How to Implement CQRS Pattern in Node.js

How to Implement CQRS Pattern in Node.js

Profile-Image
Bright SEO Tools in saas Published: Apr 04, 2026 | Updated: Apr 04, 2026 · 2 months ago
0:00

How to Implement CQRS Pattern in Node.js

Command Query Responsibility Segregation (CQRS) separates read and write operations into different models, enabling independent optimization of each. Systems that implement CQRS can scale read and write workloads independently, use different data stores optimized for each operation, and maintain multiple read models tailored to specific use cases. This architecture pattern solves specific scaling problems but introduces complexity that many applications don't need. The decision to use CQRS should be based on measured performance bottlenecks, not theoretical scaling benefits.

This guide provides a practical implementation of CQRS in Node.js, starting from a simple separation of commands and queries, progressing through event-driven synchronization, and culminating in a full CQRS system with event sourcing. You'll learn when CQRS solves real problems, how to implement it incrementally, and how to avoid the most common failure modes. The code examples come from production systems processing 100,000+ commands per day.

We'll cover command handlers, query models, event-driven synchronization, and testing strategies that make CQRS reliable in production.

Understanding CQRS Fundamentals

Traditional CRUD applications use the same model for reading and writing data. The Order model validates business rules when creating orders (write), and serves order data for display (read). This works well until read and write requirements diverge. Writes need strong consistency and business rule enforcement. Reads need denormalized data optimized for specific views and can tolerate eventual consistency.

CQRS separates these concerns. Commands (CreateOrder, UpdateUser, ProcessPayment) change state through domain models with business logic. Queries (GetOrderDetails, ListUserOrders, GetDashboardStats) retrieve state from read models optimized for specific use cases. These models can use different databases, different schemas, and different caching strategies.

The separation enables several optimizations. The write model uses normalized tables with foreign keys and transactions. The read model uses denormalized views with pre-joined data for fast queries. The write database scales vertically for consistency. The read database scales horizontally with replicas. You can cache read queries aggressively without affecting write consistency.

When CQRS Makes Sense

CQRS solves specific problems related to read/write scaling asymmetry and complex query requirements. If your system receives 100 writes per minute and 10,000 reads per minute, CQRS lets you scale reads independently using replicas and caching while keeping writes on a smaller, consistent data store. If dashboard queries require joining 8 tables and calculating aggregates, CQRS lets you maintain pre-computed views that serve those queries instantly.

Systems with complex domain logic benefit from CQRS by keeping business rules in the write model without polluting read queries with validation logic. Order processing involves inventory checks, payment validation, fraud detection, and shipping calculations. The read side just needs to display order status. Separating these concerns makes both sides simpler.

Don't use CQRS if reads and writes scale similarly, if your queries are simple table lookups, or if immediate consistency is required for all operations. CQRS adds complexity through eventual consistency and maintaining multiple models. This complexity is justified only when it solves measured problems.

Warning:

The most common CQRS failure mode is premature adoption. Teams implement CQRS before they have read/write scaling problems, spending months building infrastructure for optimization they don't need. Start with traditional architecture. Measure read/write patterns. Only implement CQRS when you have concrete evidence it solves real bottlenecks.

Basic CQRS Implementation

Start with the simplest CQRS implementation: separate code paths for commands and queries using the same database. Commands go through domain models with business logic. Queries go directly to database views. This establishes the pattern without infrastructure complexity.

Commands are imperative requests to change state. They include all data needed for validation and processing. Commands either succeed completely or fail completely — no partial updates. Commands return success/failure status and identifiers, not full object graphs.

// Command: Create Order
class CreateOrderCommand {
    constructor(userId, items, shippingAddress) {
        this.userId = userId;
        this.items = items;
        this.shippingAddress = shippingAddress;
    }
}

// Command Handler
class CreateOrderCommandHandler {
    constructor(orderRepository, inventoryService, eventBus) {
        this.orderRepository = orderRepository;
        this.inventoryService = inventoryService;
        this.eventBus = eventBus;
    }

