Skip to Content
BackendEvent Bus

Event Bus

The event bus provides decoupled, type-safe communication between domains without introducing direct dependencies. It’s a local, in-memory publish-subscribe system — not a distributed message broker.

Overview

FeatureDescription
Package@repo/events
PatternIn-memory pub/sub
Type SafetyFull TypeScript support via discriminated unions
Execution ModesSync (blocking) or async (fire-and-forget)

Not a Replacement for SNS/SQS

The event bus is a local, in-memory system running within a single Node.js process. Messages are not persisted and will be lost if the process crashes mid-handling.

For durable, distributed messaging with guaranteed delivery and retry semantics, use SQS queues instead (see Queue System).

A future migration to AWS SNS for cross-service pub/sub is planned, but the current implementation is intentionally simple for in-process domain decoupling.

When to Use Event Bus vs SQS

Use CaseEvent BusSQS
Cross-domain communication within a request✅❌
Fire-and-forget side effects (stats, snapshots)✅❌
Work that must survive process crashes❌✅
Work that needs retry on failure❌✅
Long-running background jobs❌✅
External API calls that may fail⚠️✅

The Golden Rule: If the work must complete, use SQS. If it’s a “nice to have” side effect that can be retried on the next event, the event bus is fine.

Best Practice: Event Bus + SQS Together

For important but non-blocking work, combine both: the event bus triggers the flow, and the subscriber immediately enqueues an SQS message for durable processing.

// OrderChatSubscriber: Event triggers SQS for durability @injectable() export class OrderChatSubscriber extends DomainSubscriber { constructor( @inject(EventBus) eventBus: EventBus, @inject(QueuePublisher) private queuePublisher: QueuePublisher, ) { super(eventBus, "OrderChatSubscriber") this.subscribe("transaction.completed", this.onTransactionCompleted.bind(this), { mode: "async", // Non-blocking }) } private async onTransactionCompleted(event: TransactionCompletedEvent): Promise<void> { // Immediately enqueue to SQS - if this fails, we lose nothing critical // SQS handles retry, DLQ, etc. await this.queuePublisher.publish( "create-order-channels", createOrderChannelsPayloadSchema, { orderIds: event.orderIds, buyerId: event.buyerId } ) } }

This pattern gives you:

  • Decoupling: Publisher doesn’t know about chat channels
  • Durability: SQS handles retry and dead-letter
  • Performance: Async mode doesn’t block the original request

Execution Modes

Fire-and-forget — handler runs in background, doesn’t block emitter.

Use for:

  • Stats recalculation
  • Analytics/logging
  • Non-critical side effects
  • Anything that can fail silently
