execute(ctx) receives a single argument: the workflow context. It’s how your script reads its input and reaches the outside world — calling models, agents, and other workflows, making HTTP requests, querying datastores, streaming progress, and logging.
Everything here is synchronous. The methods handle their own network I/O; you call them directly, no await.
Input & request data
These attributes carry what the caller sent. See Invoking workflows for how they’re populated.The structured payload from the request’s
input field — usually a dict. None if the caller sent only messages. This is the primary input for most workflows.Chat messages (OpenAI format) from the request’s
messages field. Use for chat-style workflows.Runtime credentials passed via the request’s
secrets field (e.g. third-party API keys). Never stored — available only for the duration of the run.Arbitrary metadata passed by the caller.
Config from the workflow’s stored configuration, including accessory S3 keys. Also mirrored into
os.environ.Inference params from the request:
max_tokens, temperature, stream.The session ID for this run. Pass it through to
ctx.chat/ctx.agents calls to correlate every step under one session in Maitai analytics.Call a model
Run an LLM call through Maitai — fully monitored, with your intent’s configured model, fallbacks, and Sentinels applied.The intent name. Determines which model, prompt config, and Sentinels apply. (
action_type is accepted as an alias.)The application this call belongs to. Defaults to the workflow’s bound application; pass explicitly to override. (
application_ref_name is an alias.)Chat messages. Defaults to
ctx.messages if omitted.Optional model override. When omitted, the intent’s configured model is used.
When
True, returns an iterator of chunk dicts instead of a single response. Default False.temperature, max_tokens, response_format, …) are forwarded to the underlying completion. The non-streaming call returns an OpenAI-shaped dict (choices, usage, model, request_id, …).
ctx.chat.completions.create(...) and the flat alias ctx.chat_completion(...) are equivalent. The namespaced form mirrors the OpenAI SDK and is recommended.Streaming a model call
Call an agent
Invoke a Maitai Agent — including its full reasoning loop — from inside a workflow.The agent reference to invoke.
Conversation to send the agent. Defaults to
ctx.messages.The full agent request surface — same overlays as the agent SDK: capability mask (
actions), form seed (state), per-request secrets, config overlay, iteration cap, and structured response_format.Optional LLM override, separate from the agent reference.
Stream the agent’s events as an iterator. Default
False.ctx.agent_completion(agent, ...). A 400 from the agent (e.g. a missing required secret) surfaces as a ValueError you can catch.
Call another workflow
Compose workflows by invoking one from another.Reference of the workflow to call (optionally
ref:tag).Structured input for the nested workflow.
output and session_id). The flat alias is ctx.workflow_completion(workflow, input=...).
Make HTTP requests
ctx.request(method, url, ...) is a thin, pooled wrapper over httpx. Any keyword httpx accepts — json, data, params, headers, auth, timeout, files, … — is forwarded verbatim.
Additional attempts on a transport error or a transient status.
0 means a single attempt.Status codes treated as transient and retried when
retries > 0.When
True, raise on a 4xx/5xx. Default False — branch on status_code/ok instead.ctx.get, ctx.post, ctx.put, ctx.patch, ctx.delete, ctx.head, ctx.options. follow_redirects defaults to True. Transport-level failures (DNS, connection refused, timeout) raise after retries are exhausted.
Streaming an HTTP response
ctx.request_stream(method, url, ...) is the streaming sibling of ctx.request. Instead of buffering the whole body, it returns a context-managed handle you iterate incrementally — ideal for forwarding an upstream NDJSON or SSE stream to your caller one event at a time.
status_code, ok, headers, and url before you read the body (so you can branch on status first), plus:
| Method | Returns | Use for |
|---|---|---|
resp.iter_lines() | Iterator[str] | NDJSON / line-delimited streams |
resp.iter_bytes(chunk_size=None) | Iterator[bytes] | Binary, SSE, custom framing |
resp.read() | str | Buffer the remaining body (escape hatch) |
retries / retry_backoff / retry_statuses / raise_for_status options as ctx.request, with one caveat: retries apply only while opening the stream. Once you start iterating the body, a mid-stream failure can’t be replayed and propagates to you. Always use it as a context manager (with ... as resp:) — the connection is released on exit, including on early break or an exception.
Stream progress to the caller
ctx.emit(data) pushes an intermediate chunk to the caller. When the workflow was invoked with stream=True, each chunk is written to the response stream immediately; otherwise chunks are buffered and returned with the final result — so the same code works in both modes.
WorkflowChunk (type, content, metadata) and stamped with an epoch-millisecond timestamp so consumers can order and debug events. See the consumer side for how clients read these.
Query a datastore
If your workflow has an attached datastore, reach it throughctx.datastore.
Fetch one full record by id, or pass a list of ids for a batch (missing ids are omitted). Reads from S3 with in-process caching.
Query the index. Provide
vector for KNN semantic search (sorted by similarity), and/or where to filter. full=True hydrates complete records from S3; the default returns only indexed fields. datastore_name is required only when the workflow has more than one datastore.where filter accepts a composable Filter (Tag, Text, combined with all_/any_/not_ or the & | ~ operators), a raw RediSearch string, or None (matches everything). Full DSL details live in Datastores & Accessories.
Load an accessory file
Read a bundled reference file by key. Resolution is cache → Redis → S3, with an optional local fallback for development.Embeddings & web search
Embed a string or list of strings. Returns the embeddings API response.
Web search (Perplexity). Returns plain text — concatenated snippets by default, or an LLM-composed answer when
system_prompt is given. Returns an empty string if unavailable.Logging & profiling
Structured log line. Keyword fields are attached to the log record and rendered inline, so they show up in your observability tooling.
Record a timing span from the last checkpoint to now. No-op unless profiling is enabled (send
X-Profile: true). ctx.profile_span_from_start(label) measures from script start instead. Spans also drive the step boundaries in the flow diagram.Next
- How a workflow file is laid out: Workflow Structure
- Attach reference data: Datastores & Accessories
- Call your workflow from code: Invoking workflows