s3-orchestrator

reconcile

import "github.com/afreidah/s3-orchestrator/internal/proxy/reconcile"

Index

func DeleteHandler

func DeleteHandler(log *slog.Logger, backendName string, deleter DeleterFn, result *Result) func(context.Context, string) error

DeleteHandler returns the onDelete callback used by the merge. Failures are logged but do not abort the pass.

func ImportHandler

func ImportHandler(log *slog.Logger, backendName string, importer ImporterFn, result *Result) func(context.Context, Entry) error

ImportHandler returns the onImport callback used by the merge. Failures are logged but do not abort the reconcile pass - a single import failure should not stop the diff for thousands of other keys.

func SiblingPrefixes

func SiblingPrefixes(knownBuckets []string, current string) []string

SiblingPrefixes returns the bucket-prefix list (each suffixed with ‘/’) for every known bucket except the one currently being reconciled. Used by ReconcileBackend so the merge skips keys that belong to sibling virtual buckets stored on the same backend.

func Sorted

func Sorted(ctx context.Context, s3, dbIter keySource, onImport func(ctx context.Context, e Entry) error, onDelete func(ctx context.Context, key string) error) error

Sorted walks two ascending key streams in lockstep, invoking onImport for keys present only on s3 and onDelete for keys present only in the DB. Keys present on both sides are no-ops. Memory is bounded by each iterator’s internal buffer.

The first failing onImport / onDelete bubbles up; iterator errors do the same.

type DBCursorStream

DBCursorStream walks store.ListObjectsByBackendKeyAsc one bounded page at a time and filters rows to those belonging to bucketPrefix. Keys for sibling buckets stored on the same backend are skipped.

type DBCursorStream struct {
    // contains filtered or unexported fields
}

func NewDBCursorStream

func NewDBCursorStream(s DBKeyLister, backendName, bucketPrefix string, otherPrefixes []string) *DBCursorStream

NewDBCursorStream prepares the iterator without issuing any query yet - the first next call pulls the first page.

func (*DBCursorStream) Next

func (d *DBCursorStream) Next(ctx context.Context) (Entry, bool, error)

next returns the next bucket-scoped row from the DB cursor, fetching a fresh bounded page when the in-memory buffer drains. Rows for sibling buckets stored on the same backend are skipped silently. Returns (zero, false, nil) at end-of-stream, never blocks on the DB once exhausted.

func (*DBCursorStream) Stop

func (d *DBCursorStream) Stop()

stop is a no-op for the DB cursor - the iterator owns no goroutine and holds no other resource that needs explicit teardown. Defined so the type satisfies keySource alongside S3KeyStream, which does need cleanup.

type DBKeyLister

DBKeyLister is the narrow contract the cursor needs from the store.

type DBKeyLister interface {
    ListObjectsByBackendKeyAsc(ctx context.Context, backendName, afterKey string, limit int) ([]core.ObjectLocation, error)
}

type DeleterFn

DeleterFn removes a metadata row whose backend confirmed it does not hold the key. Carrier type so tests can substitute a fake deleter.

type DeleterFn func(ctx context.Context, key, backendName string) error

type Entry

Entry is the unit consumed by the merge: a key already namespaced to the current virtual bucket and its size on whichever side produced it.

type Entry struct {
    // contains filtered or unexported fields
}

type ImporterFn

ImporterFn imports a backend-listed key into the metadata store. Returns (inserted, error). inserted is false when the row already existed, which the reconciler treats as a benign no-op. Carrier type so tests can substitute a fake importer.

type ImporterFn func(ctx context.Context, key, backendName string, size int64) (bool, error)

type ObjectLister

ObjectLister is the narrow surface of *backend.S3Backend that S3KeyStream depends on. Defining it here keeps the iterator decoupled from the concrete S3 client and lets tests substitute a fake.

type ObjectLister interface {
    ListObjects(ctx context.Context, prefix string, fn func([]backend.ListedObject) error) error
}

type Result

Result is the in-progress accumulator handed to the import / delete callbacks. Promoted to a struct so tests can assert on it without importing internal/worker.

type Result struct {
    Imported int64
    Removed  int64
}

type S3KeyStream

S3KeyStream inverts the page-callback shape of ObjectLister.ListObjects into a forward iterator. A single goroutine drives the callback, dropping keys that belong to other virtual buckets and namespacing the rest under bucketPrefix. apiPages, when non-nil, is incremented per page so the caller can record API usage.

type S3KeyStream struct {
    // contains filtered or unexported fields
}

func NewS3KeyStream

func NewS3KeyStream(ctx context.Context, s3b ObjectLister, bucketPrefix string, otherPrefixes []string, apiPages *int64) *S3KeyStream

NewS3KeyStream starts the goroutine that walks the backend and returns a keySource. The caller must invoke stop when done so a partial walk does not leak goroutines.

func (*S3KeyStream) Next

func (s *S3KeyStream) Next(ctx context.Context) (Entry, bool, error)

next pulls the next entry off the streaming channel that the background goroutine fills with backend-listed keys. Returns (entry, true, nil) on a successful read, (zero, false, nil) on graceful end-of-stream, or (zero, false, err) on either a producer error or a context cancellation. Once an error is observed it is latched into s.pending so subsequent calls see the same error instead of an empty channel.

func (*S3KeyStream) Stop

func (s *S3KeyStream) Stop()

stop cancels the producer goroutine if it is still running. Idempotent via closeOnce so multiple stop calls (Reconcile early-exits, deferred cleanup, error paths) do not double-close the cancel func.

Generated by gomarkdoc