s3-orchestrator

counter

import "github.com/afreidah/s3-orchestrator/internal/counter"

Package counter provides usage tracking with per-backend atomic counters and monthly limit enforcement. Supports local in-memory counters and Redis-backed shared counters for multi-instance deployments.

Index

Constants

Counter field names used as keys in CounterBackend operations.

const (
    FieldAPIRequests  = "api_requests"
    FieldEgressBytes  = "egress_bytes"
    FieldIngressBytes = "ingress_bytes"
)

func CurrentPeriod

func CurrentPeriod() string

CurrentPeriod returns the current month as “YYYY-MM” for usage aggregation.

type CounterBackend

CounterBackend abstracts the storage of per-backend usage deltas. Each backend (identified by name) tracks three counters: API requests, egress bytes, and ingress bytes. Implementations must be safe for concurrent use.

type CounterBackend interface {
    // Backends returns the names of all tracked backends.
    Backends() []string

    // Add increments a single counter field for the given backend.
    Add(backend, field string, delta int64)

    // Load returns the current value of a single counter field.
    Load(backend, field string) int64

    // Swap atomically reads and resets a single counter field, returning
    // the value immediately before the reset.
    Swap(backend, field string) int64

    // AddAll increments all three counter fields for the given backend in
    // a single call. Implementations may pipeline the operations.
    AddAll(backend string, apiReqs, egress, ingress int64)

    // LoadAll reads all three counter fields for the given backend in a
    // single call. Implementations may pipeline the operations.
    LoadAll(backend string) LoadAllResult
}

type LoadAllResult

LoadAllResult holds the values returned by CounterBackend.LoadAll.

type LoadAllResult struct {
    APIRequests  int64
    EgressBytes  int64
    IngressBytes int64
}

type LocalCounterBackend

LocalCounterBackend stores per-backend usage deltas in local atomic counters. Safe for concurrent use.

type LocalCounterBackend struct {
    // contains filtered or unexported fields
}

func NewLocalCounterBackend

func NewLocalCounterBackend(backendNames []string) *LocalCounterBackend

NewLocalCounterBackend creates a local counter backend pre-initialized with the given backend names.

func (*LocalCounterBackend) Add

func (l *LocalCounterBackend) Add(backend, field string, delta int64)

Add increments a single counter field for a backend.

func (*LocalCounterBackend) AddAll

func (l *LocalCounterBackend) AddAll(backend string, apiReqs, egress, ingress int64)

Add increments a single counter field for a backend. AddAll increments all three counter fields (API requests, egress, ingress) atomically. AddAll add all. AddAll add all.

func (*LocalCounterBackend) Backends

func (l *LocalCounterBackend) Backends() []string

Backends returns the list of backend names this counter tracks.

func (*LocalCounterBackend) Load

func (l *LocalCounterBackend) Load(backend, field string) int64

Load returns the current value of a counter field.

func (*LocalCounterBackend) LoadAll

func (l *LocalCounterBackend) LoadAll(backend string) LoadAllResult

Load returns the current value of a counter field. LoadAll returns all three counter values for a backend. LoadAll load all. LoadAll load all.

func (*LocalCounterBackend) Swap

func (l *LocalCounterBackend) Swap(backend, field string) int64

Swap atomically reads and resets a counter field, returning the old value.

func (*LocalCounterBackend) SwapAll

func (l *LocalCounterBackend) SwapAll(backend string) LoadAllResult

SwapAll atomically reads and resets all three counter fields for a backend, returning the old values. Each field is independently atomic.

func (*LocalCounterBackend) SwapAllBackends

func (l *LocalCounterBackend) SwapAllBackends() map[string]LoadAllResult

SwapAllBackends atomically reads and resets counters for every backend in a single operation by swapping the entire map. Returns the old values keyed by backend name. This avoids the race where per-backend SwapAll calls allow concurrent Add calls to slip between swaps.

type RedisClient

RedisClient abstracts the Redis operations used by RedisCounterBackend. The production implementation is *redis.Client; tests provide a mock.

type RedisClient interface {
    IncrBy(ctx context.Context, key string, value int64) *redis.IntCmd
    Get(ctx context.Context, key string) *redis.StringCmd
    GetSet(ctx context.Context, key string, value any) *redis.StringCmd
    Del(ctx context.Context, keys ...string) *redis.IntCmd
    Expire(ctx context.Context, key string, expiration time.Duration) *redis.BoolCmd
    Ping(ctx context.Context) *redis.StatusCmd
    Pipeline() redis.Pipeliner
    Close() error
}

type RedisCounterBackend

RedisCounterBackend stores per-backend usage deltas in Redis for cross-instance visibility. Falls back to local counters when Redis is unavailable.

type RedisCounterBackend struct {
    // contains filtered or unexported fields
}

func NewRedisCounterBackend

func NewRedisCounterBackend(client RedisClient, cfg *config.RedisConfig, backendNames []string) (*RedisCounterBackend, error)

NewRedisCounterBackend creates a shared counter backend backed by Redis. Pings Redis on creation; returns an error if Redis is unreachable (a configured dependency must be available at boot). Starts a background health probe goroutine.

func (*RedisCounterBackend) Add

func (r *RedisCounterBackend) Add(backend, field string, delta int64)