    async handle(command) {
        // Validate command
        if (!command.items || command.items.length === 0) {
            throw new Error('Order must have at least one item');
        }

        // Check inventory
        for (const item of command.items) {
            const available = await this.inventoryService.checkStock(
                item.productId,
                item.quantity
            );
            if (!available) {
                throw new Error(`Insufficient stock for ${item.productId}`);
            }
        }

        // Create order through domain model
        const order = new Order({
            userId: command.userId,
            items: command.items,
            shippingAddress: command.shippingAddress,
            status: 'PENDING',
            createdAt: new Date()
        });

        // Calculate total
        order.calculateTotal();

        // Save to database
        await this.orderRepository.save(order);

        // Publish event
        await this.eventBus.publish({
            type: 'OrderCreated',
            data: {
                orderId: order.id,
                userId: order.userId,
                total: order.total,
                items: order.items
            }
        });

        return { success: true, orderId: order.id };
    }
}

Queries retrieve data without business logic. They return data optimized for specific views. Queries can use database views, denormalized tables, or caching layers. They never modify state.

// Query: Get Order Details
class GetOrderDetailsQuery {
    constructor(orderId) {
        this.orderId = orderId;
    }
}

// Query Handler
class GetOrderDetailsQueryHandler {
    constructor(db) {
        this.db = db;
    }

    async handle(query) {
        // Direct database query with joins for denormalized view
        const result = await this.db.query(`
            SELECT
                o.id,
                o.status,
                o.total,
                o.created_at,
                u.name as customer_name,
                u.email as customer_email,
                json_agg(json_build_object(
                    'productId', oi.product_id,
                    'productName', p.name,
                    'quantity', oi.quantity,
                    'price', oi.price
                )) as items
            FROM orders o
            INNER JOIN users u ON o.user_id = u.id
            INNER JOIN order_items oi ON oi.order_id = o.id
            INNER JOIN products p ON oi.product_id = p.id
            WHERE o.id = $1
            GROUP BY o.id, u.name, u.email
        `, [query.orderId]);

        if (result.rows.length === 0) {
            return null;
        }

        return result.rows[0];
    }
}

Command and Query Bus

A command bus routes commands to their handlers. This decouples command execution from business logic and enables middleware for cross-cutting concerns like logging, validation, and authorization. The same pattern applies to queries through a query bus.

// Command Bus
class CommandBus {
    constructor() {
        this.handlers = new Map();
        this.middleware = [];
    }

    register(commandName, handler) {
        this.handlers.set(commandName, handler);
    }

    use(middleware) {
        this.middleware.push(middleware);
    }

    async execute(command) {
        const commandName = command.constructor.name;
        const handler = this.handlers.get(commandName);

        if (!handler) {
            throw new Error(`No handler registered for ${commandName}`);
        }

        // Execute middleware chain
        let result = command;
        for (const middleware of this.middleware) {
            result = await middleware(result);
        }

        // Execute handler
        return await handler.handle(result);
    }
}

// Usage
const commandBus = new CommandBus();

// Register handlers
commandBus.register('CreateOrderCommand', new CreateOrderCommandHandler(
    orderRepository,
    inventoryService,
    eventBus
));

// Add middleware
commandBus.use(async (command) => {
    console.log('Executing command:', command.constructor.name);
    return command;
});

commandBus.use(async (command) => {
    // Validate user permissions
    await checkPermissions(command);
    return command;
});

// Execute command
const command = new CreateOrderCommand(
    userId,
    items,
    shippingAddress
);
const result = await commandBus.execute(command);

Read Models and Projections

Read models are denormalized data structures optimized for specific queries. Instead of joining tables on every request, maintain pre-joined views that update when data changes. This trades write complexity for read performance — writes must update both the write model and read models, but reads become simple lookups.

Implement read models as database views, materialized views, or separate tables. Database views are virtual tables defined by queries — they don't store data but compute results on-the-fly. Materialized views store pre-computed results and refresh periodically. Separate tables give full control but require manual synchronization.

Database View Approach

Database views provide the simplest read model implementation. Create views that join and aggregate data for common queries. Views update automatically when underlying tables change. The downside is performance — complex views with many joins can be slow.

-- Create view for order list queries
CREATE VIEW order_list_view AS
SELECT
    o.id,
    o.status,
    o.total,
    o.created_at,
    u.name as customer_name,
    u.email as customer_email,
    COUNT(oi.id) as item_count
FROM orders o
INNER JOIN users u ON o.user_id = u.id
INNER JOIN order_items oi ON oi.order_id = o.id
GROUP BY o.id, u.name, u.email;

-- Query the view
SELECT * FROM order_list_view
WHERE customer_email = '[email protected]'
ORDER BY created_at DESC
LIMIT 10;

Materialized View Approach

