> ## Documentation Index
> Fetch the complete documentation index at: https://docs.trymaitai.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Datastores & Accessories

> Give a workflow reference data — small bundled files, or large indexed, queryable record stores.

Workflows often need reference data: a lookup table, a prompt library, a corpus of documents to search. Maitai offers two mechanisms, and the right one depends on **size** and **access pattern**.

|                      | [Accessory file](#accessory-files)              | [Datastore](#datastores)                               |
| -------------------- | ----------------------------------------------- | ------------------------------------------------------ |
| **What it is**       | One read-only JSON blob bundled with the script | An S3-backed, indexed record store                     |
| **You read it with** | `ctx.load_accessory("key")`                     | `ctx.datastore.get(...)` / `ctx.datastore.search(...)` |
| **Access pattern**   | Load the whole file into memory                 | Query by id, keyword, or vector similarity             |
| **Best for**         | Small lookup tables, enums, config, prompt sets | Large corpora, semantic/keyword search, retrieval      |
| **Size**             | Up to \~50 MB per file                          | Effectively unlimited                                  |

A rule of thumb: if you'll hold the entire file in memory and index into it yourself, use an **accessory**. If you need to *search* or *selectively fetch* from a large collection, use a **datastore**.

## Accessory files

An accessory is a single JSON file you upload alongside your workflow under a short **key**. At runtime, [`ctx.load_accessory(key)`](/build/workflows/context#load-an-accessory-file) returns its parsed contents.

```python theme={null}
def execute(ctx: WorkflowContext):
    # Loaded once, then cached in-process and in Redis for the worker's lifetime.
    hts_codes = ctx.load_accessory("hts_codes")
    notes = ctx.load_accessory("hts_notes", fallback_path="./hts_notes.json")
    ...
```

**How the key maps to storage.** When you upload an accessory under key `hts_codes`, Maitai stores the file in S3 and sets an env var `HTS_CODES_S3_KEY` in the workflow's config. `load_accessory("hts_codes")` reads that env var to locate the object. The key is uppercased and hyphens become underscores (`hts-codes` → `HTS_CODES_S3_KEY`), so **keep your `load_accessory` key identical to the upload key.**

**Resolution order.** `load_accessory` checks, in order: in-process cache → Redis (24h TTL) → S3 (then writes back to Redis). If no S3 key is configured and you passed `fallback_path`, it loads that local file — convenient for local development before the accessory is uploaded.

**Limits.** Up to \~50 MB per accessory and 100 accessories per workflow. Keys are lowercase alphanumeric plus `_`/`-`. Files must be valid UTF-8 JSON.

See [Uploading a workflow](#uploading-a-workflow) for the `--accessory` flag.

## Datastores

A datastore is a queryable collection of JSON records. You define which fields to index and how; Maitai stores the raw records in S3 and builds a search index over the indexed fields. At runtime your workflow queries it through [`ctx.datastore`](/build/workflows/context#query-a-datastore) — by record id, by keyword/tag filter, or by vector similarity — without managing any of the underlying infrastructure.

Each workflow has at most one datastore. (Need several datasets? Split them across workflows and compose with [nested calls](/build/workflows/context#call-another-workflow).)

### Schema

A datastore is defined by a small schema: which field is the record key, where the raw records live, and how each indexed field is treated.

```yaml theme={null}
record_key: ruling_id                     # primary id field on each record
records_prefix: avalara/rulings/enriched  # S3 prefix holding the raw JSON records
fields:
  ruling_id:
    type: tag
  hts_codes:
    type: tag
    separator: ","      # multi-value tag field
  product_short:
    type: text          # full-text searchable
  summary:
    type: text
  embedding:
    type: vector        # dense-vector similarity search
    dim: 1536
    algorithm: HNSW       # HNSW | FLAT
    distance_metric: COSINE  # COSINE | L2 | IP
```

**Field types:**

| Type      | Query with           | Notes                                                                                                  |
| --------- | -------------------- | ------------------------------------------------------------------------------------------------------ |
| `tag`     | `Tag(field, [...])`  | Exact or prefix match; OR across values. `separator` enables multi-value fields.                       |
| `text`    | `Text("...")`        | Tokenized full-text search.                                                                            |
| `numeric` | `Raw(...)`           | Numeric range filters via the raw escape hatch.                                                        |
| `vector`  | `search(vector=...)` | KNN similarity. Requires `dim`; `algorithm` (HNSW/FLAT) and `distance_metric` (COSINE/L2/IP) optional. |

Only the fields you list are indexed; `ctx.datastore.get` always returns the complete record from S3.

### Querying

<a id="querying" />

Import the filter helpers from `maitai_workflow` and pass a filter as `where`. Helpers compose with `all_`/`any_`/`not_` (or the `&` `|` `~` operators).

```python theme={null}
from maitai_workflow import Tag, Text, Raw, all_, any_, not_

# Exact id fetch (full record from S3)
record = ctx.datastore.get("ruling-12345")

# Semantic search: pass an embedding vector
hits = ctx.datastore.search(vector=query_embedding, k=10)

# Keyword search with a tag filter
hits = ctx.datastore.search(where=Tag("keywords", ["leather", "boots"]), k=20)

# Hybrid: vector + structured filter
hits = ctx.datastore.search(
    vector=query_embedding,
    where=all_(
        Tag("hts_codes", ["6202"], prefix=True),       # prefix match
        any_(Text("leather boots"), Tag("keywords", ["leather"])),
        not_(Tag("keywords", ["children"])),
    ),
    k=10,
)

# Power user: raw RediSearch (phrases, wildcards, numeric ranges)
hits = ctx.datastore.search(where=Text('"steel toe" leather*', op="raw"), k=20)
```

* **`Tag(field, values, prefix=False)`** — match a tag field against one or more values (OR). `prefix=True` matches by prefix (`"6202"` → `6202.10.4080`, …).
* **`Text(terms, field=None, op="any")`** — full-text search. `op="any"` (OR, recall), `op="all"` (AND, precision), or `op="raw"` for verbatim RediSearch (phrases, `leather*`, etc.).
* **`Raw(expr)`** — drop in a literal RediSearch fragment for anything the DSL doesn't model (numeric/geo ranges).

`search` returns only the **indexed** fields by default; pass `full=True` to hydrate the complete records from S3 (slower). With a `vector`, results are sorted by similarity; without one, by text relevance.

### Creating a datastore

1. **Define the schema** for the workflow (a YAML/JSON document like above):

   ```bash theme={null}
   POST /workflows/{workflow_id}/datastores/{name}
   ```

2. **Upload records** — a JSON array of records, or an S3 URI to ingest:

   ```bash theme={null}
   POST /workflows/{workflow_id}/datastores/{name}/upload
   ```

   Maitai stores the records, computes/loads the indexed fields, and builds the search index in the background. Once ready, workers serve `ctx.datastore` queries against it.

Manage datastores with `GET /workflows/{workflow_id}/datastores` and `DELETE /workflows/{workflow_id}/datastores/{name}`.

## Uploading a workflow

Register a workflow — and bundle its accessories — with the upload script, run from the repository root:

```bash theme={null}
python scripts/db_ops/upload_workflow.py <company_id> <workflow_ref_name> <script_path> [options]
```

This uploads the script to S3, upserts the workflow row (timeout, execution mode, application binding), and uploads any accessory files next to the script. Callers then invoke it as `model="workflow:<workflow_ref_name>"`.

```bash theme={null}
# Minimal
python scripts/db_ops/upload_workflow.py 22 my-workflow ./workflows/my_workflow.py

# Bind to an application (created if missing) and bundle two accessories
python scripts/db_ops/upload_workflow.py 2 hs-classification ./workflows/avalara/hs_classification.py \
  --application-name "HS Classification" \
  --accessory hts_codes=./workflows/avalara/hts_codes_v2.json \
  --accessory hts_notes=./workflows/avalara/hts_notes.json
```

| Option                 | Meaning                                                                                              |
| ---------------------- | ---------------------------------------------------------------------------------------------------- |
| `--name`               | Display name (defaults to a title-cased ref name).                                                   |
| `--desc`               | Description.                                                                                         |
| `--timeout`            | Worker timeout in **seconds** (default `600`).                                                       |
| `--mode`               | Execution mode: `sync`, `async`, or `stream` (default `sync`).                                       |
| `--application-name`   | Bind to an application by name, creating it if needed. Don't combine with `--application-id`.        |
| `--application-id`     | Bind to an existing application id (its company is used; `company_id` may then be omitted).          |
| `--accessory KEY=path` | Repeatable. Uploads the file and exposes it to `ctx.load_accessory("KEY")` via `{KEY_UPPER}_S3_KEY`. |
| `--publish-version`    | After upsert, publish an immutable version (like the Portal's **Publish**).                          |
| `--version-notes`      | Notes for the published version.                                                                     |
| `--version-bump`       | `major` / `minor` / `patch` (default `minor`) when publishing.                                       |

<Note>
  Use `company_id = -1` for a global workflow. Run from `maitai-backend/` so relative paths like `./workflows/...` resolve, with AWS + database credentials configured for your target environment.
</Note>

### Execution modes

| Mode     | Behavior                                                                                                                                                |
| -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `sync`   | Caller blocks until the workflow finishes; the response carries the final output. The default.                                                          |
| `async`  | The run is queued; the call returns immediately and you poll for the result.                                                                            |
| `stream` | Intermediate chunks (from [`ctx.emit`](/build/workflows/context#stream-progress-to-the-caller)) stream to the caller, followed by the final completion. |

## Next

* Use this data in your script: [Workflow Context (`ctx`)](/build/workflows/context)
* How a workflow file is laid out: [Workflow Structure](/build/workflows/structure)
* Invoke your workflow: [Invoking workflows](/build/workflows/overview)
