gabsurd/context

Execution context for a running task.

Constructed by the worker and passed to your handler. Encapsulates the database connection, queue, claim details, and claim timeout so you don’t have to pass them around. Provides high-level operations for idempotent steps, checkpoints, heartbeats, and event coordination.

Example

let handler = Handler(
  task_name: "process_order",
  execute: fn(ctx) {
    case order_workflow(ctx) {
      Ok(Nil) -> Complete(json.object([#("status", json.string("done"))]))
      Error(e) -> Fail(encode_error(e))
    }
  },
  on_error: option.None,
)

fn order_workflow(ctx) -> Result(Nil, GabsurdError) {
  use _ <- result.try(ctx.step("charge", decode.success(Nil), fn() {
    charge_card(decode_params(ctx.params(ctx)))
    json.null()
  }))
  use _ <- result.try(ctx.step("reserve", decode.success(Nil), fn() {
    reserve_inventory(decode_params(ctx.params(ctx)))
    json.null()
  }))
  Ok(Nil)
}

Types

Execution context for a running task. Constructed by the worker, passed to your handler.

pub type Context {
  Context(
    db: client.Db,
    queue_name: String,
    claim: task.Claim,
    claim_timeout: Int,
  )
}

Constructors

Result of await_event. The event was either received (task continues) or the task is now sleeping (return Suspend from your handler).

pub type EventResult {
  Received(String)
  Suspended
}

Constructors

  • Received(String)
  • Suspended

Values

pub fn attempt(ctx: Context) -> Int

The current attempt number (1-based).

pub fn await_event(
  ctx: Context,
  event_name: String,
  timeout: Int,
) -> Result(EventResult, client.GabsurdError)

Await an external event. If the event is already available, returns Received(payload). If not, the task is put to sleep and returns Suspended — your handler should return Suspend in this case.

timeout is in seconds. Set to 0 for no timeout.

pub fn claim_timeout(ctx: Context) -> Int

The claim timeout in seconds.

pub fn get_checkpoint(
  ctx: Context,
  name: String,
) -> Result(option.Option(String), client.GabsurdError)

Get a checkpoint’s raw JSON state string. Returns Ok(Some(json_string)) if found, Ok(None) if not found.

pub fn heartbeat(
  ctx: Context,
) -> Result(Nil, client.GabsurdError)

Extend the claim lease by claim_timeout seconds.

pub fn params(ctx: Context) -> String

The task parameters as a raw JSON string.

pub fn run_id(ctx: Context) -> BitArray

The current run’s unique identifier.

pub fn set_checkpoint(
  ctx: Context,
  name: String,
  state: json.Json,
) -> Result(Nil, client.GabsurdError)

Set a checkpoint and extend the claim lease.

pub fn step(
  ctx: Context,
  name: String,
  decoder: decode.Decoder(a),
  run: fn() -> json.Json,
) -> Result(a, client.GabsurdError)

Run an idempotent step identified by name.

If the checkpoint already exists (from a previous attempt), the stored value is decoded with decoder and returned without re-running run. If not, run is executed, the result is persisted as a checkpoint, and the claim lease is extended by claim_timeout seconds.

The decoder parameter is required because Gleam’s json.Json type is write-only — values loaded from the database must be parsed with an explicit decoder. For steps that don’t need a return value, use decode.success(Nil).

Example

// Step that returns a value:
use charge_id <- result.try(
  ctx.step("charge", decode.field("charge_id", decode.string), fn() {
    let result = charge_card(...)
    json.object([#("charge_id", json.string(result.id))])
  }),
)

// Step that doesn't return a value:
use _ <- result.try(ctx.step("notify", decode.success(Nil), fn() {
  send_email(...)
  json.null()
}))
pub fn task_id(ctx: Context) -> BitArray

The task’s unique identifier.

pub fn task_name(ctx: Context) -> String

The task name.

Search Document