s3-orchestrator

postgres

import "github.com/afreidah/s3-orchestrator/internal/store/postgres"

CB-aware sqlc DBTX wrapper for the postgres driver. Every sqlc query - pool-bound or tx-bound - flows through this single chokepoint, which calls breaker.PreCheck before the SQL and breaker.PostCheck after. Advisory locks bypass the breaker by going through pool.Acquire() directly (see advisory_lock.go).

Package store provides PostgreSQL metadata persistence for the S3 orchestrator. metadata tracking, quota enforcement, circuit breaker protection, replication, and rebalancing.

Index

Constants

ExpectedSchemaVersion is the migration version this binary expects. Updated when new migration files are added.

const ExpectedSchemaVersion = 11

type Store

Store manages quota and object location data in PostgreSQL.

type Store struct {
    // contains filtered or unexported fields
}

func NewStore

func NewStore(ctx context.Context, dbCfg *config.DatabaseConfig, cb *breaker.CircuitBreaker) (*Store, error)

NewStore creates a new PostgreSQL store connection using pgxpool. When cb is non-nil, every sqlc query (pool-bound or tx-bound) flows through it via wrapDBTX. Pass nil to skip CB wrapping (test fixtures, migration runners).

func (*Store) BackendObjectStats

func (s *Store) BackendObjectStats(ctx context.Context, backendName string) (int64, int64, error)

BackendObjectStats returns the object count and total bytes stored on a backend.

func (*Store) ClaimPendingCleanups

func (s *Store) ClaimPendingCleanups(ctx context.Context, limit int, instanceID string, graceCutoff time.Time) ([]core.CleanupItem, error)

ClaimPendingCleanups atomically reserves a batch of cleanup rows for the calling instance using FOR UPDATE SKIP LOCKED. See the SQL definition for the eligibility rules; this method is the only path the cleanup worker should use to fetch pending rows.

func (*Store) CleanupDLQDepth

func (s *Store) CleanupDLQDepth(ctx context.Context) (int64, error)

CleanupDLQDepth returns the number of rows currently in cleanup_dlq. Surfaces the count of unrecoverable orphans so the dashboard and the cleanup_dlq_depth gauge can flag operator-visible work.

func (*Store) CleanupQueueDepth

func (s *Store) CleanupQueueDepth(ctx context.Context) (int64, error)

CleanupQueueDepth returns the number of items still pending in the queue.

func (*Store) Close

func (s *Store) Close()

Close closes the connection pool.

func (*Store) CompleteCleanupItem

func (s *Store) CompleteCleanupItem(ctx context.Context, id int64) error

CompleteCleanupItem atomically deletes a successfully-processed row and decrements the backing backend’s orphan_bytes by the row’s size. The underlying SQL is a single CTE so a worker crash between the delete and the decrement cannot leave the counter inconsistent; idempotent against re-claim retries because the CTE is empty when the row is already gone.

func (*Store) CompleteNotification

func (s *Store) CompleteNotification(ctx context.Context, id int64) error

CompleteNotification removes a delivered notification from the outbox. Satisfies the notify.OutboxStore interface.

func (*Store) CountActiveMultipartUploads

func (s *Store) CountActiveMultipartUploads(ctx context.Context, bucketPrefix string) (int64, error)

CountActiveMultipartUploads returns the number of in-progress multipart uploads whose key starts with the given bucket prefix.

func (*Store) CountOverReplicatedObjects

func (s *Store) CountOverReplicatedObjects(ctx context.Context, factor int) (int64, error)

CountOverReplicatedObjects returns the total number of objects with more copies than the target replication factor.

func (*Store) CreateMultipartUpload

func (s *Store) CreateMultipartUpload(ctx context.Context, params *core.CreateMultipartUploadParams) error

CreateMultipartUpload records a new multipart upload in the database. Encryption fields on params are persisted so every UploadPart against the row can reuse the same wrapped DEK; they are zero-length when proxy-side encryption is disabled.

func (*Store) DecrementOrphanBytes

func (s *Store) DecrementOrphanBytes(ctx context.Context, backendName string, amount int64) error

