s3-orchestrator

multipart

import "github.com/afreidah/s3-orchestrator/internal/proxy/multipart"

Index

Constants

Default cleanup parameters used when the config does not override them.

const (
    DefaultMultipartStaleTimeout = 24 * time.Hour
    DefaultMultipartCleanupTick  = 1 * time.Hour
)

func NewCleanupService

func NewCleanupService(mgr *Manager, locker tickrunner.AdvisoryLocker, staleTimeout time.Duration) lifecycle.Runner

NewCleanupService constructs the multipart-cleanup background service backed by mgr.CleanupStaleMultipartUploads.

type Manager

Manager handles the multipart upload lifecycle.

dekCache holds the unwrapped per-upload DEK keyed by uploadID so an instance that handles many UploadPart calls for the same upload pays for the KeyProvider unwrap round-trip once. The cache lifetime is pegged to the multipart stale-upload sweep interval so an abandoned upload’s DEK does not linger in memory beyond its server-side existence. Concurrent UploadPart calls on the same uploadID with a cold cache will each issue their own Unwrap; the design accepts that minor cold-start cost in exchange for not pulling in singleflight.

type Manager struct {
    // contains filtered or unexported fields
}

func New

func New(core MultipartCore, coord *writepath.Coordinator, stores core.MetadataStore, encryptor *encryption.Encryptor, objectCache objcache.ObjectCache, dekCacheTTL time.Duration, integrityCfg *syncutil.AtomicConfig[config.IntegrityConfig]) *Manager

New creates a Manager sharing the given core infrastructure and write coordinator. All dependencies must be non-nil; nothing is patched in post-construction. integrityCfg is nil-safe - when nil or disabled, CompleteMultipartUpload skips the plaintext-hash tee that populates content_hash on the recorded location (#916). The component-scoped logger is built in the constructor body per the project’s logging convention.

func (*Manager) AbortMultipartUpload

func (mp *Manager) AbortMultipartUpload(ctx context.Context, bucket, key, uploadID string) error

AbortMultipartUpload cleans up an in-progress multipart upload, removing all part objects from the backend and the upload records from the database. The bucket/key arguments scope the operation to the requesting client’s URL, matching them against the stored ObjectKey via validateMultipartScope so a caller for one bucket cannot abort an upload that belongs to another.

func (*Manager) AbortMultipartUploadsOnBackend

func (mp *Manager) AbortMultipartUploadsOnBackend(ctx context.Context, backendName string)

AbortMultipartUploadsOnBackend aborts all in-progress multipart uploads on the given backend.

func (*Manager) CleanupStaleMultipartUploads

func (mp *Manager) CleanupStaleMultipartUploads(ctx context.Context, olderThan time.Duration)

CleanupStaleMultipartUploads aborts multipart uploads older than the given duration. Run periodically to prevent quota leaks from abandoned uploads.

func (*Manager) Close

func (mp *Manager) Close()

Close stops the per-upload DEK cache eviction loop.

func (*Manager) CompleteMultipartUpload

func (mp *Manager) CompleteMultipartUpload(ctx context.Context, bucket, key, uploadID string, partNumbers []int) (string, error)

CompleteMultipartUpload reassembles parts into the final object. Downloads each part, concatenates them into a single upload, cleans up temp keys, and records the final object location with quota tracking.

The body runs under a session-scoped advisory lock keyed by uploadID so two concurrent Complete calls for the same upload cannot both stream parts and PUT the assembled object on top of each other (which would leave the backend bytes and the metadata row pointing at different writers). When the lock is contended the second caller fails fast with a 409 OperationAborted so the client can decide whether to retry or abort.

func (*Manager) CreateMultipartUpload

func (mp *Manager) CreateMultipartUpload(ctx context.Context, key, contentType string, metadata map[string]string) (string, string, error)

CreateMultipartUpload initiates a multipart upload by selecting a backend with available quota and recording the upload in the database. When proxy-side encryption is configured, a single DEK is wrapped once here and persisted on the multipart_uploads row so every subsequent UploadPart can reuse it without paying its own KeyProvider round-trip (this is the shared-DEK invariant CompleteMultipartUpload also depends on).

func (*Manager) GetParts

func (mp *Manager) GetParts(ctx context.Context, bucket, key, uploadID string) ([]core.MultipartPart, error)

GetParts returns all parts for a multipart upload.

func (*Manager) ListMultipartUploads

func (mp *Manager) ListMultipartUploads(ctx context.Context, prefix string, maxUploads int) ([]core.MultipartUpload, error)

ListMultipartUploads returns active multipart uploads matching the given prefix, up to maxUploads results. Pass-through to the metadata store.

func (*Manager) UnwrapUploadDEK

func (mp *Manager) UnwrapUploadDEK(ctx context.Context, mu *core.MultipartUpload) (dek, wrappedDEK []byte, baseNonce []byte, err error)

UnwrapUploadDEK returns the unwrapped DEK for a multipart upload, caching the result for the lifetime of the upload so subsequent UploadParts on this instance do not re-issue the KeyProvider round- trip. Returns the unwrapped DEK and the wrapped form (for write-path metadata that needs the wrapped value).

func (*Manager) UploadPart

func (mp *Manager) UploadPart(ctx context.Context, bucket, key, uploadID string, partNumber int, body io.Reader, size int64) (string, error)

UploadPart uploads a single part to the backend. Parts are stored under a temporary key prefix and reassembled on completion.

type MultipartCore

MultipartCore is the subset of *infra.Core the multipart Manager needs.

type MultipartCore interface {
    GetBackend(name string) (backend.ObjectBackend, error)
    Usage() *counter.UsageTracker // still needed for WithinLimits pre-flight checks; per-backend Record calls flow through Acct
    WithTimeout(ctx context.Context) (context.Context, context.CancelFunc)
    ClassifyWriteError(span trace.Span, operation string, err error) error
    Acct() *accounting.Recorder
}

Generated by gomarkdoc