gabsurd/task

Task lifecycle operations for the Absurd durable workflow system. Provides high-level functions for spawning, claiming, completing, failing, and cancelling tasks.

Types

Cancellation policy for a task.

pub type Cancellation {
  Cancellation(max_duration: Int)
}

Constructors

  • Cancellation(max_duration: Int)

    Cancel the task if it runs longer than max_duration seconds.

Information returned when claiming a task.

pub type Claim {
  Claim(
    run_id: BitArray,
    task_id: BitArray,
    attempt: Int,
    task_name: String,
    params: String,
    retry_strategy: String,
    max_attempts: Int,
    headers: String,
    wake_event: String,
    event_payload: String,
  )
}

Constructors

  • Claim(
      run_id: BitArray,
      task_id: BitArray,
      attempt: Int,
      task_name: String,
      params: String,
      retry_strategy: String,
      max_attempts: Int,
      headers: String,
      wake_event: String,
      event_payload: String,
    )

Retry strategy for failed tasks.

pub type RetryStrategy {
  FixedRetry(base_seconds: Int)
  ExponentialRetry(
    base_seconds: Int,
    factor: Float,
    max_seconds: option.Option(Float),
  )
}

Constructors

  • FixedRetry(base_seconds: Int)

    Retry with a fixed delay between attempts.

  • ExponentialRetry(
      base_seconds: Int,
      factor: Float,
      max_seconds: option.Option(Float),
    )

    Retry with exponential backoff. base_seconds is the initial delay, factor is the multiplier, max_seconds caps the delay.

Information returned when spawning a task.

pub type SpawnInfo {
  SpawnInfo(
    task_id: BitArray,
    run_id: BitArray,
    attempt: Int,
    created: Bool,
  )
}

Constructors

  • SpawnInfo(
      task_id: BitArray,
      run_id: BitArray,
      attempt: Int,
      created: Bool,
    )

Options for spawning or retrying a task.

Create with new_options() and customize with with_* functions.

Example

let opts =
  task.new_options()
  |> task.with_max_attempts(3)
  |> task.with_retry_strategy(task.FixedRetry(base_seconds: 30))
  |> task.with_idempotency_key("order-123")
pub type SpawnOptions {
  SpawnOptions(
    max_attempts: option.Option(Int),
    retry_strategy: option.Option(RetryStrategy),
    cancellation: option.Option(Cancellation),
    headers: option.Option(json.Json),
    idempotency_key: option.Option(String),
  )
}

Constructors

Result of a completed task.

pub type TaskResult {
  TaskResult(
    state: String,
    result: String,
    failure_reason: String,
  )
}

Constructors

  • TaskResult(state: String, result: String, failure_reason: String)

Values

pub fn cancel(
  db: client.Db,
  queue_name: String,
  task_id: BitArray,
) -> Result(Nil, client.GabsurdError)

Cancel a task by its task_id.

pub fn claim(
  db: client.Db,
  queue_name: String,
  worker_id: String,
  claim_timeout: Int,
  qty: Int,
) -> Result(List(Claim), client.GabsurdError)

Claim available tasks from a queue for a worker.

pub fn complete(
  db: client.Db,
  queue_name: String,
  run_id: BitArray,
  state: json.Json,
) -> Result(Nil, client.GabsurdError)

Mark a run as completed with optional result state.

pub fn encode_options(options: SpawnOptions) -> String

Encode spawn options to a JSON string for the database.

pub fn extend_claim(
  db: client.Db,
  queue_name: String,
  run_id: BitArray,
  extend_by: Int,
) -> Result(Nil, client.GabsurdError)

Extend a worker’s claim lease on a run by extend_by seconds.

This is the manual heartbeat mechanism. The primary lease extension mechanism is checkpoint.set which calls set_task_checkpoint_state with extend_claim_by — every checkpoint write extends the lease.

Use this function when you have long-running work between checkpoints and need to keep the lease alive.

pub fn fail(
  db: client.Db,
  queue_name: String,
  run_id: BitArray,
  reason: json.Json,
) -> Result(Nil, client.GabsurdError)

Mark a run as failed with a reason. Passes NULL for retry_at so the queue’s retry policy controls retries.

pub fn fail_with_retry(
  db: client.Db,
  queue_name: String,
  run_id: BitArray,
  reason: json.Json,
  retry_at: timestamp.Timestamp,
) -> Result(Nil, client.GabsurdError)

Mark a run as failed and schedule a retry at a specific time.

pub fn get_result(
  db: client.Db,
  queue_name: String,
  task_id: BitArray,
) -> Result(TaskResult, client.GabsurdError)

Get the result of a completed task.

pub fn new_options() -> SpawnOptions

Create empty spawn options (all fields default to absent).

pub fn retry(
  db: client.Db,
  queue_name: String,
  task_id: BitArray,
  options: SpawnOptions,
) -> Result(SpawnInfo, client.GabsurdError)

Retry a task with typed options.

pub fn schedule_run(
  db: client.Db,
  queue_name: String,
  run_id: BitArray,
  defer_seconds: Int,
) -> Result(Nil, client.GabsurdError)

Schedule a run to become available again at a future time. Used for deferring unknown tasks during rolling deployments.

defer_seconds is how many seconds from now to reschedule.

pub fn spawn(
  db: client.Db,
  queue_name: String,
  task_name: String,
  params: json.Json,
  options: SpawnOptions,
) -> Result(SpawnInfo, client.GabsurdError)

Spawn a new task in a queue with typed options.

params is a json.Json value — use json.object, json.string, etc. to build it. options is a SpawnOptions record — use new_options() and with_* builders.

Example

let assert Ok(info) = task.spawn(
  db,
  "emails",
  "send_welcome",
  json.object([#("to", json.string("user@example.com"))]),
  task.new_options() |> task.with_max_attempts(3),
)
pub fn with_cancellation(
  options: SpawnOptions,
  cancellation: Cancellation,
) -> SpawnOptions

Set the cancellation policy.

pub fn with_headers(
  options: SpawnOptions,
  headers: json.Json,
) -> SpawnOptions

Set headers (arbitrary JSON metadata).

pub fn with_idempotency_key(
  options: SpawnOptions,
  key: String,
) -> SpawnOptions

Set an idempotency key to prevent duplicate task creation.

pub fn with_max_attempts(
  options: SpawnOptions,
  max: Int,
) -> SpawnOptions

Set the maximum number of attempts for this task.

pub fn with_retry_strategy(
  options: SpawnOptions,
  strategy: RetryStrategy,
) -> SpawnOptions

Set the retry strategy for failed tasks.

Search Document