DecrementOrphanBytes subtracts bytes from the orphan_bytes counter for a backend. Called when a cleanup queue item is successfully processed or exhausted (written off).

func (*Store) DeleteBackendData

func (s *Store) DeleteBackendData(ctx context.Context, backendName string) error

DeleteBackendData removes all database records for a backend in FK-safe order. Runs in a single transaction.

func (*Store) DeleteMultipartUpload

func (s *Store) DeleteMultipartUpload(ctx context.Context, uploadID string) error

DeleteMultipartUpload removes a multipart upload and its parts (cascading).

func (*Store) DeleteObject

func (s *Store) DeleteObject(ctx context.Context, key string) ([]core.DeletedCopy, error)

DeleteObject removes all copies of an object and decrements their quotas. Returns all deleted copies, or ErrObjectNotFound if the object doesn’t exist. Delegates to core.DeleteObject.

func (*Store) DeleteObjectLocation

func (s *Store) DeleteObjectLocation(ctx context.Context, key, backendName string) error

DeleteObjectLocation removes a single object_locations row for the given key and backend. Used by drain to remove source copies when a replica exists.

func (*Store) DeleteObjectsBatch

func (s *Store) DeleteObjectsBatch(ctx context.Context, keys []string) (map[string][]core.DeletedCopy, error)

DeleteObjectsBatch delegates to core.DeleteObjectsBatch which removes every supplied key in one transaction and returns per-key displaced copies for backend cleanup.

func (*Store) DeletePending

func (s *Store) DeletePending(ctx context.Context, intentID string) error

DeletePending removes a pending intent. Called by the write path on a successful commit (atomically inside the same transaction as RecordObject) and by the reaper on the HEAD-404 path.

func (*Store) DeletePendingByBackend

func (s *Store) DeletePendingByBackend(ctx context.Context, backendName string) error

DeletePendingByBackend removes every intent for a backend. Called during drain finalization and admin remove so abandoned intents do not block the FK-cascade delete of the backend’s row in backend_quotas.

func (*Store) EnqueueCleanup

func (s *Store) EnqueueCleanup(ctx context.Context, backendName, objectKey, reason string, sizeBytes int64) error

EnqueueCleanup adds a failed cleanup operation to the retry queue.

func (*Store) FlushUsageDeltas

func (s *Store) FlushUsageDeltas(ctx context.Context, backendName, period string, apiRequests, egressBytes, ingressBytes int64) error

FlushUsageDeltas atomically adds accumulated usage deltas to the persistent usage row. Creates the row if it doesn’t exist for this (backend, period).

func (*Store) GetActiveMultipartCounts

func (s *Store) GetActiveMultipartCounts(ctx context.Context) (map[string]int64, error)

GetActiveMultipartCounts returns the number of in-progress multipart uploads per backend.

func (*Store) GetAllObjectLocations

func (s *Store) GetAllObjectLocations(ctx context.Context, key string) ([]core.ObjectLocation, error)

GetAllObjectLocations returns all copies of an object, ordered by created_at ascending (oldest/primary first). Used for read failover.

func (*Store) GetBackendWithSpace

func (s *Store) GetBackendWithSpace(ctx context.Context, size int64, backendOrder []string) (string, error)

GetBackendWithSpace finds a backend with enough quota for the given size. Returns the backend name or ErrNoSpaceAvailable if none have enough space.

func (*Store) GetLeastUtilizedBackend

func (s *Store) GetLeastUtilizedBackend(ctx context.Context, size int64, eligible []string) (string, error)

GetLeastUtilizedBackend finds the backend with the lowest utilization ratio that has enough space for the given size. Used by the “spread” routing strategy.

func (*Store) GetMultipartUpload

func (s *Store) GetMultipartUpload(ctx context.Context, uploadID string) (*core.MultipartUpload, error)

GetMultipartUpload retrieves metadata for a multipart upload.

func (*Store) GetMultipartUploadsByBackend

func (s *Store) GetMultipartUploadsByBackend(ctx context.Context, backendName string) ([]core.MultipartUpload, error)

