gabsurd/context
Execution context for a running task.
Constructed by the worker and passed to your handler. Encapsulates the database connection, queue, claim details, and claim timeout so you don’t have to pass them around. Provides high-level operations for idempotent steps, checkpoints, heartbeats, and event coordination.
Example
let handler = Handler(
task_name: "process_order",
execute: fn(ctx) {
case order_workflow(ctx) {
Ok(Nil) -> Complete(json.object([#("status", json.string("done"))]))
Error(e) -> Fail(encode_error(e))
}
},
on_error: option.None,
)
fn order_workflow(ctx) -> Result(Nil, GabsurdError) {
use _ <- result.try(ctx.step("charge", decode.success(Nil), fn() {
charge_card(decode_params(ctx.params(ctx)))
json.null()
}))
use _ <- result.try(ctx.step("reserve", decode.success(Nil), fn() {
reserve_inventory(decode_params(ctx.params(ctx)))
json.null()
}))
Ok(Nil)
}
Types
Execution context for a running task. Constructed by the worker, passed to your handler.
pub type Context {
Context(
db: client.Db,
queue_name: String,
claim: task.Claim,
claim_timeout: Int,
)
}
Constructors
-
Context( db: client.Db, queue_name: String, claim: task.Claim, claim_timeout: Int, )
Result of await_event. The event was either received (task continues)
or the task is now sleeping (return Suspend from your handler).
pub type EventResult {
Received(String)
Suspended
}
Constructors
-
Received(String) -
Suspended
Values
pub fn await_event(
ctx: Context,
event_name: String,
timeout: Int,
) -> Result(EventResult, client.GabsurdError)
Await an external event. If the event is already available, returns
Received(payload). If not, the task is put to sleep and returns
Suspended — your handler should return Suspend in this case.
timeout is in seconds. Set to 0 for no timeout.
pub fn get_checkpoint(
ctx: Context,
name: String,
) -> Result(option.Option(String), client.GabsurdError)
Get a checkpoint’s raw JSON state string.
Returns Ok(Some(json_string)) if found, Ok(None) if not found.
pub fn heartbeat(
ctx: Context,
) -> Result(Nil, client.GabsurdError)
Extend the claim lease by claim_timeout seconds.
pub fn set_checkpoint(
ctx: Context,
name: String,
state: json.Json,
) -> Result(Nil, client.GabsurdError)
Set a checkpoint and extend the claim lease.
pub fn step(
ctx: Context,
name: String,
decoder: decode.Decoder(a),
run: fn() -> json.Json,
) -> Result(a, client.GabsurdError)
Run an idempotent step identified by name.
If the checkpoint already exists (from a previous attempt), the stored
value is decoded with decoder and returned without re-running run.
If not, run is executed, the result is persisted as a checkpoint, and
the claim lease is extended by claim_timeout seconds.
The decoder parameter is required because Gleam’s json.Json type is
write-only — values loaded from the database must be parsed with an
explicit decoder. For steps that don’t need a return value, use
decode.success(Nil).
Example
// Step that returns a value:
use charge_id <- result.try(
ctx.step("charge", decode.field("charge_id", decode.string), fn() {
let result = charge_card(...)
json.object([#("charge_id", json.string(result.id))])
}),
)
// Step that doesn't return a value:
use _ <- result.try(ctx.step("notify", decode.success(Nil), fn() {
send_email(...)
json.null()
}))