s3-orchestrator

worker

import "github.com/afreidah/s3-orchestrator/internal/worker"

Index

Constants

Worker name labels used by admission-rejection metric and other worker-scoped telemetry. Constants prevent the silent label drift that would otherwise split a metric’s time series in Grafana when a typo lands.

const (
    WorkerNameCleanup         = "cleanup"
    WorkerNameOverReplication = "over_replication"
    WorkerNamePendingReaper   = "pending_reaper"
    WorkerNameRebalancer      = "rebalancer"
    WorkerNameReplicator      = "replicator"
)

DefaultCleanupQueueTick is the cleanup-queue worker’s per-tick cadence when the config does not override it.

const DefaultCleanupQueueTick = 1 * time.Minute

DefaultOverReplicationTick is the over-replication cleaner’s per-tick cadence when the config does not specify one.

const DefaultOverReplicationTick = 5 * time.Minute

DefaultPendingReaperTick is the pending reaper’s per-tick cadence when the config does not specify one.

const DefaultPendingReaperTick = 1 * time.Minute

DefaultRebalanceInterval is the rebalancer’s per-tick cadence when the config does not specify one.

const DefaultRebalanceInterval = 6 * time.Hour

DefaultReplicatorTick is the replicator’s per-tick cadence when the config does not specify one.

const DefaultReplicatorTick = 5 * time.Minute

DefaultScrubberInterval is the integrity scrubber’s per-tick cadence when the integrity config does not specify one.

const DefaultScrubberInterval = 6 * time.Hour

func CleanupBackoff

func CleanupBackoff(attempts int32) time.Duration

CleanupBackoff returns the backoff duration for the given attempt number. Uses exponential backoff: min(1m * 2^attempts, 24h). Short-circuits the shift for attempts >= 11 (where the doubling already exceeds the cap) and for negative inputs, since shifting by a negative or out-of-range count is undefined in Go.

func ExceedsThreshold

func ExceedsThreshold(stats map[string]core.QuotaStat, order []string, threshold float64) bool

ExceedsThreshold reports whether the utilization spread across backends (max ratio minus min ratio) is at least the configured threshold.

func NewCleanupQueueService

func NewCleanupQueueService(cleanup *CleanupWorker, locker tickrunner.AdvisoryLocker) lifecycle.Runner

NewCleanupQueueService constructs the cleanup-queue background service.

func NewOverReplicationService

func NewOverReplicationService(manager tickrunner.QuotaMetricsRefresher, overRep *OverReplicationCleaner, locker tickrunner.AdvisoryLocker) lifecycle.Runner

NewOverReplicationService constructs the over-replication cleanup service.

func NewPendingReaperService

func NewPendingReaperService(reaper *PendingReaper, locker tickrunner.AdvisoryLocker, tick time.Duration) lifecycle.Runner

NewPendingReaperService constructs the pending-objects reaper background service. The reaper resolves abandoned PUT intents by HEADing the destination backend and either promoting the intent into object_locations (bytes present) or dropping it (bytes absent). Returns nil when no pending reaper is configured.

func NewRebalancerService

func NewRebalancerService(manager tickrunner.QuotaMetricsRefresher, rebalancer *Rebalancer, locker tickrunner.AdvisoryLocker) lifecycle.Runner

NewRebalancerService constructs the rebalancer background service.

func NewReconcileService

func NewReconcileService(reconciler *Reconciler, locker tickrunner.AdvisoryLocker, interval time.Duration) lifecycle.Runner

NewReconcileService constructs the reconcile background service.

func NewReplicatorService

func NewReplicatorService(manager tickrunner.QuotaMetricsRefresher, replicator *Replicator, locker tickrunner.AdvisoryLocker) lifecycle.Runner

NewReplicatorService constructs the replication background service.

func NewScrubberService

func NewScrubberService(scrubber *Scrubber, locker tickrunner.AdvisoryLocker) lifecycle.Runner

NewScrubberService constructs the integrity scrubber background service.