GetMultipartUploadsByBackend returns all in-progress multipart uploads on the given backend. Used by drain to abort uploads before migrating objects. Requires live PostgreSQL - covered by integration tests.

func (*Store) GetObjectBackendsForKeys

func (s *Store) GetObjectBackendsForKeys(ctx context.Context, keys []string) (map[string][]string, error)

GetObjectBackendsForKeys returns a map from each supplied object_key to the set of backend names that hold a copy. Empty input yields an empty map; keys with no copies are absent from the result. Used by the rebalancer planner to fold the per-key existence check into a single query per batch instead of N+1.

func (*Store) GetObjectCopiesForUpdate

func (s *Store) GetObjectCopiesForUpdate(ctx context.Context, key string) ([]core.ObjectLocation, error)

GetObjectCopiesForUpdate retrieves all copies of an object under a FOR UPDATE lock, suitable for use inside a transaction to prevent concurrent modification during over-replication cleanup.

func (*Store) GetObjectCounts

func (s *Store) GetObjectCounts(ctx context.Context) (map[string]int64, error)

GetObjectCounts returns the number of objects stored on each backend.

func (*Store) GetObjectsWithoutHash

func (s *Store) GetObjectsWithoutHash(ctx context.Context, limit, offset int) ([]core.ObjectLocation, error)

GetObjectsWithoutHash returns object locations that have no stored content hash, ordered by creation time. Used by the backfill command.

func (*Store) GetOverReplicatedObjects

func (s *Store) GetOverReplicatedObjects(ctx context.Context, factor, limit int) ([]core.ObjectLocation, error)

GetOverReplicatedObjects finds objects with more copies than the target replication factor. Returns all rows for those objects so callers can score each copy and decide which to remove.

func (*Store) GetParts

func (s *Store) GetParts(ctx context.Context, uploadID string) ([]core.MultipartPart, error)

GetParts returns all parts for a multipart upload, ordered by part number.

func (*Store) GetPendingCleanups

func (s *Store) GetPendingCleanups(ctx context.Context, limit int) ([]core.CleanupItem, error)

GetPendingCleanups returns a read-only snapshot of pending cleanup rows. Used by the admin endpoint to render the queue. The cleanup worker uses ClaimPendingCleanups instead, which atomically stamps claim columns.

func (*Store) GetPendingNotifications

func (s *Store) GetPendingNotifications(ctx context.Context, limit int) ([]core.NotificationRow, error)

GetPendingNotifications returns outbox rows ready for delivery.

func (*Store) GetQuotaStats

func (s *Store) GetQuotaStats(ctx context.Context) (map[string]core.QuotaStat, error)

GetQuotaStats returns quota statistics for all backends.

func (*Store) GetRandomHashedObjects

func (s *Store) GetRandomHashedObjects(ctx context.Context, limit int) ([]core.ObjectLocation, error)

GetRandomHashedObjects returns random object locations that have a stored content hash. Used by the scrubber to verify data integrity.

func (*Store) GetStaleMultipartUploads

func (s *Store) GetStaleMultipartUploads(ctx context.Context, olderThan time.Duration) ([]core.MultipartUpload, error)

GetStaleMultipartUploads returns uploads older than the given duration.

func (*Store) GetStalePending

func (s *Store) GetStalePending(ctx context.Context, olderThan time.Time, limit int) ([]core.PendingObject, error)

GetStalePending returns pending intents whose created_at is at or before olderThan, oldest first, capped at limit rows. Used by the reaper to resolve intents that have outlived their original PUT’s commit window.

func (*Store) GetUnderReplicatedObjects

func (s *Store) GetUnderReplicatedObjects(ctx context.Context, factor, limit int) ([]core.ObjectLocation, error)

GetUnderReplicatedObjects finds objects with fewer copies than the target replication factor. Returns all rows for those objects so callers know which backends already have copies.

func (*Store) GetUnderReplicatedObjectsExcluding

func (s *Store) GetUnderReplicatedObjectsExcluding(ctx context.Context, factor, limit int, excludedBackends []string) ([]core.ObjectLocation, error)

