s3-orchestrator

core

import "github.com/afreidah/s3-orchestrator/internal/store/core"

Package core holds the engine-agnostic orchestration that both store engines share: the TxAdapter seam, the Runner abstraction, narrow store role interfaces, and operations that span multiple statements within a single transaction.

Index

Constants

Advisory lock IDs assigned to each background service.

  • LockRebalancer (1001) periodic object distribution across backends
  • LockReplicator (1002) background replica creation
  • LockCleanupQueue (1003) failed deletion retry processing
  • LockMultipartCleanup (1004) stale multipart upload removal
  • LockLifecycle (1005) object expiration rule evaluation
  • LockDrain (1006) backend drain and object migration
  • LockUsageFlush (1007) usage counter flush to the database
  • LockOverReplication (1008) excess replica removal
  • LockReconcile (1009) backend-vs-database consistency check
  • LockScrubber (1010) background integrity verification
  • LockPendingReaper (1011) abandoned PUT-intent resolution
const (
    // LockRebalancer is held by the periodic rebalancer.
    LockRebalancer int64 = 1001
    // LockReplicator is held by the background replicator.
    LockReplicator int64 = 1002
    // LockCleanupQueue is held by the cleanup-queue retry worker.
    LockCleanupQueue int64 = 1003
    // LockMultipartCleanup is held by the stale-multipart sweeper.
    LockMultipartCleanup int64 = 1004
    // LockLifecycle is held by the lifecycle expiration worker.
    LockLifecycle int64 = 1005
    // LockDrain is held by an in-progress backend drain.
    LockDrain int64 = 1006
    // LockUsageFlush is held by the usage-counter flush worker.
    LockUsageFlush int64 = 1007
    // LockOverReplication is held by the excess-replica cleaner.
    LockOverReplication int64 = 1008
    // LockReconcile is held by the reconciler.
    LockReconcile int64 = 1009
    // LockScrubber is held by the integrity scrubber.
    LockScrubber int64 = 1010
    // LockPendingReaper is held by the pending-intent reaper.
    LockPendingReaper int64 = 1011
)

Variables

ErrNoSpaceAvailable and related package-level variables used by this package.

var (
    // ErrNoSpaceAvailable is returned when no backend has sufficient quota.
    ErrNoSpaceAvailable = errors.New("no backend has sufficient quota")

    // ErrDBUnavailable is returned by CircuitBreakerStore when the
    // circuit is open. Manager uses errors.Is to trigger broadcast
    // fallback on reads or 503 rejection on writes.
    ErrDBUnavailable = errors.New("database unavailable")

    // ErrObjectNotFound is returned when an object is not in the
    // location table.
    ErrObjectNotFound = &S3Error{
        StatusCode: 404,
        Code:       "NoSuchKey",
        Message:    "object not found",
    }

    // ErrMultipartUploadNotFound is returned when a multipart upload
    // ID is not found.
    ErrMultipartUploadNotFound = &S3Error{
        StatusCode: 404,
        Code:       "NoSuchUpload",
        Message:    "multipart upload not found",
    }

    // ErrServiceUnavailable is returned to S3 clients when writes are
    // rejected during a database outage.
    ErrServiceUnavailable = &S3Error{
        StatusCode: 503,
        Code:       "ServiceUnavailable",
        Message:    "database unavailable, writes are temporarily rejected",
    }

    // ErrInsufficientStorage is returned when no backend has enough
    // quota at the manager-routing layer.
    ErrInsufficientStorage = &S3Error{
        StatusCode: 507,
        Code:       "InsufficientStorage",
        Message:    "no backend has sufficient quota",
    }

    // ErrUsageLimitExceeded is returned when all backends holding an
    // object have exceeded their monthly usage limits.
    ErrUsageLimitExceeded = &S3Error{
        StatusCode: 429,
        Code:       "SlowDown",
        Message:    "monthly usage limit exceeded for all backends holding this object",
    }

    // ErrCleanupItemNotFound is returned by CleanupTxAdapter.GetCleanupQueueRow
    // when the row no longer exists - typically because another worker
    // already moved it to the DLQ or completed it. Callers treat this as
    // a benign no-op rather than an error.
    ErrCleanupItemNotFound = errors.New("cleanup queue row not found")
)

func DeleteObjectsBatch

func DeleteObjectsBatch(ctx context.Context, runner Runner, keys []string) (map[string][]DeletedCopy, error)

DeleteObjectsBatch removes every supplied key (and all its replicas) in a single transaction, decrementing each affected backend’s quota once by the sum of removed bytes. Returns a map from key to its displaced copies so the caller can fan out to the backend cleanup path. Keys with no copies on disk are absent from the returned map (treated as success-with-nothing-to-clean-up). Empty input yields an empty map without opening a transaction.

func GroupByKey

func GroupByKey(locations []ObjectLocation) map[string][]ObjectLocation

GroupByKey groups a flat list of object locations into a map keyed by object_key.

func ImportObject