func WithAdmission

func WithAdmission(ctx context.Context, ac AdmissionControl, name string, fn func())

WithAdmission acquires an admission slot, runs fn if granted, and releases the slot afterward. When admission is rejected, increments the per-worker rejection counter and returns without invoking fn. name must be one of the WorkerName* constants above.

type AdmissionControl

AdmissionControl gates concurrent access to backends.

type AdmissionControl interface {
    AcquireAdmission(ctx context.Context) bool
    ReleaseAdmission()
}

type BackendAccess

BackendAccess provides backend fleet discovery and drain-awareness.

type BackendAccess interface {
    Backends() map[string]backend.ObjectBackend
    BackendOrder() []string
    IsDraining(name string) bool
    ExcludeDraining(eligible []string) []string
    SelectReplicaTarget(ctx context.Context, size int64, exclusion map[string]bool) (string, error)
}

type BackendSyncer

BackendSyncer is the interface the reconciler needs from the proxy layer. Defined here to avoid a worker->proxy import cycle.

type BackendSyncer interface {
    SyncBackend(ctx context.Context, backendName, bucket string, knownBuckets []string) (imported, skipped int, err error)
    ReconcileBackend(ctx context.Context, backendName, bucket string, knownBuckets []string) (*ReconcileResult, error)
    UpdateQuotaMetrics(ctx context.Context) error
    BackendOrder() []string
}

type CleanupOps

CleanupOps is the dependency contract for CleanupWorker. It omits BackendAccess because cleanup operates on specific named backends.

type CleanupOps interface {
    AdmissionControl
    DataMover
    UsageAccessor
    RecorderProvider
}

type CleanupWorker

CleanupWorker processes the retry queue for failed object deletions.

type CleanupWorker struct {
    // contains filtered or unexported fields
}

func NewCleanupWorker

func NewCleanupWorker(deps CleanupOps, store core.MetadataStore, concurrency int, instanceID string, claimGracePeriod time.Duration) *CleanupWorker

NewCleanupWorker creates a CleanupWorker with explicit dependencies. instanceID is stamped into cleanup_queue.claimed_by for observability; claimGracePeriod is the threshold past which an outstanding claim becomes reclaimable by another worker tick (typically 5m).

func (*CleanupWorker) ProcessCleanupQueue

func (w *CleanupWorker) ProcessCleanupQueue(ctx context.Context) (processed, failed int)

ProcessCleanupQueue fetches pending cleanup items and attempts to delete the orphaned objects from their respective backends.

type DataMover

DataMover provides object data movement primitives.

type DataMover interface {
    GetBackend(name string) (backend.ObjectBackend, error)
    WithTimeout(ctx context.Context) (context.Context, context.CancelFunc)
    StreamCopy(ctx context.Context, src, dst backend.ObjectBackend, key string) error
    DeleteWithTimeout(ctx context.Context, be backend.ObjectBackend, key string) error
    DeleteOrEnqueue(ctx context.Context, be backend.ObjectBackend, backendName, key, reason string, sizeBytes int64)
    MoveObject(ctx context.Context, req *writepath.MoveRequest) (int64, error)
}

type Ops

Ops combines fleet access, admission control, data movement, and usage tracking. Used by Rebalancer, Replicator, and OverReplicationCleaner.

type Ops interface {
    BackendAccess
    AdmissionControl
    DataMover
    UsageAccessor
    RecorderProvider
}

type OverReplicationCleaner

OverReplicationCleaner removes excess copies of objects that exceed the configured replication factor. Embeds *backendCore for access to shared infrastructure.

type OverReplicationCleaner struct {
    // contains filtered or unexported fields
}

func NewOverReplicationCleaner

func NewOverReplicationCleaner(ops Ops, store core.MetadataStore) *OverReplicationCleaner

NewOverReplicationCleaner creates a cleaner that shares the given core.

func (*OverReplicationCleaner) Clean