GetUnderReplicatedObjectsExcluding finds objects with fewer copies than the target factor, ignoring copies on the excluded backends. Returns all rows for those objects so callers know the full picture.

func (*Store) GetUnverifiedObjectCounts

func (s *Store) GetUnverifiedObjectCounts(ctx context.Context) (map[string]int64, error)

GetUnverifiedObjectCounts returns the number of objects per backend whose content_hash column is NULL (objects predating integrity verification or otherwise not yet checksummed). Drives the dashboard’s “needs backfill” column. See #405.

func (*Store) GetUsageForPeriod

func (s *Store) GetUsageForPeriod(ctx context.Context, period string) (map[string]core.UsageStat, error)

GetUsageForPeriod returns usage statistics for all backends in the given period.

func (*Store) ImportObject

func (s *Store) ImportObject(ctx context.Context, key, backend string, size int64) (bool, error)

ImportObject records a pre-existing object in the database without overwriting. Returns true if the object was imported, false if it already existed for this backend. Delegates to core.ImportObject.

func (*Store) IncrementOrphanBytes

func (s *Store) IncrementOrphanBytes(ctx context.Context, backendName string, amount int64) error

IncrementOrphanBytes adds bytes to the orphan_bytes counter for a backend. Called when a physical delete fails and is enqueued for retry.

func (*Store) InsertNotification

func (s *Store) InsertNotification(ctx context.Context, eventType, payload, endpointURL string) error

InsertNotification adds an event to the notification outbox for async webhook delivery. Satisfies the notify.OutboxStore interface.

func (*Store) InsertPending

func (s *Store) InsertPending(ctx context.Context, p *core.PendingObject) error

InsertPending records an in-flight PUT intent. Called before the backend upload so a metadata commit failure cannot silently destroy the prior copy of an overwritten key.

func (*Store) ListAllEncryptedLocations

func (s *Store) ListAllEncryptedLocations(ctx context.Context, limit, offset int) ([]core.DecryptableLocation, error)

ListAllEncryptedLocations returns a page of all encrypted object locations. Used by the decrypt-existing admin endpoint to find objects that need decryption.

func (*Store) ListDirectoryChildren

func (s *Store) ListDirectoryChildren(ctx context.Context, prefix, startAfter string, maxKeys int) (*core.DirectoryListResult, error)

ListDirectoryChildren returns the immediate children of a directory prefix with aggregate stats for subdirectories. Files include backend and creation time. Prefix must end with “/” (or be "" for root).

func (*Store) ListEncryptedLocations

func (s *Store) ListEncryptedLocations(ctx context.Context, keyID string, limit, offset int) ([]core.EncryptedLocation, error)

ListEncryptedLocations returns a page of encrypted object locations filtered by key ID. Used during key rotation to find objects wrapped with the old key.

func (*Store) ListExpiredObjects

func (s *Store) ListExpiredObjects(ctx context.Context, prefix string, cutoff time.Time, limit int) ([]core.ObjectLocation, error)

ListExpiredObjects returns one row per unique key matching the given prefix whose created_at is older than cutoff, up to limit rows. Used by lifecycle expiration to find objects eligible for deletion.

func (*Store) ListMultipartUploads

func (s *Store) ListMultipartUploads(ctx context.Context, prefix string, maxUploads int) ([]core.MultipartUpload, error)

ListMultipartUploads returns in-progress multipart uploads whose key matches the given prefix, up to maxUploads entries.

func (*Store) ListObjects

func (s *Store) ListObjects(ctx context.Context, prefix, startAfter string, maxKeys int) (*core.ListObjectsResult, error)

ListObjects returns objects matching the given prefix, sorted by key. Supports pagination via startAfter and maxKeys. Returns one extra row to detect truncation.

func (*Store) ListObjectsByBackend

func (s *Store) ListObjectsByBackend(ctx context.Context, backendName string, limit int) ([]core.ObjectLocation, error)

ListObjectsByBackend returns objects stored on a specific backend, ordered by size ascending (smallest first). Used by the rebalancer to find movable objects.

func (*Store) ListObjectsByBackendKeyAsc

