Skip to Content
BackendOpensearchOpenSearch

OpenSearch

OpenSearch powers marketplace product and listing search in CardNexus. This document covers the architecture, setup, indexing, and querying patterns.

Overview

FeatureDescription
ProviderOpenSearch (Docker locally, AWS managed in production)
Client Package@repo/opensearch
Search Package@repo/search
Sync Package@repo/opensearch-sync
Index Nameproduct_listing_v1 (aliased as product_listing_current)
ModelParent-child (products → listings)

Architecture

Data Flow

  • Events → Queue: Subscribers publish IDs only (productIds or inventoryIds) to SQS.
  • Handlers: Consume message (IDs) → read full documents from MongoDB (ProductView / Inventory + User) → transform → bulk index or delete in OpenSearch.
  • Product sync: ProductSyncHandler uses ProductViewModel.find().
  • Listing sync: ListingSyncHandler uses InventoryModel.find() and UserModel.find() (for seller profile, e.g. shipsTo).

Components

  • @repo/opensearch: Generic OpenSearch client wrapper with environment-based configuration
  • @repo/search: Index schema definitions, bootstrap service, indexer services, query builder
  • @repo/opensearch-sync: Event subscribers, SQS producers, sync handlers

Setup

Local Development

OpenSearch is included in Docker Compose:

# Start OpenSearch docker compose up opensearch # Start with OpenSearch Dashboards (UI) docker compose --profile opensearch up opensearch opensearch-dashboards
ServicePortURL
OpenSearch9200http://localhost:9200 
OpenSearch Dashboards5601http://localhost:5601 

Environment Variables

Configuration via @t3-oss/env-core:

VariableDescriptionDefault
OPENSEARCH_URLOpenSearch endpointhttp://localhost:9200
OPENSEARCH_USERNAMEBasic auth username-
OPENSEARCH_PASSWORDBasic auth password-
OPENSEARCH_BEARER_TOKENBearer token-
OPENSEARCH_REQUEST_TIMEOUT_MSRequest timeout (ms)30000
OPENSEARCH_MAX_RETRIESMax retries3
OPENSEARCH_SSL_REJECT_UNAUTHORIZEDSSL verificationtrue

Index Bootstrap

The SearchService.init() method runs automatically on backend startup:

  1. Checks if product_listing_v1 index exists
  2. Creates index with schema if missing
  3. Checks if product_listing_current alias exists
  4. Creates alias pointing to product_listing_v1 if missing

Bootstrap is safe to run on every startup - exists checks are cheap operations.

Index Schema

Structure

  • Index: product_listing_v1
  • Alias: product_listing_current (points to current version)
  • Mapping Policy: Explicit mapping with dynamic: false
  • Relationship: Parent-child via docJoin field (product → listing)

Document Types

Product Document (Parent)

{ "docJoin": "product", "product": { "name": "Black Lotus", "nameSlug": "black-lotus", "gameSlug": "mtg", "priceUs": 123456, "priceEu": 123456, "translations": { "fr": { "name": "Lotus Noir" }, "de": { "name": "Schwarze Lotus" } }, "attrs": { "manaCost_keyword": "{3}", "power_number": 5, "toughness_number": 5 } } }

Key Points:

  • _id = product ID
  • docJoin = "product" (string)
  • Routing required: When indexing, use routing: productId (Bulk API) so parent and children land on the same shard.
  • Prices stored as double (cents)
  • Game-specific attributes in product.attrs with suffix convention

Listing Document (Child)

{ "docJoin": { "name": "listing", "parent": "product_123" }, "listing": { "userId": "user_456", "username": "seller123", "price": 123456, "currency": "USD", "condition": "NM", "language": "en", "quantity": 2, "shipsTo": ["US", "CA", "FR", "DE"], "graded": { "grade": "10", "certification": "12345678", "gradingService": "PSA" } } }

Key Points:

  • _id = listing ID
  • docJoin = { "name": "listing", "parent": "<productId>" }
  • Routing required: Must use routing: productId when indexing (Bulk API). Parent and child must share the same routing value so they live on the same shard.
  • Prices stored as long (cents as integer)
  • shipsTo: Array of ISO country codes (e.g., ["US", "CA", "FR"]) - populated from user.sellerProfile.shipsTo

