gabsurd

A Gleam SDK for the Absurd durable workflow system. Provides type-safe database access via Parrot (sqlc) code generation and OTP worker actors for task processing.

Overview

gabsurd wraps the Absurd PostgreSQL schema with two layers:

  1. Generated SQL layer (gabsurd/sql) — Type-safe query functions generated by Parrot (sqlc for Gleam). All database access goes through Absurd’s PL/pgSQL functions.
  2. SDK layer — Hand-written Gleam modules providing ergonomic APIs for queues, tasks, events, checkpoints, and worker actors.

Quick Start

1. Install & Setup

# gleam.toml
[dependencies]
gabsurd = { path = "..." }
gleam_erlang = ">= 1.3.0 and < 2.0.0"
gleam_otp = ">= 1.2.0 and < 2.0.0"
gleam_time = ">= 1.8.0 and < 2.0.0"

Ensure the Absurd schema is loaded into your PostgreSQL database.

2. Connect to the Database

import gabsurd/client

let assert Ok(started) = client.start("postgresql://user:pass@localhost:5432/mydb")
let db = started.data

3. Create a Queue

import gabsurd/queue

let assert Ok(Nil) = queue.create(db, "emails")

4. Spawn Tasks

import gleam/json
import gabsurd/task

let assert Ok(info) =
  task.spawn(
    db,
    "emails",
    "send_welcome",
    json.object([#("to", json.string("user@example.com"))]),
    task.new_options() |> task.with_max_attempts(3),
  )

5. Start a Worker

import gleam/dynamic/decode
import gleam/json
import gleam/option
import gabsurd/worker.{Complete, Handler}

let email_handler = Handler(
  task_name: "send_welcome",
  execute: fn(ctx) {
    // Process the task...
    Complete(json.object([#("sent", json.bool(True))]))
  },
  on_error: option.None,
)

let config =
  worker.new(db, "emails", [email_handler])
  |> worker.with_poll_interval(1000)
  |> worker.with_batch_size(10)

// Start a single worker
let assert Ok(started) = worker.start(config)

6. Worker Pool with Supervisor

import gleam/otp/static_supervisor

let pool = worker.pool_child_specs("email_workers", config, 4)

// pool is a List(ChildSpecification) with exactly 4 elements
let [first, second, third, fourth] = pool

let assert Ok(sup) =
  static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(first)
  |> static_supervisor.add(second)
  |> static_supervisor.add(third)
  |> static_supervisor.add(fourth)
  |> static_supervisor.start()

API Reference

gabsurd/client

Database connection management.

FunctionDescription
start(url)Connect to PostgreSQL, returns StartResult(Db)

gabsurd/queue

Queue lifecycle management.

FunctionDescription
create(db, name)Create an unpartitioned queue
create_with_mode(db, name, mode)Create a queue with storage mode
drop(db, name)Drop a queue and all its data
list(db)List all queue names

gabsurd/task

Task lifecycle operations.

FunctionDescription
spawn(db, queue, name, params, options)Create a new task
claim(db, queue, worker_id, timeout, qty)Claim available tasks
complete(db, queue, run_id, state)Mark a run as completed
fail(db, queue, run_id, reason)Fail a run (uses queue retry policy)
fail_with_retry(db, queue, run_id, reason, retry_at)Fail a run and schedule retry
cancel(db, queue, task_id)Cancel a task
get_result(db, queue, task_id)Get task result as TaskResult record
retry(db, queue, task_id, options)Retry a failed task
extend_claim(db, queue, run_id, extend_by)Manually extend a claim lease (heartbeat)
schedule_run(db, queue, run_id, defer_seconds)Reschedule a run for future execution
new_options()Create empty spawn options
with_max_attempts(options, n)Set max attempts
with_retry_strategy(options, strategy)Set retry strategy
with_cancellation(options, policy)Set cancellation policy
with_headers(options, json)Set headers metadata
with_idempotency_key(options, key)Prevent duplicate spawning

gabsurd/event

Event coordination for workflows.

FunctionDescription
emit(db, queue, event_name, payload)Emit an event
await(db, queue, task_id, run_id, step, event, timeout)Await an event

gabsurd/checkpoint

Workflow checkpoint persistence.

FunctionDescription
set(db, queue, task_id, step, state, run_id, extend_claim_by)Save a checkpoint (pass claim_timeout to extend lease)
get(db, queue, task_id, step, include_pending)Retrieve a checkpoint, returns Result(Option(Checkpoint), Error)

gabsurd/worker

OTP worker actor for task processing.

TypeDescription
Handler(task_name, execute, on_error)Handler for a specific task type
HandlerResultReturn type: Complete(json), Fail(json), or Suspend
ConfigWorker configuration
WorkerRunning worker handle
FunctionDescription
–––––———––
new(db, queue, handlers)Create config with defaults
with_poll_interval(config, ms)Set poll interval (default: 5000ms)
with_batch_size(config, n)Set tasks claimed per poll (default: 1)
with_claim_timeout(config, secs)Set claim timeout (default: 30s)
with_worker_id(config, id)Set worker ID
with_max_backoff(config, ms)Set max error backoff (default: 60000ms)
start(config)Start a worker actor
stop(worker)Stop a worker gracefully
child_spec(name, config)Create supervisor child spec
pool_child_specs(name, config, count)Create N child specs for a pool

gabsurd/context

Execution context passed to worker handlers.

TypeDescription
ContextEncapsulates db, queue, claim, claim_timeout
EventResultReceived(String) or Suspended
FunctionDescription
task_id(ctx)Task’s unique identifier
run_id(ctx)Current run’s unique identifier
params(ctx)Task parameters as raw JSON string
task_name(ctx)Task name
attempt(ctx)Current attempt number
step(ctx, name, decoder, run)Idempotent step — skips if checkpoint exists
get_checkpoint(ctx, name)Get raw JSON state for a step
set_checkpoint(ctx, name, state)Persist a checkpoint and extend lease
heartbeat(ctx)Extend the claim lease
await_event(ctx, event_name, timeout)Await an event, returns EventResult

gabsurd/utility

Maintenance and introspection.

FunctionDescription
cleanup_queue(db, queue)Clean up completed/failed tasks
get_schema_version(db)Get the installed schema version

Architecture

┌─────────────────────────────────────┐
│           Your Application          │
├─────────────────────────────────────┤
│   worker.gleam   │  task.gleam      │  ← SDK Layer (hand-written)
│   context.gleam  │  queue.gleam      │
│   event.gleam    │  checkpoint.gleam │
│   utility.gleam  │                   │
├─────────────────────────────────────┤
│          sql.gleam (generated)      │  ← Parrot codegen
├─────────────────────────────────────┤
│   param.gleam  │  client.gleam      │  ← pog driver adapter
├─────────────────────────────────────┤
│        PostgreSQL + absurd.sql      │  ← Database
└─────────────────────────────────────┘

Worker Design

Workers use the Handler Record pattern — each task type gets a Handler with:

The handler receives a Context that encapsulates the database connection, queue, claim details, and claim timeout. It provides context.step(ctx, ...) for idempotent steps and context.await_event(ctx, ...) for event-driven workflows.

The HandlerResult type has three variants:

The worker polls the queue using process.send_after, claims tasks, constructs a Context, dispatches to matching handlers by task_name, and handles the result.

Idempotent Steps

context.step(ctx, name, decoder, run) is the primary building block for durable workflows:

Unknown Task Deferral

Tasks with no registered handler are deferred (rescheduled with a delay) rather than failed. This supports rolling deployments where a new task type may arrive before its handler code is deployed.

Error Backoff

On transient claim errors, the worker backs off exponentially up to max_backoff (default 60s), resetting on success.

For pools, use pool_child_specs to generate N workers with globally unique IDs inside a static_supervisor.

Examples

Multi-Step Workflow with Checkpoints

Durable workflows survive crashes. Each step uses context.step — on retry, completed steps are skipped. Steps also extend the worker’s claim lease so the task never times out mid-workflow.

import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/worker.{Complete, Fail, Handler}

let process_order = Handler(
  task_name: "process_order",
  execute: fn(ctx) {
    case order_workflow(ctx) {
      Ok(Nil) -> Complete(json.object([#("status", json.string("completed"))]))
      Error(e) -> Fail(json.string(error_to_string(e)))
    }
  },
  on_error: option.None,
)

fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
  let params = decode_params(context.params(ctx))

  // Step 1: Charge credit card (skip if checkpoint exists)
  use _ <- result.try(context.step(ctx, "charge", decode.success(Nil), fn() {
    charge_card(params)
    json.null()
  }))

  // Step 2: Reserve inventory (skip if done)
  use _ <- result.try(context.step(ctx, "reserve", decode.success(Nil), fn() {
    reserve_inventory(params)
    json.null()
  }))

  // Step 3: Send confirmation
  use _ <- result.try(context.step(ctx, "notify", decode.success(Nil), fn() {
    send_confirmation(params)
    json.null()
  }))

  Ok(Nil)
}

Steps That Pass Data Forward

fn order_workflow(ctx) -> Result(Nil, client.GabsurdError) {
  // Charge and get back the charge_id
  use charge_id <- result.try(
    context.step(ctx, "charge", charge_id_decoder(), fn() {
      let result = charge_card(decode_params(context.params(ctx)))
      json.object([#("charge_id", json.string(result.id))])
    }),
  )

  // Use charge_id in the next step
  use _ <- result.try(context.step(ctx, "capture", decode.success(Nil), fn() {
    capture_charge(charge_id)
    json.null()
  }))

  Ok(Nil)
}

// Decoder using Gleam's continuation-based decode.field API:
fn charge_id_decoder() -> decode.Decoder(String) {
  use charge_id <- decode.field("charge_id", decode.string)
  decode.success(charge_id)
}

Event-Driven Workflow

Tasks can suspend waiting for an external event (e.g., a webhook callback). Call context.await_event and return Suspend when the task needs to sleep. When event.emit fires, the task wakes up and the handler runs again.

import gleam/dynamic/decode
import gleam/json
import gleam/option
import gleam/result
import gabsurd/client
import gabsurd/context
import gabsurd/context.{Received, Suspended}
import gabsurd/worker.{Complete, Fail, Handler, Suspend}

let generate_report = Handler(
  task_name: "generate_report",
  execute: fn(ctx) {
    // Start the remote job (idempotent — skips if already done)
    use job_id <- result.try(
      context.step(ctx, "start", job_id_decoder(), fn() {
        let id = start_remote_job(context.params(ctx))
        json.object([#("job_id", json.string(id))])
      }),
    )

    // Wait for the remote service to POST back
    case context.await_event(ctx, "report_complete_" <> job_id, 3600) {
      Ok(Received(payload)) -> Complete(json.string(payload))
      Ok(Suspended) -> Suspend
      Error(e) -> Fail(json.string("event error"))
    }
  },
  on_error: option.None,
)

// Decoder for the job_id stored in the checkpoint:
fn job_id_decoder() -> decode.Decoder(String) {
  use job_id <- decode.field("job_id", decode.string)
  decode.success(job_id)
}

Then in your webhook HTTP handler (separate process):

import gabsurd/event

// When the remote service calls back:
let assert Ok(Nil) = event.emit(
  db, "reports",
  "report_complete_" <> job_id,  // must match event_name from await
  json.object([#("url", json.string(download_url))]),
)

Idempotent Task Spawning

Use with_idempotency_key to prevent duplicate tasks — critical for payment processing where double-spawning means double-charging.

let assert Ok(info) =
  task.spawn(
    db, "payments", "charge_card",
    json.object([#("amount", json.int(9999))]),
    task.new_options()
    |> task.with_idempotency_key("order-" <> order_id),
  )
// If the caller crashes and retries, the same task is returned
// (info.created will be False on subsequent calls)

Retry Strategies

// Exponential backoff for flaky APIs
let assert Ok(_) =
  task.spawn(
    db, "integrations", "sync_crm",
    json.object([#("contact_id", json.string(id))]),
    task.new_options()
    |> task.with_max_attempts(5)
    |> task.with_retry_strategy(task.ExponentialRetry(
      base_seconds: 10, factor: 2.0, max_seconds: 300.0,
    )),
  )
// Retries at ~10s, 20s, 40s, 80s, capped at 300s

Long-Running Tasks with Manual Heartbeats

For tasks that do a single long operation without checkpoints:

import gleam/list
import gleam/json
import gabsurd/worker.{Complete, Handler}

let handler = Handler(
  task_name: "batch_import",
  execute: fn(ctx) {
    list.each(records, fn(batch) {
      process_batch(batch)
      // Keep the lease alive after each batch
      let _ = context.heartbeat(ctx)
    })
    Complete(json.object([#("imported", json.int(list.length(records)))]))
  },
  on_error: option.None,
)

Development

# Start PostgreSQL
bash bin/postgres.sh

# Reset database
bash bin/reset_db.sh

# Run tests
DATABASE_URL="postgresql://gabsurd:gabsurd@127.0.0.1:5432/gabsurd" gleam test

# Regenerate SQL (after changing queries.sql)
DATABASE_URL="..." PATH="$(nix eval --raw 'nixpkgs#postgresql_17.out')/bin:$PATH" gleam run -m parrot

License

Apache-2.0

Search Document