func (s *Store) ListObjectsByBackendKeyAsc(ctx context.Context, backendName, afterKey string, limit int) ([]core.ObjectLocation, error)

ListObjectsByBackendKeyAsc returns rows for a backend in ascending object_key order, starting strictly after the supplied cursor. The empty string returns the first page. Used by ReconcileBackend to drive a bounded-memory sorted-merge join against an S3 ListObjects walk; both sides are in lex order so the merge is O(n) memory bounded by limit.

func (*Store) ListUnencryptedLocations

func (s *Store) ListUnencryptedLocations(ctx context.Context, limit, offset int) ([]core.UnencryptedLocation, error)

ListUnencryptedLocations returns a page of unencrypted object locations. Used by the encrypt-existing admin endpoint to find objects that need encryption.

func (*Store) MarkObjectDecrypted

func (s *Store) MarkObjectDecrypted(ctx context.Context, objectKey, backendName string, plaintextSize int64) error

MarkObjectDecrypted updates a single object location to record that it has been decrypted. Clears the encryption flag, wrapped DEK, key ID, and plaintext size, and updates size_bytes to the plaintext size. The transaction reads the current ciphertext size before overwriting it so backend_quotas.bytes_used can be advanced by plaintextSize - currentSize (a negative delta because plaintext is smaller than ciphertext). Without this the counter drifts permanently from SUM(object_locations.size_bytes) and write-routing rejects writes that should succeed.

func (*Store) MarkObjectEncrypted

func (s *Store) MarkObjectEncrypted(ctx context.Context, objectKey, backendName string, encryptionKey []byte, keyID string, plaintextSize, ciphertextSize int64) error

MarkObjectEncrypted updates a single object location to record that it has been encrypted. Sets the encryption flag, wrapped DEK, key ID, plaintext size, and updates size_bytes to the ciphertext size. The transaction also advances backend_quotas.bytes_used by ciphertextSize - plaintextSize so the per-backend counter stays in step with the on-disk byte count after the bulk encrypt-existing rewrite path. Without the quota update the counter drifts permanently from SUM(object_locations.size_bytes) and write-routing silently overcommits.

func (*Store) MoveCleanupToDLQ

func (s *Store) MoveCleanupToDLQ(ctx context.Context, id int64, lastError string) (bool, error)

MoveCleanupToDLQ atomically graduates an exhausted cleanup_queue row to the dead-letter table. Delegates to core.MoveCleanupToDLQ so both engines share the move semantics - notably that orphan_bytes is left untouched because the backend object is still on disk.

func (*Store) MoveObjectLocation

func (s *Store) MoveObjectLocation(ctx context.Context, key, fromBackend, toBackend string) (int64, error)

MoveObjectLocation atomically moves a copy of an object from one backend to another. Returns (0, nil) if the source copy is gone or the target already has a copy. Delegates to core.MoveObjectLocation.

func (*Store) PendingDepth

func (s *Store) PendingDepth(ctx context.Context) (int64, error)

PendingDepth returns the total number of pending intents. Used by the reaper to publish a depth gauge.

func (*Store) PromotePending

func (s *Store) PromotePending(ctx context.Context, p *core.PendingObject) (core.PendingPromoteResult, []core.DeletedCopy, error)

PromotePending resolves a pending intent transactionally. Delegates to core.PromotePending which composes the lock, supersession check, commit, and same-tx pending delete against the per-engine TxAdapter.

func (*Store) RecordObject

func (s *Store) RecordObject(ctx context.Context, key, backend string, size int64, enc *core.EncryptionMeta) ([]core.DeletedCopy, error)

RecordObject atomically inserts or updates an object location, handling overwrites by returning displaced copies for cleanup. Delegates to core.RecordObject which composes lock, displacement, insert, and quota update against the postgres TxAdapter.

func (*Store) RecordObjectAndClearPending

func (s *Store) RecordObjectAndClearPending(ctx context.Context, key, backend string, size int64, enc *core.EncryptionMeta, intentID string) ([]core.DeletedCopy, error)

RecordObjectAndClearPending performs the same atomic commit as RecordObject and additionally deletes the matching pending_objects intent inside the same transaction. Delegates to core.

