Message Queue in Practice: Async Processing and System Decoupling

Master RabbitMQ, Kafka core concepts, messaging patterns, reliable delivery and distributed transactions

Message Queue in Practice: Async Processing and System Decoupling

Message queues are the cornerstone of distributed systems, enabling service decoupling, async processing, and traffic smoothing. This article explores core concepts and practical applications of message queues.

Why Message Queues

Core Use Cases

Message Queue Use Cases:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   Async Processing                                  │
│   ├── Email/SMS sending                            │
│   ├── Report generation                            │
│   └── Data synchronization                         │
│                                                     │
│   Service Decoupling                               │
│   ├── Order system → Inventory system              │
│   ├── User system → Notification system            │
│   └── Payment system → Points system               │
│                                                     │
│   Traffic Smoothing                                │
│   ├── Flash sales                                  │
│   ├── Batch data processing                        │
│   └── Log collection                               │
│                                                     │
│   Event-Driven                                     │
│   ├── Real-time data processing                   │
│   ├── Event Sourcing                              │
│   └── CQRS architecture                           │
│                                                     │
└─────────────────────────────────────────────────────┘

Message Queue Comparison

FeatureRabbitMQKafkaRedis Streams
ModelPushPullPull
Throughput10K/s1M/s100K/s
LatencyMicrosecondsMillisecondsMicroseconds
PersistenceOptionalRequiredOptional
Message replayNoYesYes
Use caseBusiness msgsLogs/StreamingLightweight

RabbitMQ in Practice

Core Concepts

RabbitMQ Architecture:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   Producer → Exchange → Binding → Queue → Consumer │
│                                                     │
│   Exchange Types:                                   │
│   ├── Direct   → Exact routing key match           │
│   ├── Fanout   → Broadcast to all queues          │
│   ├── Topic    → Pattern matching routing key     │
│   └── Headers  → Header-based matching            │
│                                                     │
└─────────────────────────────────────────────────────┘

Connection and Basic Operations

import amqp from 'amqplib';

class RabbitMQClient {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async connect(url: string): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // Set prefetch to control consumption rate
    await this.channel.prefetch(10);

    // Connection error handling
    this.connection.on('error', (err) => {
      console.error('RabbitMQ connection error:', err);
      this.reconnect(url);
    });
  }

  private async reconnect(url: string): Promise<void> {
    await new Promise(resolve => setTimeout(resolve, 5000));
    await this.connect(url);
  }

  async close(): Promise<void> {
    await this.channel?.close();
    await this.connection?.close();
  }
}

Publish/Subscribe Pattern

// Publisher
class Publisher {
  constructor(private channel: amqp.Channel) {}

  async publish(exchange: string, routingKey: string, message: object): Promise<void> {
    const content = Buffer.from(JSON.stringify(message));

    this.channel.publish(exchange, routingKey, content, {
      persistent: true,           // Message persistence
      contentType: 'application/json',
      timestamp: Date.now(),
      messageId: generateUUID(),
    });
  }
}

// Subscriber
class Subscriber {
  constructor(private channel: amqp.Channel) {}

  async subscribe(
    queue: string,
    handler: (msg: object) => Promise<void>
  ): Promise<void> {
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const content = JSON.parse(msg.content.toString());
        await handler(content);
        this.channel.ack(msg);  // Acknowledge message
      } catch (error) {
        console.error('Message processing failed:', error);
        // Reject and requeue
        this.channel.nack(msg, false, true);
      }
    });
  }
}

Delayed Queue

// Using Dead Letter Exchange for delayed queue
async function setupDelayedQueue(channel: amqp.Channel): Promise<void> {
  // Delayed exchange
  await channel.assertExchange('delayed.exchange', 'direct', { durable: true });

  // Delayed queue (forwards to process queue on expiration)
  await channel.assertQueue('delayed.queue', {
    durable: true,
    deadLetterExchange: 'process.exchange',
    deadLetterRoutingKey: 'process',
  });

  // Process exchange and queue
  await channel.assertExchange('process.exchange', 'direct', { durable: true });
  await channel.assertQueue('process.queue', { durable: true });

  await channel.bindQueue('delayed.queue', 'delayed.exchange', 'delayed');
  await channel.bindQueue('process.queue', 'process.exchange', 'process');
}

// Send delayed message
async function sendDelayedMessage(
  channel: amqp.Channel,
  message: object,
  delayMs: number
): Promise<void> {
  channel.publish('delayed.exchange', 'delayed', Buffer.from(JSON.stringify(message)), {
    persistent: true,
    expiration: String(delayMs),
  });
}

Kafka in Practice

Core Concepts

