> ## Documentation Index
> Fetch the complete documentation index at: https://docs.trymaitai.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Workflow Context (ctx)

> The runtime API passed to every workflow — LLMs, agents, HTTP, datastores, streaming, and logging.

`execute(ctx)` receives a single argument: the **workflow context**. It's how your script reads its input and reaches the outside world — calling models, agents, and other workflows, making HTTP requests, querying datastores, streaming progress, and logging.

Everything here is synchronous. The methods handle their own network I/O; you call them directly, no `await`.

```python theme={null}
def execute(ctx: WorkflowContext):
    query = ctx.input["query"]
    resp = ctx.chat.completions.create(
        messages=[{"role": "user", "content": query}],
        application="Support", intent="answer",
    )
    return {"answer": resp["choices"][0]["message"]["content"]}
```

## Input & request data

These attributes carry what the caller sent. See [Invoking workflows](/build/workflows/overview) for how they're populated.

<ParamField path="ctx.input" type="Any">
  The structured payload from the request's `input` field — usually a dict. `None` if the caller sent only `messages`. This is the primary input for most workflows.
</ParamField>

<ParamField path="ctx.messages" type="list[dict]">
  Chat messages (OpenAI format) from the request's `messages` field. Use for chat-style workflows.
</ParamField>

<ParamField path="ctx.secrets" type="dict[str, str]">
  Runtime credentials passed via the request's `secrets` field (e.g. third-party API keys). Never stored — available only for the duration of the run.
</ParamField>

<ParamField path="ctx.metadata" type="dict">
  Arbitrary metadata passed by the caller.
</ParamField>

<ParamField path="ctx.env" type="dict">
  Config from the workflow's stored configuration, including accessory S3 keys. Also mirrored into `os.environ`.
</ParamField>

<ParamField path="ctx.params" type="dict">
  Inference params from the request: `max_tokens`, `temperature`, `stream`.
</ParamField>

<ParamField path="ctx.session_id" type="str">
  The session ID for this run. Pass it through to `ctx.chat`/`ctx.agents` calls to correlate every step under one session in Maitai analytics.
</ParamField>

## Call a model

Run an LLM call through Maitai — fully monitored, with your intent's configured model, fallbacks, and Sentinels applied.

```python theme={null}
resp = ctx.chat.completions.create(
    messages=[
        {"role": "system", "content": "You are a financial analyst."},
        {"role": "user", "content": "Summarize Q3 earnings."},
    ],
    application="Finance",      # defaults to the workflow's APPLICATION binding
    intent="summarize",         # required
    session_id=ctx.session_id,
)
content = resp["choices"][0]["message"]["content"]
```

<ParamField path="intent" type="string" required>
  The intent name. Determines which model, prompt config, and Sentinels apply. (`action_type` is accepted as an alias.)
</ParamField>

<ParamField path="application" type="string">
  The application this call belongs to. Defaults to the workflow's bound application; pass explicitly to override. (`application_ref_name` is an alias.)
</ParamField>

<ParamField path="messages" type="list[dict]">
  Chat messages. Defaults to `ctx.messages` if omitted.
</ParamField>

<ParamField path="model" type="string">
  Optional model override. When omitted, the intent's configured model is used.
</ParamField>

<ParamField path="stream" type="boolean">
  When `True`, returns an iterator of chunk dicts instead of a single response. Default `False`.
</ParamField>

Any additional keyword arguments (`temperature`, `max_tokens`, `response_format`, …) are forwarded to the underlying completion. The non-streaming call returns an OpenAI-shaped dict (`choices`, `usage`, `model`, `request_id`, …).

<Note>
  `ctx.chat.completions.create(...)` and the flat alias `ctx.chat_completion(...)` are equivalent. The namespaced form mirrors the OpenAI SDK and is recommended.
</Note>

### Streaming a model call

```python theme={null}
for chunk in ctx.chat.completions.create(
    messages=[{"role": "user", "content": "Write a poem."}],
    application="Content", intent="compose", stream=True,
):
    delta = chunk["choices"][0]["delta"].get("content", "")
    ctx.emit({"type": "token", "content": delta})
```

## Call an agent

Invoke a Maitai [Agent](/build/agents/create_agent) — including its full reasoning loop — from inside a workflow.

```python theme={null}
result = ctx.agents.completions.create(
    agent="support-triage",
    messages=[{"role": "user", "content": "My card was declined."}],
    max_iterations=6,
)
```

