
sqlite
CB-aware *sql.DB wrapper for the sqlite driver. Every statement the store fires - direct or transaction-bound - flows through this single chokepoint, which calls breaker.PreCheck before the call and breaker.PostCheck after. Advisory locks emulate a process-local mutex and never touch *sql.DB, so they bypass the breaker.
Package sqlite implements every core store role plus the admin roles using an embedded SQLite database via modernc.org/sqlite. WAL mode handles concurrent reads, a process-local mutex emulates advisory locks, and the schema migrates on first start.
Index
- type Store
- func NewStore(ctx context.Context, dbCfg *config.DatabaseConfig, cb *breaker.CircuitBreaker) (*Store, error)
- func (s *Store) BackendObjectStats(ctx context.Context, backendName string) (int64, int64, error)
- func (s *Store) ClaimPendingCleanups(ctx context.Context, limit int, instanceID string, graceCutoff time.Time) ([]core.CleanupItem, error)
- func (s *Store) CleanupDLQDepth(ctx context.Context) (int64, error)
- func (s *Store) CleanupQueueDepth(ctx context.Context) (int64, error)
- func (s *Store) Close()
- func (s *Store) CompleteCleanupItem(ctx context.Context, id int64) error
- func (s *Store) CompleteNotification(ctx context.Context, id int64) error
- func (s *Store) CountActiveMultipartUploads(ctx context.Context, bucketPrefix string) (int64, error)
- func (s *Store) CountOverReplicatedObjects(ctx context.Context, factor int) (int64, error)
- func (s *Store) CreateMultipartUpload(ctx context.Context, params *core.CreateMultipartUploadParams) error
- func (s *Store) DecrementOrphanBytes(ctx context.Context, backendName string, amount int64) error
- func (s *Store) DeleteBackendData(ctx context.Context, backendName string) error
- func (s *Store) DeleteMultipartUpload(ctx context.Context, uploadID string) error
- func (s *Store) DeleteObject(ctx context.Context, key string) ([]core.DeletedCopy, error)
- func (s *Store) DeleteObjectLocation(ctx context.Context, key, backendName string) error
- func (s *Store) DeleteObjectsBatch(ctx context.Context, keys []string) (map[string][]core.DeletedCopy, error)
- func (s *Store) DeletePending(ctx context.Context, intentID string) error
- func (s *Store) DeletePendingByBackend(ctx context.Context, backendName string) error
- func (s *Store) EnqueueCleanup(ctx context.Context, backendName, objectKey, reason string, sizeBytes int64) error
- func (s *Store) FlushUsageDeltas(ctx context.Context, backendName, period string, apiRequests, egressBytes, ingressBytes int64) error
- func (s *Store) GetActiveMultipartCounts(ctx context.Context) (map[string]int64, error)
- func (s *Store) GetAllObjectLocations(ctx context.Context, key string) ([]core.ObjectLocation, error)
- func (s *Store) GetBackendWithSpace(ctx context.Context, size int64, backendOrder []string) (string, error)
- func (s *Store) GetLeastUtilizedBackend(ctx context.Context, size int64, eligible []string) (string, error)
- func (s *Store) GetMultipartUpload(ctx context.Context, uploadID string) (*core.MultipartUpload, error)
- func (s *Store) GetMultipartUploadsByBackend(ctx context.Context, backendName string) ([]core.MultipartUpload, error)
- func (s *Store) GetObjectBackendsForKeys(ctx context.Context, keys []string) (map[string][]string, error)
- func (s *Store) GetObjectCounts(ctx context.Context) (map[string]int64, error)
- func (s *Store) GetObjectsWithoutHash(ctx context.Context, limit, offset int) ([]core.ObjectLocation, error)
- func (s *Store) GetOverReplicatedObjects(ctx context.Context, factor, limit int) ([]core.ObjectLocation, error)
- func (s *Store) GetParts(ctx context.Context, uploadID string) ([]core.MultipartPart, error)
- func (s *Store) GetPendingCleanups(ctx context.Context, limit int) ([]core.CleanupItem, error)
- func (s *Store) GetPendingNotifications(ctx context.Context, limit int) ([]core.NotificationRow, error)
- func (s *Store) GetQuotaStats(ctx context.Context) (map[string]core.QuotaStat, error)
- func (s *Store) GetRandomHashedObjects(ctx context.Context, limit int) ([]core.ObjectLocation, error)
- func (s *Store) GetStaleMultipartUploads(ctx context.Context, olderThan time.Duration) ([]core.MultipartUpload, error)
- func (s *Store) GetStalePending(ctx context.Context, olderThan time.Time, limit int) ([]core.PendingObject, error)
- func (s *Store) GetUnderReplicatedObjects(ctx context.Context, factor, limit int) ([]core.ObjectLocation, error)
- func (s *Store) GetUnderReplicatedObjectsExcluding(ctx context.Context, factor, limit int, excludedBackends []string) ([]core.ObjectLocation, error)
- func (s *Store) GetUnverifiedObjectCounts(ctx context.Context) (map[string]int64, error)
- func (s *Store) GetUsageForPeriod(ctx context.Context, period string) (map[string]core.UsageStat, error)
- func (s *Store) ImportObject(ctx context.Context, key, backend string, size int64) (bool, error)
- func (s *Store) IncrementOrphanBytes(ctx context.Context, backendName string, amount int64) error
- func (s *Store) InsertNotification(ctx context.Context, eventType, payload, endpointURL string) error
- func (s *Store) InsertPending(ctx context.Context, p *core.PendingObject) error
- func (s *Store) ListAllEncryptedLocations(ctx context.Context, limit, offset int) ([]core.DecryptableLocation, error)
- func (s *Store) ListDirectoryChildren(ctx context.Context, prefix, startAfter string, maxKeys int) (*core.DirectoryListResult, error)
- func (s *Store) ListEncryptedLocations(ctx context.Context, keyID string, limit, offset int) ([]core.EncryptedLocation, error)
- func (s *Store) ListExpiredObjects(ctx context.Context, prefix string, cutoff time.Time, limit int) ([]core.ObjectLocation, error)
- func (s *Store) ListMultipartUploads(ctx context.Context, prefix string, maxUploads int) ([]core.MultipartUpload, error)
- func (s *Store) ListObjects(ctx context.Context, prefix, startAfter string, maxKeys int) (*core.ListObjectsResult, error)
- func (s *Store) ListObjectsByBackend(ctx context.Context, backendName string, limit int) ([]core.ObjectLocation, error)
- func (s *Store) ListObjectsByBackendKeyAsc(ctx context.Context, backendName, afterKey string, limit int) ([]core.ObjectLocation, error)
- func (s *Store) ListUnencryptedLocations(ctx context.Context, limit, offset int) ([]core.UnencryptedLocation, error)
- func (s *Store) MarkObjectDecrypted(ctx context.Context, objectKey, backendName string, plaintextSize int64) error
- func (s *Store) MarkObjectEncrypted(ctx context.Context, objectKey, backendName string, encryptionKey []byte, keyID string, plaintextSize, ciphertextSize int64) error
- func (s *Store) MoveCleanupToDLQ(ctx context.Context, id int64, lastError string) (bool, error)
- func (s *Store) MoveObjectLocation(ctx context.Context, key, fromBackend, toBackend string) (int64, error)
- func (s *Store) PendingDepth(ctx context.Context) (int64, error)
- func (s *Store) PromotePending(ctx context.Context, p *core.PendingObject) (core.PendingPromoteResult, []core.DeletedCopy, error)
- func (s *Store) RecordObject(ctx context.Context, key, backend string, size int64, enc *core.EncryptionMeta) ([]core.DeletedCopy, error)
- func (s *Store) RecordObjectAndClearPending(ctx context.Context, key, backend string, size int64, enc *core.EncryptionMeta, intentID string) ([]core.DeletedCopy, error)
- func (s *Store) RecordPart(ctx context.Context, uploadID string, partNumber int, etag string, size int64, enc *core.EncryptionMeta) error
- func (s *Store) RecordReplica(ctx context.Context, key, targetBackend, sourceBackend string) (int64, bool, error)
- func (s *Store) RemoveExcessCopy(ctx context.Context, key, backendName string, size int64) error
- func (s *Store) RetryCleanupItem(ctx context.Context, id int64, backoff time.Duration, lastError string) error
- func (s *Store) RetryNotification(ctx context.Context, id int64, backoff time.Duration, lastError string) error
- func (s *Store) RunMigrations(ctx context.Context) error
- func (s *Store) SweepStaleCleanupQueueRows(ctx context.Context, key, backend string) (int64, error)
- func (s *Store) SyncQuotaLimits(ctx context.Context, backends []config.BackendConfig) error
- func (s *Store) UpdateContentHash(ctx context.Context, key, backendName, hash string) error
- func (s *Store) UpdateEncryptionKey(ctx context.Context, objectKey, backendName string, newEncryptionKey []byte, newKeyID string) error
- func (s *Store) VerifySchemaVersion(ctx context.Context) error
- func (s *Store) WithAdvisoryLock(ctx context.Context, _ int64, fn func(ctx context.Context) error) (bool, error)
- func (s *Store) WithTx(ctx context.Context, fn func(ctx context.Context, tx core.TxAdapter) error) error
type Store
Store implements every core role interface plus LifecycleAdmin, EncryptionAdmin, and NotificationOutbox using SQLite. The db field is typed as the local dbAPI interface so the production wiring can hand the store either a raw *sql.DB or a CB-wrapped one transparently. rawDB is the same handle without the wrapper; transactional code paths begin tx through cbBeginTx so the rollback defer can live at the same call site as the begin.
func NewStore
NewStore opens a SQLite database at the configured path, applies pragmas for WAL mode and foreign key enforcement, and runs migrations. When cb is non-nil, every statement the store fires goes through PreCheck/ PostCheck. Pass nil to skip CB wrapping (test fixtures).
func (*Store) BackendObjectStats
BackendObjectStats returns the object count and total bytes stored on a backend.
func (*Store) ClaimPendingCleanups
ClaimPendingCleanups atomically claims a batch of pending rows for the calling instance. SQLite serialises writes intrinsically (one writer at a time per connection), so a single UPDATE…WHERE id IN (SELECT…) is race-free against itself; the same eligibility predicates as the postgres path apply (next_retry due, attempts < 10, claim NULL or older than graceCutoff). The reclaimed flag is computed at SELECT time.
func (*Store) CleanupDLQDepth
CleanupDLQDepth returns the number of rows currently in cleanup_dlq. Surfaces the count of unrecoverable orphans so the dashboard and the cleanup_dlq_depth gauge can flag operator-visible work.
func (*Store) CleanupQueueDepth
CleanupQueueDepth returns the number of items still pending in the queue (fewer than 10 attempts).
func (*Store) Close
Close closes the underlying database connection.
func (*Store) CompleteCleanupItem
CompleteCleanupItem atomically deletes a successfully-processed row and decrements the backing backend’s orphan_bytes by the row’s size. The two statements run inside a single transaction so a worker crash between them cannot leave the counter inconsistent. Idempotent against re-claim retries: if the row was already deleted the update affects zero rows.
func (*Store) CompleteNotification
CompleteNotification removes a successfully delivered notification from the outbox.
func (*Store) CountActiveMultipartUploads
CountActiveMultipartUploads returns the number of in-progress multipart uploads whose key starts with the given bucket prefix.
func (*Store) CountOverReplicatedObjects
CountOverReplicatedObjects returns the total number of objects with more copies than the target replication factor.
func (*Store) CreateMultipartUpload
CreateMultipartUpload records a new multipart upload in the database.
func (*Store) DecrementOrphanBytes
DecrementOrphanBytes subtracts bytes from the orphan_bytes counter for a backend. Called when a cleanup queue item is successfully processed or exhausted. Uses MAX(0, x-y) instead of PostgreSQL GREATEST to prevent underflow.
func (*Store) DeleteBackendData
DeleteBackendData removes all database records for a backend in FK-safe order. Runs in a single transaction.
func (*Store) DeleteMultipartUpload
DeleteMultipartUpload removes a multipart upload and its parts. Parts are deleted first to satisfy foreign key constraints, then the upload row.
func (*Store) DeleteObject
DeleteObject delegates to core.DeleteObject.
func (*Store) DeleteObjectLocation
DeleteObjectLocation removes a single object_locations row for the given key and backend. Used by drain to remove source copies when a replica exists.
func (*Store) DeleteObjectsBatch
DeleteObjectsBatch delegates to core.DeleteObjectsBatch which removes every supplied key in one transaction.
func (*Store) DeletePending
DeletePending removes a pending intent.
func (*Store) DeletePendingByBackend
DeletePendingByBackend removes every intent for a backend. Used during backend drain finalization so abandoned intents do not block the FK-protected delete of the backend’s row in backend_quotas.
func (*Store) EnqueueCleanup
EnqueueCleanup adds a failed cleanup operation to the retry queue.
func (*Store) FlushUsageDeltas
FlushUsageDeltas atomically adds accumulated usage deltas to the persistent usage row. Creates the row if it doesn’t exist for this (backend, period).
func (*Store) GetActiveMultipartCounts
GetActiveMultipartCounts returns the number of in-progress multipart uploads per backend.
func (*Store) GetAllObjectLocations
GetAllObjectLocations returns all copies of an object, ordered by created_at ascending (oldest/primary first). Used for read failover.
func (*Store) GetBackendWithSpace
GetBackendWithSpace finds a backend with enough quota for the given size. Returns the backend name or ErrNoSpaceAvailable if none have enough space.
func (*Store) GetLeastUtilizedBackend
GetLeastUtilizedBackend finds the backend with the lowest utilization ratio that has enough space for the given size. Used by the “spread” routing strategy. The eligible list expands inside SQLite via the JSON1 extension’s json_each, so the query body is a fixed literal with no dynamic SQL string construction.
func (*Store) GetMultipartUpload
GetMultipartUpload retrieves metadata for a multipart upload.
func (*Store) GetMultipartUploadsByBackend
GetMultipartUploadsByBackend returns all in-progress multipart uploads on the given backend. Used by drain to abort uploads before migrating objects.
func (*Store) GetObjectBackendsForKeys
GetObjectBackendsForKeys returns a map from each supplied object_key to the backends that hold a copy. Empty input yields an empty map; keys with no copies are absent from the result. Used by the rebalancer planner to fold the per-key existence check into a single query per batch instead of N+1.
The query uses SQLite’s json_each so the SQL stays static and the keys array is passed as a single JSON-encoded parameter rather than interpolated into the SQL string.
func (*Store) GetObjectCounts
GetObjectCounts returns the number of objects stored on each backend.
func (*Store) GetObjectsWithoutHash
GetObjectsWithoutHash returns object locations that have no stored content hash, ordered by creation time. Used by the backfill command.
func (*Store) GetOverReplicatedObjects
GetOverReplicatedObjects finds objects with more copies than the target replication factor. Returns all rows for those objects so callers can score each copy and decide which to remove.
func (*Store) GetParts
GetParts returns all parts for a multipart upload, ordered by part number.
func (*Store) GetPendingCleanups
GetPendingCleanups returns a read-only snapshot of pending cleanup rows for the admin endpoint. The cleanup worker uses ClaimPendingCleanups instead. claimed_at is parsed back from RFC3339Nano text since SQLite has no native timestamp type.
func (*Store) GetPendingNotifications
GetPendingNotifications returns notifications ready for delivery, ordered by creation time.
func (*Store) GetQuotaStats
GetQuotaStats returns quota statistics for all backends.
func (*Store) GetRandomHashedObjects
GetRandomHashedObjects returns random object locations that have a stored content hash. Used by the scrubber to verify data integrity. Uses ORDER BY RANDOM() LIMIT instead of PostgreSQL TABLESAMPLE BERNOULLI.
func (*Store) GetStaleMultipartUploads
GetStaleMultipartUploads returns uploads older than the given duration.
func (*Store) GetStalePending
GetStalePending returns pending intents at or older than olderThan, oldest first, capped at limit.
func (*Store) GetUnderReplicatedObjects
GetUnderReplicatedObjects finds objects with fewer copies than the target replication factor. Returns all rows for those objects so callers know which backends already have copies.
func (*Store) GetUnderReplicatedObjectsExcluding
GetUnderReplicatedObjectsExcluding finds objects with fewer copies than the target factor, ignoring copies on the excluded backends. Returns all rows for those objects so callers know the full picture.
func (*Store) GetUnverifiedObjectCounts
GetUnverifiedObjectCounts returns the number of objects per backend whose content_hash column is NULL (objects predating integrity verification or otherwise not yet checksummed). Drives the dashboard’s “needs backfill” column. See #405.
func (*Store) GetUsageForPeriod
GetUsageForPeriod returns usage statistics for all backends in the given period.
func (*Store) ImportObject
ImportObject delegates to core.ImportObject.
func (*Store) IncrementOrphanBytes
IncrementOrphanBytes adds bytes to the orphan_bytes counter for a backend. Called when a physical delete fails and is enqueued for retry.
func (*Store) InsertNotification
InsertNotification enqueues a notification for delivery.
func (*Store) InsertPending
InsertPending records an in-flight PUT intent.
func (*Store) ListAllEncryptedLocations
ListAllEncryptedLocations returns a page of all encrypted object locations with decryption metadata. Used by the decrypt-existing admin endpoint.
func (*Store) ListDirectoryChildren
ListDirectoryChildren returns immediate children (directories and files) under prefix, with aggregate stats for directories and detail for files. Pagination uses startAfter as the cursor for file-level detail.
func (*Store) ListEncryptedLocations
ListEncryptedLocations returns a page of encrypted object locations filtered by key ID. Used during key rotation to find objects wrapped with the old key.
func (*Store) ListExpiredObjects
ListExpiredObjects returns one row per unique key matching the given prefix whose created_at is older than cutoff, up to limit rows. Used by lifecycle expiration to find objects eligible for deletion.
func (*Store) ListMultipartUploads
ListMultipartUploads returns in-progress multipart uploads whose key matches the given prefix, up to maxUploads entries.
func (*Store) ListObjects
ListObjects returns objects matching the given prefix, sorted by key. Supports pagination via startAfter and maxKeys. Returns one extra row to detect truncation. Uses a subquery with GROUP BY to deduplicate replicated objects (equivalent to DISTINCT ON in PostgreSQL).
func (*Store) ListObjectsByBackend
ListObjectsByBackend returns objects stored on a specific backend, ordered by size ascending (smallest first). Used by the rebalancer to find movable objects.
func (*Store) ListObjectsByBackendKeyAsc
ListObjectsByBackendKeyAsc returns rows for a backend in ascending object_key order, starting strictly after afterKey. The empty string returns the first page. Used by ReconcileBackend’s bounded-memory sorted-merge join against an S3 ListObjects walk; both sides are in lex order so the merge is O(limit) memory bounded.
func (*Store) ListUnencryptedLocations
ListUnencryptedLocations returns a page of unencrypted object locations. Used by the encrypt-existing admin endpoint to find objects that need encryption.
func (*Store) MarkObjectDecrypted
MarkObjectDecrypted updates an object location to record that it has been decrypted in-place. Clears encryption metadata and restores the plaintext size.
func (*Store) MarkObjectEncrypted
MarkObjectEncrypted updates an object location to record that it has been encrypted in-place. Updates the size to the ciphertext size and stores the encryption metadata.
func (*Store) MoveCleanupToDLQ
MoveCleanupToDLQ atomically graduates an exhausted cleanup_queue row to the dead-letter table. Delegates to core.MoveCleanupToDLQ so both engines share the move semantics - notably that orphan_bytes is left untouched because the backend object is still on disk.
func (*Store) MoveObjectLocation
MoveObjectLocation delegates to core.MoveObjectLocation.
func (*Store) PendingDepth
PendingDepth returns the total number of pending intents.
func (*Store) PromotePending
PromotePending resolves a pending intent transactionally. SQLite serializes writers so no row-level lock is needed. The destination is inspected:
- If any object_locations row for the key was created after this intent was inserted, the intent is provably stale and is dropped (Superseded): the authoritative state is the newer row, and the intent’s bytes are either overwritten or stranded orphans.
- Otherwise the intent is promoted: any prior copies are cleared, the new row is inserted, quotas are adjusted, and the pending row is deleted in the same transaction.
- If the pending row is already gone, the call is a benign no-op.
PromotePending delegates to core.PromotePending which composes the engine-agnostic claim, supersession check, commit, and same-tx delete of the pending row against the SQLite TxAdapter.
func (*Store) RecordObject
RecordObject delegates to core.RecordObject which composes the engine-agnostic transactional sequence against the SQLite TxAdapter.
func (*Store) RecordObjectAndClearPending
RecordObjectAndClearPending delegates to core. Inside the same transaction the pending row is deleted so the intent never outlives a committed location.
func (*Store) RecordPart
RecordPart records a completed part for a multipart upload. Re-uploading the same part number updates the existing row (ON CONFLICT DO UPDATE).
func (*Store) RecordReplica
RecordReplica delegates to core.RecordReplica.
func (*Store) RemoveExcessCopy
RemoveExcessCopy delegates to core.RemoveExcessCopy.
func (*Store) RetryCleanupItem
RetryCleanupItem increments the attempt counter, schedules the next retry, and clears the claim so the row is immediately re-eligible for the next worker tick.
func (*Store) RetryNotification
RetryNotification increments the attempt counter and schedules the next retry at an absolute time computed from the backoff duration.
func (*Store) RunMigrations
RunMigrations applies the embedded SQLite schema if the database has not been initialised yet. If schema_version already exists and the version matches, this is a no-op. If the version does not match, an error is returned so the operator can take corrective action.
func (*Store) SweepStaleCleanupQueueRows
SweepStaleCleanupQueueRows delegates to core.SweepStaleCleanupQueueRows.
func (*Store) SyncQuotaLimits
SyncQuotaLimits ensures the backend_quotas table has entries for all configured backends with their quota limits. Creates new entries or updates existing limits.
func (*Store) UpdateContentHash
UpdateContentHash sets the content hash for an object location.
func (*Store) UpdateEncryptionKey
UpdateEncryptionKey updates the wrapped DEK and key ID for a single object location. Used after re-wrapping a DEK with a new master key.
func (*Store) VerifySchemaVersion
VerifySchemaVersion checks that the database schema version matches what this binary expects. Returns an error if schema_version is missing or if the recorded version is older than expected. Logs a warning if the schema is newer (possible downgrade).
func (*Store) WithAdvisoryLock
WithAdvisoryLock emulates PostgreSQL advisory locks using a process-local mutex. For single-instance SQLite deployments, this is correct - there are no competing instances. Returns (false, nil) if the lock is already held by another goroutine.
func (*Store) WithTx
WithTx satisfies core.Runner by opening a transaction, wrapping it in a sqliteTxAdapter, and invoking fn. Commits on a nil return; rolls back otherwise. Lets engine-agnostic core helpers orchestrate multi-statement operations against the SQLite engine.
Generated by gomarkdoc