Async Messaging Patterns
This guide covers common patterns and best practices for working with asynchronous messaging in Nitric, including topics and queues.
Message Delivery Guarantees
At-Least-Once Delivery
Topics and queues in Nitric provide at-least-once delivery guarantees. This means that a message may be delivered more than once, which is important to consider when designing your message processing logic.
Idempotency
To handle potential duplicate messages, you should design your message processing to be idempotent. Here's an example of how to implement idempotency:
import { queue } from '@nitric/sdk'const orderQueue = queue('orders').allow('enqueue', 'dequeue')// Add a unique ID to each messageawait orderQueue.enqueue({id: 'order-123',data: {/* order details */},})// Process messages with idempotency checkconst messages = await orderQueue.dequeue(10)for (const message of messages) {const orderId = message.id// Check if order was already processedif (!(await isOrderProcessed(orderId))) {await processOrder(message.data)await markOrderProcessed(orderId)}await message.complete()}
Outbox Pattern
The Outbox Pattern is a reliable way to ensure messages are published when they're part of a database transaction. It works by storing messages in a database table (the "outbox") as part of the transaction, then publishing them after the transaction commits.
First, create a migration file for the outbox table:
-- migrations/orders/1_create_outbox.up.sqlCREATE TABLE IF NOT EXISTS outbox (id SERIAL PRIMARY KEY,topic VARCHAR(255) NOT NULL,message JSONB NOT NULL,created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,published_at TIMESTAMP WITH TIME ZONE);
Here's an example of implementing the Outbox Pattern with Nitric's SQL resource:
import { sql, topic, api, schedule } from '@nitric/sdk'import { Pool } from 'pg'// Create the database with migrationsconst db = sql('orders', {migrations: 'file://migrations/orders',})// Create the topic for publishingconst orderTopic = topic('orders').allow('publish')// Create the APIconst ordersApi = api('orders')// Function to create an order and publish a message in a transactionasync function createOrder(orderData) {const connStr = await db.connectionString()const pool = new Pool({ connectionString: connStr })const client = await pool.connect()try {await client.query('BEGIN')// Insert the order into the orders tableawait client.query('INSERT INTO orders (data) VALUES ($1) RETURNING id', [orderData,])// Insert the message into the outboxawait client.query('INSERT INTO outbox (topic, message) VALUES ($1, $2)', ['orders',JSON.stringify({type: 'order.created',data: orderData,}),])await client.query('COMMIT')} catch (error) {await client.query('ROLLBACK')throw error} finally {client.release()await pool.end()}}// API endpoint to create ordersordersApi.post('/orders', async ({ req, res }) => {try {await createOrder(req.json())res.json({ success: true })} catch (error) {res.status(500).json({ error: error.message })}})// Schedule to publish messages from the outboxconst publishSchedule = schedule('publish-outbox').every('1 minute',async () => {const connStr = await db.connectionString()const pool = new Pool({ connectionString: connStr })try {const result = await pool.query(`SELECT id, topic, messageFROM outboxWHERE published_at IS NULLORDER BY created_at ASC`)for (const msg of result.rows) {try {// Publish the messageawait orderTopic.publish(msg.message)// Mark as publishedawait pool.query('UPDATE outbox SET published_at = CURRENT_TIMESTAMP WHERE id = $1',[msg.id],)} catch (error) {console.error('Failed to publish message:', error)// Message will be retried on next run}}} finally {await pool.end()}},)
The Outbox Pattern is particularly useful when you need to ensure that messages are published as part of a database transaction. The background worker is implemented as a Nitric schedule that runs every minute to publish messages from the outbox table.
Error Handling
Dead Letter Queues
For queues, you can implement a dead letter queue (DLQ) to handle messages that fail processing after multiple attempts:
Dead letter queues may become a native feature of Nitric in the future. If you need this functionality, please let us know by voting or commenting on the GitHub issue.
import { queue } from '@nitric/sdk'const mainQueue = queue('orders').allow('enqueue', 'dequeue')const dlq = queue('orders-dlq').allow('enqueue')// Process messages with retry logicconst messages = await mainQueue.dequeue(10)for (const message of messages) {try {await processOrder(message.data)await message.complete()} catch (error) {// Move failed message to DLQ after max retriesif (message.retryCount >= 3) {await dlq.enqueue({originalMessage: message.data,error: error.message,})await message.complete()}}}
Best Practices
-
Message Size: Keep messages small and focused. Large messages can impact performance and increase costs.
-
Batch Operations: When possible, use batch operations for better performance:
- Batch enqueue messages
- Process multiple messages in a single dequeue operation
-
Error Handling: Implement proper error handling and retry logic:
- Use dead letter queues for failed messages
- Implement exponential backoff for retries
- Log errors for monitoring and debugging
-
Monitoring: Set up monitoring for your queues and topics:
- Track message processing times
- Monitor error rates
- Set up alerts for queue depth
-
Security: Follow security best practices:
- Use minimal required permissions
- Encrypt sensitive data in messages
- Validate message content
-
Testing: Test your message processing logic thoroughly:
- Test with duplicate messages
- Test error scenarios
- Test with different message sizes and types
The implementation details of these patterns may vary depending on your cloud provider. See the provider-specific documentation for more details.