<ParamField path="agent" type="string" required>
  The agent reference to invoke.
</ParamField>

<ParamField path="messages" type="list[dict]">
  Conversation to send the agent. Defaults to `ctx.messages`.
</ParamField>

<ParamField path="actions / state / secrets / config / max_iterations / response_format" type="—">
  The full agent request surface — same overlays as the [agent SDK](/sdk/agent_call): capability mask (`actions`), form seed (`state`), per-request `secrets`, `config` overlay, iteration cap, and structured `response_format`.
</ParamField>

<ParamField path="model" type="string">
  Optional LLM override, separate from the agent reference.
</ParamField>

<ParamField path="stream" type="boolean">
  Stream the agent's events as an iterator. Default `False`.
</ParamField>

The flat alias is `ctx.agent_completion(agent, ...)`. A `400` from the agent (e.g. a missing required secret) surfaces as a `ValueError` you can catch.

## Call another workflow

Compose workflows by invoking one from another.

```python theme={null}
rate = ctx.workflows.completions.create(
    workflow="hs-rate-line-classifier",
    input={"subheading": subheading, "product": product},
)
```

<ParamField path="workflow" type="string" required>
  Reference of the workflow to call (optionally `ref:tag`).
</ParamField>

<ParamField path="input" type="dict">
  Structured input for the nested workflow.
</ParamField>

Returns the nested workflow's result dict (with `output` and `session_id`). The flat alias is `ctx.workflow_completion(workflow, input=...)`.

## Make HTTP requests

`ctx.request(method, url, ...)` is a thin, pooled wrapper over [httpx](https://www.python-httpx.org/). Any keyword httpx accepts — `json`, `data`, `params`, `headers`, `auth`, `timeout`, `files`, … — is forwarded verbatim.

```python theme={null}
resp = ctx.post(
    "https://api.example.com/score",
    json={"text": ctx.input["text"]},
    headers={"Authorization": f"Bearer {ctx.secrets['EXAMPLE_KEY']}"},
    retries=2,
)
if resp["ok"]:
    score = resp["json"]["score"]
```

The call returns a plain, JSON-serializable dict:

```python theme={null}
{
    "status_code": 200,
    "ok": True,            # True for 2xx
    "headers": {...},
    "json": {...},          # parsed body, or None if not JSON
    "text": "...",         # raw body text
    "url": "https://...",  # final URL after redirects
    "elapsed_ms": 142.5,
}
```

<ParamField path="retries" type="int" default="0">
  Additional attempts on a transport error or a transient status. `0` means a single attempt.
</ParamField>

<ParamField path="retry_statuses" type="tuple" default="(429, 500, 502, 503, 504)">
  Status codes treated as transient and retried when `retries > 0`.
</ParamField>

<ParamField path="raise_for_status" type="boolean" default="false">
  When `True`, raise on a 4xx/5xx. Default `False` — branch on `status_code`/`ok` instead.
</ParamField>

Verb shorthands exist for every method: `ctx.get`, `ctx.post`, `ctx.put`, `ctx.patch`, `ctx.delete`, `ctx.head`, `ctx.options`. `follow_redirects` defaults to `True`. Transport-level failures (DNS, connection refused, timeout) raise after retries are exhausted.

### Streaming an HTTP response

`ctx.request_stream(method, url, ...)` is the streaming sibling of `ctx.request`. Instead of buffering the whole body, it returns a context-managed handle you iterate incrementally — ideal for forwarding an upstream NDJSON or SSE stream to your caller one event at a time.

```python theme={null}
with ctx.request_stream("POST", "https://api.example.com/run", json=payload, retries=2) as resp:
    if not resp.ok:
        return {"error": resp.read()}
    for line in resp.iter_lines():
        if line:
            ctx.emit(json.loads(line))   # forward each event live
```

The handle exposes `status_code`, `ok`, `headers`, and `url` **before** you read the body (so you can branch on status first), plus:

| Method                             | Returns           | Use for                                  |
| ---------------------------------- | ----------------- | ---------------------------------------- |
| `resp.iter_lines()`                | `Iterator[str]`   | NDJSON / line-delimited streams          |
| `resp.iter_bytes(chunk_size=None)` | `Iterator[bytes]` | Binary, SSE, custom framing              |
| `resp.read()`                      | `str`             | Buffer the remaining body (escape hatch) |

It accepts the same `retries` / `retry_backoff` / `retry_statuses` / `raise_for_status` options as `ctx.request`, with one caveat: **retries apply only while opening the stream.** Once you start iterating the body, a mid-stream failure can't be replayed and propagates to you. Always use it as a context manager (`with ... as resp:`) — the connection is released on exit, including on early `break` or an exception.

## Stream progress to the caller

<a id="ctx-emit" />

`ctx.emit(data)` pushes an intermediate chunk to the caller. When the workflow was invoked with `stream=True`, each chunk is written to the response stream immediately; otherwise chunks are buffered and returned with the final result — so the same code works in both modes.

```python theme={null}
ctx.emit({"type": "status", "content": "Step 2 of 3: scoring candidates"})
```

Pass any dict. It's wrapped as a `WorkflowChunk` (`type`, `content`, `metadata`) and stamped with an epoch-millisecond `timestamp` so consumers can order and debug events. See the [consumer side](/build/workflows/overview#streaming-responses) for how clients read these.

## Query a datastore

If your workflow has an attached [datastore](/build/workflows/data#datastores), reach it through `ctx.datastore`.

```python theme={null}
from maitai_workflow import Tag, Text, all_, any_, not_

# Fetch a full record by id
ruling = ctx.datastore.get("ruling-12345")

# Vector (semantic) search
hits = ctx.datastore.search(vector=embedding, k=10)

# Keyword / hybrid search with the filter DSL
hits = ctx.datastore.search(
    where=all_(
        Tag("hts_codes", ["6202"], prefix=True),
        any_(Text("leather boots"), Tag("keywords", ["leather"])),
        not_(Tag("keywords", ["children"])),
    ),
    k=20,
)
```

<ParamField path="ctx.datastore.get(record_id, datastore_name=None)" type="dict | list[dict] | None">
  Fetch one full record by id, or pass a list of ids for a batch (missing ids are omitted). Reads from S3 with in-process caching.
</ParamField>

<ParamField path="ctx.datastore.search(vector=None, where=None, k=10, datastore_name=None, full=False)" type="list[dict]">
  Query the index. Provide `vector` for KNN semantic search (sorted by similarity), and/or `where` to filter. `full=True` hydrates complete records from S3; the default returns only indexed fields. `datastore_name` is required only when the workflow has more than one datastore.
</ParamField>

The `where` filter accepts a composable `Filter` (`Tag`, `Text`, combined with `all_`/`any_`/`not_` or the `&` `|` `~` operators), a raw RediSearch string, or `None` (matches everything). Full DSL details live in [Datastores & Accessories](/build/workflows/data#querying).

## Load an accessory file

Read a [bundled reference file](/build/workflows/data#accessory-files) by key. Resolution is cache → Redis → S3, with an optional local fallback for development.

```python theme={null}
hts_codes = ctx.load_accessory("hts_codes", fallback_path="./hts_codes.json")
```

## Embeddings & web search

```python theme={null}
vecs = ctx.embeddings.create(input=["text one", "text two"])
answer = ctx.web_search("latest USTR tariff announcements")
```

<ParamField path="ctx.embeddings.create(input, model=None)" type="dict">
  Embed a string or list of strings. Returns the embeddings API response.
</ParamField>

<ParamField path="ctx.web_search(query, system_prompt=None)" type="str">
  Web search (Perplexity). Returns plain text — concatenated snippets by default, or an LLM-composed answer when `system_prompt` is given. Returns an empty string if unavailable.
</ParamField>

## Logging & profiling

```python theme={null}
ctx.log("scoring complete", candidates=len(hits), top_score=hits[0]["score"])

ctx.profile_span("retrieval")     # time since the last span / script start
ctx.profile_span("generation")
```

<ParamField path="ctx.log(msg, **fields)" type="None">
  Structured log line. Keyword fields are attached to the log record and rendered inline, so they show up in your observability tooling.
</ParamField>

<ParamField path="ctx.profile_span(label)" type="None">
  Record a timing span from the last checkpoint to now. No-op unless profiling is enabled (send `X-Profile: true`). `ctx.profile_span_from_start(label)` measures from script start instead. Spans also drive the **step** boundaries in the [flow diagram](/build/workflows/overview).
</ParamField>

## Next

* How a workflow file is laid out: [Workflow Structure](/build/workflows/structure)
* Attach reference data: [Datastores & Accessories](/build/workflows/data)
* Call your workflow from code: [Invoking workflows](/build/workflows/overview)
