Skip to main content
Workflows often need reference data: a lookup table, a prompt library, a corpus of documents to search. Maitai offers two mechanisms, and the right one depends on size and access pattern.
Accessory fileDatastore
What it isOne read-only JSON blob bundled with the scriptAn S3-backed, indexed record store
You read it withctx.load_accessory("key")ctx.datastore.get(...) / ctx.datastore.search(...)
Access patternLoad the whole file into memoryQuery by id, keyword, or vector similarity
Best forSmall lookup tables, enums, config, prompt setsLarge corpora, semantic/keyword search, retrieval
SizeUp to ~50 MB per fileEffectively unlimited
A rule of thumb: if you’ll hold the entire file in memory and index into it yourself, use an accessory. If you need to search or selectively fetch from a large collection, use a datastore.

Accessory files

An accessory is a single JSON file you upload alongside your workflow under a short key. At runtime, ctx.load_accessory(key) returns its parsed contents.
def execute(ctx: WorkflowContext):
    # Loaded once, then cached in-process and in Redis for the worker's lifetime.
    hts_codes = ctx.load_accessory("hts_codes")
    notes = ctx.load_accessory("hts_notes", fallback_path="./hts_notes.json")
    ...
How the key maps to storage. When you upload an accessory under key hts_codes, Maitai stores the file in S3 and sets an env var HTS_CODES_S3_KEY in the workflow’s config. load_accessory("hts_codes") reads that env var to locate the object. The key is uppercased and hyphens become underscores (hts-codesHTS_CODES_S3_KEY), so keep your load_accessory key identical to the upload key. Resolution order. load_accessory checks, in order: in-process cache → Redis (24h TTL) → S3 (then writes back to Redis). If no S3 key is configured and you passed fallback_path, it loads that local file — convenient for local development before the accessory is uploaded. Limits. Up to ~50 MB per accessory and 100 accessories per workflow. Keys are lowercase alphanumeric plus _/-. Files must be valid UTF-8 JSON. See Uploading a workflow for the --accessory flag.

Datastores

A datastore is a queryable collection of JSON records. You define which fields to index and how; Maitai stores the raw records in S3 and builds a search index over the indexed fields. At runtime your workflow queries it through ctx.datastore — by record id, by keyword/tag filter, or by vector similarity — without managing any of the underlying infrastructure. Each workflow has at most one datastore. (Need several datasets? Split them across workflows and compose with nested calls.)

Schema

A datastore is defined by a small schema: which field is the record key, where the raw records live, and how each indexed field is treated.
record_key: ruling_id                     # primary id field on each record
records_prefix: avalara/rulings/enriched  # S3 prefix holding the raw JSON records
fields:
  ruling_id:
    type: tag
  hts_codes:
    type: tag
    separator: ","      # multi-value tag field
  product_short:
    type: text          # full-text searchable
  summary:
    type: text
  embedding:
    type: vector        # dense-vector similarity search
    dim: 1536
    algorithm: HNSW       # HNSW | FLAT
    distance_metric: COSINE  # COSINE | L2 | IP
Field types:
TypeQuery withNotes
tagTag(field, [...])Exact or prefix match; OR across values. separator enables multi-value fields.
textText("...")Tokenized full-text search.
numericRaw(...)Numeric range filters via the raw escape hatch.
vectorsearch(vector=...)KNN similarity. Requires dim; algorithm (HNSW/FLAT) and distance_metric (COSINE/L2/IP) optional.
Only the fields you list are indexed; ctx.datastore.get always returns the complete record from S3.

Querying

Import the filter helpers from maitai_workflow and pass a filter as where. Helpers compose with all_/any_/not_ (or the & | ~ operators).
from maitai_workflow import Tag, Text, Raw, all_, any_, not_

# Exact id fetch (full record from S3)
record = ctx.datastore.get("ruling-12345")