func ImportObject(ctx context.Context, runner Runner, key, backend string, size int64) (bool, error)

ImportObject records a pre-existing object in the database without overwriting. Returns true if the object was newly imported, false if it already existed for this backend. Used by the sync subcommand to bring existing bucket objects under proxy management.

func MoveCleanupToDLQ

func MoveCleanupToDLQ(ctx context.Context, runner Runner, id int64, lastError string) (bool, error)

MoveCleanupToDLQ atomically graduates an exhausted cleanup_queue row (one whose retry budget is spent without ever succeeding at the physical backend delete) to the cleanup_dlq table. The single transaction reads the queue row, inserts a corresponding DLQ row, and deletes the queue row.

orphan_bytes is intentionally left untouched: the backend object is still on disk, so the bytes really are still occupying the backend’s quota - decrementing here would lie about reclaimed capacity. The DLQ table exists so an operator can see the unrecoverable orphan, decide whether to retry it manually or write it off, and reconcile orphan_bytes deliberately as part of that workflow.

Returns true if a row was moved, false if no row existed for id (a benign concurrent-finaliser race).

func MoveObjectLocation

func MoveObjectLocation(ctx context.Context, runner Runner, key, fromBackend, toBackend string) (int64, error)

MoveObjectLocation atomically moves a copy of an object from one backend to another. Uses row-level locks to prevent races. Returns (0, nil) if the source copy is gone or the target already has a copy.

func PromotePending

func PromotePending(ctx context.Context, runner Runner, p *PendingObject) (PendingPromoteResult, []DeletedCopy, error)

PromotePending resolves a pending intent transactionally. The pending row is locked first so two reaper instances cannot promote the same intent concurrently. The destination is then inspected:

  • If no row for (object_key, backend_name) exists in object_locations, the pending row is promoted: any displaced copies on other backends are cleared, the new row is inserted with the pending’s metadata, quotas are adjusted, and the pending row is deleted in the same tx. The displaced copies are returned so the caller can enqueue cleanup.

  • If any object_locations row for the key was created after this intent was inserted, the intent is provably stale and the pending row is dropped (Superseded).

  • If the pending row is already gone (another reaper resolved it between GetStalePending and the lock acquire), the call returns PendingPromoteAlreadyResolved.

func RecordReplica

func RecordReplica(ctx context.Context, runner Runner, key, targetBackend, sourceBackend string) (int64, bool, error)

RecordReplica inserts a replica copy of an object, but only if the source copy still exists. This prevents stale replicas when an object is overwritten or deleted during the (potentially slow) replication copy. Returns the size that was actually written into object_locations.size_bytes (read from the source row inside InsertReplicaConditional) and inserted=true on success, or (0, false, nil) when the source copy is gone or the target already holds a copy.

IncrementBackendQuota is called with the same size the row was inserted with, so object_locations.size_bytes and backend_quotas.bytes_used always agree - even if the in-memory copy size the caller observed before this call differs (concurrent overwrite mid-replication).

func RemoveExcessCopy

func RemoveExcessCopy(ctx context.Context, runner Runner, key, backendName string, size int64) error

RemoveExcessCopy deletes one copy of an object from the given backend inside a transaction, decrementing the backend quota atomically. The caller must have already performed FOR UPDATE locking and copy-count validation.

func SweepStaleCleanupQueueRows

func SweepStaleCleanupQueueRows(ctx context.Context, runner Runner, objectKey, backend string) (int64, error)

SweepStaleCleanupQueueRows removes every cleanup_queue row matching the (objectKey, backend) pair and decrements the backend’s orphan_bytes counter by the sum of their size_bytes. Used by the reconciler when it deletes a stale object_locations row so the queue does not retain orphan entries pointing at a key the backend no longer holds. Returns the number of rows deleted.

func WithTxVal

func WithTxVal[T any](ctx context.Context, runner Runner, fn func(ctx context.Context, tx TxAdapter) (T, error)) (T, error)

WithTxVal opens a transaction via runner and invokes fn. If fn returns an error, the transaction is rolled back and the zero value of T is returned. Otherwise the transaction commits and fn’s return value is propagated. This is the calling convention core operations use to surface a value out of a transactional body.

type AdvisoryLocker

AdvisoryLocker defines the leader-election helper used by background services. SQLite implementations no-op since the engine serializes writers; on Postgres this is pg_try_advisory_lock with the supplied lockID.

type AdvisoryLocker interface {
    WithAdvisoryLock(ctx context.Context, lockID int64, fn func(ctx context.Context) error) (bool, error)
}

type BackendLifecycleStore

BackendLifecycleStore defines backend-level admin operations.

type BackendLifecycleStore interface {
    BackendObjectStats(ctx context.Context, backendName string) (int64, int64, error)
    DeleteBackendData(ctx context.Context, backendName string) error
}

type CleanupItem

CleanupItem represents a pending cleanup operation in the retry queue.