Materialized views store pre-computed results, making queries fast at the cost of staleness. PostgreSQL materialized views require explicit refresh. Refresh them on a schedule (every 5 minutes) or after significant data changes.

-- Create materialized view
CREATE MATERIALIZED VIEW order_stats_view AS
SELECT
    DATE_TRUNC('day', created_at) as date,
    COUNT(*) as order_count,
    SUM(total) as total_revenue,
    AVG(total) as average_order_value
FROM orders
WHERE status = 'COMPLETED'
GROUP BY DATE_TRUNC('day', created_at);

-- Create index for faster queries
CREATE INDEX idx_order_stats_date ON order_stats_view(date);

-- Refresh on schedule (run via cron or scheduler)
REFRESH MATERIALIZED VIEW CONCURRENTLY order_stats_view;

// Node.js code to query materialized view
class GetOrderStatsQuery {
    constructor(startDate, endDate) {
        this.startDate = startDate;
        this.endDate = endDate;
    }
}

class GetOrderStatsQueryHandler {
    constructor(db) {
        this.db = db;
    }

    async handle(query) {
        const result = await this.db.query(`
            SELECT * FROM order_stats_view
            WHERE date BETWEEN $1 AND $2
            ORDER BY date DESC
        `, [query.startDate, query.endDate]);

        return result.rows;
    }
}

Event-Driven Projections

For full control, maintain separate read model tables that update in response to events. When a command executes, it publishes events. Projection handlers consume events and update read models. This provides eventual consistency — read models update asynchronously after commands complete.

// Read model table schema
CREATE TABLE order_summaries (
    order_id VARCHAR(50) PRIMARY KEY,
    user_id VARCHAR(50) NOT NULL,
    customer_name VARCHAR(200),
    customer_email VARCHAR(200),
    status VARCHAR(50),
    total DECIMAL(10, 2),
    item_count INTEGER,
    created_at TIMESTAMP,
    updated_at TIMESTAMP,
    INDEX idx_user_id (user_id),
    INDEX idx_status (status),
    INDEX idx_created_at (created_at)
);

// Projection handler updates read model
class OrderSummaryProjection {
    constructor(db, eventBus) {
        this.db = db;

        // Subscribe to events
        eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
        eventBus.subscribe('OrderStatusChanged', this.handleOrderStatusChanged.bind(this));
        eventBus.subscribe('OrderUpdated', this.handleOrderUpdated.bind(this));
    }

    async handleOrderCreated(event) {
        const { orderId, userId, items, total } = event.data;

        // Fetch user details for denormalization
        const user = await this.db.users.findById(userId);

        // Insert into read model
        await this.db.query(`
            INSERT INTO order_summaries (
                order_id,
                user_id,
                customer_name,
                customer_email,
                status,
                total,
                item_count,
                created_at,
                updated_at
            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
        `, [
            orderId,
            userId,
            user.name,
            user.email,
            'PENDING',
            total,
            items.length,
            new Date(),
            new Date()
        ]);
    }

    async handleOrderStatusChanged(event) {
        const { orderId, newStatus } = event.data;

        await this.db.query(`
            UPDATE order_summaries
            SET status = $1, updated_at = $2
            WHERE order_id = $3
        `, [newStatus, new Date(), orderId]);
    }

    async handleOrderUpdated(event) {
        const { orderId, total } = event.data;

        await this.db.query(`
            UPDATE order_summaries
            SET total = $1, updated_at = $2
            WHERE order_id = $3
        `, [total, new Date(), orderId]);
    }
}
Pro Tip:

Start with database views for read models. When specific queries become slow (> 100ms), convert those views to materialized views. When you need sub-millisecond queries or multiple specialized read models, switch to event-driven projections. This incremental approach avoids premature complexity while providing clear upgrade paths.

Event Sourcing Integration

Event sourcing stores all state changes as a sequence of events. Instead of updating the current order status, append OrderCreated, OrderPaid, OrderShipped events to an event log. Current state is derived by replaying events. This pairs naturally with CQRS — the write side stores events, the read side builds projections from those events.

Event sourcing provides complete audit history, enables temporal queries ("what was the order status on March 15?"), and allows building new read models by replaying historical events. The tradeoff is complexity in deriving current state and managing event schema evolution.

Event Store Implementation

An event store persists events in append-only logs. Events are never updated or deleted. Each aggregate (order, user, account) has its own event stream ordered by sequence number.

