
worker
Index
- Constants
- func CleanupBackoff(attempts int32) time.Duration
- func ExceedsThreshold(stats map[string]core.QuotaStat, order []string, threshold float64) bool
- func NewCleanupQueueService(cleanup *CleanupWorker, locker tickrunner.AdvisoryLocker) lifecycle.Runner
- func NewOverReplicationService(manager tickrunner.QuotaMetricsRefresher, overRep *OverReplicationCleaner, locker tickrunner.AdvisoryLocker) lifecycle.Runner
- func NewPendingReaperService(reaper *PendingReaper, locker tickrunner.AdvisoryLocker, tick time.Duration) lifecycle.Runner
- func NewRebalancerService(manager tickrunner.QuotaMetricsRefresher, rebalancer *Rebalancer, locker tickrunner.AdvisoryLocker) lifecycle.Runner
- func NewReconcileService(reconciler *Reconciler, locker tickrunner.AdvisoryLocker, interval time.Duration) lifecycle.Runner
- func NewReplicatorService(manager tickrunner.QuotaMetricsRefresher, replicator *Replicator, locker tickrunner.AdvisoryLocker) lifecycle.Runner
- func NewScrubberService(scrubber *Scrubber, locker tickrunner.AdvisoryLocker) lifecycle.Runner
- func WithAdmission(ctx context.Context, ac AdmissionControl, name string, fn func())
- type AdmissionControl
- type BackendAccess
- type BackendSyncer
- type CleanupOps
- type CleanupWorker
- type DataMover
- type Ops
- type OverReplicationCleaner
- func NewOverReplicationCleaner(ops Ops, store core.MetadataStore) *OverReplicationCleaner
- func (c *OverReplicationCleaner) Clean(ctx context.Context, cfg config.ReplicationConfig) (int, error)
- func (c *OverReplicationCleaner) Config() *config.ReplicationConfig
- func (c *OverReplicationCleaner) CountPending(ctx context.Context, factor int) (int64, error)
- func (c *OverReplicationCleaner) ScoreCopy(loc *core.ObjectLocation, stats map[string]core.QuotaStat) float64
- func (c *OverReplicationCleaner) SetConfig(cfg *config.ReplicationConfig)
- type PendingReaper
- type RebalanceMove
- type Rebalancer
- func NewRebalancer(ops Ops, store core.MetadataStore) *Rebalancer
- func (r *Rebalancer) Config() *config.RebalanceConfig
- func (r *Rebalancer) ExecuteMoves(ctx context.Context, plan []RebalanceMove, strategy string, concurrency int) int
- func (r *Rebalancer) ExecuteOneMove(ctx context.Context, move RebalanceMove, strategy string) bool
- func (r *Rebalancer) PlanPackTight(ctx context.Context, stats map[string]core.QuotaStat, batchSize int) ([]RebalanceMove, error)
- func (r *Rebalancer) PlanSpreadEven(ctx context.Context, stats map[string]core.QuotaStat, batchSize int) ([]RebalanceMove, error)
- func (r *Rebalancer) Rebalance(ctx context.Context, cfg config.RebalanceConfig) (int, error)
- func (r *Rebalancer) SetConfig(cfg *config.RebalanceConfig)
- type ReconcileResult
- type Reconciler
- type RecorderProvider
- type ReplicationOutcome
- type Replicator
- func NewReplicator(ops Ops, store core.MetadataStore) *Replicator
- func (r *Replicator) CleanupOrphan(ctx context.Context, backendName, key string, sizeBytes int64)
- func (r *Replicator) Config() *config.ReplicationConfig
- func (r *Replicator) CopyToReplica(ctx context.Context, key string, copies []core.ObjectLocation, target string) (string, int64, error)
- func (r *Replicator) FindReplicaTarget(ctx context.Context, key string, size int64, exclusion map[string]bool) string
- func (r *Replicator) IsBackendHealthy(name string) bool
- func (r *Replicator) Replicate(ctx context.Context, cfg config.ReplicationConfig) (int, error)
- func (r *Replicator) ReplicateObject(ctx context.Context, key string, existingCopies []core.ObjectLocation, needed int) ReplicationOutcome
- func (r *Replicator) SetConfig(cfg *config.ReplicationConfig)
- func (r *Replicator) UnhealthyBackends(threshold time.Duration) []string
- type Scrubber
- func NewScrubber(deps ScrubberOps, store core.MetadataStore, encryptor *encryption.Encryptor) *Scrubber
- func (s *Scrubber) Backfill(ctx context.Context, batchSize, offset int) (processed, nextOffset int)
- func (s *Scrubber) Config() *config.IntegrityConfig
- func (s *Scrubber) Scrub(ctx context.Context, batchSize int) (checked, failed int)
- func (s *Scrubber) SetConfig(cfg *config.IntegrityConfig)
- type ScrubberOps
- type UsageAccessor
Constants
Worker name labels used by admission-rejection metric and other worker-scoped telemetry. Constants prevent the silent label drift that would otherwise split a metric’s time series in Grafana when a typo lands.
DefaultCleanupQueueTick is the cleanup-queue worker’s per-tick cadence when the config does not override it.
DefaultOverReplicationTick is the over-replication cleaner’s per-tick cadence when the config does not specify one.
DefaultPendingReaperTick is the pending reaper’s per-tick cadence when the config does not specify one.
DefaultRebalanceInterval is the rebalancer’s per-tick cadence when the config does not specify one.
DefaultReplicatorTick is the replicator’s per-tick cadence when the config does not specify one.
DefaultScrubberInterval is the integrity scrubber’s per-tick cadence when the integrity config does not specify one.
func CleanupBackoff
CleanupBackoff returns the backoff duration for the given attempt number. Uses exponential backoff: min(1m * 2^attempts, 24h). Short-circuits the shift for attempts >= 11 (where the doubling already exceeds the cap) and for negative inputs, since shifting by a negative or out-of-range count is undefined in Go.
func ExceedsThreshold
ExceedsThreshold reports whether the utilization spread across backends (max ratio minus min ratio) is at least the configured threshold.
func NewCleanupQueueService
NewCleanupQueueService constructs the cleanup-queue background service.
func NewOverReplicationService
NewOverReplicationService constructs the over-replication cleanup service.
func NewPendingReaperService
NewPendingReaperService constructs the pending-objects reaper background service. The reaper resolves abandoned PUT intents by HEADing the destination backend and either promoting the intent into object_locations (bytes present) or dropping it (bytes absent). Returns nil when no pending reaper is configured.
func NewRebalancerService
NewRebalancerService constructs the rebalancer background service.
func NewReconcileService
NewReconcileService constructs the reconcile background service.
func NewReplicatorService
NewReplicatorService constructs the replication background service.
func NewScrubberService
NewScrubberService constructs the integrity scrubber background service.
func WithAdmission
WithAdmission acquires an admission slot, runs fn if granted, and releases the slot afterward. When admission is rejected, increments the per-worker rejection counter and returns without invoking fn. name must be one of the WorkerName* constants above.
type AdmissionControl
AdmissionControl gates concurrent access to backends.
type BackendAccess
BackendAccess provides backend fleet discovery and drain-awareness.
type BackendSyncer
BackendSyncer is the interface the reconciler needs from the proxy layer. Defined here to avoid a worker->proxy import cycle.
type CleanupOps
CleanupOps is the dependency contract for CleanupWorker. It omits BackendAccess because cleanup operates on specific named backends.
type CleanupWorker
CleanupWorker processes the retry queue for failed object deletions.
func NewCleanupWorker
NewCleanupWorker creates a CleanupWorker with explicit dependencies. instanceID is stamped into cleanup_queue.claimed_by for observability; claimGracePeriod is the threshold past which an outstanding claim becomes reclaimable by another worker tick (typically 5m).
func (*CleanupWorker) ProcessCleanupQueue
ProcessCleanupQueue fetches pending cleanup items and attempts to delete the orphaned objects from their respective backends.
type DataMover
DataMover provides object data movement primitives.
type Ops
Ops combines fleet access, admission control, data movement, and usage tracking. Used by Rebalancer, Replicator, and OverReplicationCleaner.
type OverReplicationCleaner
OverReplicationCleaner removes excess copies of objects that exceed the configured replication factor. Embeds *backendCore for access to shared infrastructure.
func NewOverReplicationCleaner
NewOverReplicationCleaner creates a cleaner that shares the given core.
func (*OverReplicationCleaner) Clean
Clean finds over-replicated objects and removes excess copies to reach the target replication factor. Returns the number of copies removed.
func (*OverReplicationCleaner) Config
Config returns the current replication configuration.
func (*OverReplicationCleaner) CountPending
CountPending returns the number of objects exceeding the replication factor.
func (*OverReplicationCleaner) ScoreCopy
scoreCopy assigns a retention score to a copy based on its backend’s state:
- draining backend: 0 (always remove first)
- circuit-broken backend: 1 (remove next)
- healthy backend: 2 + (1 - utilization_ratio), range [2..3]
Among healthy backends, the most utilized backend gets the lowest score, making its copy the first candidate for removal -- freeing space where it is scarcest. ScoreCopy score copy. ScoreCopy score copy.
func (*OverReplicationCleaner) SetConfig
SetConfig atomically stores the replication configuration.
type PendingReaper
PendingReaper resolves abandoned PUT intents by inspecting the destination backend. The min-age window protects in-flight PUTs whose commit has not yet had a chance to clear the intent on the synchronous path.
func NewPendingReaper
NewPendingReaper creates a PendingReaper with explicit dependencies. concurrency, minAge, and batchSize fall back to safe defaults when zero or negative.
func (*PendingReaper) ProcessPendingQueue
ProcessPendingQueue runs one reaper tick: fetch stale intents, HEAD their destinations, and promote or drop based on what the backend reports. Returns the number of intents that completed (committed or dropped) and the number that failed (left for the next tick).
type RebalanceMove
RebalanceMove describes a single object move from one backend to another.
type Rebalancer
Rebalancer moves objects between backends to optimize space distribution.
func NewRebalancer
NewRebalancer creates a Rebalancer with fleet operations and a metadata store.
func (*Rebalancer) Config
Config returns the current rebalance configuration.
func (*Rebalancer) ExecuteMoves
ExecuteMoves runs the planned object moves with bounded concurrency. Skips individual moves that fail and continues with the rest, returning the count of successful moves.
func (*Rebalancer) ExecuteOneMove
ExecuteOneMove performs a single object move: stream the bytes from source to destination, swap the DB location with compare-and-swap, and delete the source copy. Returns true when all steps succeed.
func (*Rebalancer) PlanPackTight
PlanPackTight consolidates objects onto the most-utilized backends by pulling from the least-utilized. Sorts by percent full descending and only moves an object from a less-full source to a more-full destination. Skips moves that would not increase the destination’s packing ratio.
func (*Rebalancer) PlanSpreadEven
PlanSpreadEven equalizes utilization ratios across backends by moving objects from over-utilized backends to under-utilized ones. Returns nil when no backend has a usable byte limit.
func (*Rebalancer) Rebalance
Rebalance moves objects between backends to optimize space distribution. Returns the number of objects successfully moved.
func (*Rebalancer) SetConfig
SetConfig atomically stores the rebalance configuration.
type ReconcileResult
ReconcileResult holds the outcome of a reconciliation pass for one backend.
type Reconciler
Reconciler scans backends for untracked objects and imports them into the metadata database.
func NewReconciler
NewReconciler creates a reconciler that uses the syncer’s SyncBackend to import untracked objects.
func (*Reconciler) Reconcile
Reconcile performs a full reconciliation for the given backend (or all backends if backendName is empty). Lists objects on each backend, diffs against DB entries, imports untracked objects, and removes stale entries.
func (*Reconciler) Run
Run performs a full reconciliation pass: for each backend, list all objects and import any that are not tracked in the metadata database.
type RecorderProvider
RecorderProvider provides the shared per-backend accounting recorder. Prefer this over UsageAccessor for per-attempt API call / byte counters: APICall / Egress / Ingress / Operation document intent at the call site and prevent argument-order bugs.
type ReplicationOutcome
ReplicationOutcome captures the per-object result of one ReplicateObject invocation. Counts are populated regardless of success so the reporter and unit tests can reason about retry behaviour without parsing log lines. NoTarget reflects whether the loop exited because target selection ran out, not whether any individual attempt failed.
func (ReplicationOutcome) Failed
Failed reports the total number of failed copy attempts for this object, irrespective of failure stage.
type Replicator
Replicator creates additional copies of under-replicated objects across backends.
func NewReplicator
NewReplicator creates a Replicator with fleet operations and a metadata store.
func (*Replicator) CleanupOrphan
CleanupOrphan deletes an object from a backend when the DB record was not created (e.g. source was deleted during replication). Looks up the backend by name and dispatches to DeleteOrEnqueue, which handles its own API accounting and orphan-byte tracking.
func (*Replicator) Config
Config returns the current replication configuration.
func (*Replicator) CopyToReplica
copyToReplica reads the object from an existing copy and writes it to the target backend. Tries each existing copy in order for failover. Returns the source backend name that was successfully read from and the size_bytes recorded on that source’s ObjectLocation row (the size of the bytes that were actually transferred). The input slice is cloned before sorting so callers retain their original ordering; see #904 for the aliasing bug the clone prevents.
func (*Replicator) FindReplicaTarget
FindReplicaTarget selects a backend for a replication copy using the same routing strategy as normal writes. Returns empty string if no suitable target exists.
func (*Replicator) IsBackendHealthy
IsBackendHealthy returns true if the backend has a closed circuit breaker or has no circuit breaker wrapper.
func (*Replicator) Replicate
Replicate finds under-replicated objects and creates additional copies to reach the target replication factor. Returns the number of copies created.
func (*Replicator) ReplicateObject
ReplicateObject creates up to `needed` additional copies of a single object. Returns a ReplicationOutcome the caller can use to drive metrics, audit, and log reporting without re-parsing logs.
Per-attempt diagnostic logs are preserved so incident responders can still trace each failed retry; the outcome is a *structured summary* on top of those, not a replacement.
func (*Replicator) SetConfig
SetConfig atomically stores the replication configuration.
func (*Replicator) UnhealthyBackends
UnhealthyBackends returns backend names whose circuit breakers have been open longer than the given threshold. Returns nil when all backends are healthy or circuit breakers are not enabled.
type Scrubber
Scrubber periodically verifies stored object integrity by reading objects from backends, computing their SHA-256 hash, and comparing against the stored content hash. Also supports backfilling hashes for objects that were written before integrity was enabled.
func NewScrubber
NewScrubber creates a Scrubber with the given dependencies and optional encryptor.
func (*Scrubber) Backfill
Backfill reads objects that have no stored content hash, computes the SHA-256 digest, and stores it in the database. Processes up to batchSize objects starting at the given offset. Returns the number of objects processed and the next offset for pagination (0 when done).
func (*Scrubber) Config
Config returns the current integrity configuration.
func (*Scrubber) Scrub
Scrub verifies a batch of objects with stored content hashes. Returns the number of objects checked and the number of hash mismatches found.
func (*Scrubber) SetConfig
SetConfig atomically stores the integrity configuration.
type ScrubberOps
ScrubberOps is the dependency contract for Scrubber. It omits AdmissionControl and BackendAccess because integrity checks are best-effort background work.
type UsageAccessor
UsageAccessor provides usage tracking.
Generated by gomarkdoc