this.subscribe("inventory.changed", this.onInventoryChanged.bind(this), { mode: "async", // Default, can omit })

Limitations

  1. No Persistence: Messages exist only in memory. Process crash = message lost.
  2. Single Process: Events don’t propagate to other server instances.
  3. No Retry: Failed handlers are logged but not retried.
  4. No Ordering Guarantees: Multiple async handlers may complete in any order.
  5. No Dead Letter: Failed messages aren’t captured for later inspection.

These limitations are intentional — the event bus solves in-process decoupling, not distributed messaging. For those needs, use SQS or wait for SNS integration.

Adding a New Event

1. Define the Event Type

Add your event interface and register it in packages/core/events/src/domain-events.ts:

// Define the payload interface export interface MyDomainActionEvent { userId: string entityId: string action: "created" | "updated" | "deleted" } // Register in the event registry export interface DomainEvents { // ... existing events "mydomain.action": MyDomainActionEvent }

2. Emit the Event

Inject EventBus and call emit() from your service:

import { inject, injectable } from "tsyringe" import { EventBus } from "@repo/events" @injectable() export class MyDomainService { constructor(@inject(EventBus) private eventBus: EventBus) {} async doSomething(userId: string, entityId: string): Promise<void> { // ... business logic ... // Emit event after the fact await this.eventBus.emit("mydomain.action", { userId, entityId, action: "created", }) } }

Always await the emit call, even for async handlers. This ensures sync handlers complete before your method returns.

Creating a Subscriber

1. Extend DomainSubscriber

Create a subscriber class that extends DomainSubscriber:

// Example: packages/backend/domains/mydomain/src/subscribers/my.subscriber.ts import { inject, injectable } from "tsyringe" import { DomainSubscriber, EventBus, type SomeEvent } from "@repo/events" @injectable() export class MySubscriber extends DomainSubscriber { constructor( @inject(EventBus) eventBus: EventBus, @inject(SomeService) private someService: SomeService, ) { super(eventBus, "MySubscriber") // Register handlers in constructor this.subscribe("some.event", this.onSomeEvent.bind(this)) this.subscribe("another.event", this.onAnotherEvent.bind(this), { mode: "sync" }) } private async onSomeEvent(event: SomeEvent): Promise<void> { await this.someService.handleEvent(event) } private async onAnotherEvent(event: AnotherEvent): Promise<void> { // Sync handler - blocks emitter until complete await this.someService.criticalUpdate(event) } }

2. Register at Startup

Add your subscriber to apps/backend/src/bootstrap/event-subscribers.ts:

import { MySubscriber } from "@repo/mydomain" export function registerEventSubscribers(container: DependencyContainer): void { // ... existing subscribers container.resolve(MySubscriber) // Constructor runs, handlers register }

The DI container resolution triggers the constructor, which registers the event handlers.

Event Design Guidelines

Keep Events Thin

Events should contain IDs and essential context, not entire objects:

// ✅ Good: Thin event with IDs export interface OrderCreatedEvent { orderId: string transactionId: string sellerId: string buyerId: string subtotalCents: number } // ❌ Bad: Fat event with nested objects export interface OrderCreatedEvent { order: Order // Don't embed entire models transaction: Transaction seller: User buyer: User }

Subscribers should fetch additional data if needed. This keeps events small and avoids stale data issues.

Name Events as Facts

Events describe what happened, not what should happen:

// ✅ Good: Describes what happened "inventory.changed" "order.shipped" "transaction.completed" // ❌ Bad: Imperative/command style "update.inventory" "send.shipping.notification" "process.transaction"

Include Context for Filtering

Include enough context so subscribers can filter efficiently:

export interface InventoryChangedEvent { userId: string operationType: InventoryOperationType // add, edit, delete affectedProductIds: string[] // For targeted cache invalidation ignoreListIds?: string[] // Skip certain handlers }

API Reference

EventBus

class EventBus { // Emit an event to all subscribers async emit<E extends DomainEventName>( event: E, payload: DomainEventPayload<E>, ): Promise<void> // Subscribe to an event, returns unsubscribe function on<E extends DomainEventName>( event: E, handler: (payload: DomainEventPayload<E>) => Promise<void>, options?: { mode?: "sync" | "async" }, ): () => void // Subscribe once (auto-unsubscribes after first call) once<E extends DomainEventName>( event: E, handler: (payload: DomainEventPayload<E>) => Promise<void>, options?: { mode?: "sync" | "async" }, ): () => void // Remove handlers for an event (or all if no event specified) off<E extends DomainEventName>(event?: E): void }

DomainSubscriber

abstract class DomainSubscriber { constructor(eventBus: EventBus, subscriberName: string) // Subscribe to an event (call in constructor) protected subscribe<E extends DomainEventName>( event: E, handler: (payload: DomainEventPayload<E>) => Promise<void>, options?: { mode?: "sync" | "async" }, ): void // Unsubscribe from all events public dispose(): void }

Future: SNS Migration

The event bus may eventually be backed by AWS SNS for:

  • Cross-service pub/sub
  • Persistence and replay
  • Fan-out to multiple consumers
  • Better observability

When this happens, the EventBus interface will remain the same — only the underlying transport changes. Design your subscribers with this in mind: don’t rely on in-process assumptions like shared memory or synchronous side effects.

Last updated on