s3-orchestrator

proxy

import "github.com/afreidah/s3-orchestrator/internal/proxy"

Package proxy is the domain orchestration layer that coordinates multi-backend S3 storage. It routes writes, manages failover reads, handles multipart uploads, drains backends, and exposes dashboard data. Workers receive the Ops interface instead of direct access.

Index

type BackendManager

BackendManager manages multiple storage backends with quota tracking. Embeds *infra.Core for non-store infrastructure (backends, usage, admission, draining, metrics) and holds the per-role store views and hot-reloadable configuration. Store-touching write-path helpers are methods on *BackendManager (manager_writepath.go); pure infra primitives stay on *infra.Core.

Workers (rebalancer, replicator, scrubber, …) are resolved through DI at the call site rather than carried on the manager. The dashboard aggregator, hot-reload paths, and tests that previously read mgr.Replicator etc. now invoke do.Invoke directly.

DrainManager is the one dependency wired post-construction via WireDrain. The cycle (drain.Manager needs *BackendManager and mgr.MultipartManager) makes constructor injection impractical without redesigning drain.Core. Code paths that depend on DrainManager (FlushUsage, ClearDrainState) nil-guard the field so a manager constructed without WireDrain (every test path that does not need drain behavior) remains usable.

type BackendManager struct {
    *infra.Core

    MultipartManager *multipart.Manager // multipart upload lifecycle
    ObjectManager    *object.Manager    // CRUD, read failover, broadcast reads

    // DrainManager is the single post-construction wiring point. Set by
    // WireDrain after both *BackendManager and *drain.Manager have been
    // constructed (the dependency cycle prevents constructor injection).
    // Nil-able by design; FlushUsage and ClearDrainState guard the
    // access so tests that do not exercise drain behavior need not call
    // WireDrain.
    DrainManager *drain.Manager
    // contains filtered or unexported fields
}

func NewBackendManager

func NewBackendManager(cfg *BackendManagerConfig) *BackendManager

NewBackendManager constructs a BackendManager. Required dependencies (cfg, Stores, Dashboard, Metrics) panic via must.NotNil at construction so a wiring bug surfaces immediately at DI assembly rather than NPE’ing N call frames deep on the first request. Numeric config invariants (negative timeouts, ordering rules) are the config validator’s responsibility; the constructor trusts the values it receives.

func (*BackendManager) AdmissionSem

func (m *BackendManager) AdmissionSem() chan struct{}

AdmissionSem returns the shared admission semaphore, or nil if none is configured. The HTTP admission controller should use this channel so that HTTP requests and background services share one concurrency budget.

func (*BackendManager) ClearCache

func (m *BackendManager) ClearCache()

ClearCache removes all entries from the location cache.

func (*BackendManager) ClearDrainState

func (m *BackendManager) ClearDrainState()

ClearDrainState removes all entries from the draining map. Used by tests to reset state between runs. No-op when DrainManager has not been wired (tests that do not need drain behavior skip WireDrain).

func (*BackendManager) Close

func (m *BackendManager) Close()

Close stops every background cache eviction goroutine the manager owns: the object location cache and the multipart per-upload DEK cache. Safe to call multiple times.

func (*BackendManager) DeleteOrEnqueue

func (m *BackendManager) DeleteOrEnqueue(ctx context.Context, be backend.ObjectBackend, backendName, key, reason string, sizeBytes int64)

DeleteOrEnqueue forwards to the write coordinator. Worker and drain interfaces accept *BackendManager and call DeleteOrEnqueue on it; the implementation lives on the coordinator now, but the call site shape stays the same.

func (*BackendManager) FlushUsage

func (m *BackendManager) FlushUsage(ctx context.Context) error

FlushUsage flushes accumulated in-memory usage counters to the database. Backends that have completed draining are skipped because their DB records (including backend_usage) have been removed. When DrainManager has not been wired (tests that do not exercise drain behavior) the skip set is empty and every backend’s counters flush.

func (*BackendManager) GetDashboardData

func (m *BackendManager) GetDashboardData(ctx context.Context) (*dashboard.Data, error)

GetDashboardData delegates to the dashboard.Aggregator and enriches the result with drain status and circuit-breaker health from the BackendManager’s in-memory state.

func (*BackendManager) GetDirectoryChildren