func (c *OverReplicationCleaner) Clean(ctx context.Context, cfg config.ReplicationConfig) (int, error)

Clean finds over-replicated objects and removes excess copies to reach the target replication factor. Returns the number of copies removed.

func (*OverReplicationCleaner) Config

func (c *OverReplicationCleaner) Config() *config.ReplicationConfig

Config returns the current replication configuration.

func (*OverReplicationCleaner) CountPending

func (c *OverReplicationCleaner) CountPending(ctx context.Context, factor int) (int64, error)

CountPending returns the number of objects exceeding the replication factor.

func (*OverReplicationCleaner) ScoreCopy

func (c *OverReplicationCleaner) ScoreCopy(loc *core.ObjectLocation, stats map[string]core.QuotaStat) float64

scoreCopy assigns a retention score to a copy based on its backend’s state:

  • draining backend: 0 (always remove first)
  • circuit-broken backend: 1 (remove next)
  • healthy backend: 2 + (1 - utilization_ratio), range [2..3]

Among healthy backends, the most utilized backend gets the lowest score, making its copy the first candidate for removal -- freeing space where it is scarcest. ScoreCopy score copy. ScoreCopy score copy.

func (*OverReplicationCleaner) SetConfig

func (c *OverReplicationCleaner) SetConfig(cfg *config.ReplicationConfig)

SetConfig atomically stores the replication configuration.

type PendingReaper

PendingReaper resolves abandoned PUT intents by inspecting the destination backend. The min-age window protects in-flight PUTs whose commit has not yet had a chance to clear the intent on the synchronous path.

type PendingReaper struct {
    // contains filtered or unexported fields
}

func NewPendingReaper

func NewPendingReaper(deps CleanupOps, store core.MetadataStore, concurrency int, minAge time.Duration, batchSize int) *PendingReaper

NewPendingReaper creates a PendingReaper with explicit dependencies. concurrency, minAge, and batchSize fall back to safe defaults when zero or negative.

func (*PendingReaper) ProcessPendingQueue

func (r *PendingReaper) ProcessPendingQueue(ctx context.Context) (resolved, failed int)

ProcessPendingQueue runs one reaper tick: fetch stale intents, HEAD their destinations, and promote or drop based on what the backend reports. Returns the number of intents that completed (committed or dropped) and the number that failed (left for the next tick).

type RebalanceMove

RebalanceMove describes a single object move from one backend to another.

type RebalanceMove struct {
    ObjectKey   string
    FromBackend string
    ToBackend   string
    SizeBytes   int64
}

type Rebalancer

Rebalancer moves objects between backends to optimize space distribution.

type Rebalancer struct {
    // contains filtered or unexported fields
}

func NewRebalancer

func NewRebalancer(ops Ops, store core.MetadataStore) *Rebalancer

NewRebalancer creates a Rebalancer with fleet operations and a metadata store.

func (*Rebalancer) Config

func (r *Rebalancer) Config() *config.RebalanceConfig

Config returns the current rebalance configuration.

func (*Rebalancer) ExecuteMoves

func (r *Rebalancer) ExecuteMoves(ctx context.Context, plan []RebalanceMove, strategy string, concurrency int) int

ExecuteMoves runs the planned object moves with bounded concurrency. Skips individual moves that fail and continues with the rest, returning the count of successful moves.

func (*Rebalancer) ExecuteOneMove

func (r *Rebalancer) ExecuteOneMove(ctx context.Context, move RebalanceMove, strategy string) bool

ExecuteOneMove performs a single object move: stream the bytes from source to destination, swap the DB location with compare-and-swap, and delete the source copy. Returns true when all steps succeed.

func (*Rebalancer) PlanPackTight

func (r *Rebalancer) PlanPackTight(ctx context.Context, stats map[string]core.QuotaStat, batchSize int) ([]RebalanceMove, error)

PlanPackTight consolidates objects onto the most-utilized backends by pulling from the least-utilized. Sorts by percent full descending and only moves an object from a less-full source to a more-full destination. Skips moves that would not increase the destination’s packing ratio.