Implementation: @repo/search indexers already pass routing for both products and listings. ProductIndexerService.bulkIndexProducts / bulkDeleteProducts and ListingIndexerService.bulkIndexListings / bulkDeleteListings use routing: productId in every bulk index and delete action.

Field Types

Field TypeExamplesStorage
Priceproduct.priceUs, listing.pricedouble / long (cents)
Dateproduct.createdAtdate (not indexed)
Translationproduct.translations.fr.nametext with analyzer
Game Attributesproduct.attrs.manaCost_keywordSuffix-based (_keyword, _text, _number, _bool, _date)

Data Synchronization

Event-Driven Sync

Products and listings sync automatically via domain events:

  • Product Sync: ProductViewRefreshedEvent → OpenSearchProductSyncSubscriber → SQS → ProductSyncHandler
  • Listing Sync: InventoryChangedEvent → OpenSearchListingSyncSubscriber → SQS → ListingSyncHandler
  • Listing Sync (shipsTo only): SellerShipsToChangedEvent → SellerShipsToSyncSubscriber → SQS → ListingSyncHandler — when a seller updates only sellerProfile.shipsTo (no inventory changes), this subscriber re-indexes all that seller’s active listings so the listing.shipsTo field in OpenSearch stays in sync.

Listing shipsTo Field

The listing.shipsTo field in OpenSearch is populated from user.sellerProfile.shipsTo (an array of ISO country codes the seller ships to).

Important: When a user updates their sellerProfile.shipsTo, all their listings in OpenSearch are re-synced automatically. This is triggered either by InventoryChangedEvent (when inventory changes) or by SellerShipsToChangedEvent (when only shipsTo is updated), via SellerShipsToSyncSubscriber in @repo/opensearch-sync. No manual sync is needed for shipsTo-only updates.

The shipsTo field comes from sellerProfile.shipsTo. When sellerProfile.shipsTo is updated, SellerShipsToSyncSubscriber publishes all that seller’s active listing inventory IDs to the listing sync queue, so OpenSearch listings are re-indexed with the new shipsTo value.

Manual Sync

Use the CLI to manually sync products:

# Sync all products pnpm cdnx opensearch sync products # Sync specific game pnpm cdnx opensearch sync products --game mtg # Dry run pnpm cdnx opensearch sync products --dry-run

Query Examples

Basic Queries

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "filter": [ { "term": { "docJoin": "product" } }, { "term": { "product.gameSlug": "mtg" } } ] } }, "size": 20, "track_total_hits": true }'

Filter by Expansion

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "filter": [ { "term": { "docJoin": "product" } }, { "term": { "product.gameSlug": "mtg" } }, { "term": { "product.expansion.slug": "alpha" } } ] } }, "size": 20 }'

Product Name Search (with Fuzzy Matching)

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "must": [ { "term": { "docJoin": "product" } }, { "bool": { "should": [ { "match": { "product.translations.fr.name": { "query": "lotus", "operator": "or", "fuzziness": "AUTO", "fuzzy_transpositions": true, "prefix_length": 2 } } }, { "wildcard": { "product.translations.fr.name.keyword": { "value": "*lotus*", "case_insensitive": true } } }, { "match": { "product.name": { "query": "lotus", "operator": "or", "fuzziness": "AUTO" } } } ], "minimum_should_match": 1 } } ], "filter": [ { "term": { "product.gameSlug": "mtg" } } ] } }, "size": 20 }'

Filtering

Filter by Price Range

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "filter": [ { "term": { "docJoin": "product" } }, { "term": { "product.gameSlug": "mtg" } }, { "range": { "product.priceUs": { "gte": 1000, "lte": 5000 } } } ] } }, "size": 20 }'

Filter by Game-Specific Attributes

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "filter": [ { "term": { "docJoin": "product" } }, { "term": { "product.gameSlug": "mtg" } }, { "terms": { "product.rarity": ["mythic", "rare"] } }, { "range": { "product.attrs.power_number": { "gte": 5 } } } ] } }, "size": 20 }'

Faceting

See the dedicated Faceted Search page for a complete guide to the dimensions system, including self-excluding aggregations, the three facet families, and how game-specific attributes are resolved dynamically.

Parent-Child Queries (Listings)