Kafka Architecture:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   Topic                                             │
│   ├── Partition 0  [msg0, msg3, msg6, ...]         │
│   ├── Partition 1  [msg1, msg4, msg7, ...]         │
│   └── Partition 2  [msg2, msg5, msg8, ...]         │
│                                                     │
│   Consumer Group                                    │
│   ├── Consumer A  → Partition 0                    │
│   ├── Consumer B  → Partition 1                    │
│   └── Consumer C  → Partition 2                    │
│                                                     │
│   Key Concepts:                                     │
│   ├── Offset    → Message position in partition    │
│   ├── Replica   → Partition replicas for HA       │
│   └── Leader    → Replica handling reads/writes   │
│                                                     │
└─────────────────────────────────────────────────────┘

Producer

import { Kafka, Producer, Partitioners } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092'],
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
});

class KafkaProducer {
  private producer: Producer;

  constructor() {
    this.producer = kafka.producer({
      createPartitioner: Partitioners.DefaultPartitioner,
      allowAutoTopicCreation: false,
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async send(topic: string, messages: object[]): Promise<void> {
    await this.producer.send({
      topic,
      messages: messages.map(msg => ({
        key: msg.id?.toString(),
        value: JSON.stringify(msg),
        headers: {
          'content-type': 'application/json',
          timestamp: Date.now().toString(),
        },
      })),
    });
  }

  // Batch sending optimization
  async sendBatch(messages: { topic: string; data: object }[]): Promise<void> {
    const grouped = messages.reduce((acc, msg) => {
      if (!acc[msg.topic]) acc[msg.topic] = [];
      acc[msg.topic].push(msg.data);
      return acc;
    }, {} as Record<string, object[]>);

    await this.producer.sendBatch({
      topicMessages: Object.entries(grouped).map(([topic, data]) => ({
        topic,
        messages: data.map(d => ({ value: JSON.stringify(d) })),
      })),
    });
  }
}

Consumer

import { Consumer, EachMessagePayload } from 'kafkajs';

class KafkaConsumer {
  private consumer: Consumer;

  constructor(groupId: string) {
    this.consumer = kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
    });
  }

  async connect(): Promise<void> {
    await this.consumer.connect();
  }

  async subscribe(topics: string[]): Promise<void> {
    for (const topic of topics) {
      await this.consumer.subscribe({ topic, fromBeginning: false });
    }
  }

  async run(handler: (payload: EachMessagePayload) => Promise<void>): Promise<void> {
    await this.consumer.run({
      eachMessage: async (payload) => {
        const { topic, partition, message } = payload;

        try {
          await handler(payload);
        } catch (error) {
          console.error(`Error processing message from ${topic}:${partition}`, error);
          await this.sendToDeadLetterQueue(topic, message);
        }
      },
    });
  }

  private async sendToDeadLetterQueue(topic: string, message: any): Promise<void> {
    // Dead letter queue implementation
  }
}

// Usage example
const consumer = new KafkaConsumer('order-service');
await consumer.connect();
await consumer.subscribe(['orders', 'payments']);

await consumer.run(async ({ topic, message }) => {
  const data = JSON.parse(message.value?.toString() || '{}');
  console.log(`Received from ${topic}:`, data);
});

Message Replay

// Consume from specific offset
async function seekToOffset(consumer: Consumer, topic: string, partition: number, offset: string): Promise<void> {
  consumer.seek({
    topic,
    partition,
    offset,
  });
}

// Consume from specific timestamp
async function seekToTimestamp(admin: Admin, topic: string, timestamp: number): Promise<void> {
  const offsets = await admin.fetchTopicOffsetsByTimestamp(topic, timestamp);

  for (const { partition, offset } of offsets) {
    consumer.seek({ topic, partition, offset });
  }
}

Message Reliability

Producer Acknowledgment

// RabbitMQ confirm mode
async function publishWithConfirm(channel: amqp.ConfirmChannel, message: object): Promise<void> {
  await channel.assertQueue('reliable.queue', { durable: true });

  return new Promise((resolve, reject) => {
    channel.sendToQueue(
      'reliable.queue',
      Buffer.from(JSON.stringify(message)),
      { persistent: true },
      (err) => {
        if (err) reject(err);
        else resolve();
      }
    );
  });
}

// Kafka acks configuration
const producer = kafka.producer({
  acks: -1,  // Wait for all replicas
  idempotent: true,  // Idempotency guarantee
});

Consumer Acknowledgment

// Manual acknowledgment mode
class ReliableConsumer {
  async consume(channel: amqp.Channel, queue: string): Promise<void> {
    await channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        await this.processMessage(msg);
        channel.ack(msg);
      } catch (error) {
        if (this.shouldRetry(error)) {
          channel.nack(msg, false, true);
        } else {
          await this.sendToDeadLetter(msg);
          channel.ack(msg);
        }
      }
    }, { noAck: false });
  }

  private shouldRetry(error: Error): boolean {
    return error instanceof TemporaryError;
  }
}

Idempotency

// Message ID-based idempotency
class IdempotentProcessor {
  private redis: Redis;

