Async Messaging Patterns

This guide covers common patterns and best practices for working with asynchronous messaging in Nitric, including topics and queues.

Message Delivery Guarantees

At-Least-Once Delivery

Topics and queues in Nitric provide at-least-once delivery guarantees. This means that a message may be delivered more than once, which is important to consider when designing your message processing logic.

Idempotency

To handle potential duplicate messages, you should design your message processing to be idempotent. Here's an example of how to implement idempotency:

import { queue } from '@nitric/sdk'
const orderQueue = queue('orders').allow('enqueue', 'dequeue')
// Add a unique ID to each message
await orderQueue.enqueue({
id: 'order-123',
data: {
/* order details */
},
})
// Process messages with idempotency check
const messages = await orderQueue.dequeue(10)
for (const message of messages) {
const orderId = message.id
// Check if order was already processed
if (!(await isOrderProcessed(orderId))) {
await processOrder(message.data)
await markOrderProcessed(orderId)
}
await message.complete()
}

Outbox Pattern

The Outbox Pattern is a reliable way to ensure messages are published when they're part of a database transaction. It works by storing messages in a database table (the "outbox") as part of the transaction, then publishing them after the transaction commits.

First, create a migration file for the outbox table:

-- migrations/orders/1_create_outbox.up.sql
CREATE TABLE IF NOT EXISTS outbox (
id SERIAL PRIMARY KEY,
topic VARCHAR(255) NOT NULL,
message JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
published_at TIMESTAMP WITH TIME ZONE
);

Here's an example of implementing the Outbox Pattern with Nitric's SQL resource:

import { sql, topic, api, schedule } from '@nitric/sdk'
import { Pool } from 'pg'
// Create the database with migrations
const db = sql('orders', {
migrations: 'file://migrations/orders',
})
// Create the topic for publishing
const orderTopic = topic('orders').allow('publish')
// Create the API
const ordersApi = api('orders')
// Function to create an order and publish a message in a transaction
async function createOrder(orderData) {
const connStr = await db.connectionString()
const pool = new Pool({ connectionString: connStr })
const client = await pool.connect()
try {
await client.query('BEGIN')
// Insert the order into the orders table
await client.query('INSERT INTO orders (data) VALUES ($1) RETURNING id', [
orderData,
])
// Insert the message into the outbox
await client.query('INSERT INTO outbox (topic, message) VALUES ($1, $2)', [
'orders',
JSON.stringify({
type: 'order.created',
data: orderData,
}),
])
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
await pool.end()
}
}
// API endpoint to create orders
ordersApi.post('/orders', async ({ req, res }) => {
try {
await createOrder(req.json())
res.json({ success: true })
} catch (error) {
res.status(500).json({ error: error.message })
}
})
// Schedule to publish messages from the outbox
const publishSchedule = schedule('publish-outbox').every(
'1 minute',
async () => {
const connStr = await db.connectionString()
const pool = new Pool({ connectionString: connStr })
try {
const result = await pool.query(`
SELECT id, topic, message
FROM outbox
WHERE published_at IS NULL
ORDER BY created_at ASC
`)
for (const msg of result.rows) {
try {
// Publish the message
await orderTopic.publish(msg.message)
// Mark as published
await pool.query(
'UPDATE outbox SET published_at = CURRENT_TIMESTAMP WHERE id = $1',
[msg.id],
)
} catch (error) {
console.error('Failed to publish message:', error)
// Message will be retried on next run
}
}
} finally {
await pool.end()
}
},
)

The Outbox Pattern is particularly useful when you need to ensure that messages are published as part of a database transaction. The background worker is implemented as a Nitric schedule that runs every minute to publish messages from the outbox table.

Error Handling

Dead Letter Queues

For queues, you can implement a dead letter queue (DLQ) to handle messages that fail processing after multiple attempts:

Dead letter queues may become a native feature of Nitric in the future. If you need this functionality, please let us know by voting or commenting on the GitHub issue.

import { queue } from '@nitric/sdk'
const mainQueue = queue('orders').allow('enqueue', 'dequeue')
const dlq = queue('orders-dlq').allow('enqueue')
// Process messages with retry logic
const messages = await mainQueue.dequeue(10)
for (const message of messages) {
try {
await processOrder(message.data)
await message.complete()
} catch (error) {
// Move failed message to DLQ after max retries
if (message.retryCount >= 3) {
await dlq.enqueue({
originalMessage: message.data,
error: error.message,
})
await message.complete()
}
}
}

Best Practices

  1. Message Size: Keep messages small and focused. Large messages can impact performance and increase costs.

  2. Batch Operations: When possible, use batch operations for better performance:

    • Batch enqueue messages
    • Process multiple messages in a single dequeue operation
  3. Error Handling: Implement proper error handling and retry logic:

    • Use dead letter queues for failed messages
    • Implement exponential backoff for retries
    • Log errors for monitoring and debugging
  4. Monitoring: Set up monitoring for your queues and topics:

    • Track message processing times
    • Monitor error rates
    • Set up alerts for queue depth
  5. Security: Follow security best practices:

    • Use minimal required permissions
    • Encrypt sensitive data in messages
    • Validate message content
  6. Testing: Test your message processing logic thoroughly:

    • Test with duplicate messages
    • Test error scenarios
    • Test with different message sizes and types

The implementation details of these patterns may vary depending on your cloud provider. See the provider-specific documentation for more details.

Last updated on Apr 1, 2025