Package core holds the engine-agnostic orchestration that both store engines share: the TxAdapter seam, the Runner abstraction, narrow store role interfaces, and operations that span multiple statements within a single transaction.
const (
// LockRebalancer is held by the periodic rebalancer.LockRebalancerint64 = 1001// LockReplicator is held by the background replicator.LockReplicatorint64 = 1002// LockCleanupQueue is held by the cleanup-queue retry worker.LockCleanupQueueint64 = 1003// LockMultipartCleanup is held by the stale-multipart sweeper.LockMultipartCleanupint64 = 1004// LockLifecycle is held by the lifecycle expiration worker.LockLifecycleint64 = 1005// LockDrain is held by an in-progress backend drain.LockDrainint64 = 1006// LockUsageFlush is held by the usage-counter flush worker.LockUsageFlushint64 = 1007// LockOverReplication is held by the excess-replica cleaner.LockOverReplicationint64 = 1008// LockReconcile is held by the reconciler.LockReconcileint64 = 1009// LockScrubber is held by the integrity scrubber.LockScrubberint64 = 1010// LockPendingReaper is held by the pending-intent reaper.LockPendingReaperint64 = 1011)
Variables
ErrNoSpaceAvailable and related package-level variables used by this package.
var (
// ErrNoSpaceAvailable is returned when no backend has sufficient quota.ErrNoSpaceAvailable = errors.New("no backend has sufficient quota")
// ErrDBUnavailable is returned by CircuitBreakerStore when the// circuit is open. Manager uses errors.Is to trigger broadcast// fallback on reads or 503 rejection on writes.ErrDBUnavailable = errors.New("database unavailable")
// ErrObjectNotFound is returned when an object is not in the// location table.ErrObjectNotFound = &S3Error{
StatusCode: 404,
Code: "NoSuchKey",
Message: "object not found",
}
// ErrMultipartUploadNotFound is returned when a multipart upload// ID is not found.ErrMultipartUploadNotFound = &S3Error{
StatusCode: 404,
Code: "NoSuchUpload",
Message: "multipart upload not found",
}
// ErrServiceUnavailable is returned to S3 clients when writes are// rejected during a database outage.ErrServiceUnavailable = &S3Error{
StatusCode: 503,
Code: "ServiceUnavailable",
Message: "database unavailable, writes are temporarily rejected",
}
// ErrInsufficientStorage is returned when no backend has enough// quota at the manager-routing layer.ErrInsufficientStorage = &S3Error{
StatusCode: 507,
Code: "InsufficientStorage",
Message: "no backend has sufficient quota",
}
// ErrUsageLimitExceeded is returned when all backends holding an// object have exceeded their monthly usage limits.ErrUsageLimitExceeded = &S3Error{
StatusCode: 429,
Code: "SlowDown",
Message: "monthly usage limit exceeded for all backends holding this object",
}
// ErrCleanupItemNotFound is returned by CleanupTxAdapter.GetCleanupQueueRow// when the row no longer exists - typically because another worker// already moved it to the DLQ or completed it. Callers treat this as// a benign no-op rather than an error.ErrCleanupItemNotFound = errors.New("cleanup queue row not found")
)
DeleteObjectsBatch removes every supplied key (and all its replicas) in a single transaction, decrementing each affected backend’s quota once by the sum of removed bytes. Returns a map from key to its displaced copies so the caller can fan out to the backend cleanup path. Keys with no copies on disk are absent from the returned map (treated as success-with-nothing-to-clean-up). Empty input yields an empty map without opening a transaction.
ImportObject records a pre-existing object in the database without overwriting. Returns true if the object was newly imported, false if it already existed for this backend. Used by the sync subcommand to bring existing bucket objects under proxy management.
MoveCleanupToDLQ atomically graduates an exhausted cleanup_queue row (one whose retry budget is spent without ever succeeding at the physical backend delete) to the cleanup_dlq table. The single transaction reads the queue row, inserts a corresponding DLQ row, and deletes the queue row.
orphan_bytes is intentionally left untouched: the backend object is still on disk, so the bytes really are still occupying the backend’s quota - decrementing here would lie about reclaimed capacity. The DLQ table exists so an operator can see the unrecoverable orphan, decide whether to retry it manually or write it off, and reconcile orphan_bytes deliberately as part of that workflow.
Returns true if a row was moved, false if no row existed for id (a benign concurrent-finaliser race).
MoveObjectLocation atomically moves a copy of an object from one backend to another. Uses row-level locks to prevent races. Returns (0, nil) if the source copy is gone or the target already has a copy.
PromotePending resolves a pending intent transactionally. The pending row is locked first so two reaper instances cannot promote the same intent concurrently. The destination is then inspected:
If no row for (object_key, backend_name) exists in object_locations, the pending row is promoted: any displaced copies on other backends are cleared, the new row is inserted with the pending’s metadata, quotas are adjusted, and the pending row is deleted in the same tx. The displaced copies are returned so the caller can enqueue cleanup.
If any object_locations row for the key was created after this intent was inserted, the intent is provably stale and the pending row is dropped (Superseded).
If the pending row is already gone (another reaper resolved it between GetStalePending and the lock acquire), the call returns PendingPromoteAlreadyResolved.
RecordReplica inserts a replica copy of an object, but only if the source copy still exists. This prevents stale replicas when an object is overwritten or deleted during the (potentially slow) replication copy. Returns the size that was actually written into object_locations.size_bytes (read from the source row inside InsertReplicaConditional) and inserted=true on success, or (0, false, nil) when the source copy is gone or the target already holds a copy.
IncrementBackendQuota is called with the same size the row was inserted with, so object_locations.size_bytes and backend_quotas.bytes_used always agree - even if the in-memory copy size the caller observed before this call differs (concurrent overwrite mid-replication).
RemoveExcessCopy deletes one copy of an object from the given backend inside a transaction, decrementing the backend quota atomically. The caller must have already performed FOR UPDATE locking and copy-count validation.
SweepStaleCleanupQueueRows removes every cleanup_queue row matching the (objectKey, backend) pair and decrements the backend’s orphan_bytes counter by the sum of their size_bytes. Used by the reconciler when it deletes a stale object_locations row so the queue does not retain orphan entries pointing at a key the backend no longer holds. Returns the number of rows deleted.
WithTxVal opens a transaction via runner and invokes fn. If fn returns an error, the transaction is rolled back and the zero value of T is returned. Otherwise the transaction commits and fn’s return value is propagated. This is the calling convention core operations use to surface a value out of a transactional body.
AdvisoryLocker defines the leader-election helper used by background services. SQLite implementations no-op since the engine serializes writers; on Postgres this is pg_try_advisory_lock with the supplied lockID.
CleanupItem represents a pending cleanup operation in the retry queue.
ClaimedAt and ClaimedBy are populated by ClaimPendingCleanups (the worker path) and surfaced through GetPendingCleanups (the admin display path); both are nil when no worker has ever held the row. Reclaimed is set by ClaimPendingCleanups only and is true when this claim recovered a row whose previous claim aged past the grace cutoff - the cleanup worker uses it to drive the s3o_cleanup_queue_stale_claims_recovered_total metric and the cleanup_queue.claim_recovered audit event.
CleanupQueueRow is the full payload of a single cleanup_queue row, returned by GetCleanupQueueRow inside the move-to-DLQ transaction so every column the DLQ insert needs travels with one read.
CleanupStore defines cleanup queue and orphan byte tracking operations.
typeCleanupStoreinterface {
EnqueueCleanup(ctxcontext.Context, backendName, objectKey, reasonstring, sizeBytesint64) error// GetPendingCleanups returns a read-only snapshot of pending rows for// admin / dashboard display. It does not stamp claim columns; the// cleanup worker uses ClaimPendingCleanups instead.GetPendingCleanups(ctxcontext.Context, limitint) ([]CleanupItem, error)
// ClaimPendingCleanups atomically reserves a batch of cleanup rows// for the calling instance. Postgres uses FOR UPDATE SKIP LOCKED so// concurrent claim transactions across instances return disjoint row// sets; SQLite serialises writes intrinsically. A row is eligible// when claimed_at IS NULL or older than graceCutoff (a stale claim// from a worker that died mid-process). Returned rows have Reclaimed// set true when their previous claim was reclaimed by this call.ClaimPendingCleanups(ctxcontext.Context, limitint, instanceIDstring, graceCutofftime.Time) ([]CleanupItem, error)
// CompleteCleanupItem atomically deletes a successfully-processed row// and decrements the backing backend's orphan_bytes by the row's// size_bytes. Idempotent against re-claim retries: if the row was// already deleted by a previous worker, orphan_bytes is not// double-decremented.CompleteCleanupItem(ctxcontext.Context, idint64) error// RetryCleanupItem advances next_retry, records the error, and// clears the claim so the row is immediately re-eligible for the// next worker tick.RetryCleanupItem(ctxcontext.Context, idint64, backofftime.Duration, lastErrorstring) errorCleanupQueueDepth(ctxcontext.Context) (int64, error)
IncrementOrphanBytes(ctxcontext.Context, backendNamestring, amountint64) errorDecrementOrphanBytes(ctxcontext.Context, backendNamestring, amountint64) errorSweepStaleCleanupQueueRows(ctxcontext.Context, key, backendstring) (int64, error)
// MoveCleanupToDLQ atomically graduates an exhausted cleanup_queue// row to cleanup_dlq. orphan_bytes is intentionally left untouched// because the underlying backend object is still on disk; the DLQ// entry exists so an operator can investigate, retry, or write off// each unrecoverable orphan deliberately. Returns true if a row was// moved, false if no row existed (benign concurrent-finaliser race).MoveCleanupToDLQ(ctxcontext.Context, idint64, lastErrorstring) (bool, error)
// CleanupDLQDepth returns the number of rows currently in// cleanup_dlq. Updates the cleanup_dlq_depth gauge so dashboards can// surface the count of unrecoverable orphans needing attention.CleanupDLQDepth(ctxcontext.Context) (int64, error)
}
CleanupTxAdapter exposes the transactional operations on the cleanup_queue table needed by core orchestration. Background-worker helpers that already live entirely on a single transaction (Enqueue, Retry, Complete) stay on the read/write path through CleanupStore.
typeCleanupTxAdapterinterface {
// SumAndDeleteCleanupQueueRows deletes every cleanup_queue row for// the given (objectKey, backend) pair and returns the count and// total size of the deleted rows.SumAndDeleteCleanupQueueRows(ctxcontext.Context, objectKey, backendstring) (deletedint64, totalBytesint64, errerror)
// GetCleanupQueueRow returns the full payload of a single// cleanup_queue row by id. Used inside MoveCleanupToDLQ so the row// contents (key, backend, size, attempts, created_at, last_error)// can be copied into the DLQ insert without a separate round trip.GetCleanupQueueRow(ctxcontext.Context, idint64) (CleanupQueueRow, error)
// InsertCleanupDLQ inserts the supplied row into cleanup_dlq. The// original_id retains the queue row's id for forensic correlation;// first_enqueued_at carries the original created_at so the DLQ// entry remembers how long the cleanup was outstanding. Takes a// pointer so the 112-byte row payload travels by reference.InsertCleanupDLQ(ctxcontext.Context, row*CleanupQueueRow) error// DeleteCleanupItem removes a cleanup_queue row by id. Used inside// MoveCleanupToDLQ so the queue->DLQ move is atomic.DeleteCleanupItem(ctxcontext.Context, idint64) error}
CreateMultipartUploadParams bundles the fields a CreateMultipartUpload row needs at insert time. Pulled into a struct because the optional upload-level encryption fields would otherwise push the call signature past gocritic’s parameter-count limit and force every call site to pass empty values for non-encrypted uploads.
typeCreateMultipartUploadParamsstruct {
UploadIDstringObjectKeystringBackendNamestringContentTypestringMetadatamap[string]stringEncryptionKey []byte// empty for unencrypted uploadsKeyIDstring// empty for unencrypted uploads}
DeleteObject removes all copies of an object and decrements their quotas. Returns ErrObjectNotFound if the object doesn’t exist; otherwise returns the deleted copies for cleanup. Quota deltas apply in stable backend_name order (see #687).
RecordObject records an object’s location and updates the backend quota. On overwrite, all existing copies (including replicas) are removed and their quotas decremented before inserting the new primary copy. Returns the displaced copies for cleanup.
RecordObjectAndClearPending performs the same atomic commit as RecordObject and additionally deletes the matching pending_objects intent inside the same transaction. The write path uses this on a successful PUT so the intent never outlives a committed location.
EncryptionAdmin defines the admin-only encryption key rotation and encrypt/decrypt batch operations used by the admin HTTP handler. These are not on the request hot path, so they bypass the circuit breaker.
LifecycleAdmin defines startup, shutdown, and schema-management operations on the concrete store. These methods are not wrapped by CircuitBreakerStore - they run during boot before the breaker is wired up, and Close() releases pool resources directly.
MetadataStore is the union of every method the metadata-persistence layer exposes. Both *postgres.Store and *sqlite.Store satisfy it, and every consumer (proxy / transport / cli / worker) takes it as the one dependency declaration. The narrow per-role interfaces below are kept only as embedding sources for this composite; consumer code does not reference them directly. CB protection lives in each driver’s DBTX chokepoint, so this single typed surface carries the breaker semantics transparently.
MultipartPart describes a single uploaded part of an active upload. PartNumber is int (not int32) to match S3 SDK conventions and the existing engine convention; the sqlc row’s int32 column value is widened by the engine adapter on read. UploadID is omitted because parts are always queried in the context of a specific upload, and adding it would push the struct past gocritic’s hugeParam threshold in callers that range over a slice of parts.
MultipartUpload describes an active multipart upload’s metadata.
EncryptionKey, KeyID, and Encrypted carry the upload-level wrapped DEK shared across every part of an encrypted multipart upload. Encrypted is true when EncryptionKey is non-empty. EncryptionKey uses the same packed format as MultipartPart.EncryptionKey and ObjectLocation.EncryptionKey: encryption.PackKeyData(baseNonce, wrappedDEK).
NotificationOutbox defines the durable notification outbox operations the notifier worker uses to deliver webhook events with retry/backoff semantics. Leader election around the drain loop comes from a separate AdvisoryLocker dependency.
ObjectsTxAdapter exposes the transactional operations on the object_locations table.
typeObjectsTxAdapterinterface {
GetExistingCopiesForUpdate(ctxcontext.Context, objectKeystring) ([]ExistingCopy, error)
InsertObjectLocation(ctxcontext.Context, loc*ObjectLocation) errorDeleteObjectCopies(ctxcontext.Context, objectKeystring) error// GetCopiesForKeysForUpdate returns every (key, backend, size) row// matching any key in the supplied list, locked FOR UPDATE so the// same transaction can delete the rows and decrement quotas// atomically. Used by the batch-delete path.GetCopiesForKeysForUpdate(ctxcontext.Context, keys []string) ([]KeyedExistingCopy, error)
// DeleteObjectsByKeys removes every object_locations row whose key// is in the supplied list. Caller must have already locked the// rows via GetCopiesForKeysForUpdate.DeleteObjectsByKeys(ctxcontext.Context, keys []string) error// CheckObjectExistsOnBackend reports whether the object_locations// table holds a row for (key, backend).CheckObjectExistsOnBackend(ctxcontext.Context, objectKey, backendstring) (bool, error)
// LockObjectOnBackend takes a FOR UPDATE lock on the// (key, backend) row and returns its full payload. Returns// (nil, false, nil) when the row is gone - benign race the caller// treats as nothing to act on.LockObjectOnBackend(ctxcontext.Context, objectKey, backendstring) (loc*ObjectLocation, okbool, errerror)
// DeleteObjectFromBackend removes the single (key, backend)// object_locations row.DeleteObjectFromBackend(ctxcontext.Context, objectKey, backendstring) error// InsertObjectLocationIfNotExists is the import-side INSERT that// preserves an existing row. Returns true if a row was inserted.InsertObjectLocationIfNotExists(ctxcontext.Context, loc*ObjectLocation) (insertedbool, errerror)
// InsertReplicaConditional inserts a replica row only if the// source copy still exists. Returns the size_bytes the row was// inserted with (read from the source row inside the same// statement) and inserted=true on success, or (0, false, nil)// when the source copy is gone or the target already has a copy.// Callers use the returned size for IncrementBackendQuota so// object_locations.size_bytes and backend_quotas.bytes_used always// agree.InsertReplicaConditional(ctxcontext.Context, objectKey, targetBackend, sourceBackendstring) (sizeint64, insertedbool, errerror)
}
PendingObject is an in-flight PUT intent recorded before the backend upload. The reaper resolves intents that survive a failed metadata commit so a DB outage between PUT and RecordObject cannot silently destroy the prior copy of an overwritten key.
PendingPromoteResult describes how PromotePending resolved an intent.
typePendingPromoteResultint
PendingPromoteCommitted and related constants used by this package.
const (
// PendingPromoteCommitted means the pending row was promoted into// object_locations and removed in the same transaction.PendingPromoteCommittedPendingPromoteResult = iota// PendingPromoteAmbiguous is reserved for pathological cases the// resolver cannot decide between promotion and dropping. The current// resolver does not produce this result; the timestamp comparison// covers every previously-ambiguous case as Superseded instead. Kept// so the metric label and constant stay stable across releases.PendingPromoteAmbiguous// PendingPromoteAlreadyResolved means the pending row was gone by// the time the transaction acquired its lock - another reaper// instance already resolved it. Benign no-op.PendingPromoteAlreadyResolved// PendingPromoteSuperseded means a successful write for the same// key committed after this intent was inserted. The intent is// provably stale, so the resolver deletes the pending row.PendingPromoteSuperseded)
PendingStore defines in-flight PutObject intent tracking. The write path inserts an intent before the backend PUT and removes it on a successful commit; the pending reaper resolves intents left behind by a failed commit so a DB outage between PUT and RecordObject cannot silently destroy the prior copy of an overwritten key.
PendingTxAdapter exposes the transactional operations on the pending_objects table.
typePendingTxAdapterinterface {
// ClaimPending returns true if the pending row was successfully// claimed for promotion, false if it has already been resolved by// another worker. Postgres uses SELECT FOR UPDATE; SQLite uses an// existence probe inside the writer-serialized txn - same// guarantee.ClaimPending(ctxcontext.Context, intentIDstring) (claimedbool, errerror)
InsertPending(ctxcontext.Context, p*PendingObject) errorDeletePending(ctxcontext.Context, intentIDstring) errorDeletePendingByBackend(ctxcontext.Context, backendNamestring) error}
QuotaTxAdapter exposes the transactional operations on the backend_quotas table.
typeQuotaTxAdapterinterface {
// IncrementBackendQuota credits delta bytes to the backend's// quota and returns ErrNoSpaceAvailable when the row reports zero// rows updated (quota would be exceeded).IncrementBackendQuota(ctxcontext.Context, backendNamestring, deltaint64) error// DecrementBackendQuota debits delta bytes from the backend's// quota.DecrementBackendQuota(ctxcontext.Context, backendNamestring, deltaint64) error// DecrementOrphanBytes debits delta bytes from the backend's// orphan_bytes counter, clamped at zero.DecrementOrphanBytes(ctxcontext.Context, backendNamestring, deltaint64) error}
Reader exposes the engine’s read-only operations - those that do not require a transaction or row-level lock. Engine packages return one alongside the Runner so background services can issue queries without paying the cost of a transaction.
typeReaderinterface {
// Pending readsGetStalePending(ctxcontext.Context, olderThantime.Time, limitint) ([]PendingObject, error)
CountPending(ctxcontext.Context) (int64, error)
// Object listing readsGetAllObjectLocations(ctxcontext.Context, keystring) ([]ObjectLocation, error)
ListObjectsByBackend(ctxcontext.Context, backendNamestring, limitint) ([]ObjectLocation, error)
ListObjectsByBackendKeyAsc(ctxcontext.Context, backendName, afterKeystring, limitint) ([]ObjectLocation, error)
ListObjectsByPrefix(ctxcontext.Context, prefix, startAfterstring, maxKeysint) ([]ObjectLocation, error)
ListExpiredObjects(ctxcontext.Context, prefixstring, cutofftime.Time, limitint) ([]ObjectLocation, error)
// Cleanup queue readsGetPendingCleanups(ctxcontext.Context, limitint) ([]CleanupItem, error)
CleanupQueueDepth(ctxcontext.Context) (int64, error)
// CleanupDLQDepth returns the number of rows in cleanup_dlq. Used// by the cleanup_dlq_depth gauge so dashboards surface the count// of unrecoverable orphans needing operator attention.CleanupDLQDepth(ctxcontext.Context) (int64, error)
}
Runner opens a transaction and invokes fn with a TxAdapter scoped to it. The transaction commits if fn returns a nil error and rolls back otherwise.
typeRunnerinterface {
// WithTx is method-form here for Go interface satisfaction; the// generic helper below is the calling convention used by core// operations. A type that implements WithTx with the unwrapped// signature satisfies this interface; the WithTxVal helper wraps// it.WithTx(ctxcontext.Context, fnfunc(ctxcontext.Context, txTxAdapter) error) error}
S3Error is a structured error that carries an HTTP status code and S3 error code, allowing the server layer to translate storage errors into S3 XML responses without per-handler error mapping.
TxAdapter is the per-engine transactional seam. A core operation receives one of these from Runner.WithTx, runs business logic against it, and never touches a driver-specific transaction directly. The parent embeds the per-feature adapters so callers depend only on the narrowest interface that fits their needs.
typeTxAdapterinterface {
PendingTxAdapterObjectsTxAdapterCleanupTxAdapterQuotaTxAdapter// AcquireKeyLock takes a transaction-scoped lock keyed by the// object key. Postgres uses pg_advisory_xact_lock derived from a// hash of the key; SQLite no-ops because the engine serializes// writers and the in-tx existence probe provides the same// guarantee.AcquireKeyLock(ctxcontext.Context, objectKeystring) error}