ClaimedAt and ClaimedBy are populated by ClaimPendingCleanups (the worker path) and surfaced through GetPendingCleanups (the admin display path); both are nil when no worker has ever held the row. Reclaimed is set by ClaimPendingCleanups only and is true when this claim recovered a row whose previous claim aged past the grace cutoff - the cleanup worker uses it to drive the s3o_cleanup_queue_stale_claims_recovered_total metric and the cleanup_queue.claim_recovered audit event.

type CleanupItem struct {
    ID          int64
    BackendName string
    ObjectKey   string
    Reason      string
    Attempts    int32
    SizeBytes   int64
    ClaimedAt   *time.Time
    ClaimedBy   *string
    Reclaimed   bool `json:"-"`
}

type CleanupQueueRow

CleanupQueueRow is the full payload of a single cleanup_queue row, returned by GetCleanupQueueRow inside the move-to-DLQ transaction so every column the DLQ insert needs travels with one read.

type CleanupQueueRow struct {
    ID          int64
    BackendName string
    ObjectKey   string
    Reason      string
    SizeBytes   int64
    Attempts    int32
    CreatedAt   time.Time
    LastError   string
}

type CleanupStore

CleanupStore defines cleanup queue and orphan byte tracking operations.

type CleanupStore interface {
    EnqueueCleanup(ctx context.Context, backendName, objectKey, reason string, sizeBytes int64) error

    // GetPendingCleanups returns a read-only snapshot of pending rows for
    // admin / dashboard display. It does not stamp claim columns; the
    // cleanup worker uses ClaimPendingCleanups instead.
    GetPendingCleanups(ctx context.Context, limit int) ([]CleanupItem, error)

    // ClaimPendingCleanups atomically reserves a batch of cleanup rows
    // for the calling instance. Postgres uses FOR UPDATE SKIP LOCKED so
    // concurrent claim transactions across instances return disjoint row
    // sets; SQLite serialises writes intrinsically. A row is eligible
    // when claimed_at IS NULL or older than graceCutoff (a stale claim
    // from a worker that died mid-process). Returned rows have Reclaimed
    // set true when their previous claim was reclaimed by this call.
    ClaimPendingCleanups(ctx context.Context, limit int, instanceID string, graceCutoff time.Time) ([]CleanupItem, error)

    // CompleteCleanupItem atomically deletes a successfully-processed row
    // and decrements the backing backend's orphan_bytes by the row's
    // size_bytes. Idempotent against re-claim retries: if the row was
    // already deleted by a previous worker, orphan_bytes is not
    // double-decremented.
    CompleteCleanupItem(ctx context.Context, id int64) error

    // RetryCleanupItem advances next_retry, records the error, and
    // clears the claim so the row is immediately re-eligible for the
    // next worker tick.
    RetryCleanupItem(ctx context.Context, id int64, backoff time.Duration, lastError string) error
    CleanupQueueDepth(ctx context.Context) (int64, error)
    IncrementOrphanBytes(ctx context.Context, backendName string, amount int64) error
    DecrementOrphanBytes(ctx context.Context, backendName string, amount int64) error
    SweepStaleCleanupQueueRows(ctx context.Context, key, backend string) (int64, error)

    // MoveCleanupToDLQ atomically graduates an exhausted cleanup_queue
    // row to cleanup_dlq. orphan_bytes is intentionally left untouched
    // because the underlying backend object is still on disk; the DLQ
    // entry exists so an operator can investigate, retry, or write off
    // each unrecoverable orphan deliberately. Returns true if a row was
    // moved, false if no row existed (benign concurrent-finaliser race).
    MoveCleanupToDLQ(ctx context.Context, id int64, lastError string) (bool, error)

    // CleanupDLQDepth returns the number of rows currently in
    // cleanup_dlq. Updates the cleanup_dlq_depth gauge so dashboards can
    // surface the count of unrecoverable orphans needing attention.
    CleanupDLQDepth(ctx context.Context) (int64, error)
}

type CleanupTxAdapter

CleanupTxAdapter exposes the transactional operations on the cleanup_queue table needed by core orchestration. Background-worker helpers that already live entirely on a single transaction (Enqueue, Retry, Complete) stay on the read/write path through CleanupStore.

type CleanupTxAdapter interface {
    // SumAndDeleteCleanupQueueRows deletes every cleanup_queue row for
    // the given (objectKey, backend) pair and returns the count and
    // total size of the deleted rows.
    SumAndDeleteCleanupQueueRows(ctx context.Context, objectKey, backend string) (deleted int64, totalBytes int64, err error)

    // GetCleanupQueueRow returns the full payload of a single
    // cleanup_queue row by id. Used inside MoveCleanupToDLQ so the row
    // contents (key, backend, size, attempts, created_at, last_error)
    // can be copied into the DLQ insert without a separate round trip.
    GetCleanupQueueRow(ctx context.Context, id int64) (CleanupQueueRow, error)

    // InsertCleanupDLQ inserts the supplied row into cleanup_dlq. The
    // original_id retains the queue row's id for forensic correlation;
    // first_enqueued_at carries the original created_at so the DLQ
    // entry remembers how long the cleanup was outstanding. Takes a
    // pointer so the 112-byte row payload travels by reference.
    InsertCleanupDLQ(ctx context.Context, row *CleanupQueueRow) error

    // DeleteCleanupItem removes a cleanup_queue row by id. Used inside
    // MoveCleanupToDLQ so the queue->DLQ move is atomic.
    DeleteCleanupItem(ctx context.Context, id int64) error
}

