s3-orchestrator

writepath

import "github.com/afreidah/s3-orchestrator/internal/proxy/writepath"

Index

Variables

ErrMoveStale signals MoveObject was raced: MoveObjectLocation returned movedSize=0, meaning another process (or the same caller from a prior tick) already moved or deleted the object. The destination has had its now-orphaned bytes enqueued for cleanup via req.StaleOrphanReason. Callers treat this as a no-op rather than a failure - increment a “stale” / “skipped” counter rather than an error counter.

var ErrMoveStale = errors.New("object already moved or deleted")

type Coordinator

Coordinator bundles the infrastructure subset (WritepathCore) with the metadata-store contract and the pending-pattern flag so the write-path helpers can be expressed as plain methods on a value owned by BackendManager. The managers hold a *Coordinator rather than a *BackendManager back-pointer, eliminating the post-construction wiring step.

type Coordinator struct {
    // contains filtered or unexported fields
}

func New

func New(core WritepathCore, stores core.MetadataStore, pendingEnabled bool) *Coordinator

New constructs a Coordinator. The supplied core must observe the same admission, usage, drain, and backend state as the *infra.Core embedded in BackendManager (in production they are the same instance). The component-scoped logger is built in the constructor body per the project’s logging convention.

func (*Coordinator) DeleteOrEnqueue

func (w *Coordinator) DeleteOrEnqueue(ctx context.Context, be backend.ObjectBackend, backendName, key, reason string, sizeBytes int64)

DeleteOrEnqueue attempts to delete an object from a backend. On failure it logs a warning and enqueues the key for background retry. The standard “best-effort orphan cleanup” primitive used throughout the manager: rebalancer, replicator, multipart cleanup, and delete paths. sizeBytes is tracked as orphan bytes when the delete is enqueued. Always accounts for the cleanup DELETE as one API call against the backend’s usage counter, regardless of success or failure (the HTTP call to the backend was made either way).

func (*Coordinator) EnqueueCleanup

func (w *Coordinator) EnqueueCleanup(ctx context.Context, backendName, objectKey, reason string, sizeBytes int64)

EnqueueCleanup adds a failed cleanup operation to the retry queue and increments orphan_bytes so the write path accounts for the physically unreleased space. Best-effort: if the enqueue or orphan update fails (e.g. DB down), logs the error and moves on since the circuit breaker is already handling DB outages.

Failures here mean a backend object exists with no entry in the cleanup queue (stage=“enqueue”) or no matching orphan_bytes increment (stage=“orphan_bytes”). Both failure modes increment s3o_cleanup_enqueue_failures_total and emit a storage.OrphanEnqueueFailed audit event so operators can pivot from “metric incremented” to the exact backend/key/size, then run POST /admin/api/reconcile to recover untracked orphans once DB connectivity is restored. See docs/admin-guide.md for the runbook.

func (*Coordinator) InsertPendingIntent

func (w *Coordinator) InsertPendingIntent(ctx context.Context, key, backendName string, size int64, enc *core.EncryptionMeta) (string, error)

InsertPendingIntent records an in-flight PUT intent before the backend upload. Returns the generated intent ID, or empty string if no pending store is configured (in which case the legacy delete-on-record-failure path remains in effect for that PUT). A failure to insert the intent while pending tracking is configured fails the PUT - proceeding without the intent would reintroduce the data-loss window the pattern exists to close.

func (*Coordinator) MoveObject

func (w *Coordinator) MoveObject(ctx context.Context, req *MoveRequest) (int64, error)

MoveObject performs a single src -> dest object move with cleanup semantics that drain and rebalance share: StreamCopy the source body to dest, atomic MoveObjectLocation CAS, orphan cleanup on dest if the CAS errors, stale-orphan cleanup on dest if the CAS reports a raced row, and on success a source-side DeleteOrEnqueue plus the canonical accounting (Egress on src + Ingress on dest).