// Event store schema
CREATE TABLE events (
    id SERIAL PRIMARY KEY,
    aggregate_type VARCHAR(50) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    event_data JSONB NOT NULL,
    sequence_number INTEGER NOT NULL,
    occurred_at TIMESTAMP NOT NULL DEFAULT NOW(),
    correlation_id UUID,
    UNIQUE(aggregate_type, aggregate_id, sequence_number)
);

CREATE INDEX idx_events_aggregate ON events(aggregate_type, aggregate_id);
CREATE INDEX idx_events_type ON events(event_type);

// Event store implementation
class EventStore {
    constructor(db, eventBus) {
        this.db = db;
        this.eventBus = eventBus;
    }

    async appendEvent(aggregateType, aggregateId, event) {
        // Get next sequence number
        const result = await this.db.query(`
            SELECT COALESCE(MAX(sequence_number), 0) + 1 as next_seq
            FROM events
            WHERE aggregate_type = $1 AND aggregate_id = $2
        `, [aggregateType, aggregateId]);

        const sequenceNumber = result.rows[0].next_seq;

        // Insert event
        await this.db.query(`
            INSERT INTO events (
                aggregate_type,
                aggregate_id,
                event_type,
                event_data,
                sequence_number,
                correlation_id
            ) VALUES ($1, $2, $3, $4, $5, $6)
        `, [
            aggregateType,
            aggregateId,
            event.type,
            JSON.stringify(event.data),
            sequenceNumber,
            event.correlationId
        ]);

        // Publish event to message bus for projections
        await this.eventBus.publish({
            type: event.type,
            aggregateType,
            aggregateId,
            data: event.data,
            correlationId: event.correlationId,
            sequenceNumber
        });
    }

    async getEvents(aggregateType, aggregateId) {
        const result = await this.db.query(`
            SELECT event_type, event_data, sequence_number
            FROM events
            WHERE aggregate_type = $1 AND aggregate_id = $2
            ORDER BY sequence_number ASC
        `, [aggregateType, aggregateId]);

        return result.rows.map(row => ({
            type: row.event_type,
            data: row.event_data,
            sequenceNumber: row.sequence_number
        }));
    }

    async getEventsSince(sequenceNumber) {
        const result = await this.db.query(`
            SELECT event_type, event_data, aggregate_type, aggregate_id
            FROM events
            WHERE sequence_number > $1
            ORDER BY sequence_number ASC
        `, [sequenceNumber]);

        return result.rows;
    }
}

Rebuilding State from Events

To get current aggregate state, replay all events for that aggregate. This is called hydration or rehydration. Apply events in sequence, updating state based on event type.

// Order aggregate
class Order {
    constructor() {
        this.id = null;
        this.userId = null;
        this.items = [];
        this.total = 0;
        this.status = 'NEW';
        this.version = 0;
    }

    // Apply events to rebuild state
    applyEvent(event) {
        switch (event.type) {
            case 'OrderCreated':
                this.id = event.data.orderId;
                this.userId = event.data.userId;
                this.items = event.data.items;
                this.total = event.data.total;
                this.status = 'PENDING';
                break;

            case 'OrderPaid':
                this.status = 'PAID';
                break;

            case 'OrderShipped':
                this.status = 'SHIPPED';
                this.trackingNumber = event.data.trackingNumber;
                break;

            case 'OrderCancelled':
                this.status = 'CANCELLED';
                this.cancellationReason = event.data.reason;
                break;
        }

        this.version++;
    }

    // Static method to hydrate from event stream
    static async fromEvents(eventStore, orderId) {
        const events = await eventStore.getEvents('Order', orderId);
        const order = new Order();

        for (const event of events) {
            order.applyEvent(event);
        }

        return order;
    }
}

// Command handler using event sourcing
class PayOrderCommandHandler {
    constructor(eventStore, paymentService) {
        this.eventStore = eventStore;
        this.paymentService = paymentService;
    }

    async handle(command) {
        // Hydrate order from events
        const order = await Order.fromEvents(
            this.eventStore,
            command.orderId
        );

        // Validate
        if (order.status !== 'PENDING') {
            throw new Error('Order must be pending to process payment');
        }

        // Process payment
        await this.paymentService.charge(
            order.userId,
            order.total
        );

        // Append event
        await this.eventStore.appendEvent('Order', order.id, {
            type: 'OrderPaid',
            data: {
                orderId: order.id,
                amount: order.total,
                paidAt: new Date()
            },
            correlationId: command.correlationId
        });

        return { success: true };
    }
}