type CreateMultipartUploadParams

CreateMultipartUploadParams bundles the fields a CreateMultipartUpload row needs at insert time. Pulled into a struct because the optional upload-level encryption fields would otherwise push the call signature past gocritic’s parameter-count limit and force every call site to pass empty values for non-encrypted uploads.

type CreateMultipartUploadParams struct {
    UploadID      string
    ObjectKey     string
    BackendName   string
    ContentType   string
    Metadata      map[string]string
    EncryptionKey []byte // empty for unencrypted uploads
    KeyID         string // empty for unencrypted uploads
}

type DashboardStore

DashboardStore defines the methods used by the web UI dashboard aggregator.

type DashboardStore interface {
    GetQuotaStats(ctx context.Context) (map[string]QuotaStat, error)
    GetObjectCounts(ctx context.Context) (map[string]int64, error)
    GetUnverifiedObjectCounts(ctx context.Context) (map[string]int64, error)
    GetActiveMultipartCounts(ctx context.Context) (map[string]int64, error)
    GetUsageForPeriod(ctx context.Context, period string) (map[string]UsageStat, error)
    ListDirectoryChildren(ctx context.Context, prefix, startAfter string, maxKeys int) (*DirectoryListResult, error)
}

type DecryptableLocation

DecryptableLocation represents an encrypted object location with all metadata needed for decryption.

type DecryptableLocation struct {
    ObjectKey     string
    BackendName   string
    SizeBytes     int64
    EncryptionKey []byte
    KeyID         string
    PlaintextSize int64
}

type DeletedCopy

DeletedCopy describes a copy displaced by an overwrite or delete. The caller enqueues these for physical orphan cleanup.

type DeletedCopy struct {
    BackendName string
    SizeBytes   int64
}

func DeleteObject

func DeleteObject(ctx context.Context, runner Runner, key string) ([]DeletedCopy, error)

