s3-orchestrator

object

import "github.com/afreidah/s3-orchestrator/internal/proxy/object"

Index

Variables

ListObjectsMaxPages caps DB round trips per ListObjects request so a single client call cannot drag the database through unbounded scans on pathological prefix layouts. Exposed as a var (rather than const) so tests can lower it without generating hundreds of mock pages.

var ListObjectsMaxPages = 100

func AdvancePastEmittedCommonPrefix

func AdvancePastEmittedCommonPrefix(prefix, delimiter, cursor string, seen map[string]bool) string

AdvancePastEmittedCommonPrefix rewrites a continuation cursor so the next ListObjects call cannot re-emit a CommonPrefix the current call already returned. The seen map is local to a single ListObjects invocation, so without this rewrite a cursor that lands inside an already-emitted CP (e.g., maxPages cap reached deep in a tenant’s keys or the page boundary aligned mid-group) would let the next call walk the same group and emit its CP a second time.

The rewrite increments the last byte of the CP, producing the smallest string lex-greater than every key starting with that CP. The store’s next-page WHERE object_key > cursor then skips the rest of the group cleanly. Returns the input unchanged when the delimiter is unset, the cursor does not fall inside an emitted CP, or the last byte is 0xff (no representable advance - accept potential re-emission rather than corrupt the cursor).

func HashBody

func HashBody(data []byte) string

HashBody computes the SHA-256 hex digest of a byte slice.

func ParsePlaintextRange

func ParsePlaintextRange(rangeHeader string, plaintextSize int64) (start, end int64, ok bool)

ParsePlaintextRange extracts the start and end byte offsets from an HTTP Range header value (e.g., “bytes=0-99”). Suffix ranges and open-ended ranges are resolved against plaintextSize.

type DeleteObjectResult

DeleteObjectResult holds the outcome of a single key within a batch delete.

type DeleteObjectResult struct {
    Key string `json:"key,omitempty"`
    Err error  `json:"err,omitempty"`
}

type Deps

Deps bundles the dependencies New needs so the call signature stays under the parameter-count ceiling. Core and Coord are consumer-declared interfaces; the concrete *infra.Core and *writepath.Coordinator that BackendManager builds satisfy them implicitly.

type Deps struct {
    Core              ObjectCore
    Coord             ObjectCoordinator
    Stores            core.MetadataStore
    Encryptor         *encryption.Encryptor
    LocationCache     *LocationCache
    ObjectCache       objcache.ObjectCache
    ParallelBroadcast bool
    // DegradedBroadcastParallelism caps concurrent probes during
    // parallel degraded-mode broadcasts. 0 = uncapped.
    DegradedBroadcastParallelism int
    // DisableDegradedReads makes the degraded path fail fast instead of broadcasting.
    DisableDegradedReads bool
    IntegrityCfg         *syncutil.AtomicConfig[config.IntegrityConfig]
}

type ListObjectsV2Result

ListObjectsV2Result holds the processed result for the S3 ListObjectsV2 response.

type ListObjectsV2Result struct {
    Objects               []core.ObjectLocation `json:"objects,omitempty"`
    CommonPrefixes        []string              `json:"common_prefixes,omitempty"`
    IsTruncated           bool                  `json:"is_truncated,omitempty"`
    NextContinuationToken string                `json:"next_continuation_token,omitempty"`
    KeyCount              int                   `json:"key_count,omitempty"`
}

type LocationCache

LocationCache is a TTL-based cache mapping object keys to backend names. It delegates storage and eviction to a generic TTLCache and applies random jitter (+/-20%) on each Set to stagger expiry times.

type LocationCache struct {
    // contains filtered or unexported fields
}

func NewLocationCache

func NewLocationCache(ttl time.Duration) *LocationCache

NewLocationCache creates a location cache with the given TTL. If ttl > 0, a background goroutine periodically evicts expired entries.

func (*LocationCache) Clear

func (c *LocationCache) Clear()

Clear removes all entries from the cache.

func (*LocationCache) Close

func (c *LocationCache) Close()

Close stops the background eviction goroutine. Safe to call multiple times.

func (*LocationCache) Delete

func (c *LocationCache) Delete(key string)

Delete removes a single key from the cache.

func (*LocationCache) Get

func (c *LocationCache) Get(key string) (string, bool)

