Workflows often need reference data: a lookup table, a prompt library, a corpus of documents to search. Maitai offers two mechanisms, and the right one depends on size and access pattern.
| Accessory file | Datastore |
|---|
| What it is | One read-only JSON blob bundled with the script | An S3-backed, indexed record store |
| You read it with | ctx.load_accessory("key") | ctx.datastore.get(...) / ctx.datastore.search(...) |
| Access pattern | Load the whole file into memory | Query by id, keyword, or vector similarity |
| Best for | Small lookup tables, enums, config, prompt sets | Large corpora, semantic/keyword search, retrieval |
| Size | Up to ~50 MB per file | Effectively unlimited |
A rule of thumb: if you’ll hold the entire file in memory and index into it yourself, use an accessory. If you need to search or selectively fetch from a large collection, use a datastore.
Accessory files
An accessory is a single JSON file you upload alongside your workflow under a short key. At runtime, ctx.load_accessory(key) returns its parsed contents.
def execute(ctx: WorkflowContext):
# Loaded once, then cached in-process and in Redis for the worker's lifetime.
hts_codes = ctx.load_accessory("hts_codes")
notes = ctx.load_accessory("hts_notes", fallback_path="./hts_notes.json")
...
How the key maps to storage. When you upload an accessory under key hts_codes, Maitai stores the file in S3 and sets an env var HTS_CODES_S3_KEY in the workflow’s config. load_accessory("hts_codes") reads that env var to locate the object. The key is uppercased and hyphens become underscores (hts-codes → HTS_CODES_S3_KEY), so keep your load_accessory key identical to the upload key.
Resolution order. load_accessory checks, in order: in-process cache → Redis (24h TTL) → S3 (then writes back to Redis). If no S3 key is configured and you passed fallback_path, it loads that local file — convenient for local development before the accessory is uploaded.
Limits. Up to ~50 MB per accessory and 100 accessories per workflow. Keys are lowercase alphanumeric plus _/-. Files must be valid UTF-8 JSON.
See Uploading a workflow for the --accessory flag.
Datastores
A datastore is a queryable collection of JSON records. You define which fields to index and how; Maitai stores the raw records in S3 and builds a search index over the indexed fields. At runtime your workflow queries it through ctx.datastore — by record id, by keyword/tag filter, or by vector similarity — without managing any of the underlying infrastructure.
Each workflow has at most one datastore. (Need several datasets? Split them across workflows and compose with nested calls.)
Schema
A datastore is defined by a small schema: which field is the record key, where the raw records live, and how each indexed field is treated.
record_key: ruling_id # primary id field on each record
records_prefix: avalara/rulings/enriched # S3 prefix holding the raw JSON records
fields:
ruling_id:
type: tag
hts_codes:
type: tag
separator: "," # multi-value tag field
product_short:
type: text # full-text searchable
summary:
type: text
embedding:
type: vector # dense-vector similarity search
dim: 1536
algorithm: HNSW # HNSW | FLAT
distance_metric: COSINE # COSINE | L2 | IP
Field types:
| Type | Query with | Notes |
|---|
tag | Tag(field, [...]) | Exact or prefix match; OR across values. separator enables multi-value fields. |
text | Text("...") | Tokenized full-text search. |
numeric | Raw(...) | Numeric range filters via the raw escape hatch. |
vector | search(vector=...) | KNN similarity. Requires dim; algorithm (HNSW/FLAT) and distance_metric (COSINE/L2/IP) optional. |
Only the fields you list are indexed; ctx.datastore.get always returns the complete record from S3.
Querying
Import the filter helpers from maitai_workflow and pass a filter as where. Helpers compose with all_/any_/not_ (or the & | ~ operators).
from maitai_workflow import Tag, Text, Raw, all_, any_, not_
# Exact id fetch (full record from S3)
record = ctx.datastore.get("ruling-12345")
# Semantic search: pass an embedding vector
hits = ctx.datastore.search(vector=query_embedding, k=10)
# Keyword search with a tag filter
hits = ctx.datastore.search(where=Tag("keywords", ["leather", "boots"]), k=20)
# Hybrid: vector + structured filter
hits = ctx.datastore.search(
vector=query_embedding,
where=all_(
Tag("hts_codes", ["6202"], prefix=True), # prefix match
any_(Text("leather boots"), Tag("keywords", ["leather"])),
not_(Tag("keywords", ["children"])),
),
k=10,
)
# Power user: raw RediSearch (phrases, wildcards, numeric ranges)
hits = ctx.datastore.search(where=Text('"steel toe" leather*', op="raw"), k=20)
Tag(field, values, prefix=False) — match a tag field against one or more values (OR). prefix=True matches by prefix ("6202" → 6202.10.4080, …).
Text(terms, field=None, op="any") — full-text search. op="any" (OR, recall), op="all" (AND, precision), or op="raw" for verbatim RediSearch (phrases, leather*, etc.).
Raw(expr) — drop in a literal RediSearch fragment for anything the DSL doesn’t model (numeric/geo ranges).
search returns only the indexed fields by default; pass full=True to hydrate the complete records from S3 (slower). With a vector, results are sorted by similarity; without one, by text relevance.
Creating a datastore
-
Define the schema for the workflow (a YAML/JSON document like above):
POST /workflows/{workflow_id}/datastores/{name}
-
Upload records — a JSON array of records, or an S3 URI to ingest:
POST /workflows/{workflow_id}/datastores/{name}/upload
Maitai stores the records, computes/loads the indexed fields, and builds the search index in the background. Once ready, workers serve ctx.datastore queries against it.
Manage datastores with GET /workflows/{workflow_id}/datastores and DELETE /workflows/{workflow_id}/datastores/{name}.
Uploading a workflow
Register a workflow — and bundle its accessories — with the upload script, run from the repository root:
python scripts/db_ops/upload_workflow.py <company_id> <workflow_ref_name> <script_path> [options]
This uploads the script to S3, upserts the workflow row (timeout, execution mode, application binding), and uploads any accessory files next to the script. Callers then invoke it as model="workflow:<workflow_ref_name>".
# Minimal
python scripts/db_ops/upload_workflow.py 22 my-workflow ./workflows/my_workflow.py
# Bind to an application (created if missing) and bundle two accessories
python scripts/db_ops/upload_workflow.py 2 hs-classification ./workflows/avalara/hs_classification.py \
--application-name "HS Classification" \
--accessory hts_codes=./workflows/avalara/hts_codes_v2.json \
--accessory hts_notes=./workflows/avalara/hts_notes.json
| Option | Meaning |
|---|
--name | Display name (defaults to a title-cased ref name). |
--desc | Description. |
--timeout | Worker timeout in seconds (default 600). |
--mode | Execution mode: sync, async, or stream (default sync). |
--application-name | Bind to an application by name, creating it if needed. Don’t combine with --application-id. |
--application-id | Bind to an existing application id (its company is used; company_id may then be omitted). |
--accessory KEY=path | Repeatable. Uploads the file and exposes it to ctx.load_accessory("KEY") via {KEY_UPPER}_S3_KEY. |
--publish-version | After upsert, publish an immutable version (like the Portal’s Publish). |
--version-notes | Notes for the published version. |
--version-bump | major / minor / patch (default minor) when publishing. |
Use company_id = -1 for a global workflow. Run from maitai-backend/ so relative paths like ./workflows/... resolve, with AWS + database credentials configured for your target environment.
Execution modes
| Mode | Behavior |
|---|
sync | Caller blocks until the workflow finishes; the response carries the final output. The default. |
async | The run is queued; the call returns immediately and you poll for the result. |
stream | Intermediate chunks (from ctx.emit) stream to the caller, followed by the final completion. |
Next