Products with Listings Filter

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "must": [ { "term": { "docJoin": "product" } }, { "has_child": { "type": "listing", "query": { "bool": { "filter": [ { "range": { "listing.price": { "gte": 1000, "lte": 5000 } } }, { "term": { "listing.condition": "NM" } } ] } }, "inner_hits": { "size": 100, "sort": [ { "listing.price": { "order": "asc" } } ] } } } ], "filter": [ { "term": { "product.gameSlug": "mtg" } } ] } }, "size": 20 }'

Filter by Graded Listings

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "must": [ { "term": { "docJoin": "product" } }, { "has_child": { "type": "listing", "query": { "bool": { "filter": [ { "exists": { "field": "listing.graded" } }, { "term": { "listing.graded.gradingService": "PSA" } } ] } }, "inner_hits": { "size": 100, "sort": [ { "listing.price": { "order": "asc" } } ] } } } ], "filter": [ { "term": { "product.gameSlug": "mtg" } } ] } }, "size": 20 }'

Filter by Shipping Destination (shipsTo)

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "must": [ { "term": { "docJoin": "product" } }, { "has_child": { "type": "listing", "query": { "terms": { "listing.shipsTo": ["US", "CA"] } }, "inner_hits": { "size": 100, "sort": [ { "listing.price": { "order": "asc" } } ] } } } ], "filter": [ { "term": { "product.gameSlug": "mtg" } } ] } }, "size": 20 }'

The shipsTo field is an array of country codes. Use terms query to match listings that ship to any of the specified countries.

Sorting

Sort by Price

curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{ "query": { "bool": { "filter": [ { "term": { "docJoin": "product" } }, { "term": { "product.gameSlug": "mtg" } } ] } }, "sort": [ { "product.priceUs": { "order": "asc" } }, { "_id": { "order": "asc" } } ], "size": 20 }'

Using the TypeScript API

The normalized query shape uses product (identity, type, category, market, game filters) and listing (listing-level filters). Game-specific attributes are passed via gameFilters, a strongly-typed discriminated union validated per-game at the Zod schema level. The API uses product.type; the query builder maps it to the index field product.productType.

import { ListingSearchService } from "@repo/search" const queryService = container.resolve(ListingSearchService) const results = await queryService.search({ product: { gameFilters: { game: "mtg" } }, pagination: { limit: 20, offset: 0 } })

Search with Filters

const results = await queryService.search({ product: { gameFilters: { game: "mtg", rarity: { op: "or", values: ["mythic", "rare"] } }, expansionSlug: "alpha", name: { value: "Lotus" }, type: { op: "and", values: ["card"] }, market: { priceUs: { min: 1000, max: 5000 } }, }, listing: { priceRange: { min: 1000, max: 5000 }, condition: { op: "and", values: ["NM", "LP"] }, currency: "USD" }, pagination: { limit: 20, offset: 0, sort: [["product.priceUs", "asc"]] }, locale: "en" })

Index Versioning

OpenSearch mappings are immutable — you can’t change field types, normalizers, or dynamic templates on an existing index. Schema changes require creating a new index and reindexing all data.

We have an automated zero-downtime reindex migration harness that handles this via a Trigger.dev task. See the dedicated Reindex Migration page for the full guide, including the migration flow, operator recipe, and code locations.

Monitoring

Check Index Status

# Check if index exists curl http://localhost:9200/product_listing_v1 # Get index stats curl http://localhost:9200/product_listing_v1/_stats # Get index mapping curl http://localhost:9200/product_listing_v1/_mapping

Check Alias

# List aliases curl http://localhost:9200/_aliases # Search via alias curl -X POST "http://localhost:9200/product_listing_current/_search" \ -H 'Content-Type: application/json' \ -d '{"query": {"match_all": {}}}'

Monitor SQS queue depth and DLQ for sync issues. Check sync handler logs for errors.

Best Practices

  1. Always use alias: Write to product_listing_current, not versioned index
  2. Routing for listings: Always provide routing: productId when indexing products and listings (Bulk API key routing) so parent and children are on the same shard.
  3. Bulk operations: Use bulk API for multiple documents (batches of 1000)
  4. Filters over must: Use filter clauses when possible (cached, faster)
  5. Limit inner_hits: Default is 100, adjust based on needs
  6. Schema changes: Always version indices, never modify existing mappings
  7. Translation queries: Boost locale-specific fields for better relevance
Last updated on