func (*Rebalancer) PlanSpreadEven

func (r *Rebalancer) PlanSpreadEven(ctx context.Context, stats map[string]core.QuotaStat, batchSize int) ([]RebalanceMove, error)

PlanSpreadEven equalizes utilization ratios across backends by moving objects from over-utilized backends to under-utilized ones. Returns nil when no backend has a usable byte limit.

func (*Rebalancer) Rebalance

func (r *Rebalancer) Rebalance(ctx context.Context, cfg config.RebalanceConfig) (int, error)

Rebalance moves objects between backends to optimize space distribution. Returns the number of objects successfully moved.

func (*Rebalancer) SetConfig

func (r *Rebalancer) SetConfig(cfg *config.RebalanceConfig)

SetConfig atomically stores the rebalance configuration.

type ReconcileResult

ReconcileResult holds the outcome of a reconciliation pass for one backend.

type ReconcileResult struct {
    Imported        int `json:"imported"`
    Removed         int `json:"removed"`
    BackendsScanned int `json:"backends_scanned"`
}

type Reconciler

Reconciler scans backends for untracked objects and imports them into the metadata database.

type Reconciler struct {
    // contains filtered or unexported fields
}

func NewReconciler

func NewReconciler(syncer BackendSyncer, bucketNames []string) *Reconciler

NewReconciler creates a reconciler that uses the syncer’s SyncBackend to import untracked objects.

func (*Reconciler) Reconcile

func (r *Reconciler) Reconcile(ctx context.Context, backendName string) (*ReconcileResult, error)

Reconcile performs a full reconciliation for the given backend (or all backends if backendName is empty). Lists objects on each backend, diffs against DB entries, imports untracked objects, and removes stale entries.

func (*Reconciler) Run

func (r *Reconciler) Run(ctx context.Context)

Run performs a full reconciliation pass: for each backend, list all objects and import any that are not tracked in the metadata database.

type RecorderProvider

RecorderProvider provides the shared per-backend accounting recorder. Prefer this over UsageAccessor for per-attempt API call / byte counters: APICall / Egress / Ingress / Operation document intent at the call site and prevent argument-order bugs.

type RecorderProvider interface {
    Acct() *accounting.Recorder
}

type ReplicationOutcome

ReplicationOutcome captures the per-object result of one ReplicateObject invocation. Counts are populated regardless of success so the reporter and unit tests can reason about retry behaviour without parsing log lines. NoTarget reflects whether the loop exited because target selection ran out, not whether any individual attempt failed.

type ReplicationOutcome struct {
    Key          string // object key
    Created      int    // copies successfully recorded
    CopyErrors   int    // source -> target stream copies that errored
    RecordErrors int    // RecordReplica failures (after the copy succeeded)
    Superseded   int    // RecordReplica returned inserted=false (source row gone)
    NoTarget     bool   // selection failed before `needed` was reached
}

func (ReplicationOutcome) Failed

func (o ReplicationOutcome) Failed() int

Failed reports the total number of failed copy attempts for this object, irrespective of failure stage.

type Replicator

Replicator creates additional copies of under-replicated objects across backends.

type Replicator struct {
    // contains filtered or unexported fields
}

func NewReplicator

func NewReplicator(ops Ops, store core.MetadataStore) *Replicator

NewReplicator creates a Replicator with fleet operations and a metadata store.

func (*Replicator) CleanupOrphan

func (r *Replicator) CleanupOrphan(ctx context.Context, backendName, key string, sizeBytes int64)

CleanupOrphan deletes an object from a backend when the DB record was not created (e.g. source was deleted during replication). Looks up the backend by name and dispatches to DeleteOrEnqueue, which handles its own API accounting and orphan-byte tracking.

func (*Replicator) Config

func (r *Replicator) Config() *config.ReplicationConfig

Config returns the current replication configuration.

func (*Replicator) CopyToReplica