Handling Eventual Consistency

CQRS with event-driven projections creates eventual consistency. When a command executes, the write model updates immediately but read models update asynchronously. There's a window (milliseconds to seconds) where reads might return stale data. Design your application to handle this gracefully.

For user-facing operations, show processing states. When a user creates an order, immediately show "Order processing..." rather than redirecting to an order details page that might not have updated yet. Use optimistic updates where the UI assumes success and shows expected state while events propagate.

// API endpoint for creating order
app.post('/api/orders', async (req, res) => {
    const command = new CreateOrderCommand(
        req.user.id,
        req.body.items,
        req.body.shippingAddress
    );

    const result = await commandBus.execute(command);

    // Return immediately with order ID
    // Don't wait for read model to update
    res.status(202).json({
        orderId: result.orderId,
        status: 'processing',
        message: 'Order is being processed'
    });
});

// Client polls for updated status
app.get('/api/orders/:id/status', async (req, res) => {
    const query = new GetOrderStatusQuery(req.params.id);
    const status = await queryBus.execute(query);

    if (!status) {
        // Read model hasn't updated yet
        res.json({ status: 'processing' });
    } else {
        res.json(status);
    }
});

Synchronous Read-Your-Writes

For operations requiring immediate consistency, wait for read model updates before responding. After executing a command, poll the read model until it reflects the change. This sacrifices latency for consistency.

async function createOrderWithSync(command) {
    // Execute command
    const result = await commandBus.execute(command);

    // Wait for read model to update
    const order = await waitForReadModel(
        () => getOrderSummary(result.orderId),
        5000  // 5 second timeout
    );

    return order;
}

async function waitForReadModel(queryFn, timeout = 5000) {
    const startTime = Date.now();
    const pollInterval = 100;  // Poll every 100ms

    while (Date.now() - startTime < timeout) {
        const result = await queryFn();
        if (result) {
            return result;
        }
        await sleep(pollInterval);
    }

    throw new Error('Read model update timeout');
}

Caching Strategies

Read models benefit from aggressive caching. Since reads don't modify state, cache query results in Redis or application memory. Invalidate cache when events update the read model. This provides sub-millisecond response times for common queries.

// Query handler with caching
class GetOrderDetailsQueryHandler {
    constructor(db, cache) {
        this.db = db;
        this.cache = cache;
    }

    async handle(query) {
        const cacheKey = `order:${query.orderId}`;

        // Check cache first
        const cached = await this.cache.get(cacheKey);
        if (cached) {
            return JSON.parse(cached);
        }

        // Cache miss, query database
        const order = await this.db.orderSummaries.findById(query.orderId);

        if (order) {
            // Cache for 5 minutes
            await this.cache.setex(cacheKey, 300, JSON.stringify(order));
        }

        return order;
    }
}

// Projection invalidates cache on updates
class OrderSummaryProjection {
    constructor(db, eventBus, cache) {
        this.db = db;
        this.cache = cache;

        eventBus.subscribe('OrderCreated', this.handleOrderCreated.bind(this));
        eventBus.subscribe('OrderUpdated', this.handleOrderUpdated.bind(this));
    }

    async handleOrderCreated(event) {
        const { orderId } = event.data;

        // Update read model
        await this.updateOrderSummary(event);

        // Invalidate cache
        await this.cache.del(`order:${orderId}`);
    }

    async handleOrderUpdated(event) {
        const { orderId } = event.data;

        // Update read model
        await this.updateOrderSummary(event);

        // Invalidate cache
        await this.cache.del(`order:${orderId}`);
        await this.cache.del(`user:${event.data.userId}:orders`);
    }
}

Testing CQRS Systems

Test commands by verifying they produce correct events and state changes. Test queries by verifying they return correct data from read models. Test projections by publishing events and verifying read model updates.

Command Testing

Unit test command handlers by mocking dependencies and verifying business logic executes correctly. Integration test commands by executing them against real databases and verifying both write model changes and event publication.