Get returns the cached backend for a key, or false if not cached or expired.

func (*LocationCache) Len

func (c *LocationCache) Len() int

Len returns the number of entries in the cache, including expired entries not yet swept by the background eviction goroutine.

func (*LocationCache) Set

func (c *LocationCache) Set(key, backend string)

Set stores a key-to-backend mapping with the configured TTL. A random jitter of +/-20% is applied to prevent synchronized cache expiry storms.

type Manager

Manager handles object-level CRUD operations with read failover, broadcast reads during degraded mode, and location caching.

type Manager struct {
    // contains filtered or unexported fields
}

func New

func New(d *Deps) *Manager

New creates a Manager sharing the given core infrastructure and write coordinator. All dependencies must be non-nil; nothing is patched in post-construction. The component-scoped logger is built in the constructor body per the project’s logging convention. The read-failover orchestrator is built once and reused for every GET / HEAD; it captures the same Core, Stores, LocationCache, and the parallelBroadcast flag, so per-call read paths stay short.

func (*Manager) BackendCapacityStats

func (o *Manager) BackendCapacityStats(ctx context.Context) map[string]core.QuotaStat

BackendCapacityStats returns the current per-backend used/limit byte snapshot. Used by the InsufficientStorage error path so the response body can name the backends that are at capacity instead of returning a generic message. Returns nil on a DB lookup failure so the caller can fall back to its terse default.

func (*Manager) CanAcceptWrite

func (o *Manager) CanAcceptWrite(size int64) bool

CanAcceptWrite reports whether any backend can accept a write of the given size. Used by the HTTP handler to reject uploads before the request body is transmitted (Expect: 100-Continue support).

func (*Manager) CopyObject

func (o *Manager) CopyObject(ctx context.Context, sourceKey, destKey string) (string, error)

CopyObject copies an object from sourceKey to destKey. Materializes the source body into a seekable buffer - in-memory for small objects, a self-unlinking tempfile above materializeMemThreshold - before handing it to the destination PutObject. A non-seekable body would force the AWS SDK onto its streaming-unsigned-payload signing path, which uses chunked transfer encoding and drops Content-Length; S3 implementations that require Content-Length (notably OCI) then reject the upload with HTTP 411. Supports cross-backend copies and read failover from replicas.

func (*Manager) DeleteObject

func (o *Manager) DeleteObject(ctx context.Context, key string) error

DeleteObject removes an object from the backend where it’s stored.

func (*Manager) DeleteObjects

func (o *Manager) DeleteObjects(ctx context.Context, keys []string) []DeleteObjectResult

DeleteObjects deletes multiple objects in a single request. Metadata removal happens in a single transaction via DeleteObjectsBatch; backend S3 deletes run concurrently with bounded parallelism to avoid overwhelming backends.

func (*Manager) GetObject

func (o *Manager) GetObject(ctx context.Context, key string, rangeHeader string) (*s3be.GetObjectResult, error)

GetObject retrieves an object from the backend where it’s stored. Tries the primary copy first, then falls back to replicas if the primary fails. When the object is encrypted, the response body is transparently decrypted and the reported size reflects the original plaintext size.

func (*Manager) HeadObject

func (o *Manager) HeadObject(ctx context.Context, key string) (*s3be.HeadObjectResult, error)

HeadObject retrieves object metadata. Tries the primary copy first, then falls back to replicas if the primary fails. When the object is encrypted, the reported size reflects the original plaintext size.

func (*Manager) ListObjects

func (o *Manager) ListObjects(ctx context.Context, prefix, delimiter, startAfter string, maxKeys int) (*ListObjectsV2Result, error)

ListObjects returns objects matching the given prefix with optional delimiter support for virtual directory grouping. When a delimiter is set, many raw objects may collapse into a single CommonPrefix, so the loop fetches store pages until maxKeys post-grouping items are collected or the store is exhausted.

func (*Manager) LocationCache

func (o *Manager) LocationCache() *LocationCache

LocationCache returns the location cache the manager holds. Exposed for BackendManager and tests so the lifecycle (Close, Clear) can be driven from the root package without reaching into the unexported field.

func (*Manager) ObjectExists

func (o *Manager) ObjectExists(ctx context.Context, key string) (bool, error)

