gabsurd/task
Task lifecycle operations for the Absurd durable workflow system. Provides high-level functions for spawning, claiming, completing, failing, and cancelling tasks.
Types
Cancellation policy for a task.
pub type Cancellation {
Cancellation(max_duration: Int)
}
Constructors
-
Cancellation(max_duration: Int)Cancel the task if it runs longer than
max_durationseconds.
Information returned when claiming a task.
pub type Claim {
Claim(
run_id: BitArray,
task_id: BitArray,
attempt: Int,
task_name: String,
params: String,
retry_strategy: String,
max_attempts: Int,
headers: String,
wake_event: String,
event_payload: String,
)
}
Constructors
-
Claim( run_id: BitArray, task_id: BitArray, attempt: Int, task_name: String, params: String, retry_strategy: String, max_attempts: Int, headers: String, wake_event: String, event_payload: String, )
Retry strategy for failed tasks.
pub type RetryStrategy {
FixedRetry(base_seconds: Int)
ExponentialRetry(
base_seconds: Int,
factor: Float,
max_seconds: option.Option(Float),
)
}
Constructors
-
FixedRetry(base_seconds: Int)Retry with a fixed delay between attempts.
-
ExponentialRetry( base_seconds: Int, factor: Float, max_seconds: option.Option(Float), )Retry with exponential backoff.
base_secondsis the initial delay,factoris the multiplier,max_secondscaps the delay.
Information returned when spawning a task.
pub type SpawnInfo {
SpawnInfo(
task_id: BitArray,
run_id: BitArray,
attempt: Int,
created: Bool,
)
}
Constructors
-
SpawnInfo( task_id: BitArray, run_id: BitArray, attempt: Int, created: Bool, )
Options for spawning or retrying a task.
Create with new_options() and customize with with_* functions.
Example
let opts =
task.new_options()
|> task.with_max_attempts(3)
|> task.with_retry_strategy(task.FixedRetry(base_seconds: 30))
|> task.with_idempotency_key("order-123")
pub type SpawnOptions {
SpawnOptions(
max_attempts: option.Option(Int),
retry_strategy: option.Option(RetryStrategy),
cancellation: option.Option(Cancellation),
headers: option.Option(json.Json),
idempotency_key: option.Option(String),
)
}
Constructors
-
SpawnOptions( max_attempts: option.Option(Int), retry_strategy: option.Option(RetryStrategy), cancellation: option.Option(Cancellation), headers: option.Option(json.Json), idempotency_key: option.Option(String), )
Result of a completed task.
pub type TaskResult {
TaskResult(
state: String,
result: String,
failure_reason: String,
)
}
Constructors
-
TaskResult(state: String, result: String, failure_reason: String)
Values
pub fn cancel(
db: client.Db,
queue_name: String,
task_id: BitArray,
) -> Result(Nil, client.GabsurdError)
Cancel a task by its task_id.
pub fn claim(
db: client.Db,
queue_name: String,
worker_id: String,
claim_timeout: Int,
qty: Int,
) -> Result(List(Claim), client.GabsurdError)
Claim available tasks from a queue for a worker.
pub fn complete(
db: client.Db,
queue_name: String,
run_id: BitArray,
state: json.Json,
) -> Result(Nil, client.GabsurdError)
Mark a run as completed with optional result state.
pub fn encode_options(options: SpawnOptions) -> String
Encode spawn options to a JSON string for the database.
pub fn extend_claim(
db: client.Db,
queue_name: String,
run_id: BitArray,
extend_by: Int,
) -> Result(Nil, client.GabsurdError)
Extend a worker’s claim lease on a run by extend_by seconds.
This is the manual heartbeat mechanism. The primary lease extension
mechanism is checkpoint.set which calls set_task_checkpoint_state
with extend_claim_by — every checkpoint write extends the lease.
Use this function when you have long-running work between checkpoints and need to keep the lease alive.
pub fn fail(
db: client.Db,
queue_name: String,
run_id: BitArray,
reason: json.Json,
) -> Result(Nil, client.GabsurdError)
Mark a run as failed with a reason. Passes NULL for retry_at so the queue’s retry policy controls retries.
pub fn fail_with_retry(
db: client.Db,
queue_name: String,
run_id: BitArray,
reason: json.Json,
retry_at: timestamp.Timestamp,
) -> Result(Nil, client.GabsurdError)
Mark a run as failed and schedule a retry at a specific time.
pub fn get_result(
db: client.Db,
queue_name: String,
task_id: BitArray,
) -> Result(TaskResult, client.GabsurdError)
Get the result of a completed task.
pub fn new_options() -> SpawnOptions
Create empty spawn options (all fields default to absent).
pub fn retry(
db: client.Db,
queue_name: String,
task_id: BitArray,
options: SpawnOptions,
) -> Result(SpawnInfo, client.GabsurdError)
Retry a task with typed options.
pub fn schedule_run(
db: client.Db,
queue_name: String,
run_id: BitArray,
defer_seconds: Int,
) -> Result(Nil, client.GabsurdError)
Schedule a run to become available again at a future time. Used for deferring unknown tasks during rolling deployments.
defer_seconds is how many seconds from now to reschedule.
pub fn spawn(
db: client.Db,
queue_name: String,
task_name: String,
params: json.Json,
options: SpawnOptions,
) -> Result(SpawnInfo, client.GabsurdError)
Spawn a new task in a queue with typed options.
params is a json.Json value — use json.object, json.string, etc.
to build it. options is a SpawnOptions record — use new_options() and
with_* builders.
Example
let assert Ok(info) = task.spawn(
db,
"emails",
"send_welcome",
json.object([#("to", json.string("user@example.com"))]),
task.new_options() |> task.with_max_attempts(3),
)
pub fn with_cancellation(
options: SpawnOptions,
cancellation: Cancellation,
) -> SpawnOptions
Set the cancellation policy.
pub fn with_headers(
options: SpawnOptions,
headers: json.Json,
) -> SpawnOptions
Set headers (arbitrary JSON metadata).
pub fn with_idempotency_key(
options: SpawnOptions,
key: String,
) -> SpawnOptions
Set an idempotency key to prevent duplicate task creation.
pub fn with_max_attempts(
options: SpawnOptions,
max: Int,
) -> SpawnOptions
Set the maximum number of attempts for this task.
pub fn with_retry_strategy(
options: SpawnOptions,
strategy: RetryStrategy,
) -> SpawnOptions
Set the retry strategy for failed tasks.