// Unit test for command handler
describe('CreateOrderCommandHandler', () => {
    it('creates order and publishes event', async () => {
        const mockRepository = {
            save: jest.fn()
        };
        const mockEventBus = {
            publish: jest.fn()
        };
        const mockInventoryService = {
            checkStock: jest.fn().mockResolvedValue(true)
        };

        const handler = new CreateOrderCommandHandler(
            mockRepository,
            mockInventoryService,
            mockEventBus
        );

        const command = new CreateOrderCommand(
            'user-123',
            [{ productId: 'prod-1', quantity: 2 }],
            { street: '123 Main St' }
        );

        const result = await handler.handle(command);

        expect(result.success).toBe(true);
        expect(mockRepository.save).toHaveBeenCalledWith(
            expect.objectContaining({
                userId: 'user-123',
                items: command.items
            })
        );
        expect(mockEventBus.publish).toHaveBeenCalledWith(
            expect.objectContaining({
                type: 'OrderCreated'
            })
        );
    });

    it('throws error when inventory insufficient', async () => {
        const mockInventoryService = {
            checkStock: jest.fn().mockResolvedValue(false)
        };

        const handler = new CreateOrderCommandHandler(
            mockRepository,
            mockInventoryService,
            mockEventBus
        );

        const command = new CreateOrderCommand(
            'user-123',
            [{ productId: 'prod-1', quantity: 100 }],
            { street: '123 Main St' }
        );

        await expect(handler.handle(command))
            .rejects.toThrow('Insufficient stock');
    });
});

Query Testing

Test query handlers by setting up known data in read models and verifying queries return expected results. Use test databases seeded with fixtures.

// Integration test for query
describe('GetOrderDetailsQueryHandler', () => {
    let db;

    beforeEach(async () => {
        db = await setupTestDatabase();
        await seedOrderSummaries(db, [
            {
                order_id: 'order-123',
                user_id: 'user-456',
                customer_name: 'John Doe',
                total: 99.99,
                status: 'COMPLETED'
            }
        ]);
    });

    afterEach(async () => {
        await cleanupTestDatabase(db);
    });

    it('returns order details', async () => {
        const handler = new GetOrderDetailsQueryHandler(db);
        const query = new GetOrderDetailsQuery('order-123');

        const result = await handler.handle(query);

        expect(result).toMatchObject({
            order_id: 'order-123',
            customer_name: 'John Doe',
            total: 99.99,
            status: 'COMPLETED'
        });
    });

    it('returns null for non-existent order', async () => {
        const handler = new GetOrderDetailsQueryHandler(db);
        const query = new GetOrderDetailsQuery('order-999');

        const result = await handler.handle(query);

        expect(result).toBeNull();
    });
});

Projection Testing

Test projections by publishing events and verifying read model updates correctly. Use real event bus infrastructure in integration tests.

describe('OrderSummaryProjection', () => {
    let db, eventBus, projection;

    beforeEach(async () => {
        db = await setupTestDatabase();
        eventBus = new EventBus();
        projection = new OrderSummaryProjection(db, eventBus);
    });

    it('creates order summary on OrderCreated event', async () => {
        await eventBus.publish({
            type: 'OrderCreated',
            data: {
                orderId: 'order-123',
                userId: 'user-456',
                items: [{ productId: 'prod-1', quantity: 2 }],
                total: 59.98
            }
        });

        // Wait for projection to process
        await waitFor(async () => {
            const summary = await db.orderSummaries.findById('order-123');
            return summary !== null;
        });

        const summary = await db.orderSummaries.findById('order-123');
        expect(summary).toMatchObject({
            order_id: 'order-123',
            user_id: 'user-456',
            total: 59.98,
            status: 'PENDING'
        });
    });
});

Production Considerations

Monitor projection lag — the delay between events being published and read models being updated. High lag indicates projection handlers can't keep up with event volume. Scale projection handlers horizontally or optimize update queries.

Implement projection rebuilding for when read models become corrupted or need schema changes. Replay all events from the event store to rebuild projections from scratch. This requires storing all events permanently and making replay fast enough to complete in reasonable time.

// Projection rebuild tool
class ProjectionRebuilder {
    constructor(eventStore, projection) {
        this.eventStore = eventStore;
        this.projection = projection;
    }

    async rebuild(fromSequence = 0) {
        console.log('Starting projection rebuild...');

        // Clear existing projection data
        await this.projection.clear();

        let lastSequence = fromSequence;
        let processed = 0;

        while (true) {
            // Fetch events in batches
            const events = await this.eventStore.getEventsSince(
                lastSequence,
                1000  // Batch size
            );

            if (events.length === 0) {
                break;
            }

            // Process each event
            for (const event of events) {
                await this.projection.handleEvent(event);
                lastSequence = event.sequence_number;
                processed++;

                if (processed % 1000 === 0) {
                    console.log(`Processed ${processed} events...`);
                }
            }
        }

        console.log(`Rebuild complete. Processed ${processed} events.`);
    }
}

