Queue System
The queue system is a wrapper around AWS SQS that provides type-safe message publishing and consumption with automatic retries, distributed tracing, and large payload handling via S3.
Overview
| Feature | Description |
|---|---|
| Provider | AWS SQS (LocalStack in development) |
| Package | @repo/queue |
| Worker | apps/backend/src/worker.ts |
| Tracing | DataDog APM integration |
Adding a New Queue
Define Queue in Infrastructure
Add your queue to catalog/units/sqs-queues/terragrunt.hcl in the infrastructure repo :
my-new-queue = {
fifo = true # true for FIFO, false for standard
content_based_deduplication = true # FIFO only: dedupe by message body
visibility_timeout_seconds = 300 # Time message is hidden after receive
message_retention_seconds = 86400 # How long messages stay in queue
max_message_size = 1024 # Max message size in bytes
create_dlq = true # Create dead-letter queue
max_receive_count = 3 # Retries before DLQ
# schedule = "cron(0 * * * ? *)" # Optional: EventBridge
}Queue naming convention: {queue-key}-{environment}[.fifo]
Example: my-new-queue-prod.fifo
Register Queue Configuration
Add the queue config to packages/backend/queue/src/config.ts:
export const QUEUE_CONFIGS: Record<string, QueueConfig> = {
// ... existing queues
"my-new-queue": {
fifo: true,
dlq: true,
concurrency: 1, // Concurrent message processors
batchSize: 1, // Messages per poll (1-10)
},
}Define the Message Schema
Create a DTO in packages/backend/queue/src/dtos/:
// packages/backend/queue/src/dtos/my-new-queue.dto.ts
import { z } from "zod"
export const myNewQueuePayloadSchema = z.object({
userId: z.string(),
itemId: z.string(),
action: z.enum(["create", "update", "delete"]),
})
export type MyNewQueuePayload = z.infer<typeof myNewQueuePayloadSchema>Export from the index:
// packages/backend/queue/src/dtos/index.ts
export * from "./my-new-queue.dto"Create the Handler
Create a handler class:
// packages/backend/domains/my-domain/src/my-new-queue.handler.ts
import { injectable, inject } from "tsyringe"
import type { MyNewQueuePayload } from "@repo/queue"
@injectable()
export class MyNewQueueHandler {
constructor(
@inject(SomeService) private someService: SomeService
) {}
async handle(payload: MyNewQueuePayload): Promise<void> {
// Process the message
await this.someService.doSomething(payload)
}
}Register the Handler
Add the handler to apps/backend/src/handlers/registry.ts:
import { MyNewQueueHandler } from "@repo/my-domain"
import { myNewQueuePayloadSchema } from "@repo/queue"
export class HandlerRegistry {
private registerHandlers(): void {
// ... existing handlers
const myNewQueueHandler = container.resolve(MyNewQueueHandler)
this.handlers.set("my-new-queue", {
schema: myNewQueuePayloadSchema,
handler: async (payload, metadata) => {
await myNewQueueHandler.handle(payload)
},
})
}
}Publishing Messages
Use QueuePublisher to send messages:
import { inject, injectable } from "tsyringe"
import { QueuePublisher, myNewQueuePayloadSchema } from "@repo/queue"
@injectable()
export class MyService {
constructor(
@inject(QueuePublisher) private queuePublisher: QueuePublisher
) {}
async queueWork(userId: string, itemId: string): Promise<string> {
const { messageId } = await this.queuePublisher.publish(
"my-new-queue",
myNewQueuePayloadSchema,
{
userId,
itemId,
action: "create",
},
{
// FIFO options (required for FIFO queues)
messageGroupId: userId, // Messages with same group are ordered
messageDeduplicationId: `${userId}-${itemId}-${Date.now()}`,
}
)
return messageId
}
}Publish Options
interface PublishOptions {
delaySeconds?: number // Delay delivery (0-900 seconds)
messageGroupId?: string // FIFO: ordering group
messageDeduplicationId?: string // FIFO: deduplication key
messageAttributes?: Record<string, MessageAttributeValue>
}Message Envelope
Messages are automatically wrapped with metadata:
{
metadata: {
messageId: "uuid",
timestamp: "2024-01-25T12:00:00.000Z",
retryCount: 0,
// Distributed tracing headers (auto-injected)
"x-datadog-trace-id": "...",
"x-datadog-parent-id": "...",
},
payload: {
// Your message data
}
}Consuming Messages
The worker (apps/backend/src/worker.ts) automatically starts consumers for all registered queues.
Handler Signature
type MessageHandler<T> = (
payload: T,
metadata: {
messageId: string
timestamp: string
retryCount: number
}
) => Promise<void>Consumer Behavior
| Feature | Behavior |
|---|---|
| Long Polling | 20-second wait time |
| Concurrency | Configurable per queue |
| Batch Size | 1-10 messages per poll |
| Visibility Timeout | Extended based on retry backoff |
| Graceful Shutdown | Waits up to 30s for active processors |
Retry Strategy
Failed messages are retried with exponential backoff:
| Retry | Delay |
|---|---|
| 1st | 30 seconds |
| 2nd | 60 seconds |
| 3rd | 120 seconds |
| 4th | 240 seconds |
| 5th+ | 600 seconds (max) |
After maxReceiveCount failures, messages go to the Dead Letter Queue (DLQ).
Large Payloads
For payloads exceeding SQS limits (256KB), use QueueS3PayloadService:
import { inject, injectable } from "tsyringe"
import { QueuePublisher, QueueS3PayloadService, myQueueSchema } from "@repo/queue"
@injectable()
export class MyService {
constructor(
@inject(QueuePublisher) private publisher: QueuePublisher,
@inject(QueueS3PayloadService) private s3Payload: QueueS3PayloadService
) {}
async queueLargePayload(items: Item[]): Promise<void> {
const messageId = crypto.randomUUID()
// Store large data in S3
const s3Key = await this.s3Payload.store(
"my-queue",
messageId,
items
)
// Send reference in queue message
await this.publisher.publish(
"my-queue",
myQueueSchema,
{
s3Key,
itemCount: items.length,
},
{ messageGroupId: "batch" }
)
}
}Retrieve in handler:
async handle(payload: MyPayload): Promise<void> {
const items = await this.s3Payload.retrieve<Item[]>(payload.s3Key)
// Process items...
}S3 payloads are stored in cardnexus-queue-payloads-{environment} with 30-day expiration.
Cron-Triggered Queues
Some queues are triggered by EventBridge Scheduler instead of direct publishing:
# In terragrunt.hcl
process-stuck-fund-operations = {
fifo = false
create_dlq = false
schedule = "cron(*/5 * * * ? *)" # Every 5 minutes
}These handlers receive an empty payload and query the database for work:
@injectable()
export class StuckFundOperationsHandler {
async handle(): Promise<void> {
// Find stuck operations from DB
const stuckOps = await this.fundService.findStuckOperations()
for (const op of stuckOps) {
await this.fundService.retryOperation(op)
}
}
}FIFO vs Standard Queues
FIFO Queue
Use when:
- Message ordering matters within a group
- Need deduplication
- Processing must be sequential per user/entity
Configuration:
{
fifo: true,
concurrency: 1, // Usually 1 for strict ordering
batchSize: 1,
}API Reference
QueuePublisher
class QueuePublisher {
publish<T extends z.ZodTypeAny>(
queueKey: string,
payloadSchema: T,
payload: z.infer<T>,
options?: PublishOptions
): Promise<{ messageId: string }>
}QueueConsumer
class QueueConsumer {
consume<T extends z.ZodType>(
queueKey: string,
payloadSchema: T,
handler: MessageHandler<z.infer<T>>
): Promise<void>
stop(): Promise<void> // Graceful shutdown
}QueueS3PayloadService
class QueueS3PayloadService {
store<T>(
queueKey: string,
messageId: string,
data: T
): Promise<string> // Returns S3 key
retrieve<T>(s3Key: string): Promise<T>
}QueueConfig
interface QueueConfig {
fifo: boolean // FIFO or standard queue
dlq: boolean // Has dead-letter queue
concurrency: number // Concurrent processors
batchSize: number // Messages per poll (1-10)
}Local Development
LocalStack provides SQS locally:
# Start infrastructure
docker compose up -d
# Queues are auto-created by init-localstack.sh
# Endpoint: http://localhost:4566The queue system auto-detects local environment and configures accordingly:
- Uses LocalStack endpoint
- Uses test credentials
- Auto-provisions missing queues
Distributed Tracing
The queue system integrates with DataDog APM:
- Publishing: Injects trace context from active span into message metadata
- Consuming: Creates child span linked to publisher’s trace
- Visibility: Full trace from API request → queue → handler
Trace headers are automatically propagated:
x-datadog-trace-idx-datadog-parent-idx-datadog-sampling-priority