# Semantic search: pass an embedding vector
hits = ctx.datastore.search(vector=query_embedding, k=10)

# Keyword search with a tag filter
hits = ctx.datastore.search(where=Tag("keywords", ["leather", "boots"]), k=20)

# Hybrid: vector + structured filter
hits = ctx.datastore.search(
    vector=query_embedding,
    where=all_(
        Tag("hts_codes", ["6202"], prefix=True),       # prefix match
        any_(Text("leather boots"), Tag("keywords", ["leather"])),
        not_(Tag("keywords", ["children"])),
    ),
    k=10,
)

# Power user: raw RediSearch (phrases, wildcards, numeric ranges)
hits = ctx.datastore.search(where=Text('"steel toe" leather*', op="raw"), k=20)
  • Tag(field, values, prefix=False) — match a tag field against one or more values (OR). prefix=True matches by prefix ("6202"6202.10.4080, …).
  • Text(terms, field=None, op="any") — full-text search. op="any" (OR, recall), op="all" (AND, precision), or op="raw" for verbatim RediSearch (phrases, leather*, etc.).
  • Raw(expr) — drop in a literal RediSearch fragment for anything the DSL doesn’t model (numeric/geo ranges).
search returns only the indexed fields by default; pass full=True to hydrate the complete records from S3 (slower). With a vector, results are sorted by similarity; without one, by text relevance.

Creating a datastore

  1. Define the schema for the workflow (a YAML/JSON document like above):
    POST /workflows/{workflow_id}/datastores/{name}
    
  2. Upload records — a JSON array of records, or an S3 URI to ingest:
    POST /workflows/{workflow_id}/datastores/{name}/upload
    
    Maitai stores the records, computes/loads the indexed fields, and builds the search index in the background. Once ready, workers serve ctx.datastore queries against it.
Manage datastores with GET /workflows/{workflow_id}/datastores and DELETE /workflows/{workflow_id}/datastores/{name}.

Uploading a workflow

Register a workflow — and bundle its accessories — with the upload script, run from the repository root:
python scripts/db_ops/upload_workflow.py <company_id> <workflow_ref_name> <script_path> [options]
This uploads the script to S3, upserts the workflow row (timeout, execution mode, application binding), and uploads any accessory files next to the script. Callers then invoke it as model="workflow:<workflow_ref_name>".
# Minimal
python scripts/db_ops/upload_workflow.py 22 my-workflow ./workflows/my_workflow.py

# Bind to an application (created if missing) and bundle two accessories
python scripts/db_ops/upload_workflow.py 2 hs-classification ./workflows/avalara/hs_classification.py \
  --application-name "HS Classification" \
  --accessory hts_codes=./workflows/avalara/hts_codes_v2.json \
  --accessory hts_notes=./workflows/avalara/hts_notes.json
OptionMeaning
--nameDisplay name (defaults to a title-cased ref name).
--descDescription.
--timeoutWorker timeout in seconds (default 600).
--modeExecution mode: sync, async, or stream (default sync).
--application-nameBind to an application by name, creating it if needed. Don’t combine with --application-id.
--application-idBind to an existing application id (its company is used; company_id may then be omitted).
--accessory KEY=pathRepeatable. Uploads the file and exposes it to ctx.load_accessory("KEY") via {KEY_UPPER}_S3_KEY.
--publish-versionAfter upsert, publish an immutable version (like the Portal’s Publish).
--version-notesNotes for the published version.
--version-bumpmajor / minor / patch (default minor) when publishing.
Use company_id = -1 for a global workflow. Run from maitai-backend/ so relative paths like ./workflows/... resolve, with AWS + database credentials configured for your target environment.

Execution modes

ModeBehavior
syncCaller blocks until the workflow finishes; the response carries the final output. The default.
asyncThe run is queued; the call returns immediately and you poll for the result.
streamIntermediate chunks (from ctx.emit) stream to the caller, followed by the final completion.

Next