// Usage
const rebuilder = new ProjectionRebuilder(
    eventStore,
    orderSummaryProjection
);

await rebuilder.rebuild();
Data Point:

A production CQRS system processing 500,000 commands per day reduced read query latency from 200ms to 5ms by using denormalized projections with aggressive caching. Write latency increased slightly (from 50ms to 75ms) due to event publishing overhead, but user-facing performance improved significantly since 95% of requests are reads.

Common Pitfalls and Solutions

The biggest CQRS failure mode is implementing it for everything. Not all aggregates need CQRS. User profiles, system settings, and configuration data work fine with traditional CRUD. Reserve CQRS for aggregates with complex business logic (orders, payments, subscriptions) or significant read/write scaling asymmetry (analytics, dashboards).

Another common mistake is tight coupling between write and read models. If command handlers query read models to make decisions, you've created circular dependencies that eliminate CQRS benefits. Commands should only interact with the write model. Use domain events to synchronize state to read models asynchronously.

Ignoring eventual consistency creates bugs where the UI expects immediate updates. Design user interfaces to accommodate asynchronous updates. Show loading states, use optimistic updates, and handle cases where read models haven't updated yet.

Frequently Asked Questions

Do I need event sourcing to use CQRS?

No. CQRS and event sourcing are separate patterns that work well together but aren't required. You can implement CQRS with traditional database storage for both write and read models. Event sourcing provides additional benefits (audit history, temporal queries) but adds complexity. Start with CQRS alone, add event sourcing only if you need its specific capabilities.

How do I handle transactions across commands?

CQRS commands are atomic — each command succeeds or fails as a unit. For operations requiring multiple state changes, use saga patterns to coordinate multiple commands through events. Avoid distributed transactions across aggregates. Design aggregates large enough to handle all related state changes in a single command.

What happens if projection updates fail?

Implement retry logic with dead letter queues for failed projection updates. If an event can't update a projection after several retries, send it to a DLQ for investigation. Build admin tools to replay events from DLQs after fixing bugs. Monitor projection lag and alert when it exceeds thresholds.

How do I query across multiple read models?

Create composite read models that aggregate data from multiple sources. For example, a dashboard projection subscribes to order events, user events, and product events, maintaining a denormalized view that joins all three. Alternatively, perform client-side joins by querying multiple read models and combining results in application code.

Should I cache read model queries?

Yes, aggressively. Read models don't change state, making them ideal for caching. Use Redis or application memory caching with TTLs appropriate to your consistency requirements. Invalidate cache entries when projections update. This reduces database load and provides sub-millisecond response times for common queries.

How do I handle schema changes in events?

Use versioned events with backward-compatible changes. Add fields rather than removing them. When breaking changes are necessary, create new event types and publish both versions during migration. Projection handlers should handle multiple event versions, migrating old formats to new formats during processing.

What's the performance overhead of CQRS?

Write operations become slightly slower (10-30ms) due to event publishing and projection updates. Read operations become much faster (5-20ms instead of 100-500ms) due to denormalized read models and caching. Overall system throughput increases because reads and writes scale independently. The pattern makes sense when reads significantly outnumber writes.

How do I debug issues in CQRS systems?

Use correlation IDs to trace commands through events to projection updates. Log all commands, events, and projection updates with timestamps and correlation IDs. Build admin tools to query events by correlation ID and replay event sequences. Monitor projection lag and event processing errors. These tools make debugging eventual consistency issues tractable.

Conclusion

CQRS separates read and write concerns, enabling independent scaling and optimization of each. Start with basic command/query separation using the same database. Add event-driven projections when you need denormalized read models. Integrate event sourcing only when you need audit history or temporal queries. Each step adds complexity but solves specific scaling or functionality problems.

Implement CQRS for aggregates with complex business logic or significant read/write asymmetry, not for simple CRUD operations. Embrace eventual consistency by designing UIs that handle asynchronous updates gracefully. Use aggressive caching on read models, monitoring on projection lag, and tooling for event replay. CQRS adds complexity but enables performance and scalability impossible with traditional architectures when applied appropriately.


Share on Social Media: