Skip to main content
A Maitai workflow is a single Python file. Maitai loads it into the workflow worker pool, calls one function — execute(ctx) — and returns whatever that function returns. Everything you need at runtime arrives on the ctx object: input, secrets, LLM access, HTTP, datastores, and more. Here is a complete, minimal workflow:
from maitai_workflow import WorkflowContext

APPLICATION = "My Application"

INPUT_SCHEMA = {
    "query": {"type": "string", "description": "The user's question", "required": True},
    "context": {"type": "string", "description": "Optional background context"},
}

OUTPUT_SCHEMA = {
    "answer": {"type": "string", "description": "The generated answer"},
    "confidence": {"type": "number", "description": "Confidence between 0 and 1"},
}


def execute(ctx: WorkflowContext):
    inp = ctx.input or {}
    query = inp.get("query", "")
    if not query:
        return {"error": "query is required"}

    resp = ctx.chat.completions.create(
        messages=[{"role": "user", "content": query}],
        application=APPLICATION,
        intent="answer_query",
    )
    answer = resp["choices"][0]["message"]["content"]

    return {"answer": answer, "confidence": 0.85}
The repo ships this as a copy-paste starting point at workflows/workflow.example.py. You can also fetch the latest template from the API at GET /workflows/default-script.

The execute(ctx) entrypoint

Every workflow must define exactly one top-level execute function.
  • Required and unique. There must be precisely one module-level def execute(...). Upload validation rejects a script with zero or multiple.
  • Synchronous. Write def execute(ctx):, not async def. The worker runs your function on a thread — use ctx’s synchronous methods (they handle their own I/O) rather than await.
  • Takes the context. It receives a single argument, the WorkflowContext, conventionally named ctx.
  • Returns a value. Return your result — typically a dict matching OUTPUT_SCHEMA. A bare return or return None is rejected at upload. The returned object is exposed verbatim as response.output, and a JSON-stringified copy becomes choices[0].message.content for OpenAI-shaped consumers.
You can define as many helper functions, constants, and imports at module level as you like — only execute is special. Keep helpers as plain top-level defs; the flow graph inlines them when visualizing your workflow.
SYSTEM_PROMPT = "You are a careful financial analyst."

def _build_messages(query: str) -> list:
    return [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": query},
    ]

def execute(ctx: WorkflowContext):
    resp = ctx.chat.completions.create(
        messages=_build_messages(ctx.input["query"]),
        application="Finance", intent="analyze",
    )
    return {"answer": resp["choices"][0]["message"]["content"]}

The WorkflowContext import

from maitai_workflow import WorkflowContext
If you reference WorkflowContext anywhere — most commonly as the type hint on execute(ctx: WorkflowContext) — you must import it, or the script fails at runtime with a NameError. Upload validation flags a missing import. The import is also what makes editor autocomplete work for the full ctx surface.

Input and output schemas

INPUT_SCHEMA and OUTPUT_SCHEMA are optional module-level dicts that describe the shape of ctx.input and your return value.
INPUT_SCHEMA = {
    "document_url": {"type": "string", "description": "URL of the PDF to process", "required": True},
    "output_format": {"type": "string", "description": "markdown | html"},
    "max_pages": {"type": "number", "description": "Cap on pages to read"},
}
  • Format. A dict mapping each field name to {type, description, required?}.
  • Types. "string", "number", "boolean", "object", "array".
  • Required fields. Add "required": True. Omitting the key (or False) marks the field optional.
Schemas are informational, not enforced at runtime. They power the Portal — describing what ctx.input expects and what your output contains, and rendering the Input and Output nodes in the flow diagram. Validate inputs you actually depend on inside execute.
The upload step extracts these dicts statically (via AST), so they must be literal dicts at module level — don’t compute them at import time or build them inside a function.

Recognized constants

ConstantPurpose
APPLICATIONThe application this workflow’s LLM calls belong to. Used as the default application for ctx.chat.completions.create(...) when you pass it explicitly, and it documents intent. Binding is set at upload via --application-name / --application-id (see Uploading).
INPUT_SCHEMAInput shape (above).
OUTPUT_SCHEMAOutput shape (above).
Any other module-level constants (like SYSTEM_PROMPT above) are yours to use freely — they aren’t special to Maitai.

Streaming output

If callers invoke your workflow with stream=True, you can push intermediate events to them as the script runs by calling ctx.emit(...). When not streaming, emitted chunks are buffered and returned with the final response, so the same code works either way.
def execute(ctx: WorkflowContext):
    ctx.emit({"type": "status", "content": "fetching document"})
    doc = ctx.get(ctx.input["document_url"])

    ctx.emit({"type": "status", "content": "summarizing"})
    resp = ctx.chat.completions.create(
        messages=[{"role": "user", "content": doc["text"]}],
        application="Docs", intent="summarize",
    )
    return {"summary": resp["choices"][0]["message"]["content"]}
See Invoking workflows — streaming responses for the consumer side.

transform_session (optional)

Define transform_session(session) if you want to convert recorded Maitai sessions into test set items for this workflow. It takes a session and returns the input/output pair that execute would have seen:
def transform_session(session):
    workflow_input, workflow_output = {}, {"content": ""}
    if session.timeline:
        first = session.timeline[0]
        if first.request and first.request.params:
            for msg in reversed(first.request.params.messages):
                if msg.role == "user" and msg.content:
                    workflow_input = {"query": msg.content}
                    break
        last = session.timeline[-1]
        if last.response and last.response.choices:
            workflow_output = {"content": last.response.choices[0].message.content or ""}
    return {"input": workflow_input, "output": workflow_output}
The return value must be a dict with exactly two keys, "input" and "output" (where "output" is itself a dict). When present, the Portal lets you import sessions into a test set; when absent, that option is simply hidden.

Next