Add increments a single counter field in Redis, falling back to local on error.

func (*RedisCounterBackend) AddAll

func (r *RedisCounterBackend) AddAll(backend string, apiReqs, egress, ingress int64)

AddAll increments all three counter fields in a Redis pipeline.

func (*RedisCounterBackend) Backends

func (r *RedisCounterBackend) Backends() []string

Backends returns the list of backend names this counter tracks.

func (*RedisCounterBackend) Close

func (r *RedisCounterBackend) Close() error

Close stops the health probe goroutine and closes the Redis client. Safe to call multiple times.

func (*RedisCounterBackend) IsHealthy

func (r *RedisCounterBackend) IsHealthy() bool

IsHealthy returns true when Redis is reachable and the circuit is closed.

func (*RedisCounterBackend) Load

func (r *RedisCounterBackend) Load(backend, field string) int64

Load reads a counter field from Redis, falling back to local on error.

func (*RedisCounterBackend) LoadAll

func (r *RedisCounterBackend) LoadAll(backend string) LoadAllResult

LoadAll reads all three counter values from Redis in a pipeline.

func (*RedisCounterBackend) Swap

func (r *RedisCounterBackend) Swap(backend, field string) int64

Swap atomically reads and resets a counter field via Redis GETSET.

type UsageTracker

UsageTracker tracks per-backend usage counters, enforces monthly usage limits, and flushes accumulated deltas to the database. Counter storage is delegated to a CounterBackend (local atomics or shared Redis). The limits and baseline maps live behind atomic.Pointer snapshots so reads require no locking and writes copy-on-write.

type UsageTracker struct {
    // contains filtered or unexported fields
}

func NewUsageTracker

func NewUsageTracker(backend CounterBackend, limits map[string]core.UsageLimits) *UsageTracker

NewUsageTracker creates a usage tracker with the given counter backend and per-backend limits. The counter backend determines whether deltas are stored locally (default) or in a shared store like Redis.

func (*UsageTracker) Backend

func (u *UsageTracker) Backend() CounterBackend

Backend returns the underlying CounterBackend (local or Redis).

func (*UsageTracker) BackendsWithinLimits

func (u *UsageTracker) BackendsWithinLimits(order []string, apiCalls, egress, ingress int64) []string

BackendsWithinLimits returns the subset of the given order whose backends are within their monthly usage limits for the proposed operation dimensions. Loads the limits and baseline snapshots ONCE so every per-backend check sees a consistent view; with the old RWMutex pair each WithinLimits call could see a different snapshot if a writer landed between iterations.

func (*UsageTracker) FlushUsage

func (u *UsageTracker) FlushUsage(ctx context.Context, store usageFlusher, skip map[string]bool) error

FlushUsage reads and resets the counter backend, then writes the accumulated deltas to the database. Called periodically (every 30s). On DB error, deltas are added back to avoid data loss. Backends in the skip set have their counters discarded (used for drained backends whose DB records are gone).

func (*UsageTracker) GetLimits

func (u *UsageTracker) GetLimits() map[string]core.UsageLimits

GetLimits returns a shallow copy of the current per-backend usage limits. The snapshot itself is immutable, but a defensive copy keeps the API contract from leaking the live snapshot to callers that might mutate it.

func (*UsageTracker) NearLimit

func (u *UsageTracker) NearLimit(threshold float64) bool

NearLimit returns true if any backend’s effective usage (baseline + unflushed) exceeds the given threshold ratio for any non-zero limit dimension. Used by adaptive flushing to shorten the flush interval when enforcement accuracy matters. Loads both snapshots once so the loop sees a consistent view.

func (*UsageTracker) Record

func (u *UsageTracker) Record(backendName string, apiCalls, egress, ingress int64)

Record increments the usage counters for a backend.

func (*UsageTracker) ResetBaselines

func (u *UsageTracker) ResetBaselines(names []string)

ResetBaselines zeroes out the baseline for the given backend names via copy-on-write swap.

func (*UsageTracker) SetBaseline

func (u *UsageTracker) SetBaseline(name string, stat core.UsageStat)

SetBaseline updates the cached DB usage baseline for a single backend via copy-on-write swap.

func (*UsageTracker) UpdateLimits

func (u *UsageTracker) UpdateLimits(limits map[string]core.UsageLimits)

UpdateLimits replaces the per-backend usage limits via copy-on-write swap. Safe to call concurrently with request handling: in-flight WithinLimits calls keep using whichever snapshot they loaded.

func (*UsageTracker) WithinLimits

func (u *UsageTracker) WithinLimits(backendName string, apiCalls, egress, ingress int64) bool

WithinLimits checks whether the proposed operation would keep the given backend within its configured monthly usage limits. It computes:

effective = baseline (from DB) + unflushed counter + proposed

Returns true if no non-zero limit is exceeded.

Enforcement is approximate: the snapshot pair (limits, baseline) is read separately from the live counter, so concurrent requests may all pass the check and collectively exceed the limit by a small margin. This is intentional - exact enforcement would require a mutex on every request. The overshoot is bounded by one flush interval worth of concurrent traffic, and s3o_usage_limit_rejections_total tracks when limits are actively enforced.

Generated by gomarkdoc