func (*Store) RecordPart

func (s *Store) RecordPart(ctx context.Context, uploadID string, partNumber int, etag string, size int64, enc *core.EncryptionMeta) error

RecordPart records a completed part for a multipart upload. S3 spec requires part numbers between 1 and 10000.

func (*Store) RecordReplica

func (s *Store) RecordReplica(ctx context.Context, key, targetBackend, sourceBackend string) (int64, bool, error)

RecordReplica inserts a replica copy of an object, but only if the source copy still exists. Delegates to core.RecordReplica.

func (*Store) RemoveExcessCopy

func (s *Store) RemoveExcessCopy(ctx context.Context, key, backendName string, size int64) error

RemoveExcessCopy deletes one copy of an object from the given backend inside a transaction, decrementing the backend quota atomically. Delegates to core.RemoveExcessCopy.

func (*Store) RetryCleanupItem

func (s *Store) RetryCleanupItem(ctx context.Context, id int64, backoff time.Duration, lastError string) error

RetryCleanupItem increments the attempt counter, schedules the next retry, and clears the claim so the row is immediately re-eligible for the next worker tick.

func (*Store) RetryNotification

func (s *Store) RetryNotification(ctx context.Context, id int64, backoff time.Duration, lastError string) error

RetryNotification increments the attempt counter and schedules the next retry. Satisfies the notify.OutboxStore interface.

func (*Store) RunMigrations

func (s *Store) RunMigrations(ctx context.Context) error

RunMigrations applies versioned database migrations using goose. Migrations are embedded in the binary and applied in order. Already-applied migrations are skipped automatically via the goose_db_version tracking table.

func (*Store) SweepStaleCleanupQueueRows

func (s *Store) SweepStaleCleanupQueueRows(ctx context.Context, key, backend string) (int64, error)

SweepStaleCleanupQueueRows removes every cleanup_queue row matching the (object_key, backend_name) pair and decrements the backend’s orphan_bytes counter by the sum of their size_bytes. Delegates to core.SweepStaleCleanupQueueRows.

func (*Store) SyncQuotaLimits

func (s *Store) SyncQuotaLimits(ctx context.Context, backends []config.BackendConfig) error

SyncQuotaLimits ensures the backend_quotas table has entries for all configured backends with their quota limits. Creates new entries or updates existing limits. All updates happen in a single transaction for atomicity.

func (*Store) UpdateContentHash

func (s *Store) UpdateContentHash(ctx context.Context, key, backendName, hash string) error

UpdateContentHash sets the content hash for an object location.

func (*Store) UpdateEncryptionKey

func (s *Store) UpdateEncryptionKey(ctx context.Context, objectKey, backendName string, newEncryptionKey []byte, newKeyID string) error

UpdateEncryptionKey re-wraps a single object’s encryption key. Used during key rotation to replace the old wrapped DEK with one wrapped by the new key.

func (*Store) VerifySchemaVersion

func (s *Store) VerifySchemaVersion(ctx context.Context) error

VerifySchemaVersion checks that the database schema version matches what this binary expects. Returns an error if the schema is older than expected (partial migration failure). Logs a warning if the schema is newer (possible downgrade).

func (*Store) WithAdvisoryLock

func (s *Store) WithAdvisoryLock(ctx context.Context, lockID int64, fn func(ctx context.Context) error) (bool, error)

WithAdvisoryLock acquires a PostgreSQL session-level advisory lock on a dedicated connection from the pool. If the lock is acquired, fn runs and the connection is released (which releases the lock). If another session holds the lock, returns (false, nil). On DB error, returns (false, err).

func (*Store) WithTx

func (s *Store) WithTx(ctx context.Context, fn func(ctx context.Context, tx core.TxAdapter) error) error

WithTx satisfies core.Runner by opening a transaction, wrapping the sqlc Queries in a pgTxAdapter, and invoking fn. Commits on a nil return; rolls back otherwise. Lets engine-agnostic core helpers orchestrate multi-statement operations against the postgres engine.

Generated by gomarkdoc