Skip to Content
BackendQueue System

Queue System

The queue system is a wrapper around AWS SQS that provides type-safe message publishing and consumption with automatic retries, distributed tracing, and large payload handling via S3.

Overview

FeatureDescription
ProviderAWS SQS (LocalStack in development)
Package@repo/queue
Workerapps/backend/src/worker.ts
TracingDataDog APM integration

Adding a New Queue

Define Queue in Infrastructure

Add your queue to catalog/units/sqs-queues/terragrunt.hcl in the infrastructure repo :

my-new-queue = { fifo = true # true for FIFO, false for standard content_based_deduplication = true # FIFO only: dedupe by message body visibility_timeout_seconds = 300 # Time message is hidden after receive message_retention_seconds = 86400 # How long messages stay in queue max_message_size = 1024 # Max message size in bytes create_dlq = true # Create dead-letter queue max_receive_count = 3 # Retries before DLQ # schedule = "cron(0 * * * ? *)" # Optional: EventBridge }

Queue naming convention: {queue-key}-{environment}[.fifo]

Example: my-new-queue-prod.fifo

Register Queue Configuration

Add the queue config to packages/backend/queue/src/config.ts:

export const QUEUE_CONFIGS: Record<string, QueueConfig> = { // ... existing queues "my-new-queue": { fifo: true, dlq: true, concurrency: 1, // Concurrent message processors batchSize: 1, // Messages per poll (1-10) }, }

Define the Message Schema

Create a DTO in packages/backend/queue/src/dtos/:

// packages/backend/queue/src/dtos/my-new-queue.dto.ts import { z } from "zod" export const myNewQueuePayloadSchema = z.object({ userId: z.string(), itemId: z.string(), action: z.enum(["create", "update", "delete"]), }) export type MyNewQueuePayload = z.infer<typeof myNewQueuePayloadSchema>

Export from the index:

// packages/backend/queue/src/dtos/index.ts export * from "./my-new-queue.dto"

Create the Handler

Create a handler class:

// packages/backend/domains/my-domain/src/my-new-queue.handler.ts import { injectable, inject } from "tsyringe" import type { MyNewQueuePayload } from "@repo/queue" @injectable() export class MyNewQueueHandler { constructor( @inject(SomeService) private someService: SomeService ) {} async handle(payload: MyNewQueuePayload): Promise<void> { // Process the message await this.someService.doSomething(payload) } }

Register the Handler

Add the handler to apps/backend/src/handlers/registry.ts:

import { MyNewQueueHandler } from "@repo/my-domain" import { myNewQueuePayloadSchema } from "@repo/queue" export class HandlerRegistry { private registerHandlers(): void { // ... existing handlers const myNewQueueHandler = container.resolve(MyNewQueueHandler) this.handlers.set("my-new-queue", { schema: myNewQueuePayloadSchema, handler: async (payload, metadata) => { await myNewQueueHandler.handle(payload) }, }) } }

Publishing Messages

Use QueuePublisher to send messages:

import { inject, injectable } from "tsyringe" import { QueuePublisher, myNewQueuePayloadSchema } from "@repo/queue" @injectable() export class MyService { constructor( @inject(QueuePublisher) private queuePublisher: QueuePublisher ) {} async queueWork(userId: string, itemId: string): Promise<string> { const { messageId } = await this.queuePublisher.publish( "my-new-queue", myNewQueuePayloadSchema, { userId, itemId, action: "create", }, { // FIFO options (required for FIFO queues) messageGroupId: userId, // Messages with same group are ordered messageDeduplicationId: `${userId}-${itemId}-${Date.now()}`, } ) return messageId } }

Publish Options

interface PublishOptions { delaySeconds?: number // Delay delivery (0-900 seconds) messageGroupId?: string // FIFO: ordering group messageDeduplicationId?: string // FIFO: deduplication key messageAttributes?: Record<string, MessageAttributeValue> }

Message Envelope

Messages are automatically wrapped with metadata:

{ metadata: { messageId: "uuid", timestamp: "2024-01-25T12:00:00.000Z", retryCount: 0, // Distributed tracing headers (auto-injected) "x-datadog-trace-id": "...", "x-datadog-parent-id": "...", }, payload: { // Your message data } }

Consuming Messages

The worker (apps/backend/src/worker.ts) automatically starts consumers for all registered queues.

Handler Signature

type MessageHandler<T> = ( payload: T, metadata: { messageId: string timestamp: string retryCount: number } ) => Promise<void>

Consumer Behavior

FeatureBehavior
Long Polling20-second wait time
ConcurrencyConfigurable per queue
Batch Size1-10 messages per poll
Visibility TimeoutExtended based on retry backoff
Graceful ShutdownWaits up to 30s for active processors

Retry Strategy

Failed messages are retried with exponential backoff:

RetryDelay
1st30 seconds
2nd60 seconds
3rd120 seconds
4th240 seconds
5th+600 seconds (max)

After maxReceiveCount failures, messages go to the Dead Letter Queue (DLQ).

Large Payloads

For payloads exceeding SQS limits (256KB), use QueueS3PayloadService:

import { inject, injectable } from "tsyringe" import { QueuePublisher, QueueS3PayloadService, myQueueSchema } from "@repo/queue" @injectable() export class MyService { constructor( @inject(QueuePublisher) private publisher: QueuePublisher, @inject(QueueS3PayloadService) private s3Payload: QueueS3PayloadService ) {} async queueLargePayload(items: Item[]): Promise<void> { const messageId = crypto.randomUUID() // Store large data in S3 const s3Key = await this.s3Payload.store( "my-queue", messageId, items ) // Send reference in queue message await this.publisher.publish( "my-queue", myQueueSchema, { s3Key, itemCount: items.length, }, { messageGroupId: "batch" } ) } }

Retrieve in handler:

async handle(payload: MyPayload): Promise<void> { const items = await this.s3Payload.retrieve<Item[]>(payload.s3Key) // Process items... }

S3 payloads are stored in cardnexus-queue-payloads-{environment} with 30-day expiration.

Cron-Triggered Queues

Some queues are triggered by EventBridge Scheduler instead of direct publishing:

# In terragrunt.hcl process-stuck-fund-operations = { fifo = false create_dlq = false schedule = "cron(*/5 * * * ? *)" # Every 5 minutes }

These handlers receive an empty payload and query the database for work:

@injectable() export class StuckFundOperationsHandler { async handle(): Promise<void> { // Find stuck operations from DB const stuckOps = await this.fundService.findStuckOperations() for (const op of stuckOps) { await this.fundService.retryOperation(op) } } }

FIFO vs Standard Queues

Use when:

  • Message ordering matters within a group
  • Need deduplication
  • Processing must be sequential per user/entity

Configuration:

{ fifo: true, concurrency: 1, // Usually 1 for strict ordering batchSize: 1, }

API Reference

QueuePublisher

class QueuePublisher { publish<T extends z.ZodTypeAny>( queueKey: string, payloadSchema: T, payload: z.infer<T>, options?: PublishOptions ): Promise<{ messageId: string }> }

QueueConsumer

class QueueConsumer { consume<T extends z.ZodType>( queueKey: string, payloadSchema: T, handler: MessageHandler<z.infer<T>> ): Promise<void> stop(): Promise<void> // Graceful shutdown }

QueueS3PayloadService

class QueueS3PayloadService { store<T>( queueKey: string, messageId: string, data: T ): Promise<string> // Returns S3 key retrieve<T>(s3Key: string): Promise<T> }

QueueConfig

interface QueueConfig { fifo: boolean // FIFO or standard queue dlq: boolean // Has dead-letter queue concurrency: number // Concurrent processors batchSize: number // Messages per poll (1-10) }

Local Development

LocalStack provides SQS locally:

# Start infrastructure docker compose up -d # Queues are auto-created by init-localstack.sh # Endpoint: http://localhost:4566

The queue system auto-detects local environment and configures accordingly:

  • Uses LocalStack endpoint
  • Uses test credentials
  • Auto-provisions missing queues

Distributed Tracing

The queue system integrates with DataDog APM:

  1. Publishing: Injects trace context from active span into message metadata
  2. Consuming: Creates child span linked to publisher’s trace
  3. Visibility: Full trace from API request → queue → handler

Trace headers are automatically propagated:

  • x-datadog-trace-id
  • x-datadog-parent-id
  • x-datadog-sampling-priority
Last updated on