func (m *BackendManager) GetDirectoryChildren(ctx context.Context, prefix, startAfter string, maxKeys int) (*core.DirectoryListResult, error)

GetDirectoryChildren delegates to the dashboard.Aggregator.

func (*BackendManager) IntegrityConfig

func (m *BackendManager) IntegrityConfig() *config.IntegrityConfig

IntegrityConfig returns the current integrity configuration.

func (*BackendManager) LifecycleConfig

func (m *BackendManager) LifecycleConfig() *config.LifecycleConfig

LifecycleConfig returns the current lifecycle configuration.

func (*BackendManager) MoveObject

func (m *BackendManager) MoveObject(ctx context.Context, req *writepath.MoveRequest) (int64, error)

MoveObject forwards to the write coordinator’s shared move primitive (#924). Drain and rebalancer both pass their narrow consumer interface (drain.Core, worker.DataMover) here so the StreamCopy + MoveObjectLocation CAS + orphan-cleanup branches + source-delete accounting all funnel through one implementation - the same way DeleteOrEnqueue does.

func (*BackendManager) NearUsageLimit

func (m *BackendManager) NearUsageLimit(threshold float64) bool

NearUsageLimit returns true if any backend is approaching its usage limits.

func (*BackendManager) ProcessLifecycleRules

func (m *BackendManager) ProcessLifecycleRules(ctx context.Context, rules []config.LifecycleRule) (deleted, failed int)

ProcessLifecycleRules evaluates all lifecycle rules and deletes expired objects. Returns total deleted and failed counts. Terminates processing of a rule when a full batch produces zero successful deletions, preventing infinite loops when backends are unhealthy.

func (*BackendManager) ReconcileBackend

func (m *BackendManager) ReconcileBackend(ctx context.Context, backendName, bucket string, knownBuckets []string) (*worker.ReconcileResult, error)

ReconcileBackend reconciles a single backend against the metadata store using a bounded-memory sorted-merge: both sides are walked in lex key order and diffed in lockstep. The S3 walk and DB cursor each cap their in-flight buffer, so memory is independent of object count.

Behaviour: imports keys present on the backend but not in the DB, and deletes DB rows whose keys are no longer on the backend. Keys owned by sibling virtual buckets stored on the same backend are left alone in both directions - sibling buckets are reconciled by their own pass.

func (*BackendManager) RecordUsage

func (m *BackendManager) RecordUsage(backendName string, apiCalls, egress, ingress int64)

RecordUsage increments the in-memory usage counters for a backend. Exposed for admin operations that bypass the normal manager request path.

func (*BackendManager) RedisCounterConfigured

func (m *BackendManager) RedisCounterConfigured() bool

RedisCounterConfigured returns true when the counter backend is a Redis backend, regardless of health status. Used by the flush service to decide whether an advisory lock is needed - the lock must be held even during fallback to prevent double-counting when Redis recovers mid-flush.

func (*BackendManager) SelectReplicaTarget

func (m *BackendManager) SelectReplicaTarget(ctx context.Context, size int64, exclusion map[string]bool) (string, error)

SelectReplicaTarget picks a target backend for a replication copy using the same routing strategy as normal writes. Excludes backends that already hold a copy of the object.

func (*BackendManager) SetIntegrityConfig

func (m *BackendManager) SetIntegrityConfig(cfg *config.IntegrityConfig)

SetIntegrityConfig atomically stores the integrity configuration. The scrubber’s own SetConfig is invoked separately by the caller (serve) since the scrubber is a top-level DI service after #676 B.

func (*BackendManager) SetLifecycleConfig

func (m *BackendManager) SetLifecycleConfig(cfg *config.LifecycleConfig)

SetLifecycleConfig atomically stores the lifecycle configuration.

func (*BackendManager) SetUsageFlushConfig

func (m *BackendManager) SetUsageFlushConfig(cfg *config.UsageFlushConfig)

SetUsageFlushConfig atomically stores the usage flush configuration.

func (*BackendManager) Stores

func (m *BackendManager) Stores() core.MetadataStore

Stores returns the wrapped metadata-store contract. Exposed for transport handlers that need direct store access (multipart bookkeeping in s3api, per-backend stats / data drop in admin).

func (*BackendManager) SyncBackend

func (m *BackendManager) SyncBackend(ctx context.Context, backendName, bucket string, knownBuckets []string) (imported, skipped int, err error)

SyncBackend scans a backend’s S3 bucket and imports pre-existing objects into the proxy database. Objects already tracked for the backend are skipped. knownBuckets is the full list of configured virtual bucket names, used to distinguish objects belonging to other buckets from externally-uploaded objects that need the bucket prefix prepended. Returns counts of imported vs skipped objects.

func (*BackendManager) UpdateUsageLimits

func (m *BackendManager) UpdateUsageLimits(limits map[string]core.UsageLimits)

UpdateUsageLimits replaces the per-backend usage limits. Safe to call concurrently with request handling.

func (*BackendManager) UsageFlushConfig

func (m *BackendManager) UsageFlushConfig() *config.UsageFlushConfig

UsageFlushConfig returns the current usage flush configuration.

func (*BackendManager) WireDrain

func (m *BackendManager) WireDrain(d *drain.Manager)

WireDrain installs the drain.Manager: stores it on BackendManager so dashboard rendering and drain-aware tests can reach it, and points backendCore.drainMgr at it so eligibility filters see drain state. Called by the drain DI provider after both values exist. The dependency cycle between drain.Manager and BackendManager prevents passing the drain manager through the constructor.

type BackendManagerConfig

BackendManagerConfig holds the parameters for creating a BackendManager. Stores carries the metadata-store contract. Metrics carries the narrow proxy.metrics.Deps used by MetricsCollector.

type BackendManagerConfig struct {
    Backends  map[string]backend.ObjectBackend
    Stores    core.MetadataStore
    Metrics   metrics.Deps
    Dashboard core.MetadataStore
    // PendingEnabled toggles the PUT-before-COMMIT pending-row pattern
    // (write_path.pending_pattern.enabled). When false the manager skips
    // pending-intent inserts and pending-promotion paths and falls back
    // to the legacy cleanup-on-failure flow.
    PendingEnabled    bool
    Order             []string
    CacheTTL          time.Duration
    BackendTimeout    time.Duration
    UsageLimits       map[string]core.UsageLimits
    RoutingStrategy   config.RoutingStrategy
    ParallelBroadcast bool // fan-out reads in parallel during degraded mode
    // DegradedBroadcastParallelism caps concurrent probes during a
    // parallel degraded-mode broadcast. 0 = no cap (every backend
    // probed at once, the historical behaviour).
    DegradedBroadcastParallelism int
    // DisableDegradedReads opts the read path out of broadcasting on DB outage.
    DisableDegradedReads bool
    Encryptor            *encryption.Encryptor  // nil when encryption is disabled
    CounterBackend       counter.CounterBackend // nil uses LocalCounterBackend
    ObjectCache          objcache.ObjectCache   // nil when object data caching is disabled
    MaxObjectSizes       map[string]int64       // per-backend max object size in bytes (0 = unlimited)
    // AdmissionSem is the shared concurrency semaphore for write-class
    // traffic. In split mode (MaxConcurrentReads + MaxConcurrentWrites)
    // it is sized to MaxConcurrentWrites and is shared between HTTP
    // writes and all background workers; reads run on a separate sem
    // created in transport/httpserver/routes.go. In merged mode
    // (MaxConcurrentRequests only) it is the global pool for every HTTP
    // request and every background worker. nil disables admission entirely
    // (no cap installed). See admissionSemFor in internal/di/backend.go
    // for the sizing rules and #835 for the documentation rationale.
    AdmissionSem chan struct{}

    // MultipartDEKCacheTTL pegs the lifetime of cached unwrapped DEKs
    // the MultipartManager uses to avoid re-unwrapping the upload-level
    // DEK on every UploadPart. Zero falls back to a 1h default which
    // is shorter than the typical multipart_stale_timeout but long
    // enough to absorb a reasonable upload's part stream.
    MultipartDEKCacheTTL time.Duration

    // ReplicationFactor is invoked by the metrics collector when refreshing
    // the under-replicated-objects gauge. Returns 0 when replication is
    // disabled. Lazy-evaluated so it can resolve the live replicator's
    // configured factor (which is hot-reloadable).
    ReplicationFactor func() int
}

Generated by gomarkdoc