  async process(messageId: string, handler: () => Promise<void>): Promise<void> {
    const key = `processed:${messageId}`;

    // Check if already processed
    const exists = await this.redis.exists(key);
    if (exists) {
      console.log(`Message ${messageId} already processed`);
      return;
    }

    // Set processing flag atomically
    const acquired = await this.redis.set(key, '1', 'EX', 86400, 'NX');
    if (!acquired) {
      return; // Another instance is processing
    }

    try {
      await handler();
    } catch (error) {
      await this.redis.del(key);
      throw error;
    }
  }
}

Distributed Transactions

Saga Pattern

// Saga Orchestrator
class SagaOrchestrator {
  private steps: SagaStep[] = [];

  addStep(step: SagaStep): this {
    this.steps.push(step);
    return this;
  }

  async execute(data: any): Promise<void> {
    const completedSteps: SagaStep[] = [];

    try {
      for (const step of this.steps) {
        await step.execute(data);
        completedSteps.push(step);
      }
    } catch (error) {
      // Compensate in reverse order
      for (const step of completedSteps.reverse()) {
        try {
          await step.compensate(data);
        } catch (compensateError) {
          console.error('Compensation failed:', compensateError);
        }
      }
      throw error;
    }
  }
}

interface SagaStep {
  execute(data: any): Promise<void>;
  compensate(data: any): Promise<void>;
}

// Usage: Order creation Saga
const orderSaga = new SagaOrchestrator()
  .addStep({
    execute: async (data) => await createOrder(data),
    compensate: async (data) => await cancelOrder(data.orderId),
  })
  .addStep({
    execute: async (data) => await reserveInventory(data),
    compensate: async (data) => await releaseInventory(data),
  })
  .addStep({
    execute: async (data) => await processPayment(data),
    compensate: async (data) => await refundPayment(data),
  });

Outbox Pattern

// Using Outbox table for guaranteed delivery
class OutboxPattern {
  async executeWithOutbox(
    db: Database,
    businessLogic: () => Promise<void>,
    message: OutboxMessage
  ): Promise<void> {
    await db.transaction(async (tx) => {
      // 1. Execute business logic
      await businessLogic();

      // 2. Write to Outbox table
      await tx.outbox.create({
        data: {
          id: generateUUID(),
          topic: message.topic,
          payload: JSON.stringify(message.payload),
          status: 'PENDING',
          createdAt: new Date(),
        },
      });
    });
  }
}

// Outbox publisher (background task)
class OutboxPublisher {
  async publishPendingMessages(): Promise<void> {
    const messages = await db.outbox.findMany({
      where: { status: 'PENDING' },
      orderBy: { createdAt: 'asc' },
      take: 100,
    });

    for (const msg of messages) {
      try {
        await this.producer.send(msg.topic, JSON.parse(msg.payload));
        await db.outbox.update({
          where: { id: msg.id },
          data: { status: 'SENT', sentAt: new Date() },
        });
      } catch (error) {
        await db.outbox.update({
          where: { id: msg.id },
          data: { retryCount: { increment: 1 } },
        });
      }
    }
  }
}

Monitoring and Operations

// Consumer lag monitoring
async function monitorConsumerLag(admin: Admin, groupId: string): Promise<void> {
  const offsets = await admin.fetchOffsets({ groupId, topics: ['orders'] });

  for (const { topic, partitions } of offsets) {
    for (const { partition, offset } of partitions) {
      const topicOffsets = await admin.fetchTopicOffsets(topic);
      const latestOffset = topicOffsets.find(t => t.partition === partition)?.high;

      const lag = Number(latestOffset) - Number(offset);
      console.log(`Topic ${topic} Partition ${partition}: lag = ${lag}`);

      if (lag > 10000) {
        alertOps(`High consumer lag detected: ${topic}:${partition} = ${lag}`);
      }
    }
  }
}

Best Practices Summary

Message Queue Best Practices:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   Producer                                          │
│   ├── Use acknowledgment mechanism                 │
│   ├── Implement retry logic                        │
│   ├── Batch sending for performance               │
│   └── Set reasonable timeouts                      │
│                                                     │
│   Consumer                                          │
│   ├── Manual message acknowledgment               │
│   ├── Implement idempotent processing             │
│   ├── Set appropriate concurrency                 │
│   └── Proper error handling                        │
│                                                     │
│   Operations                                        │
│   ├── Monitor consumer lag                        │
│   ├── Set up dead letter queues                   │
│   ├── Clean up expired messages                   │
│   └── Capacity planning                           │
│                                                     │
└─────────────────────────────────────────────────────┘
ScenarioRecommended Solution
Business messagesRabbitMQ
Log collectionKafka
Lightweight queueRedis Streams
Delayed tasksRabbitMQ DLX
Event sourcingKafka

Message queues are key components for building reliable distributed systems. Understanding their principles and choosing the right solution is essential for maximum value.


Messages are the bloodstream of systems, queues are the art of scheduling. Async processing makes systems more elegant.