func (r *Replicator) CopyToReplica(ctx context.Context, key string, copies []core.ObjectLocation, target string) (string, int64, error)

copyToReplica reads the object from an existing copy and writes it to the target backend. Tries each existing copy in order for failover. Returns the source backend name that was successfully read from and the size_bytes recorded on that source’s ObjectLocation row (the size of the bytes that were actually transferred). The input slice is cloned before sorting so callers retain their original ordering; see #904 for the aliasing bug the clone prevents.

func (*Replicator) FindReplicaTarget

func (r *Replicator) FindReplicaTarget(ctx context.Context, key string, size int64, exclusion map[string]bool) string

FindReplicaTarget selects a backend for a replication copy using the same routing strategy as normal writes. Returns empty string if no suitable target exists.

func (*Replicator) IsBackendHealthy

func (r *Replicator) IsBackendHealthy(name string) bool

IsBackendHealthy returns true if the backend has a closed circuit breaker or has no circuit breaker wrapper.

func (*Replicator) Replicate

func (r *Replicator) Replicate(ctx context.Context, cfg config.ReplicationConfig) (int, error)

Replicate finds under-replicated objects and creates additional copies to reach the target replication factor. Returns the number of copies created.

func (*Replicator) ReplicateObject

func (r *Replicator) ReplicateObject(ctx context.Context, key string, existingCopies []core.ObjectLocation, needed int) ReplicationOutcome

ReplicateObject creates up to `needed` additional copies of a single object. Returns a ReplicationOutcome the caller can use to drive metrics, audit, and log reporting without re-parsing logs.

Per-attempt diagnostic logs are preserved so incident responders can still trace each failed retry; the outcome is a *structured summary* on top of those, not a replacement.

func (*Replicator) SetConfig

func (r *Replicator) SetConfig(cfg *config.ReplicationConfig)

SetConfig atomically stores the replication configuration.

func (*Replicator) UnhealthyBackends

func (r *Replicator) UnhealthyBackends(threshold time.Duration) []string

UnhealthyBackends returns backend names whose circuit breakers have been open longer than the given threshold. Returns nil when all backends are healthy or circuit breakers are not enabled.

type Scrubber

Scrubber periodically verifies stored object integrity by reading objects from backends, computing their SHA-256 hash, and comparing against the stored content hash. Also supports backfilling hashes for objects that were written before integrity was enabled.

type Scrubber struct {
    // contains filtered or unexported fields
}

func NewScrubber

func NewScrubber(deps ScrubberOps, store core.MetadataStore, encryptor *encryption.Encryptor) *Scrubber

NewScrubber creates a Scrubber with the given dependencies and optional encryptor.

func (*Scrubber) Backfill

func (s *Scrubber) Backfill(ctx context.Context, batchSize, offset int) (processed, nextOffset int)

Backfill reads objects that have no stored content hash, computes the SHA-256 digest, and stores it in the database. Processes up to batchSize objects starting at the given offset. Returns the number of objects processed and the next offset for pagination (0 when done).

func (*Scrubber) Config

func (s *Scrubber) Config() *config.IntegrityConfig

Config returns the current integrity configuration.

func (*Scrubber) Scrub

func (s *Scrubber) Scrub(ctx context.Context, batchSize int) (checked, failed int)

Scrub verifies a batch of objects with stored content hashes. Returns the number of objects checked and the number of hash mismatches found.

func (*Scrubber) SetConfig

func (s *Scrubber) SetConfig(cfg *config.IntegrityConfig)

SetConfig atomically stores the integrity configuration.

type ScrubberOps

ScrubberOps is the dependency contract for Scrubber. It omits AdmissionControl and BackendAccess because integrity checks are best-effort background work.

type ScrubberOps interface {
    DataMover
    UsageAccessor
    RecorderProvider
}

type UsageAccessor

UsageAccessor provides usage tracking.

type UsageAccessor interface {
    Usage() *counter.UsageTracker
}

Generated by gomarkdoc