Worker¶
sheaf.worker ¶
Async job-queue worker — long-running inference decoupled from HTTP.
For jobs where request/response is the wrong shape (FLUX 50-step,
GraphCast multi-day rollouts, batch SDXL): clients enqueue a
typed request and either poll the result store or wait for a webhook.
Public surface::
from sheaf.worker import (
SheafWorker,
WorkerSpec,
JobQueueClient,
JobQueue,
ResultStore,
RedisStreamsQueue,
RedisHashResultStore,
Job,
JobResult,
)
Install with: pip install 'sheaf-serve[worker]'
Job ¶
Bases: BaseModel
A dequeued job — request + delivery metadata.
JobQueue ¶
Bases: ABC
Abstract job queue. Subclass to add new backends (SQS, Kafka, ...).
enqueue
abstractmethod
¶
Add a job to the queue. Returns the job_id.
dequeue
abstractmethod
¶
dequeue(block_ms: int) -> Job | None
Block up to block_ms for the next job. Returns None on timeout.
ack
abstractmethod
¶
ack(job: Job) -> None
Confirm successful processing — removes the job from in-flight tracking.
nack
abstractmethod
¶
nack(job: Job) -> None
Mark the job as failed — leaves it for retry by the consumer group.
dead_letter
abstractmethod
¶
dead_letter(job: Job, reason: str) -> None
Move the job to the dead-letter queue with a failure reason.
queue_depth ¶
Approximate pending+inflight count, or None if unsupported.
JobQueueClient ¶
JobQueueClient(queue: JobQueue, results: ResultStore)
Application-side helper: enqueue jobs, optionally wait for results.
Wraps a JobQueue (for enqueue) and a ResultStore (for poll).
Validates the request against AnyRequest before enqueue so
callers see schema errors immediately, not in a worker log later.
enqueue ¶
enqueue(request: dict | BaseModel, webhook_url: str | None = None, job_id: str | None = None) -> str
Validate + submit a request. Returns the assigned job_id.
JobResult ¶
Bases: BaseModel
Result record written to the result store.
RedisHashResultStore ¶
RedisHashResultStore(prefix: str = 'sheaf:result', url: str = 'redis://localhost:6379/0', ttl_seconds: int | None = 86400, client: Any = None)
Bases: ResultStore
Redis hash-per-job result store.
Each result is stored under the key f"{prefix}:{job_id}" as a
Redis hash with fields status, response, error,
completed_at. Optional TTL (ttl_seconds) lets results
auto-expire so the store doesn't grow unbounded.
RedisStreamsQueue ¶
RedisStreamsQueue(stream: str, group: str, consumer: str, url: str = 'redis://localhost:6379/0', dead_letter_stream: str | None = None, client: Any = None)
Bases: JobQueue
Redis Streams + consumer groups job queue.
Multiple workers using the same stream + group form a
horizontal-scaling consumer pool: each job is delivered to exactly
one consumer. Jobs that exceed max_retries (decided by the
worker, not the queue) are XADDed to a separate dead-letter stream
via :meth:dead_letter.
Notes
- The stream and the consumer group are created lazily on first
use (idempotent
XGROUP CREATE ... MKSTREAM). XACKis the only way a job leaves "pending" state for this consumer; the worker calls :meth:ackonly after the result is persisted to the result store, so a crash between infer and ack causes redelivery (at-least-once).redis-pyis imported lazily — the[worker]extra addsredis>=5.0butsheaf.worker.queueshould be importable for type-only usage without it.
ResultStore ¶
Bases: ABC
Abstract result store. Subclass to add new backends (S3, Postgres, ...).
SheafWorker ¶
SheafWorker(spec: WorkerSpec, backend: ModelBackend | None = None)
Async-job consumer for any ModelBackend.
Example::
from sheaf.api.base import ModelType
from sheaf.worker import (
SheafWorker, WorkerSpec, RedisStreamsQueue, RedisHashResultStore,
)
spec = WorkerSpec(
name="flux-worker",
model_type=ModelType.DIFFUSION,
backend="flux",
backend_kwargs={"model_id": "black-forest-labs/FLUX.1-schnell"},
queue=RedisStreamsQueue(
stream="sheaf:flux", group="workers", consumer="w1"
),
results=RedisHashResultStore(prefix="sheaf:flux:result"),
max_retries=3,
)
SheafWorker(spec).start() # blocks until SIGINT
start ¶
Run the consume loop until SIGINT/SIGTERM.
Blocks the calling thread. After receiving a stop signal, the worker finishes its current job (if any) before returning.
WorkerSpec ¶
Bases: BaseModel
Declares an async worker process consuming jobs from a queue.
Example::
from sheaf.worker import WorkerSpec, RedisStreamsQueue, RedisHashResultStore
spec = WorkerSpec(
name="flux-worker",
model_type=ModelType.DIFFUSION,
backend="flux",
backend_kwargs={"model_id": "black-forest-labs/FLUX.1-schnell"},
queue=RedisStreamsQueue(
stream="sheaf:flux", group="workers", consumer="w1"
),
results=RedisHashResultStore(prefix="sheaf:flux:result"),
max_retries=3,
)
SheafWorker(spec).start()
backend / backend_cls / backend_kwargs follow the same
semantics as on ModelSpec: backend_cls takes precedence over
the registry lookup when set.