DeleteOrEnqueue owns the per-backend DELETE API-call tick (#881 / #917) so this method does NOT call Acct().APICall(…) on the destination cleanup or the source delete.

Returns:

  • (movedSize, nil) on success
  • (0, ErrMoveStale) when MoveObjectLocation returned movedSize=0
  • (0, err) wrapping the underlying StreamCopy / MoveObjectLocation failure for every other failure mode

func (*Coordinator) RecordObjectAndPromoteIntent

func (w *Coordinator) RecordObjectAndPromoteIntent(ctx context.Context, span trace.Span, key, backendName string, size int64, enc *core.EncryptionMeta, intentID string) error

RecordObjectAndPromoteIntent commits the object location, updates quota, and clears the pending intent in a single transaction. On failure, the pending row is left in place and the backend bytes are NOT deleted: the pending reaper resolves the intent on a later tick by HEADing the backend, promoting the metadata if the bytes are present and removing the intent if they are absent.

When intentID is empty (no pending store configured) this falls back to the legacy RecordObjectOrCleanup behavior so existing call sites and tests retain their previous semantics.

func (*Coordinator) RecordObjectOrCleanup

func (w *Coordinator) RecordObjectOrCleanup(ctx context.Context, span trace.Span, be backend.ObjectBackend, key, backendName string, size int64, enc *core.EncryptionMeta) error

RecordObjectOrCleanup calls RecordObject and, on failure, deletes the orphaned object from the backend. On success, enqueues cleanup for any displaced copies on other backends (from overwrites). Updates the tracing span on error.

func (*Coordinator) RecoverFromRecordFailure

func (w *Coordinator) RecoverFromRecordFailure(ctx context.Context, be backend.ObjectBackend, backendName, key, cleanupReason string, size int64)

RecoverFromRecordFailure runs the post-record-failure cleanup sequence shared by RecordObjectOrCleanup and the multipart UploadPart record path. Accounts for both API calls the failure path made (the original PUT and the cleanup DELETE) regardless of whether the cleanup succeeds. On cleanup failure the orphan is enqueued for the cleanup-queue worker with the supplied reason. A backend 404 is treated as idempotent success and skips the enqueue so the cleanup queue does not collect phantom rows for objects the backend already agrees are gone (#880 completes the GH_ISSUE_843 story). Callers are responsible for the failure log message and span status before/after this call.

func (*Coordinator) SelectBackendForWrite

func (w *Coordinator) SelectBackendForWrite(ctx context.Context, size int64, eligible []string) (string, error)

SelectBackendForWrite picks the target backend for a write operation using the configured routing strategy. “pack” returns the first backend with space, “spread” returns the least-utilized backend.

func (*Coordinator) SelectWriteTarget

func (w *Coordinator) SelectWriteTarget(ctx context.Context, span trace.Span, operation string, size int64) (string, error)

SelectWriteTarget picks a backend for a write operation, combining eligibility filtering, backend selection, and error classification into a single call. Returns ErrInsufficientStorage when no backend can accept the write, or the classified error from the routing query.

func (*Coordinator) SetPendingEnabledForTest

func (w *Coordinator) SetPendingEnabledForTest(enabled bool)

SetPendingEnabledForTest toggles the pending-pattern flag. Test-only: production wiring always passes the flag through New.

type MoveRequest

MoveRequest bundles the inputs to a single src -> dest object move. Callers supply distinct cleanup-queue reason strings per failure mode so a future operator triaging the cleanup_queue can tell which subsystem orphaned each row.

type MoveRequest struct {
    // Key is the object key being moved.
    Key string
    // SizeBytes is the size as known to the caller before the move
    // runs. Used for the orphan-cleanup paths where MoveObjectLocation
    // did not return the row's actual size. The success path uses the
    // authoritative movedSize from MoveObjectLocation instead.
    SizeBytes int64

    SrcBackend  backend.ObjectBackend
    SrcName     string
    DestBackend backend.ObjectBackend
    DestName    string

    // OrphanReason names the cleanup-queue reason used when
    // MoveObjectLocation errors after the destination PUT has landed.
    OrphanReason string
    // StaleOrphanReason names the cleanup-queue reason used when
    // MoveObjectLocation returns movedSize=0 (another process won the
    // race; the destination bytes are now orphaned).
    StaleOrphanReason string
    // SourceDeleteReason names the cleanup-queue reason used for the
    // successful-move source delete.
    SourceDeleteReason string
}

type WritepathCore

WritepathCore is the subset of *infra.Core the Coordinator needs.

type WritepathCore interface {
    Backends() map[string]backend.ObjectBackend
    RoutingStrategy() config.RoutingStrategy
    EligibleForWrite(apiCalls, egress, ingress int64) []string
    ClassifyWriteError(span trace.Span, operation string, err error) error
    DeleteWithTimeout(ctx context.Context, be backend.ObjectBackend, key string) error
    StreamCopy(ctx context.Context, src, dst backend.ObjectBackend, key string) error
    Acct() *accounting.Recorder
}

Generated by gomarkdoc