gabsurd/worker
OTP Worker actor for the Absurd durable workflow system. Polls a queue for tasks, dispatches to registered handlers, and completes/fails tasks based on handler results.
Distributed systems behaviour
- Claim extension: The primary lease extension mechanism is
checkpoint.set, which passesclaim_timeoutasextend_claim_bytoset_task_checkpoint_state— every checkpoint write extends the lease. For handlers doing a single long operation without checkpoints, the claim timeout is the safety net: if the handler takes too long, the claim expires and another worker picks up the task. - Error backoff: On transient claim errors, backs off exponentially
up to
max_backoff(default 60s), resets on success. - Unknown task deferral: Tasks with no registered handler are deferred (rescheduled with a delay) rather than failed. This supports rolling deployments where a new task type may arrive before its handler is deployed.
- Terminal state tolerance: complete/fail errors from already- completed or already-failed runs are silently ignored (matching the official Absurd SDK behaviour).
Types
A handler for a specific task type.
Define one of these for each kind of task your workers should process.
The worker dispatches claimed tasks to the handler matching their task_name.
Example
let email_handler = worker.Handler(
task_name: "send_email",
execute: fn(ctx) {
// ... send email using ctx.params(ctx) ...
Complete(json.object([#("sent", json.bool(True))]))
},
on_error: option.None,
)
pub type Handler {
Handler(
task_name: String,
execute: fn(context.Context) -> HandlerResult,
on_error: option.Option(fn(context.Context, json.Json) -> Nil),
)
}
Constructors
-
Handler( task_name: String, execute: fn(context.Context) -> HandlerResult, on_error: option.Option(fn(context.Context, json.Json) -> Nil), )Arguments
- task_name
-
The task_name this handler responds to.
- execute
-
Execute the task. Receives a
Contextwith the claim details, database connection, and helper methods. ReturnComplete,Fail, orSuspend. - on_error
-
Optional hook called when execute returns Fail. Receives the context and the error json. Use for logging/metrics.
The result of a handler’s execute function.
Return Complete(result) to mark the task as successfully done.
Return Fail(reason) to mark the task as failed.
Return Suspend when the task is waiting for an event — the worker
will skip the complete/fail call so the run stays in sleeping state
until emit_event wakes it up.
pub type HandlerResult {
Complete(json.Json)
Fail(json.Json)
Suspend
}
Constructors
Messages the worker actor accepts.
pub type Message {
Poll
Shutdown
}
Constructors
-
Poll -
Shutdown
A running worker actor handle.
pub type Worker {
Worker(subject: process.Subject(Message))
}
Constructors
-
Worker(subject: process.Subject(Message))
Values
pub fn child_spec(
name: String,
config: Config,
) -> supervision.ChildSpecification(Worker)
Create a child spec for adding to a static_supervisor.
pub fn new(
db: client.Db,
queue_name: String,
handlers: List(Handler),
) -> Config
Create a new worker config with defaults.
pub fn pool_child_specs(
name: String,
config: Config,
count: Int,
) -> List(supervision.ChildSpecification(Worker))
Create a list of child specs for a worker pool (N workers). Each worker gets a unique worker_id incorporating a unique integer to avoid collisions between pools.
pub fn start(
config: Config,
) -> Result(actor.Started(Worker), actor.StartError)
Start a worker actor.
pub fn with_batch_size(config: Config, size: Int) -> Config
Set the batch size (tasks claimed per poll).
pub fn with_claim_timeout(
config: Config,
timeout_secs: Int,
) -> Config
Set the claim timeout in seconds.
pub fn with_max_backoff(
config: Config,
max_backoff_ms: Int,
) -> Config
Set the maximum backoff in milliseconds for retrying after claim errors. Default: 60000 (60 seconds).
pub fn with_poll_interval(
config: Config,
interval_ms: Int,
) -> Config
Set the poll interval in milliseconds.