ObjectExists reports whether at least one location row exists for key. Used by the conditional-write path (If-None-Match: *) to fail-fast before the body upload. Best-effort: a concurrent racing PUT can land between this read and the eventual RecordObject commit, matching AWS S3’s documented best-effort precondition semantic. ErrObjectNotFound is the canonical “no row” signal and is normalised to (false, nil).

func (*Manager) PutObject

func (o *Manager) PutObject(ctx context.Context, key string, body io.Reader, size int64, contentType string, metadata map[string]string) (string, error)

PutObject uploads an object to the first backend with available quota. If the upload fails, it retries on remaining eligible backends before returning an error to the caller (write failover).

type ObjectCoordinator

ObjectCoordinator is the subset of *writepath.Coordinator the object Manager needs. RecoverFromRecordFailure is here for the drain-race abort path in attemptPutOnBackend: when the post-PUT IsDraining re-check fires, the orchestrator reuses the existing post-record- failure recovery sequence to delete the orphaned bytes and account for the cleanup DELETE.

type ObjectCoordinator interface {
    SelectBackendForWrite(ctx context.Context, size int64, eligible []string) (string, error)
    SelectWriteTarget(ctx context.Context, span trace.Span, operation string, size int64) (string, error)
    InsertPendingIntent(ctx context.Context, key, backendName string, size int64, enc *core.EncryptionMeta) (string, error)
    RecordObjectAndPromoteIntent(ctx context.Context, span trace.Span, key, backendName string, size int64, enc *core.EncryptionMeta, intentID string) error
    RecordObjectOrCleanup(ctx context.Context, span trace.Span, be backend.ObjectBackend, key, backendName string, size int64, enc *core.EncryptionMeta) error
    RecoverFromRecordFailure(ctx context.Context, be backend.ObjectBackend, backendName, key, cleanupReason string, size int64)
    DeleteOrEnqueue(ctx context.Context, be backend.ObjectBackend, backendName, key, reason string, sizeBytes int64)
}

type ObjectCore

ObjectCore is the subset of *infra.Core the object Manager needs. BackendOrder is here for *readpath.Failover, which Manager constructs from its own ObjectCore; the transitive requirement satisfies its type-check at this boundary. IsDraining is here for the post-PUT drain-race re-check in attemptPutOnBackend (the upstream EligibleForWrite filter is racy; the re-check closes the window).

type ObjectCore interface {
    Backends() map[string]backend.ObjectBackend
    BackendOrder() []string
    GetBackend(name string) (backend.ObjectBackend, error)
    Usage() *counter.UsageTracker // still needed for WithinLimits pre-flight checks; per-backend Record calls flow through Acct
    IsDraining(name string) bool
    WithTimeout(ctx context.Context) (context.Context, context.CancelFunc)
    EligibleForWrite(apiCalls, egress, ingress int64) []string
    ClassifyWriteError(span trace.Span, operation string, err error) error
    Acct() *accounting.Recorder
}

type VerifyingReader

VerifyingReader wraps an io.ReadCloser and computes SHA-256 as data is read. After the underlying reader returns EOF, call Verify to check the hash.

type VerifyingReader struct {
    // contains filtered or unexported fields
}

func NewVerifyingReader

func NewVerifyingReader(r io.ReadCloser) *VerifyingReader

NewVerifyingReader wraps r with a streaming SHA-256 computation.

func (*VerifyingReader) Close

func (vr *VerifyingReader) Close() error

Close closes the underlying reader. If an OnMismatch callback is set and verification fails, it is called before returning.

func (*VerifyingReader) Read

func (vr *VerifyingReader) Read(p []byte) (int, error)

Read implements io.Reader. Data passes through to the caller while being hashed incrementally.

func (*VerifyingReader) SetVerification

func (vr *VerifyingReader) SetVerification(expected string, onMismatch func(expected, actual string))

SetVerification configures the reader to check the hash on Close and call onMismatch if the digest doesn’t match. This allows the caller to trigger cleanup of corrupted copies after streaming completes.

func (*VerifyingReader) Verify

func (vr *VerifyingReader) Verify(expected string) error

Verify checks the computed hash against the expected hex digest. Returns nil if they match, or an error describing the mismatch. Returns nil if expected is empty (object has no stored hash).

Generated by gomarkdoc