OpenSearch
OpenSearch powers marketplace product and listing search in CardNexus. This document covers the architecture, setup, indexing, and querying patterns.
Overview
| Feature | Description |
|---|---|
| Provider | OpenSearch (Docker locally, AWS managed in production) |
| Client Package | @repo/opensearch |
| Search Package | @repo/search |
| Sync Package | @repo/opensearch-sync |
| Index Name | product_listing_v1 (aliased as product_listing_current) |
| Model | Parent-child (products → listings) |
Architecture
Data Flow
- Events → Queue: Subscribers publish IDs only (productIds or inventoryIds) to SQS.
- Handlers: Consume message (IDs) → read full documents from MongoDB (ProductView / Inventory + User) → transform → bulk index or delete in OpenSearch.
- Product sync:
ProductSyncHandlerusesProductViewModel.find(). - Listing sync:
ListingSyncHandlerusesInventoryModel.find()andUserModel.find()(for seller profile, e.g.shipsTo).
Components
@repo/opensearch: Generic OpenSearch client wrapper with environment-based configuration@repo/search: Index schema definitions, bootstrap service, indexer services, query builder@repo/opensearch-sync: Event subscribers, SQS producers, sync handlers
Setup
Local Development
OpenSearch is included in Docker Compose:
# Start OpenSearch
docker compose up opensearch
# Start with OpenSearch Dashboards (UI)
docker compose --profile opensearch up opensearch opensearch-dashboards| Service | Port | URL |
|---|---|---|
| OpenSearch | 9200 | http://localhost:9200Â |
| OpenSearch Dashboards | 5601 | http://localhost:5601Â |
Environment Variables
Configuration via @t3-oss/env-core:
| Variable | Description | Default |
|---|---|---|
OPENSEARCH_URL | OpenSearch endpoint | http://localhost:9200 |
OPENSEARCH_USERNAME | Basic auth username | - |
OPENSEARCH_PASSWORD | Basic auth password | - |
OPENSEARCH_BEARER_TOKEN | Bearer token | - |
OPENSEARCH_REQUEST_TIMEOUT_MS | Request timeout (ms) | 30000 |
OPENSEARCH_MAX_RETRIES | Max retries | 3 |
OPENSEARCH_SSL_REJECT_UNAUTHORIZED | SSL verification | true |
Index Bootstrap
The SearchService.init() method runs automatically on backend startup:
- Checks if
product_listing_v1index exists - Creates index with schema if missing
- Checks if
product_listing_currentalias exists - Creates alias pointing to
product_listing_v1if missing
Bootstrap is safe to run on every startup - exists checks are cheap operations.
Index Schema
Structure
- Index:
product_listing_v1 - Alias:
product_listing_current(points to current version) - Mapping Policy: Explicit mapping with
dynamic: false - Relationship: Parent-child via
docJoinfield (product → listing)
Document Types
Product Document (Parent)
{
"docJoin": "product",
"product": {
"name": "Black Lotus",
"nameSlug": "black-lotus",
"gameSlug": "mtg",
"priceUs": 123456,
"priceEu": 123456,
"translations": {
"fr": { "name": "Lotus Noir" },
"de": { "name": "Schwarze Lotus" }
},
"attrs": {
"manaCost_keyword": "{3}",
"power_number": 5,
"toughness_number": 5
}
}
}Key Points:
_id= product IDdocJoin="product"(string)- Routing required: When indexing, use
routing: productId(Bulk API) so parent and children land on the same shard. - Prices stored as
double(cents) - Game-specific attributes in
product.attrswith suffix convention
Listing Document (Child)
{
"docJoin": { "name": "listing", "parent": "product_123" },
"listing": {
"userId": "user_456",
"username": "seller123",
"price": 123456,
"currency": "USD",
"condition": "NM",
"language": "en",
"quantity": 2,
"shipsTo": ["US", "CA", "FR", "DE"],
"graded": {
"grade": "10",
"certification": "12345678",
"gradingService": "PSA"
}
}
}Key Points:
_id= listing IDdocJoin={ "name": "listing", "parent": "<productId>" }- Routing required: Must use
routing: productIdwhen indexing (Bulk API). Parent and child must share the same routing value so they live on the same shard. - Prices stored as
long(cents as integer) shipsTo: Array of ISO country codes (e.g.,["US", "CA", "FR"]) - populated fromuser.sellerProfile.shipsTo
Implementation: @repo/search indexers already pass routing for both products and listings. ProductIndexerService.bulkIndexProducts / bulkDeleteProducts and ListingIndexerService.bulkIndexListings / bulkDeleteListings use routing: productId in every bulk index and delete action.
Field Types
| Field Type | Examples | Storage |
|---|---|---|
| Price | product.priceUs, listing.price | double / long (cents) |
| Date | product.createdAt | date (not indexed) |
| Translation | product.translations.fr.name | text with analyzer |
| Game Attributes | product.attrs.manaCost_keyword | Suffix-based (_keyword, _text, _number, _bool, _date) |
Data Synchronization
Event-Driven Sync
Products and listings sync automatically via domain events:
- Product Sync:
ProductViewRefreshedEvent→OpenSearchProductSyncSubscriber→ SQS →ProductSyncHandler - Listing Sync:
InventoryChangedEvent→OpenSearchListingSyncSubscriber→ SQS →ListingSyncHandler - Listing Sync (shipsTo only):
SellerShipsToChangedEvent→SellerShipsToSyncSubscriber→ SQS →ListingSyncHandler— when a seller updates onlysellerProfile.shipsTo(no inventory changes), this subscriber re-indexes all that seller’s active listings so thelisting.shipsTofield in OpenSearch stays in sync.
Listing shipsTo Field
The listing.shipsTo field in OpenSearch is populated from user.sellerProfile.shipsTo (an array of ISO country codes the seller ships to).
Important: When a user updates their sellerProfile.shipsTo, all their listings in OpenSearch are re-synced automatically. This is triggered either by InventoryChangedEvent (when inventory changes) or by SellerShipsToChangedEvent (when only shipsTo is updated), via SellerShipsToSyncSubscriber in @repo/opensearch-sync. No manual sync is needed for shipsTo-only updates.
The shipsTo field comes from sellerProfile.shipsTo. When sellerProfile.shipsTo is updated, SellerShipsToSyncSubscriber publishes all that seller’s active listing inventory IDs to the listing sync queue, so OpenSearch listings are re-indexed with the new shipsTo value.
Manual Sync
Use the CLI to manually sync products:
# Sync all products
pnpm cdnx opensearch sync products
# Sync specific game
pnpm cdnx opensearch sync products --game mtg
# Dry run
pnpm cdnx opensearch sync products --dry-runQuery Examples
Basic Queries
Simple Product Search
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"filter": [
{ "term": { "docJoin": "product" } },
{ "term": { "product.gameSlug": "mtg" } }
]
}
},
"size": 20,
"track_total_hits": true
}'Filter by Expansion
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"filter": [
{ "term": { "docJoin": "product" } },
{ "term": { "product.gameSlug": "mtg" } },
{ "term": { "product.expansion.slug": "alpha" } }
]
}
},
"size": 20
}'Text Search
Product Name Search (with Fuzzy Matching)
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"must": [
{ "term": { "docJoin": "product" } },
{
"bool": {
"should": [
{
"match": {
"product.translations.fr.name": {
"query": "lotus",
"operator": "or",
"fuzziness": "AUTO",
"fuzzy_transpositions": true,
"prefix_length": 2
}
}
},
{
"wildcard": {
"product.translations.fr.name.keyword": {
"value": "*lotus*",
"case_insensitive": true
}
}
},
{
"match": {
"product.name": {
"query": "lotus",
"operator": "or",
"fuzziness": "AUTO"
}
}
}
],
"minimum_should_match": 1
}
}
],
"filter": [
{ "term": { "product.gameSlug": "mtg" } }
]
}
},
"size": 20
}'Filtering
Filter by Price Range
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"filter": [
{ "term": { "docJoin": "product" } },
{ "term": { "product.gameSlug": "mtg" } },
{
"range": {
"product.priceUs": {
"gte": 1000,
"lte": 5000
}
}
}
]
}
},
"size": 20
}'Filter by Game-Specific Attributes
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"filter": [
{ "term": { "docJoin": "product" } },
{ "term": { "product.gameSlug": "mtg" } },
{
"terms": {
"product.rarity": ["mythic", "rare"]
}
},
{
"range": {
"product.attrs.power_number": {
"gte": 5
}
}
}
]
}
},
"size": 20
}'Faceting
See the dedicated Faceted Search page for a complete guide to the dimensions system, including self-excluding aggregations, the three facet families, and how game-specific attributes are resolved dynamically.
Parent-Child Queries (Listings)
Products with Listings Filter
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"must": [
{ "term": { "docJoin": "product" } },
{
"has_child": {
"type": "listing",
"query": {
"bool": {
"filter": [
{
"range": {
"listing.price": {
"gte": 1000,
"lte": 5000
}
}
},
{
"term": {
"listing.condition": "NM"
}
}
]
}
},
"inner_hits": {
"size": 100,
"sort": [
{
"listing.price": {
"order": "asc"
}
}
]
}
}
}
],
"filter": [
{ "term": { "product.gameSlug": "mtg" } }
]
}
},
"size": 20
}'Filter by Graded Listings
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"must": [
{ "term": { "docJoin": "product" } },
{
"has_child": {
"type": "listing",
"query": {
"bool": {
"filter": [
{
"exists": {
"field": "listing.graded"
}
},
{
"term": {
"listing.graded.gradingService": "PSA"
}
}
]
}
},
"inner_hits": {
"size": 100,
"sort": [
{
"listing.price": {
"order": "asc"
}
}
]
}
}
}
],
"filter": [
{ "term": { "product.gameSlug": "mtg" } }
]
}
},
"size": 20
}'Filter by Shipping Destination (shipsTo)
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"must": [
{ "term": { "docJoin": "product" } },
{
"has_child": {
"type": "listing",
"query": {
"terms": {
"listing.shipsTo": ["US", "CA"]
}
},
"inner_hits": {
"size": 100,
"sort": [
{
"listing.price": {
"order": "asc"
}
}
]
}
}
}
],
"filter": [
{ "term": { "product.gameSlug": "mtg" } }
]
}
},
"size": 20
}'The shipsTo field is an array of country codes. Use terms query to match listings that ship to any of the specified countries.
Sorting
Sort by Price
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"filter": [
{ "term": { "docJoin": "product" } },
{ "term": { "product.gameSlug": "mtg" } }
]
}
},
"sort": [
{
"product.priceUs": {
"order": "asc"
}
},
{
"_id": {
"order": "asc"
}
}
],
"size": 20
}'Using the TypeScript API
The normalized query shape uses product (identity, type, category, market, game filters) and listing (listing-level filters). Game-specific attributes are passed via gameFilters, a strongly-typed discriminated union validated per-game at the Zod schema level. The API uses product.type; the query builder maps it to the index field product.productType.
Basic Search
import { ListingSearchService } from "@repo/search"
const queryService = container.resolve(ListingSearchService)
const results = await queryService.search({
product: {
gameFilters: { game: "mtg" }
},
pagination: {
limit: 20,
offset: 0
}
})Search with Filters
const results = await queryService.search({
product: {
gameFilters: {
game: "mtg",
rarity: { op: "or", values: ["mythic", "rare"] }
},
expansionSlug: "alpha",
name: { value: "Lotus" },
type: {
op: "and",
values: ["card"]
},
market: {
priceUs: {
min: 1000,
max: 5000
}
},
},
listing: {
priceRange: {
min: 1000,
max: 5000
},
condition: {
op: "and",
values: ["NM", "LP"]
},
currency: "USD"
},
pagination: {
limit: 20,
offset: 0,
sort: [["product.priceUs", "asc"]]
},
locale: "en"
})Index Versioning
OpenSearch mappings are immutable — you can’t change field types, normalizers, or dynamic templates on an existing index. Schema changes require creating a new index and reindexing all data.
We have an automated zero-downtime reindex migration harness that handles this via a Trigger.dev task. See the dedicated Reindex Migration page for the full guide, including the migration flow, operator recipe, and code locations.
Monitoring
Check Index Status
# Check if index exists
curl http://localhost:9200/product_listing_v1
# Get index stats
curl http://localhost:9200/product_listing_v1/_stats
# Get index mapping
curl http://localhost:9200/product_listing_v1/_mappingCheck Alias
# List aliases
curl http://localhost:9200/_aliases
# Search via alias
curl -X POST "http://localhost:9200/product_listing_current/_search" \
-H 'Content-Type: application/json' \
-d '{"query": {"match_all": {}}}'Monitor SQS queue depth and DLQ for sync issues. Check sync handler logs for errors.
Best Practices
- Always use alias: Write to
product_listing_current, not versioned index - Routing for listings: Always provide
routing: productIdwhen indexing products and listings (Bulk API keyrouting) so parent and children are on the same shard. - Bulk operations: Use bulk API for multiple documents (batches of 1000)
- Filters over must: Use
filterclauses when possible (cached, faster) - Limit inner_hits: Default is 100, adjust based on needs
- Schema changes: Always version indices, never modify existing mappings
- Translation queries: Boost locale-specific fields for better relevance