DeleteObject removes all copies of an object and decrements their quotas. Returns ErrObjectNotFound if the object doesn’t exist; otherwise returns the deleted copies for cleanup. Quota deltas apply in stable backend_name order (see #687).

func RecordObject

func RecordObject(ctx context.Context, runner Runner, key, backend string, size int64, enc *EncryptionMeta) ([]DeletedCopy, error)

RecordObject records an object’s location and updates the backend quota. On overwrite, all existing copies (including replicas) are removed and their quotas decremented before inserting the new primary copy. Returns the displaced copies for cleanup.

func RecordObjectAndClearPending

func RecordObjectAndClearPending(ctx context.Context, runner Runner, key, backend string, size int64, enc *EncryptionMeta, intentID string) ([]DeletedCopy, error)

RecordObjectAndClearPending performs the same atomic commit as RecordObject and additionally deletes the matching pending_objects intent inside the same transaction. The write path uses this on a successful PUT so the intent never outlives a committed location.

type DirEntry

DirEntry holds aggregate stats for one immediate child of a directory prefix.

type DirEntry struct {
    Name      string   `json:"name"`
    IsDir     bool     `json:"isDir"`
    FileCount int64    `json:"fileCount"`
    TotalSize int64    `json:"totalSize"`
    Backends  []string `json:"backends"`
    CreatedAt string   `json:"createdAt"`
}

type DirectoryListResult

DirectoryListResult holds the response for a lazy-loaded directory listing.

type DirectoryListResult struct {
    Entries    []DirEntry `json:"entries"`
    HasMore    bool       `json:"hasMore"`
    NextCursor string     `json:"nextCursor"`
}

type EncryptedLocation

EncryptedLocation represents an encrypted object location for key rotation.

type EncryptedLocation struct {
    ObjectKey     string
    BackendName   string
    EncryptionKey []byte
    KeyID         string
}

type EncryptionAdmin

EncryptionAdmin defines the admin-only encryption key rotation and encrypt/decrypt batch operations used by the admin HTTP handler. These are not on the request hot path, so they bypass the circuit breaker.

type EncryptionAdmin interface {
    ListEncryptedLocations(ctx context.Context, keyID string, limit, offset int) ([]EncryptedLocation, error)
    UpdateEncryptionKey(ctx context.Context, objectKey, backendName string, newEncryptionKey []byte, newKeyID string) error
    ListUnencryptedLocations(ctx context.Context, limit, offset int) ([]UnencryptedLocation, error)
    MarkObjectEncrypted(ctx context.Context, objectKey, backendName string, encryptionKey []byte, keyID string, plaintextSize, ciphertextSize int64) error
    ListAllEncryptedLocations(ctx context.Context, limit, offset int) ([]DecryptableLocation, error)
    MarkObjectDecrypted(ctx context.Context, objectKey, backendName string, plaintextSize int64) error
}

type EncryptionMeta

EncryptionMeta carries envelope-encryption metadata stored alongside an object location. The zero value represents an unencrypted object.

type EncryptionMeta struct {
    Encrypted     bool
    EncryptionKey []byte
    KeyID         string
    PlaintextSize int64
    ContentHash   string
}

type ExistingCopy

ExistingCopy is the projection of an object_locations row that promotion and overwrite logic needs from a SELECT-for-update read.

type ExistingCopy struct {
    BackendName string
    SizeBytes   int64
    CreatedAt   time.Time
}

type ExpiredObjectsLister

ExpiredObjectsLister defines object lifecycle expiration operations.

type ExpiredObjectsLister interface {
    ListExpiredObjects(ctx context.Context, prefix string, cutoff time.Time, limit int) ([]ObjectLocation, error)
}

type IntegrityStore

IntegrityStore defines content hash verification operations.

type IntegrityStore interface {
    GetRandomHashedObjects(ctx context.Context, limit int) ([]ObjectLocation, error)
    GetObjectsWithoutHash(ctx context.Context, limit, offset int) ([]ObjectLocation, error)
    UpdateContentHash(ctx context.Context, key, backendName, hash string) error
}

type KeyedExistingCopy

KeyedExistingCopy is an ExistingCopy that also carries the object_key so batch operations can group rows by key.

type KeyedExistingCopy struct {
    ObjectKey   string
    BackendName string
    SizeBytes   int64
}

type LifecycleAdmin

LifecycleAdmin defines startup, shutdown, and schema-management operations on the concrete store. These methods are not wrapped by CircuitBreakerStore - they run during boot before the breaker is wired up, and Close() releases pool resources directly.

type LifecycleAdmin interface {
    RunMigrations(ctx context.Context) error
    VerifySchemaVersion(ctx context.Context) error
    SyncQuotaLimits(ctx context.Context, backends []config.BackendConfig) error
    Close()
}

type ListObjectsResult

ListObjectsResult holds the result of a list-objects query.

type ListObjectsResult struct {
    Objects               []ObjectLocation
    IsTruncated           bool
    NextContinuationToken string
}

type MetadataStore

MetadataStore is the union of every method the metadata-persistence layer exposes. Both *postgres.Store and *sqlite.Store satisfy it, and every consumer (proxy / transport / cli / worker) takes it as the one dependency declaration. The narrow per-role interfaces below are kept only as embedding sources for this composite; consumer code does not reference them directly. CB protection lives in each driver’s DBTX chokepoint, so this single typed surface carries the breaker semantics transparently.

type MetadataStore interface {
    ObjectStore
    QuotaStore
    MultipartStore
    ReplicationStore
    CleanupStore
    PendingStore
    IntegrityStore
    ExpiredObjectsLister
    BackendLifecycleStore
    UsageFlusher
    AdvisoryLocker
    DashboardStore
    LifecycleAdmin
    EncryptionAdmin
    NotificationOutbox
}

type MultipartPart

MultipartPart describes a single uploaded part of an active upload. PartNumber is int (not int32) to match S3 SDK conventions and the existing engine convention; the sqlc row’s int32 column value is widened by the engine adapter on read. UploadID is omitted because parts are always queried in the context of a specific upload, and adding it would push the struct past gocritic’s hugeParam threshold in callers that range over a slice of parts.

type MultipartPart struct {
    PartNumber    int
    ETag          string
    SizeBytes     int64
    CreatedAt     time.Time
    Encrypted     bool
    EncryptionKey []byte
    KeyID         string
    PlaintextSize int64
}

type MultipartStore

MultipartStore defines multipart upload lifecycle operations.

type MultipartStore interface {
    CreateMultipartUpload(ctx context.Context, params *CreateMultipartUploadParams) error
    GetMultipartUpload(ctx context.Context, uploadID string) (*MultipartUpload, error)
    RecordPart(ctx context.Context, uploadID string, partNumber int, etag string, size int64, enc *EncryptionMeta) error
    GetParts(ctx context.Context, uploadID string) ([]MultipartPart, error)
    DeleteMultipartUpload(ctx context.Context, uploadID string) error
    ListMultipartUploads(ctx context.Context, prefix string, maxUploads int) ([]MultipartUpload, error)
    CountActiveMultipartUploads(ctx context.Context, bucketPrefix string) (int64, error)
    GetStaleMultipartUploads(ctx context.Context, olderThan time.Duration) ([]MultipartUpload, error)
    GetMultipartUploadsByBackend(ctx context.Context, backendName string) ([]MultipartUpload, error)
}

type MultipartUpload

MultipartUpload describes an active multipart upload’s metadata.

EncryptionKey, KeyID, and Encrypted carry the upload-level wrapped DEK shared across every part of an encrypted multipart upload. Encrypted is true when EncryptionKey is non-empty. EncryptionKey uses the same packed format as MultipartPart.EncryptionKey and ObjectLocation.EncryptionKey: encryption.PackKeyData(baseNonce, wrappedDEK).

type MultipartUpload struct {
    UploadID      string
    ObjectKey     string
    BackendName   string
    ContentType   string
    Metadata      map[string]string
    Encrypted     bool
    EncryptionKey []byte
    KeyID         string
    CreatedAt     time.Time
}

type NotificationOutbox

NotificationOutbox defines the durable notification outbox operations the notifier worker uses to deliver webhook events with retry/backoff semantics. Leader election around the drain loop comes from a separate AdvisoryLocker dependency.

type NotificationOutbox interface {
    InsertNotification(ctx context.Context, eventType, payload, endpointURL string) error
    GetPendingNotifications(ctx context.Context, limit int) ([]NotificationRow, error)
    CompleteNotification(ctx context.Context, id int64) error
    RetryNotification(ctx context.Context, id int64, backoff time.Duration, lastError string) error
}

type NotificationRow

NotificationRow represents a pending notification in the outbox table.

type NotificationRow struct {
    ID          int64
    EventType   string
    Payload     []byte
    EndpointURL string
    Attempts    int32
}

type ObjectLocation

ObjectLocation records that a backend currently holds a copy of a key, along with the size and any encryption or integrity metadata.

type ObjectLocation struct {
    ObjectKey     string
    BackendName   string
    SizeBytes     int64
    CreatedAt     time.Time
    Encrypted     bool
    EncryptionKey []byte
    KeyID         string
    PlaintextSize int64
    ContentHash   string
}

type ObjectStore

ObjectStore defines object location CRUD and listing operations.

type ObjectStore interface {
    GetAllObjectLocations(ctx context.Context, key string) ([]ObjectLocation, error)
    GetObjectBackendsForKeys(ctx context.Context, keys []string) (map[string][]string, error)
    RecordObject(ctx context.Context, key, backend string, size int64, enc *EncryptionMeta) ([]DeletedCopy, error)
    RecordObjectAndClearPending(ctx context.Context, key, backend string, size int64, enc *EncryptionMeta, intentID string) ([]DeletedCopy, error)
    DeleteObject(ctx context.Context, key string) ([]DeletedCopy, error)
    DeleteObjectsBatch(ctx context.Context, keys []string) (map[string][]DeletedCopy, error)
    ListObjects(ctx context.Context, prefix, startAfter string, maxKeys int) (*ListObjectsResult, error)
    ListObjectsByBackend(ctx context.Context, backendName string, limit int) ([]ObjectLocation, error)
    ListObjectsByBackendKeyAsc(ctx context.Context, backendName, afterKey string, limit int) ([]ObjectLocation, error)
    MoveObjectLocation(ctx context.Context, key, fromBackend, toBackend string) (int64, error)
    ImportObject(ctx context.Context, key, backend string, size int64) (bool, error)
    DeleteObjectLocation(ctx context.Context, key, backendName string) error
}

type ObjectsTxAdapter

ObjectsTxAdapter exposes the transactional operations on the object_locations table.

type ObjectsTxAdapter interface {
    GetExistingCopiesForUpdate(ctx context.Context, objectKey string) ([]ExistingCopy, error)
    InsertObjectLocation(ctx context.Context, loc *ObjectLocation) error
    DeleteObjectCopies(ctx context.Context, objectKey string) error

    // GetCopiesForKeysForUpdate returns every (key, backend, size) row
    // matching any key in the supplied list, locked FOR UPDATE so the
    // same transaction can delete the rows and decrement quotas
    // atomically. Used by the batch-delete path.
    GetCopiesForKeysForUpdate(ctx context.Context, keys []string) ([]KeyedExistingCopy, error)

    // DeleteObjectsByKeys removes every object_locations row whose key
    // is in the supplied list. Caller must have already locked the
    // rows via GetCopiesForKeysForUpdate.
    DeleteObjectsByKeys(ctx context.Context, keys []string) error

    // CheckObjectExistsOnBackend reports whether the object_locations
    // table holds a row for (key, backend).
    CheckObjectExistsOnBackend(ctx context.Context, objectKey, backend string) (bool, error)

    // LockObjectOnBackend takes a FOR UPDATE lock on the
    // (key, backend) row and returns its full payload. Returns
    // (nil, false, nil) when the row is gone - benign race the caller
    // treats as nothing to act on.
    LockObjectOnBackend(ctx context.Context, objectKey, backend string) (loc *ObjectLocation, ok bool, err error)

    // DeleteObjectFromBackend removes the single (key, backend)
    // object_locations row.
    DeleteObjectFromBackend(ctx context.Context, objectKey, backend string) error

    // InsertObjectLocationIfNotExists is the import-side INSERT that
    // preserves an existing row. Returns true if a row was inserted.
    InsertObjectLocationIfNotExists(ctx context.Context, loc *ObjectLocation) (inserted bool, err error)

    // InsertReplicaConditional inserts a replica row only if the
    // source copy still exists. Returns the size_bytes the row was
    // inserted with (read from the source row inside the same
    // statement) and inserted=true on success, or (0, false, nil)
    // when the source copy is gone or the target already has a copy.
    // Callers use the returned size for IncrementBackendQuota so
    // object_locations.size_bytes and backend_quotas.bytes_used always
    // agree.
    InsertReplicaConditional(ctx context.Context, objectKey, targetBackend, sourceBackend string) (size int64, inserted bool, err error)
}

type PendingObject

PendingObject is an in-flight PUT intent recorded before the backend upload. The reaper resolves intents that survive a failed metadata commit so a DB outage between PUT and RecordObject cannot silently destroy the prior copy of an overwritten key.

type PendingObject struct {
    IntentID      string
    ObjectKey     string
    BackendName   string
    SizeBytes     int64
    Encrypted     bool
    EncryptionKey []byte
    KeyID         string
    PlaintextSize int64
    ContentHash   string
    CreatedAt     time.Time
}

type PendingPromoteResult

PendingPromoteResult describes how PromotePending resolved an intent.

type PendingPromoteResult int

PendingPromoteCommitted and related constants used by this package.

const (
    // PendingPromoteCommitted means the pending row was promoted into
    // object_locations and removed in the same transaction.
    PendingPromoteCommitted PendingPromoteResult = iota

    // PendingPromoteAmbiguous is reserved for pathological cases the
    // resolver cannot decide between promotion and dropping. The current
    // resolver does not produce this result; the timestamp comparison
    // covers every previously-ambiguous case as Superseded instead. Kept
    // so the metric label and constant stay stable across releases.
    PendingPromoteAmbiguous

    // PendingPromoteAlreadyResolved means the pending row was gone by
    // the time the transaction acquired its lock - another reaper
    // instance already resolved it. Benign no-op.
    PendingPromoteAlreadyResolved

    // PendingPromoteSuperseded means a successful write for the same
    // key committed after this intent was inserted. The intent is
    // provably stale, so the resolver deletes the pending row.
    PendingPromoteSuperseded
)

type PendingStore

PendingStore defines in-flight PutObject intent tracking. The write path inserts an intent before the backend PUT and removes it on a successful commit; the pending reaper resolves intents left behind by a failed commit so a DB outage between PUT and RecordObject cannot silently destroy the prior copy of an overwritten key.

type PendingStore interface {
    InsertPending(ctx context.Context, p *PendingObject) error
    DeletePending(ctx context.Context, intentID string) error
    GetStalePending(ctx context.Context, olderThan time.Time, limit int) ([]PendingObject, error)
    PromotePending(ctx context.Context, p *PendingObject) (PendingPromoteResult, []DeletedCopy, error)
    PendingDepth(ctx context.Context) (int64, error)
    DeletePendingByBackend(ctx context.Context, backendName string) error
}

type PendingTxAdapter

PendingTxAdapter exposes the transactional operations on the pending_objects table.

type PendingTxAdapter interface {
    // ClaimPending returns true if the pending row was successfully
    // claimed for promotion, false if it has already been resolved by
    // another worker. Postgres uses SELECT FOR UPDATE; SQLite uses an
    // existence probe inside the writer-serialized txn - same
    // guarantee.
    ClaimPending(ctx context.Context, intentID string) (claimed bool, err error)

    InsertPending(ctx context.Context, p *PendingObject) error
    DeletePending(ctx context.Context, intentID string) error
    DeletePendingByBackend(ctx context.Context, backendName string) error
}

type QuotaStat

QuotaStat holds quota statistics for a single backend.

type QuotaStat struct {
    BackendName string
    BytesUsed   int64
    BytesLimit  int64
    OrphanBytes int64
    UpdatedAt   time.Time
}

type QuotaStore

QuotaStore defines quota routing queries.

type QuotaStore interface {
    GetBackendWithSpace(ctx context.Context, size int64, backendOrder []string) (string, error)
    GetLeastUtilizedBackend(ctx context.Context, size int64, eligible []string) (string, error)
    GetQuotaStats(ctx context.Context) (map[string]QuotaStat, error)
}

type QuotaTxAdapter

QuotaTxAdapter exposes the transactional operations on the backend_quotas table.

type QuotaTxAdapter interface {
    // IncrementBackendQuota credits delta bytes to the backend's
    // quota and returns ErrNoSpaceAvailable when the row reports zero
    // rows updated (quota would be exceeded).
    IncrementBackendQuota(ctx context.Context, backendName string, delta int64) error

    // DecrementBackendQuota debits delta bytes from the backend's
    // quota.
    DecrementBackendQuota(ctx context.Context, backendName string, delta int64) error

    // DecrementOrphanBytes debits delta bytes from the backend's
    // orphan_bytes counter, clamped at zero.
    DecrementOrphanBytes(ctx context.Context, backendName string, delta int64) error
}

type Reader

Reader exposes the engine’s read-only operations - those that do not require a transaction or row-level lock. Engine packages return one alongside the Runner so background services can issue queries without paying the cost of a transaction.

type Reader interface {
    // Pending reads
    GetStalePending(ctx context.Context, olderThan time.Time, limit int) ([]PendingObject, error)
    CountPending(ctx context.Context) (int64, error)

    // Object listing reads
    GetAllObjectLocations(ctx context.Context, key string) ([]ObjectLocation, error)
    ListObjectsByBackend(ctx context.Context, backendName string, limit int) ([]ObjectLocation, error)
    ListObjectsByBackendKeyAsc(ctx context.Context, backendName, afterKey string, limit int) ([]ObjectLocation, error)
    ListObjectsByPrefix(ctx context.Context, prefix, startAfter string, maxKeys int) ([]ObjectLocation, error)
    ListExpiredObjects(ctx context.Context, prefix string, cutoff time.Time, limit int) ([]ObjectLocation, error)

    // Cleanup queue reads
    GetPendingCleanups(ctx context.Context, limit int) ([]CleanupItem, error)
    CleanupQueueDepth(ctx context.Context) (int64, error)

    // CleanupDLQDepth returns the number of rows in cleanup_dlq. Used
    // by the cleanup_dlq_depth gauge so dashboards surface the count
    // of unrecoverable orphans needing operator attention.
    CleanupDLQDepth(ctx context.Context) (int64, error)
}

type ReplicationStore

ReplicationStore defines replication management operations.

type ReplicationStore interface {
    GetUnderReplicatedObjects(ctx context.Context, factor, limit int) ([]ObjectLocation, error)
    GetUnderReplicatedObjectsExcluding(ctx context.Context, factor, limit int, excludedBackends []string) ([]ObjectLocation, error)
    RecordReplica(ctx context.Context, key, targetBackend, sourceBackend string) (size int64, inserted bool, err error)
    GetOverReplicatedObjects(ctx context.Context, factor, limit int) ([]ObjectLocation, error)
    CountOverReplicatedObjects(ctx context.Context, factor int) (int64, error)
    RemoveExcessCopy(ctx context.Context, key, backendName string, size int64) error
}

type Runner

Runner opens a transaction and invokes fn with a TxAdapter scoped to it. The transaction commits if fn returns a nil error and rolls back otherwise.

type Runner interface {
    // WithTx is method-form here for Go interface satisfaction; the
    // generic helper below is the calling convention used by core
    // operations. A type that implements WithTx with the unwrapped
    // signature satisfies this interface; the WithTxVal helper wraps
    // it.
    WithTx(ctx context.Context, fn func(ctx context.Context, tx TxAdapter) error) error
}

type S3Error

S3Error is a structured error that carries an HTTP status code and S3 error code, allowing the server layer to translate storage errors into S3 XML responses without per-handler error mapping.

type S3Error struct {
    StatusCode int
    Code       string
    Message    string
}

func (*S3Error) Error

func (e *S3Error) Error() string

Error returns the human-readable error message.

type TxAdapter

TxAdapter is the per-engine transactional seam. A core operation receives one of these from Runner.WithTx, runs business logic against it, and never touches a driver-specific transaction directly. The parent embeds the per-feature adapters so callers depend only on the narrowest interface that fits their needs.

type TxAdapter interface {
    PendingTxAdapter
    ObjectsTxAdapter
    CleanupTxAdapter
    QuotaTxAdapter

    // AcquireKeyLock takes a transaction-scoped lock keyed by the
    // object key. Postgres uses pg_advisory_xact_lock derived from a
    // hash of the key; SQLite no-ops because the engine serializes
    // writers and the in-tx existence probe provides the same
    // guarantee.
    AcquireKeyLock(ctx context.Context, objectKey string) error
}

type UnencryptedLocation

UnencryptedLocation represents an unencrypted object location.

type UnencryptedLocation struct {
    ObjectKey   string
    BackendName string
    SizeBytes   int64
}

type UsageFlusher

UsageFlusher defines the store method used by UsageTracker.FlushUsage.

type UsageFlusher interface {
    FlushUsageDeltas(ctx context.Context, backendName, period string, apiRequests, egressBytes, ingressBytes int64) error
}

type UsageLimits

UsageLimits holds configurable monthly usage limits for a single backend. Zero in a field means unlimited for that dimension.

type UsageLimits struct {
    APIRequestLimit  int64
    EgressByteLimit  int64
    IngressByteLimit int64
}

type UsageStat

UsageStat holds usage statistics for a single backend in a given period.

type UsageStat struct {
    APIRequests  int64
    EgressBytes  int64
    IngressBytes int64
}

Generated by gomarkdoc