Skip to main content
execute(ctx) receives a single argument: the workflow context. It’s how your script reads its input and reaches the outside world — calling models, agents, and other workflows, making HTTP requests, querying datastores, streaming progress, and logging. Everything here is synchronous. The methods handle their own network I/O; you call them directly, no await.
def execute(ctx: WorkflowContext):
    query = ctx.input["query"]
    resp = ctx.chat.completions.create(
        messages=[{"role": "user", "content": query}],
        application="Support", intent="answer",
    )
    return {"answer": resp["choices"][0]["message"]["content"]}

Input & request data

These attributes carry what the caller sent. See Invoking workflows for how they’re populated.
ctx.input
Any
The structured payload from the request’s input field — usually a dict. None if the caller sent only messages. This is the primary input for most workflows.
ctx.messages
list[dict]
Chat messages (OpenAI format) from the request’s messages field. Use for chat-style workflows.
ctx.secrets
dict[str, str]
Runtime credentials passed via the request’s secrets field (e.g. third-party API keys). Never stored — available only for the duration of the run.
ctx.metadata
dict
Arbitrary metadata passed by the caller.
ctx.env
dict
Config from the workflow’s stored configuration, including accessory S3 keys. Also mirrored into os.environ.
ctx.params
dict
Inference params from the request: max_tokens, temperature, stream.
ctx.session_id
str
The session ID for this run. Pass it through to ctx.chat/ctx.agents calls to correlate every step under one session in Maitai analytics.

Call a model

Run an LLM call through Maitai — fully monitored, with your intent’s configured model, fallbacks, and Sentinels applied.
resp = ctx.chat.completions.create(
    messages=[
        {"role": "system", "content": "You are a financial analyst."},
        {"role": "user", "content": "Summarize Q3 earnings."},
    ],
    application="Finance",      # defaults to the workflow's APPLICATION binding
    intent="summarize",         # required
    session_id=ctx.session_id,
)
content = resp["choices"][0]["message"]["content"]
intent
string
required
The intent name. Determines which model, prompt config, and Sentinels apply. (action_type is accepted as an alias.)
application
string
The application this call belongs to. Defaults to the workflow’s bound application; pass explicitly to override. (application_ref_name is an alias.)
messages
list[dict]
Chat messages. Defaults to ctx.messages if omitted.
model
string
Optional model override. When omitted, the intent’s configured model is used.
stream
boolean
When True, returns an iterator of chunk dicts instead of a single response. Default False.
Any additional keyword arguments (temperature, max_tokens, response_format, …) are forwarded to the underlying completion. The non-streaming call returns an OpenAI-shaped dict (choices, usage, model, request_id, …).
ctx.chat.completions.create(...) and the flat alias ctx.chat_completion(...) are equivalent. The namespaced form mirrors the OpenAI SDK and is recommended.

Streaming a model call

for chunk in ctx.chat.completions.create(
    messages=[{"role": "user", "content": "Write a poem."}],
    application="Content", intent="compose", stream=True,
):
    delta = chunk["choices"][0]["delta"].get("content", "")
    ctx.emit({"type": "token", "content": delta})

Call an agent

Invoke a Maitai Agent — including its full reasoning loop — from inside a workflow.
result = ctx.agents.completions.create(
    agent="support-triage",
    messages=[{"role": "user", "content": "My card was declined."}],
    max_iterations=6,
)
agent
string
required
The agent reference to invoke.
messages
list[dict]
Conversation to send the agent. Defaults to ctx.messages.
actions / state / secrets / config / max_iterations / response_format
The full agent request surface — same overlays as the agent SDK: capability mask (actions), form seed (state), per-request secrets, config overlay, iteration cap, and structured response_format.
model
string
Optional LLM override, separate from the agent reference.
stream
boolean
Stream the agent’s events as an iterator. Default False.
The flat alias is ctx.agent_completion(agent, ...). A 400 from the agent (e.g. a missing required secret) surfaces as a ValueError you can catch.

Call another workflow

Compose workflows by invoking one from another.
rate = ctx.workflows.completions.create(
    workflow="hs-rate-line-classifier",
    input={"subheading": subheading, "product": product},
)
workflow
string
required
Reference of the workflow to call (optionally ref:tag).
input
dict
Structured input for the nested workflow.
Returns the nested workflow’s result dict (with output and session_id). The flat alias is ctx.workflow_completion(workflow, input=...).

Make HTTP requests

ctx.request(method, url, ...) is a thin, pooled wrapper over httpx. Any keyword httpx accepts — json, data, params, headers, auth, timeout, files, … — is forwarded verbatim.
resp = ctx.post(
    "https://api.example.com/score",
    json={"text": ctx.input["text"]},
    headers={"Authorization": f"Bearer {ctx.secrets['EXAMPLE_KEY']}"},
    retries=2,
)
if resp["ok"]:
    score = resp["json"]["score"]
