Event Bus
The event bus provides decoupled, type-safe communication between domains without introducing direct dependencies. It’s a local, in-memory publish-subscribe system — not a distributed message broker.
Overview
| Feature | Description |
|---|---|
| Package | @repo/events |
| Pattern | In-memory pub/sub |
| Type Safety | Full TypeScript support via discriminated unions |
| Execution Modes | Sync (blocking) or async (fire-and-forget) |
Not a Replacement for SNS/SQS
The event bus is a local, in-memory system running within a single Node.js process. Messages are not persisted and will be lost if the process crashes mid-handling.
For durable, distributed messaging with guaranteed delivery and retry semantics, use SQS queues instead (see Queue System).
A future migration to AWS SNS for cross-service pub/sub is planned, but the current implementation is intentionally simple for in-process domain decoupling.
When to Use Event Bus vs SQS
| Use Case | Event Bus | SQS |
|---|---|---|
| Cross-domain communication within a request | ✅ | ❌ |
| Fire-and-forget side effects (stats, snapshots) | ✅ | ❌ |
| Work that must survive process crashes | ❌ | ✅ |
| Work that needs retry on failure | ❌ | ✅ |
| Long-running background jobs | ❌ | ✅ |
| External API calls that may fail | ⚠️ | ✅ |
The Golden Rule: If the work must complete, use SQS. If it’s a “nice to have” side effect that can be retried on the next event, the event bus is fine.
Best Practice: Event Bus + SQS Together
For important but non-blocking work, combine both: the event bus triggers the flow, and the subscriber immediately enqueues an SQS message for durable processing.
// OrderChatSubscriber: Event triggers SQS for durability
@injectable()
export class OrderChatSubscriber extends DomainSubscriber {
constructor(
@inject(EventBus) eventBus: EventBus,
@inject(QueuePublisher) private queuePublisher: QueuePublisher,
) {
super(eventBus, "OrderChatSubscriber")
this.subscribe("transaction.completed", this.onTransactionCompleted.bind(this), {
mode: "async", // Non-blocking
})
}
private async onTransactionCompleted(event: TransactionCompletedEvent): Promise<void> {
// Immediately enqueue to SQS - if this fails, we lose nothing critical
// SQS handles retry, DLQ, etc.
await this.queuePublisher.publish(
"create-order-channels",
createOrderChannelsPayloadSchema,
{ orderIds: event.orderIds, buyerId: event.buyerId }
)
}
}This pattern gives you:
- Decoupling: Publisher doesn’t know about chat channels
- Durability: SQS handles retry and dead-letter
- Performance: Async mode doesn’t block the original request
Execution Modes
Async (Default)
Fire-and-forget — handler runs in background, doesn’t block emitter.
Use for:
- Stats recalculation
- Analytics/logging
- Non-critical side effects
- Anything that can fail silently
this.subscribe("inventory.changed", this.onInventoryChanged.bind(this), {
mode: "async", // Default, can omit
})Limitations
- No Persistence: Messages exist only in memory. Process crash = message lost.
- Single Process: Events don’t propagate to other server instances.
- No Retry: Failed handlers are logged but not retried.
- No Ordering Guarantees: Multiple async handlers may complete in any order.
- No Dead Letter: Failed messages aren’t captured for later inspection.
These limitations are intentional — the event bus solves in-process decoupling, not distributed messaging. For those needs, use SQS or wait for SNS integration.
Adding a New Event
1. Define the Event Type
Add your event interface and register it in packages/core/events/src/domain-events.ts:
// Define the payload interface
export interface MyDomainActionEvent {
userId: string
entityId: string
action: "created" | "updated" | "deleted"
}
// Register in the event registry
export interface DomainEvents {
// ... existing events
"mydomain.action": MyDomainActionEvent
}2. Emit the Event
Inject EventBus and call emit() from your service:
import { inject, injectable } from "tsyringe"
import { EventBus } from "@repo/events"
@injectable()
export class MyDomainService {
constructor(@inject(EventBus) private eventBus: EventBus) {}
async doSomething(userId: string, entityId: string): Promise<void> {
// ... business logic ...
// Emit event after the fact
await this.eventBus.emit("mydomain.action", {
userId,
entityId,
action: "created",
})
}
}Always await the emit call, even for async handlers. This ensures sync handlers complete before your method returns.
Creating a Subscriber
1. Extend DomainSubscriber
Create a subscriber class that extends DomainSubscriber:
// Example: packages/backend/domains/mydomain/src/subscribers/my.subscriber.ts
import { inject, injectable } from "tsyringe"
import { DomainSubscriber, EventBus, type SomeEvent } from "@repo/events"
@injectable()
export class MySubscriber extends DomainSubscriber {
constructor(
@inject(EventBus) eventBus: EventBus,
@inject(SomeService) private someService: SomeService,
) {
super(eventBus, "MySubscriber")
// Register handlers in constructor
this.subscribe("some.event", this.onSomeEvent.bind(this))
this.subscribe("another.event", this.onAnotherEvent.bind(this), { mode: "sync" })
}
private async onSomeEvent(event: SomeEvent): Promise<void> {
await this.someService.handleEvent(event)
}
private async onAnotherEvent(event: AnotherEvent): Promise<void> {
// Sync handler - blocks emitter until complete
await this.someService.criticalUpdate(event)
}
}2. Register at Startup
Add your subscriber to apps/backend/src/bootstrap/event-subscribers.ts:
import { MySubscriber } from "@repo/mydomain"
export function registerEventSubscribers(container: DependencyContainer): void {
// ... existing subscribers
container.resolve(MySubscriber) // Constructor runs, handlers register
}The DI container resolution triggers the constructor, which registers the event handlers.
Event Design Guidelines
Keep Events Thin
Events should contain IDs and essential context, not entire objects:
// âś… Good: Thin event with IDs
export interface OrderCreatedEvent {
orderId: string
transactionId: string
sellerId: string
buyerId: string
subtotalCents: number
}
// ❌ Bad: Fat event with nested objects
export interface OrderCreatedEvent {
order: Order // Don't embed entire models
transaction: Transaction
seller: User
buyer: User
}Subscribers should fetch additional data if needed. This keeps events small and avoids stale data issues.
Name Events as Facts
Events describe what happened, not what should happen:
// âś… Good: Describes what happened
"inventory.changed"
"order.shipped"
"transaction.completed"
// ❌ Bad: Imperative/command style
"update.inventory"
"send.shipping.notification"
"process.transaction"Include Context for Filtering
Include enough context so subscribers can filter efficiently:
export interface InventoryChangedEvent {
userId: string
operationType: InventoryOperationType // add, edit, delete
affectedProductIds: string[] // For targeted cache invalidation
ignoreListIds?: string[] // Skip certain handlers
}API Reference
EventBus
class EventBus {
// Emit an event to all subscribers
async emit<E extends DomainEventName>(
event: E,
payload: DomainEventPayload<E>,
): Promise<void>
// Subscribe to an event, returns unsubscribe function
on<E extends DomainEventName>(
event: E,
handler: (payload: DomainEventPayload<E>) => Promise<void>,
options?: { mode?: "sync" | "async" },
): () => void
// Subscribe once (auto-unsubscribes after first call)
once<E extends DomainEventName>(
event: E,
handler: (payload: DomainEventPayload<E>) => Promise<void>,
options?: { mode?: "sync" | "async" },
): () => void
// Remove handlers for an event (or all if no event specified)
off<E extends DomainEventName>(event?: E): void
}DomainSubscriber
abstract class DomainSubscriber {
constructor(eventBus: EventBus, subscriberName: string)
// Subscribe to an event (call in constructor)
protected subscribe<E extends DomainEventName>(
event: E,
handler: (payload: DomainEventPayload<E>) => Promise<void>,
options?: { mode?: "sync" | "async" },
): void
// Unsubscribe from all events
public dispose(): void
}Future: SNS Migration
The event bus may eventually be backed by AWS SNS for:
- Cross-service pub/sub
- Persistence and replay
- Fan-out to multiple consumers
- Better observability
When this happens, the EventBus interface will remain the same — only the underlying transport changes. Design your subscribers with this in mind: don’t rely on in-process assumptions like shared memory or synchronous side effects.