Reindex Migration
OpenSearch mappings are immutable once created — you can’t change a field’s type, add a normalizer, or modify dynamic templates on an existing index. Any schema change requires creating a new index and reindexing all data.
This page documents the zero-downtime reindex migration harness that automates this process as a Trigger.dev task.
Overview
| Feature | Description |
|---|---|
| Service | ReindexMigrationService in @repo/search |
| Task | reindexOpenSearch in @repo/tasks (Trigger.dev) |
| DTO | reindexOpenSearchSchema in @repo/tasks-dtos |
| Pattern | Alias swap with two catch-up passes |
| Duration | Depends on data volume (minutes to hours) |
| Concurrency | One migration at a time (queue limit) |
When You Need This
- Changing a field type (e.g.,
keywordtotext) - Adding or removing a normalizer (e.g.,
lowercase_norm) - Modifying dynamic templates
- Adding new mapped fields to the schema
- Any change to
buildListingSchema()inlisting-schema.ts
Adding new data to existing fields (e.g., indexing a new game attribute that matches an existing dynamic template) does NOT require a reindex — the dynamic template handles it automatically.
Migration Flow
Products vs Listings
Products are system-controlled (imported via CLI, not user-generated). During a migration, the full product stream in step 4 is sufficient — no timestamp-based catch-up is needed.
Listings are user-generated and continuously changing via the event-driven sync pipeline. Two catch-up passes ensure zero data loss:
| Pass | Timing | What it covers | Target |
|---|---|---|---|
| Catch-up #1 | Before alias swap | Listings created/updated during bulk sync (steps 4-5) | Indexes into v{N} |
| Cleanup #1 | Before alias swap | Listings deactivated during bulk sync (steps 4-5) | Deletes from v{N} |
| Catch-up #2 | After alias swap | Listings created/updated between T2 and alias swap (steps 7-9) | Indexes via alias |
| Cleanup #2 | After alias swap | Listings deactivated between T2 and alias swap (steps 7-9) | Deletes via alias |
The event-driven sync pipeline continues writing to the alias (old index) throughout steps 3-9. After step 9, new events automatically write to v{N} via the alias.
Idempotency
If the task fails mid-way, re-triggering with the same targetVersion is safe:
- The index already exists — creation is skipped
- Bulk
indexoperations are upserts — re-streaming overwrites existing documents - The alias swap is atomic — it either succeeds or doesn’t
Error Handling
The service aborts immediately on the first error — any transform failure, batch index failure, or user fetch failure causes the task to stop. This is intentional: a migration must be all-or-nothing to avoid silently losing data. Since the migration is idempotent, you can fix the root cause and re-trigger safely.
Operator Recipe
Modify the Schema
Edit buildListingSchema() in listing-schema.ts.
Bump the Version
Increment SCHEMA_VERSION in the same file (e.g., 1 → 2).
export const SCHEMA_VERSION = 2 // Was 1Update Transforms (if needed)
If the document shape changed (new fields, renamed fields), update the transform functions in @repo/opensearch-sync/transforms/.
Deploy
Merge and deploy. On startup, SearchService.init() logs a warning:
OpenSearch alias points to a different version than expected.
Run the reindex migration task to update.The application continues working normally — reads/writes go through the alias to the old index.
Trigger the Task
In the Trigger.dev dashboard, trigger reindexOpenSearch:
{
"targetVersion": 2
}Optional parameters:
| Parameter | Default | Description |
|---|---|---|
batchSize | 500 | Documents per bulk batch |
skipCleanup | false | Keep old index after swap |
Monitor
Watch progress in Trigger.dev logs:
[creating_index] 0 indexed (0s)
[syncing_products] 45000 indexed (120s)
[syncing_listings] 12000 indexed (180s)
[catchup_listings] 150 indexed (182s)
[cleanup_deactivated] 3 indexed (182s)
[swapping_alias] 0 indexed (183s)
[post_swap_catchup] 8 indexed (184s)
[post_swap_cleanup_deactivated] 0 indexed (184s)
[cleanup] 0 indexed (184s)Done
Alias swapped, old index deleted. Verify in OpenSearch:
curl http://localhost:9200/_aliases | jq
# Should show: product_listing_v2 → product_listing_currentTask Payload
import { z } from "zod"
export const reindexOpenSearchSchema = z.object({
targetVersion: z.number().int().positive(),
batchSize: z.number().int().positive().prefault(500),
skipCleanup: z.boolean().prefault(false),
})Schema Versioning
The schema version is a simple integer constant in listing-schema.ts:
export const SCHEMA_VERSION = 1
export const buildIndexName = (version: number) => `product_listing_v${version}`
export const CURRENT_INDEX_NAME = buildIndexName(SCHEMA_VERSION)SCHEMA_VERSION— bump when schema changesbuildIndexName()— produces predictable index names (product_listing_v1,product_listing_v2, …)CURRENT_INDEX_NAME— used bySearchService.init()for the create-if-not-exists check
Code Locations
| Component | Location |
|---|---|
| Schema & versioning | packages/backend/features/search/src/listings/listing-schema.ts |
| Migration service | packages/backend/features/search/src/reindex-migration.service.ts |
| Error types | packages/backend/features/search/src/indexer.errors.ts |
| Search init | packages/backend/features/search/src/search.service.ts |
| Product indexer | packages/backend/features/search/src/product-indexer.service.ts |
| Listing indexer | packages/backend/features/search/src/listing-indexer.service.ts |
| Product transform | packages/backend/features/search/src/transforms/product-transform.ts |
| Listing transform | packages/backend/features/search/src/transforms/listing-transform.ts |
| Trigger.dev task | packages/backend/tasks/src/trigger/reindex-opensearch.task.ts |
| Task DTO | packages/core/tasks-dtos/src/reindex-opensearch.ts |