The call returns a plain, JSON-serializable dict:
{
    "status_code": 200,
    "ok": True,            # True for 2xx
    "headers": {...},
    "json": {...},          # parsed body, or None if not JSON
    "text": "...",         # raw body text
    "url": "https://...",  # final URL after redirects
    "elapsed_ms": 142.5,
}
retries
int
default:"0"
Additional attempts on a transport error or a transient status. 0 means a single attempt.
retry_statuses
tuple
default:"(429, 500, 502, 503, 504)"
Status codes treated as transient and retried when retries > 0.
raise_for_status
boolean
default:"false"
When True, raise on a 4xx/5xx. Default False — branch on status_code/ok instead.
Verb shorthands exist for every method: ctx.get, ctx.post, ctx.put, ctx.patch, ctx.delete, ctx.head, ctx.options. follow_redirects defaults to True. Transport-level failures (DNS, connection refused, timeout) raise after retries are exhausted.

Streaming an HTTP response

ctx.request_stream(method, url, ...) is the streaming sibling of ctx.request. Instead of buffering the whole body, it returns a context-managed handle you iterate incrementally — ideal for forwarding an upstream NDJSON or SSE stream to your caller one event at a time.
with ctx.request_stream("POST", "https://api.example.com/run", json=payload, retries=2) as resp:
    if not resp.ok:
        return {"error": resp.read()}
    for line in resp.iter_lines():
        if line:
            ctx.emit(json.loads(line))   # forward each event live
The handle exposes status_code, ok, headers, and url before you read the body (so you can branch on status first), plus:
MethodReturnsUse for
resp.iter_lines()Iterator[str]NDJSON / line-delimited streams
resp.iter_bytes(chunk_size=None)Iterator[bytes]Binary, SSE, custom framing
resp.read()strBuffer the remaining body (escape hatch)
It accepts the same retries / retry_backoff / retry_statuses / raise_for_status options as ctx.request, with one caveat: retries apply only while opening the stream. Once you start iterating the body, a mid-stream failure can’t be replayed and propagates to you. Always use it as a context manager (with ... as resp:) — the connection is released on exit, including on early break or an exception.

Stream progress to the caller

ctx.emit(data) pushes an intermediate chunk to the caller. When the workflow was invoked with stream=True, each chunk is written to the response stream immediately; otherwise chunks are buffered and returned with the final result — so the same code works in both modes.
ctx.emit({"type": "status", "content": "Step 2 of 3: scoring candidates"})
Pass any dict. It’s wrapped as a WorkflowChunk (type, content, metadata) and stamped with an epoch-millisecond timestamp so consumers can order and debug events. See the consumer side for how clients read these.

Query a datastore

If your workflow has an attached datastore, reach it through ctx.datastore.
from maitai_workflow import Tag, Text, all_, any_, not_

# Fetch a full record by id
ruling = ctx.datastore.get("ruling-12345")

# Vector (semantic) search
hits = ctx.datastore.search(vector=embedding, k=10)

# Keyword / hybrid search with the filter DSL
hits = ctx.datastore.search(
    where=all_(
        Tag("hts_codes", ["6202"], prefix=True),
        any_(Text("leather boots"), Tag("keywords", ["leather"])),
        not_(Tag("keywords", ["children"])),
    ),
    k=20,
)
ctx.datastore.get(record_id, datastore_name=None)
dict | list[dict] | None
Fetch one full record by id, or pass a list of ids for a batch (missing ids are omitted). Reads from S3 with in-process caching.
ctx.datastore.search(vector=None, where=None, k=10, datastore_name=None, full=False)
list[dict]
Query the index. Provide vector for KNN semantic search (sorted by similarity), and/or where to filter. full=True hydrates complete records from S3; the default returns only indexed fields. datastore_name is required only when the workflow has more than one datastore.
The where filter accepts a composable Filter (Tag, Text, combined with all_/any_/not_ or the & | ~ operators), a raw RediSearch string, or None (matches everything). Full DSL details live in Datastores & Accessories.

Load an accessory file

Read a bundled reference file by key. Resolution is cache → Redis → S3, with an optional local fallback for development.
hts_codes = ctx.load_accessory("hts_codes", fallback_path="./hts_codes.json")
vecs = ctx.embeddings.create(input=["text one", "text two"])
answer = ctx.web_search("latest USTR tariff announcements")
ctx.embeddings.create(input, model=None)
dict
Embed a string or list of strings. Returns the embeddings API response.
ctx.web_search(query, system_prompt=None)
str
Web search (Perplexity). Returns plain text — concatenated snippets by default, or an LLM-composed answer when system_prompt is given. Returns an empty string if unavailable.

Logging & profiling

ctx.log("scoring complete", candidates=len(hits), top_score=hits[0]["score"])

ctx.profile_span("retrieval")     # time since the last span / script start
ctx.profile_span("generation")
ctx.log(msg, **fields)
None
Structured log line. Keyword fields are attached to the log record and rendered inline, so they show up in your observability tooling.
ctx.profile_span(label)
None
Record a timing span from the last checkpoint to now. No-op unless profiling is enabled (send X-Profile: true). ctx.profile_span_from_start(label) measures from script start instead. Spans also drive the step boundaries in the flow diagram.

Next