
postgres
CB-aware sqlc DBTX wrapper for the postgres driver. Every sqlc query - pool-bound or tx-bound - flows through this single chokepoint, which calls breaker.PreCheck before the SQL and breaker.PostCheck after. Advisory locks bypass the breaker by going through pool.Acquire() directly (see advisory_lock.go).
Package store provides PostgreSQL metadata persistence for the S3 orchestrator. metadata tracking, quota enforcement, circuit breaker protection, replication, and rebalancing.
Index
- Constants
- type Store
- func NewStore(ctx context.Context, dbCfg *config.DatabaseConfig, cb *breaker.CircuitBreaker) (*Store, error)
- func (s *Store) BackendObjectStats(ctx context.Context, backendName string) (int64, int64, error)
- func (s *Store) ClaimPendingCleanups(ctx context.Context, limit int, instanceID string, graceCutoff time.Time) ([]core.CleanupItem, error)
- func (s *Store) CleanupDLQDepth(ctx context.Context) (int64, error)
- func (s *Store) CleanupQueueDepth(ctx context.Context) (int64, error)
- func (s *Store) Close()
- func (s *Store) CompleteCleanupItem(ctx context.Context, id int64) error
- func (s *Store) CompleteNotification(ctx context.Context, id int64) error
- func (s *Store) CountActiveMultipartUploads(ctx context.Context, bucketPrefix string) (int64, error)
- func (s *Store) CountOverReplicatedObjects(ctx context.Context, factor int) (int64, error)
- func (s *Store) CreateMultipartUpload(ctx context.Context, params *core.CreateMultipartUploadParams) error
- func (s *Store) DecrementOrphanBytes(ctx context.Context, backendName string, amount int64) error
- func (s *Store) DeleteBackendData(ctx context.Context, backendName string) error
- func (s *Store) DeleteMultipartUpload(ctx context.Context, uploadID string) error
- func (s *Store) DeleteObject(ctx context.Context, key string) ([]core.DeletedCopy, error)
- func (s *Store) DeleteObjectLocation(ctx context.Context, key, backendName string) error
- func (s *Store) DeleteObjectsBatch(ctx context.Context, keys []string) (map[string][]core.DeletedCopy, error)
- func (s *Store) DeletePending(ctx context.Context, intentID string) error
- func (s *Store) DeletePendingByBackend(ctx context.Context, backendName string) error
- func (s *Store) EnqueueCleanup(ctx context.Context, backendName, objectKey, reason string, sizeBytes int64) error
- func (s *Store) FlushUsageDeltas(ctx context.Context, backendName, period string, apiRequests, egressBytes, ingressBytes int64) error
- func (s *Store) GetActiveMultipartCounts(ctx context.Context) (map[string]int64, error)
- func (s *Store) GetAllObjectLocations(ctx context.Context, key string) ([]core.ObjectLocation, error)
- func (s *Store) GetBackendWithSpace(ctx context.Context, size int64, backendOrder []string) (string, error)
- func (s *Store) GetLeastUtilizedBackend(ctx context.Context, size int64, eligible []string) (string, error)
- func (s *Store) GetMultipartUpload(ctx context.Context, uploadID string) (*core.MultipartUpload, error)
- func (s *Store) GetMultipartUploadsByBackend(ctx context.Context, backendName string) ([]core.MultipartUpload, error)
- func (s *Store) GetObjectBackendsForKeys(ctx context.Context, keys []string) (map[string][]string, error)
- func (s *Store) GetObjectCopiesForUpdate(ctx context.Context, key string) ([]core.ObjectLocation, error)
- func (s *Store) GetObjectCounts(ctx context.Context) (map[string]int64, error)
- func (s *Store) GetObjectsWithoutHash(ctx context.Context, limit, offset int) ([]core.ObjectLocation, error)
- func (s *Store) GetOverReplicatedObjects(ctx context.Context, factor, limit int) ([]core.ObjectLocation, error)
- func (s *Store) GetParts(ctx context.Context, uploadID string) ([]core.MultipartPart, error)
- func (s *Store) GetPendingCleanups(ctx context.Context, limit int) ([]core.CleanupItem, error)
- func (s *Store) GetPendingNotifications(ctx context.Context, limit int) ([]core.NotificationRow, error)
- func (s *Store) GetQuotaStats(ctx context.Context) (map[string]core.QuotaStat, error)
- func (s *Store) GetRandomHashedObjects(ctx context.Context, limit int) ([]core.ObjectLocation, error)
- func (s *Store) GetStaleMultipartUploads(ctx context.Context, olderThan time.Duration) ([]core.MultipartUpload, error)
- func (s *Store) GetStalePending(ctx context.Context, olderThan time.Time, limit int) ([]core.PendingObject, error)
- func (s *Store) GetUnderReplicatedObjects(ctx context.Context, factor, limit int) ([]core.ObjectLocation, error)
- func (s *Store) GetUnderReplicatedObjectsExcluding(ctx context.Context, factor, limit int, excludedBackends []string) ([]core.ObjectLocation, error)
- func (s *Store) GetUnverifiedObjectCounts(ctx context.Context) (map[string]int64, error)
- func (s *Store) GetUsageForPeriod(ctx context.Context, period string) (map[string]core.UsageStat, error)
- func (s *Store) ImportObject(ctx context.Context, key, backend string, size int64) (bool, error)
- func (s *Store) IncrementOrphanBytes(ctx context.Context, backendName string, amount int64) error
- func (s *Store) InsertNotification(ctx context.Context, eventType, payload, endpointURL string) error
- func (s *Store) InsertPending(ctx context.Context, p *core.PendingObject) error
- func (s *Store) ListAllEncryptedLocations(ctx context.Context, limit, offset int) ([]core.DecryptableLocation, error)
- func (s *Store) ListDirectoryChildren(ctx context.Context, prefix, startAfter string, maxKeys int) (*core.DirectoryListResult, error)
- func (s *Store) ListEncryptedLocations(ctx context.Context, keyID string, limit, offset int) ([]core.EncryptedLocation, error)
- func (s *Store) ListExpiredObjects(ctx context.Context, prefix string, cutoff time.Time, limit int) ([]core.ObjectLocation, error)
- func (s *Store) ListMultipartUploads(ctx context.Context, prefix string, maxUploads int) ([]core.MultipartUpload, error)
- func (s *Store) ListObjects(ctx context.Context, prefix, startAfter string, maxKeys int) (*core.ListObjectsResult, error)
- func (s *Store) ListObjectsByBackend(ctx context.Context, backendName string, limit int) ([]core.ObjectLocation, error)
- func (s *Store) ListObjectsByBackendKeyAsc(ctx context.Context, backendName, afterKey string, limit int) ([]core.ObjectLocation, error)
- func (s *Store) ListUnencryptedLocations(ctx context.Context, limit, offset int) ([]core.UnencryptedLocation, error)
- func (s *Store) MarkObjectDecrypted(ctx context.Context, objectKey, backendName string, plaintextSize int64) error
- func (s *Store) MarkObjectEncrypted(ctx context.Context, objectKey, backendName string, encryptionKey []byte, keyID string, plaintextSize, ciphertextSize int64) error
- func (s *Store) MoveCleanupToDLQ(ctx context.Context, id int64, lastError string) (bool, error)
- func (s *Store) MoveObjectLocation(ctx context.Context, key, fromBackend, toBackend string) (int64, error)
- func (s *Store) PendingDepth(ctx context.Context) (int64, error)
- func (s *Store) PromotePending(ctx context.Context, p *core.PendingObject) (core.PendingPromoteResult, []core.DeletedCopy, error)
- func (s *Store) RecordObject(ctx context.Context, key, backend string, size int64, enc *core.EncryptionMeta) ([]core.DeletedCopy, error)
- func (s *Store) RecordObjectAndClearPending(ctx context.Context, key, backend string, size int64, enc *core.EncryptionMeta, intentID string) ([]core.DeletedCopy, error)
- func (s *Store) RecordPart(ctx context.Context, uploadID string, partNumber int, etag string, size int64, enc *core.EncryptionMeta) error
- func (s *Store) RecordReplica(ctx context.Context, key, targetBackend, sourceBackend string) (int64, bool, error)
- func (s *Store) RemoveExcessCopy(ctx context.Context, key, backendName string, size int64) error
- func (s *Store) RetryCleanupItem(ctx context.Context, id int64, backoff time.Duration, lastError string) error
- func (s *Store) RetryNotification(ctx context.Context, id int64, backoff time.Duration, lastError string) error
- func (s *Store) RunMigrations(ctx context.Context) error
- func (s *Store) SweepStaleCleanupQueueRows(ctx context.Context, key, backend string) (int64, error)
- func (s *Store) SyncQuotaLimits(ctx context.Context, backends []config.BackendConfig) error
- func (s *Store) UpdateContentHash(ctx context.Context, key, backendName, hash string) error
- func (s *Store) UpdateEncryptionKey(ctx context.Context, objectKey, backendName string, newEncryptionKey []byte, newKeyID string) error
- func (s *Store) VerifySchemaVersion(ctx context.Context) error
- func (s *Store) WithAdvisoryLock(ctx context.Context, lockID int64, fn func(ctx context.Context) error) (bool, error)
- func (s *Store) WithTx(ctx context.Context, fn func(ctx context.Context, tx core.TxAdapter) error) error
Constants
ExpectedSchemaVersion is the migration version this binary expects. Updated when new migration files are added.
type Store
Store manages quota and object location data in PostgreSQL.
func NewStore
NewStore creates a new PostgreSQL store connection using pgxpool. When cb is non-nil, every sqlc query (pool-bound or tx-bound) flows through it via wrapDBTX. Pass nil to skip CB wrapping (test fixtures, migration runners).
func (*Store) BackendObjectStats
BackendObjectStats returns the object count and total bytes stored on a backend.
func (*Store) ClaimPendingCleanups
ClaimPendingCleanups atomically reserves a batch of cleanup rows for the calling instance using FOR UPDATE SKIP LOCKED. See the SQL definition for the eligibility rules; this method is the only path the cleanup worker should use to fetch pending rows.
func (*Store) CleanupDLQDepth
CleanupDLQDepth returns the number of rows currently in cleanup_dlq. Surfaces the count of unrecoverable orphans so the dashboard and the cleanup_dlq_depth gauge can flag operator-visible work.
func (*Store) CleanupQueueDepth
CleanupQueueDepth returns the number of items still pending in the queue.
func (*Store) Close
Close closes the connection pool.
func (*Store) CompleteCleanupItem
CompleteCleanupItem atomically deletes a successfully-processed row and decrements the backing backend’s orphan_bytes by the row’s size. The underlying SQL is a single CTE so a worker crash between the delete and the decrement cannot leave the counter inconsistent; idempotent against re-claim retries because the CTE is empty when the row is already gone.
func (*Store) CompleteNotification
CompleteNotification removes a delivered notification from the outbox. Satisfies the notify.OutboxStore interface.
func (*Store) CountActiveMultipartUploads
CountActiveMultipartUploads returns the number of in-progress multipart uploads whose key starts with the given bucket prefix.
func (*Store) CountOverReplicatedObjects
CountOverReplicatedObjects returns the total number of objects with more copies than the target replication factor.
func (*Store) CreateMultipartUpload
CreateMultipartUpload records a new multipart upload in the database. Encryption fields on params are persisted so every UploadPart against the row can reuse the same wrapped DEK; they are zero-length when proxy-side encryption is disabled.
func (*Store) DecrementOrphanBytes
DecrementOrphanBytes subtracts bytes from the orphan_bytes counter for a backend. Called when a cleanup queue item is successfully processed or exhausted (written off).
func (*Store) DeleteBackendData
DeleteBackendData removes all database records for a backend in FK-safe order. Runs in a single transaction.
func (*Store) DeleteMultipartUpload
DeleteMultipartUpload removes a multipart upload and its parts (cascading).
func (*Store) DeleteObject
DeleteObject removes all copies of an object and decrements their quotas. Returns all deleted copies, or ErrObjectNotFound if the object doesn’t exist. Delegates to core.DeleteObject.
func (*Store) DeleteObjectLocation
DeleteObjectLocation removes a single object_locations row for the given key and backend. Used by drain to remove source copies when a replica exists.
func (*Store) DeleteObjectsBatch
DeleteObjectsBatch delegates to core.DeleteObjectsBatch which removes every supplied key in one transaction and returns per-key displaced copies for backend cleanup.
func (*Store) DeletePending
DeletePending removes a pending intent. Called by the write path on a successful commit (atomically inside the same transaction as RecordObject) and by the reaper on the HEAD-404 path.
func (*Store) DeletePendingByBackend
DeletePendingByBackend removes every intent for a backend. Called during drain finalization and admin remove so abandoned intents do not block the FK-cascade delete of the backend’s row in backend_quotas.
func (*Store) EnqueueCleanup
EnqueueCleanup adds a failed cleanup operation to the retry queue.
func (*Store) FlushUsageDeltas
FlushUsageDeltas atomically adds accumulated usage deltas to the persistent usage row. Creates the row if it doesn’t exist for this (backend, period).
func (*Store) GetActiveMultipartCounts
GetActiveMultipartCounts returns the number of in-progress multipart uploads per backend.
func (*Store) GetAllObjectLocations
GetAllObjectLocations returns all copies of an object, ordered by created_at ascending (oldest/primary first). Used for read failover.
func (*Store) GetBackendWithSpace
GetBackendWithSpace finds a backend with enough quota for the given size. Returns the backend name or ErrNoSpaceAvailable if none have enough space.
func (*Store) GetLeastUtilizedBackend
GetLeastUtilizedBackend finds the backend with the lowest utilization ratio that has enough space for the given size. Used by the “spread” routing strategy.
func (*Store) GetMultipartUpload
GetMultipartUpload retrieves metadata for a multipart upload.
func (*Store) GetMultipartUploadsByBackend
GetMultipartUploadsByBackend returns all in-progress multipart uploads on the given backend. Used by drain to abort uploads before migrating objects. Requires live PostgreSQL - covered by integration tests.
func (*Store) GetObjectBackendsForKeys
GetObjectBackendsForKeys returns a map from each supplied object_key to the set of backend names that hold a copy. Empty input yields an empty map; keys with no copies are absent from the result. Used by the rebalancer planner to fold the per-key existence check into a single query per batch instead of N+1.
func (*Store) GetObjectCopiesForUpdate
GetObjectCopiesForUpdate retrieves all copies of an object under a FOR UPDATE lock, suitable for use inside a transaction to prevent concurrent modification during over-replication cleanup.
func (*Store) GetObjectCounts
GetObjectCounts returns the number of objects stored on each backend.
func (*Store) GetObjectsWithoutHash
GetObjectsWithoutHash returns object locations that have no stored content hash, ordered by creation time. Used by the backfill command.
func (*Store) GetOverReplicatedObjects
GetOverReplicatedObjects finds objects with more copies than the target replication factor. Returns all rows for those objects so callers can score each copy and decide which to remove.
func (*Store) GetParts
GetParts returns all parts for a multipart upload, ordered by part number.
func (*Store) GetPendingCleanups
GetPendingCleanups returns a read-only snapshot of pending cleanup rows. Used by the admin endpoint to render the queue. The cleanup worker uses ClaimPendingCleanups instead, which atomically stamps claim columns.
func (*Store) GetPendingNotifications
GetPendingNotifications returns outbox rows ready for delivery.
func (*Store) GetQuotaStats
GetQuotaStats returns quota statistics for all backends.
func (*Store) GetRandomHashedObjects
GetRandomHashedObjects returns random object locations that have a stored content hash. Used by the scrubber to verify data integrity.
func (*Store) GetStaleMultipartUploads
GetStaleMultipartUploads returns uploads older than the given duration.
func (*Store) GetStalePending
GetStalePending returns pending intents whose created_at is at or before olderThan, oldest first, capped at limit rows. Used by the reaper to resolve intents that have outlived their original PUT’s commit window.
func (*Store) GetUnderReplicatedObjects
GetUnderReplicatedObjects finds objects with fewer copies than the target replication factor. Returns all rows for those objects so callers know which backends already have copies.
func (*Store) GetUnderReplicatedObjectsExcluding
GetUnderReplicatedObjectsExcluding finds objects with fewer copies than the target factor, ignoring copies on the excluded backends. Returns all rows for those objects so callers know the full picture.
func (*Store) GetUnverifiedObjectCounts
GetUnverifiedObjectCounts returns the number of objects per backend whose content_hash column is NULL (objects predating integrity verification or otherwise not yet checksummed). Drives the dashboard’s “needs backfill” column. See #405.
func (*Store) GetUsageForPeriod
GetUsageForPeriod returns usage statistics for all backends in the given period.
func (*Store) ImportObject
ImportObject records a pre-existing object in the database without overwriting. Returns true if the object was imported, false if it already existed for this backend. Delegates to core.ImportObject.
func (*Store) IncrementOrphanBytes
IncrementOrphanBytes adds bytes to the orphan_bytes counter for a backend. Called when a physical delete fails and is enqueued for retry.
func (*Store) InsertNotification
InsertNotification adds an event to the notification outbox for async webhook delivery. Satisfies the notify.OutboxStore interface.
func (*Store) InsertPending
InsertPending records an in-flight PUT intent. Called before the backend upload so a metadata commit failure cannot silently destroy the prior copy of an overwritten key.
func (*Store) ListAllEncryptedLocations
ListAllEncryptedLocations returns a page of all encrypted object locations. Used by the decrypt-existing admin endpoint to find objects that need decryption.
func (*Store) ListDirectoryChildren
ListDirectoryChildren returns the immediate children of a directory prefix with aggregate stats for subdirectories. Files include backend and creation time. Prefix must end with “/” (or be "" for root).
func (*Store) ListEncryptedLocations
ListEncryptedLocations returns a page of encrypted object locations filtered by key ID. Used during key rotation to find objects wrapped with the old key.
func (*Store) ListExpiredObjects
ListExpiredObjects returns one row per unique key matching the given prefix whose created_at is older than cutoff, up to limit rows. Used by lifecycle expiration to find objects eligible for deletion.
func (*Store) ListMultipartUploads
ListMultipartUploads returns in-progress multipart uploads whose key matches the given prefix, up to maxUploads entries.
func (*Store) ListObjects
ListObjects returns objects matching the given prefix, sorted by key. Supports pagination via startAfter and maxKeys. Returns one extra row to detect truncation.
func (*Store) ListObjectsByBackend
ListObjectsByBackend returns objects stored on a specific backend, ordered by size ascending (smallest first). Used by the rebalancer to find movable objects.
func (*Store) ListObjectsByBackendKeyAsc
ListObjectsByBackendKeyAsc returns rows for a backend in ascending object_key order, starting strictly after the supplied cursor. The empty string returns the first page. Used by ReconcileBackend to drive a bounded-memory sorted-merge join against an S3 ListObjects walk; both sides are in lex order so the merge is O(n) memory bounded by limit.
func (*Store) ListUnencryptedLocations
ListUnencryptedLocations returns a page of unencrypted object locations. Used by the encrypt-existing admin endpoint to find objects that need encryption.
func (*Store) MarkObjectDecrypted
MarkObjectDecrypted updates a single object location to record that it has been decrypted. Clears the encryption flag, wrapped DEK, key ID, and plaintext size, and updates size_bytes to the plaintext size. The transaction reads the current ciphertext size before overwriting it so backend_quotas.bytes_used can be advanced by plaintextSize - currentSize (a negative delta because plaintext is smaller than ciphertext). Without this the counter drifts permanently from SUM(object_locations.size_bytes) and write-routing rejects writes that should succeed.
func (*Store) MarkObjectEncrypted
MarkObjectEncrypted updates a single object location to record that it has been encrypted. Sets the encryption flag, wrapped DEK, key ID, plaintext size, and updates size_bytes to the ciphertext size. The transaction also advances backend_quotas.bytes_used by ciphertextSize - plaintextSize so the per-backend counter stays in step with the on-disk byte count after the bulk encrypt-existing rewrite path. Without the quota update the counter drifts permanently from SUM(object_locations.size_bytes) and write-routing silently overcommits.
func (*Store) MoveCleanupToDLQ
MoveCleanupToDLQ atomically graduates an exhausted cleanup_queue row to the dead-letter table. Delegates to core.MoveCleanupToDLQ so both engines share the move semantics - notably that orphan_bytes is left untouched because the backend object is still on disk.
func (*Store) MoveObjectLocation
MoveObjectLocation atomically moves a copy of an object from one backend to another. Returns (0, nil) if the source copy is gone or the target already has a copy. Delegates to core.MoveObjectLocation.
func (*Store) PendingDepth
PendingDepth returns the total number of pending intents. Used by the reaper to publish a depth gauge.
func (*Store) PromotePending
PromotePending resolves a pending intent transactionally. Delegates to core.PromotePending which composes the lock, supersession check, commit, and same-tx pending delete against the per-engine TxAdapter.
func (*Store) RecordObject
RecordObject atomically inserts or updates an object location, handling overwrites by returning displaced copies for cleanup. Delegates to core.RecordObject which composes lock, displacement, insert, and quota update against the postgres TxAdapter.
func (*Store) RecordObjectAndClearPending
RecordObjectAndClearPending performs the same atomic commit as RecordObject and additionally deletes the matching pending_objects intent inside the same transaction. Delegates to core.
func (*Store) RecordPart
RecordPart records a completed part for a multipart upload. S3 spec requires part numbers between 1 and 10000.
func (*Store) RecordReplica
RecordReplica inserts a replica copy of an object, but only if the source copy still exists. Delegates to core.RecordReplica.
func (*Store) RemoveExcessCopy
RemoveExcessCopy deletes one copy of an object from the given backend inside a transaction, decrementing the backend quota atomically. Delegates to core.RemoveExcessCopy.
func (*Store) RetryCleanupItem
RetryCleanupItem increments the attempt counter, schedules the next retry, and clears the claim so the row is immediately re-eligible for the next worker tick.
func (*Store) RetryNotification
RetryNotification increments the attempt counter and schedules the next retry. Satisfies the notify.OutboxStore interface.
func (*Store) RunMigrations
RunMigrations applies versioned database migrations using goose. Migrations are embedded in the binary and applied in order. Already-applied migrations are skipped automatically via the goose_db_version tracking table.
func (*Store) SweepStaleCleanupQueueRows
SweepStaleCleanupQueueRows removes every cleanup_queue row matching the (object_key, backend_name) pair and decrements the backend’s orphan_bytes counter by the sum of their size_bytes. Delegates to core.SweepStaleCleanupQueueRows.
func (*Store) SyncQuotaLimits
SyncQuotaLimits ensures the backend_quotas table has entries for all configured backends with their quota limits. Creates new entries or updates existing limits. All updates happen in a single transaction for atomicity.
func (*Store) UpdateContentHash
UpdateContentHash sets the content hash for an object location.
func (*Store) UpdateEncryptionKey
UpdateEncryptionKey re-wraps a single object’s encryption key. Used during key rotation to replace the old wrapped DEK with one wrapped by the new key.
func (*Store) VerifySchemaVersion
VerifySchemaVersion checks that the database schema version matches what this binary expects. Returns an error if the schema is older than expected (partial migration failure). Logs a warning if the schema is newer (possible downgrade).
func (*Store) WithAdvisoryLock
WithAdvisoryLock acquires a PostgreSQL session-level advisory lock on a dedicated connection from the pool. If the lock is acquired, fn runs and the connection is released (which releases the lock). If another session holds the lock, returns (false, nil). On DB error, returns (false, err).
func (*Store) WithTx
WithTx satisfies core.Runner by opening a transaction, wrapping the sqlc Queries in a pgTxAdapter, and invoking fn. Commits on a nil return; rolls back otherwise. Lets engine-agnostic core helpers orchestrate multi-statement operations against the postgres engine.
Generated by gomarkdoc