消息队列实战:异步处理与系统解耦

掌握 RabbitMQ、Kafka 核心概念、消息模式、可靠投递和分布式事务

消息队列实战:异步处理与系统解耦

消息队列是分布式系统的基石,实现服务解耦、异步处理和流量削峰。本文深入探讨消息队列的核心概念和实战应用。

为什么需要消息队列

核心应用场景

消息队列应用场景:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   异步处理                                          │
│   ├── 发送邮件/短信                                 │
│   ├── 生成报表                                      │
│   └── 数据同步                                      │
│                                                     │
│   服务解耦                                          │
│   ├── 订单系统 → 库存系统                           │
│   ├── 用户系统 → 通知系统                           │
│   └── 支付系统 → 积分系统                           │
│                                                     │
│   流量削峰                                          │
│   ├── 秒杀活动                                      │
│   ├── 批量数据处理                                  │
│   └── 日志收集                                      │
│                                                     │
│   事件驱动                                          │
│   ├── 实时数据处理                                  │
│   ├── 事件溯源 (Event Sourcing)                     │
│   └── CQRS 架构                                     │
│                                                     │
└─────────────────────────────────────────────────────┘

消息队列对比

特性RabbitMQKafkaRedis Streams
模型推模式拉模式拉模式
吞吐量万级百万级十万级
延迟微秒级毫秒级微秒级
持久化可选强制可选
消息回溯不支持支持支持
适用场景业务消息日志/流处理轻量级队列

RabbitMQ 实战

基本概念

RabbitMQ 架构:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   Producer → Exchange → Binding → Queue → Consumer │
│                                                     │
│   Exchange 类型:                                    │
│   ├── Direct   → 精确匹配路由键                     │
│   ├── Fanout   → 广播到所有队列                     │
│   ├── Topic    → 模式匹配路由键                     │
│   └── Headers  → 基于消息头匹配                     │
│                                                     │
└─────────────────────────────────────────────────────┘

连接与基本操作

import amqp from 'amqplib';

class RabbitMQClient {
  private connection: amqp.Connection | null = null;
  private channel: amqp.Channel | null = null;

  async connect(url: string): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // 设置预取数量,控制消费速率
    await this.channel.prefetch(10);

    // 连接错误处理
    this.connection.on('error', (err) => {
      console.error('RabbitMQ connection error:', err);
      this.reconnect(url);
    });
  }

  private async reconnect(url: string): Promise<void> {
    await new Promise(resolve => setTimeout(resolve, 5000));
    await this.connect(url);
  }

  async close(): Promise<void> {
    await this.channel?.close();
    await this.connection?.close();
  }
}

发布订阅模式

// 发布者
class Publisher {
  constructor(private channel: amqp.Channel) {}

  async publish(exchange: string, routingKey: string, message: object): Promise<void> {
    const content = Buffer.from(JSON.stringify(message));

    this.channel.publish(exchange, routingKey, content, {
      persistent: true,           // 消息持久化
      contentType: 'application/json',
      timestamp: Date.now(),
      messageId: generateUUID(),
    });
  }
}

// 订阅者
class Subscriber {
  constructor(private channel: amqp.Channel) {}

  async subscribe(
    queue: string,
    handler: (msg: object) => Promise<void>
  ): Promise<void> {
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const content = JSON.parse(msg.content.toString());
        await handler(content);
        this.channel.ack(msg);  // 确认消息
      } catch (error) {
        console.error('Message processing failed:', error);
        // 拒绝并重新入队
        this.channel.nack(msg, false, true);
      }
    });
  }
}

延迟队列

// 使用 Dead Letter Exchange 实现延迟队列
async function setupDelayedQueue(channel: amqp.Channel): Promise<void> {
  // 延迟交换机
  await channel.assertExchange('delayed.exchange', 'direct', { durable: true });

  // 延迟队列(消息过期后转发到处理队列)
  await channel.assertQueue('delayed.queue', {
    durable: true,
    deadLetterExchange: 'process.exchange',
    deadLetterRoutingKey: 'process',
  });

  // 处理交换机和队列
  await channel.assertExchange('process.exchange', 'direct', { durable: true });
  await channel.assertQueue('process.queue', { durable: true });

  await channel.bindQueue('delayed.queue', 'delayed.exchange', 'delayed');
  await channel.bindQueue('process.queue', 'process.exchange', 'process');
}

// 发送延迟消息
async function sendDelayedMessage(
  channel: amqp.Channel,
  message: object,
  delayMs: number
): Promise<void> {
  channel.publish('delayed.exchange', 'delayed', Buffer.from(JSON.stringify(message)), {
    persistent: true,
    expiration: String(delayMs),
  });
}

Kafka 实战

核心概念

Kafka 架构:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   Topic (主题)                                       │
│   ├── Partition 0  [msg0, msg3, msg6, ...]         │
│   ├── Partition 1  [msg1, msg4, msg7, ...]         │
│   └── Partition 2  [msg2, msg5, msg8, ...]         │
│                                                     │
│   Consumer Group                                    │
│   ├── Consumer A  → Partition 0                    │
│   ├── Consumer B  → Partition 1                    │
│   └── Consumer C  → Partition 2                    │
│                                                     │
│   关键概念:                                        │
│   ├── Offset    → 消息在分区中的位置               │
│   ├── Replica   → 分区副本,保证高可用             │
│   └── Leader    → 负责读写的副本                   │
│                                                     │
└─────────────────────────────────────────────────────┘

生产者

import { Kafka, Producer, Partitioners } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092'],
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
});

class KafkaProducer {
  private producer: Producer;

  constructor() {
    this.producer = kafka.producer({
      createPartitioner: Partitioners.DefaultPartitioner,
      allowAutoTopicCreation: false,
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async send(topic: string, messages: object[]): Promise<void> {
    await this.producer.send({
      topic,
      messages: messages.map(msg => ({
        key: msg.id?.toString(),
        value: JSON.stringify(msg),
        headers: {
          'content-type': 'application/json',
          timestamp: Date.now().toString(),
        },
      })),
    });
  }

  // 批量发送优化
  async sendBatch(messages: { topic: string; data: object }[]): Promise<void> {
    const grouped = messages.reduce((acc, msg) => {
      if (!acc[msg.topic]) acc[msg.topic] = [];
      acc[msg.topic].push(msg.data);
      return acc;
    }, {} as Record<string, object[]>);

    await this.producer.sendBatch({
      topicMessages: Object.entries(grouped).map(([topic, data]) => ({
        topic,
        messages: data.map(d => ({ value: JSON.stringify(d) })),
      })),
    });
  }
}

消费者

import { Consumer, EachMessagePayload } from 'kafkajs';

class KafkaConsumer {
  private consumer: Consumer;

  constructor(groupId: string) {
    this.consumer = kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
    });
  }

  async connect(): Promise<void> {
    await this.consumer.connect();
  }

  async subscribe(topics: string[]): Promise<void> {
    for (const topic of topics) {
      await this.consumer.subscribe({ topic, fromBeginning: false });
    }
  }

  async run(handler: (payload: EachMessagePayload) => Promise<void>): Promise<void> {
    await this.consumer.run({
      eachMessage: async (payload) => {
        const { topic, partition, message } = payload;

        try {
          await handler(payload);
        } catch (error) {
          console.error(`Error processing message from ${topic}:${partition}`, error);
          // 可以发送到死信队列
          await this.sendToDeadLetterQueue(topic, message);
        }
      },
    });
  }

  private async sendToDeadLetterQueue(topic: string, message: any): Promise<void> {
    // 实现死信队列逻辑
  }
}

// 使用示例
const consumer = new KafkaConsumer('order-service');
await consumer.connect();
await consumer.subscribe(['orders', 'payments']);

await consumer.run(async ({ topic, message }) => {
  const data = JSON.parse(message.value?.toString() || '{}');
  console.log(`Received from ${topic}:`, data);
});

消息回溯

// 从指定偏移量消费
async function seekToOffset(consumer: Consumer, topic: string, partition: number, offset: string): Promise<void> {
  consumer.seek({
    topic,
    partition,
    offset,
  });
}

// 从指定时间戳消费
async function seekToTimestamp(admin: Admin, topic: string, timestamp: number): Promise<void> {
  const offsets = await admin.fetchTopicOffsetsByTimestamp(topic, timestamp);

  for (const { partition, offset } of offsets) {
    consumer.seek({ topic, partition, offset });
  }
}

消息可靠性

生产者确认

// RabbitMQ 确认模式
async function publishWithConfirm(channel: amqp.ConfirmChannel, message: object): Promise<void> {
  await channel.assertQueue('reliable.queue', { durable: true });

  return new Promise((resolve, reject) => {
    channel.sendToQueue(
      'reliable.queue',
      Buffer.from(JSON.stringify(message)),
      { persistent: true },
      (err) => {
        if (err) reject(err);
        else resolve();
      }
    );
  });
}

// Kafka acks 配置
const producer = kafka.producer({
  acks: -1,  // 等待所有副本确认
  idempotent: true,  // 幂等性保证
});

消费者确认

// 手动确认模式
class ReliableConsumer {
  async consume(channel: amqp.Channel, queue: string): Promise<void> {
    await channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        await this.processMessage(msg);

        // 处理成功后确认
        channel.ack(msg);
      } catch (error) {
        if (this.shouldRetry(error)) {
          // 重新入队
          channel.nack(msg, false, true);
        } else {
          // 发送到死信队列
          await this.sendToDeadLetter(msg);
          channel.ack(msg);
        }
      }
    }, { noAck: false });
  }

  private shouldRetry(error: Error): boolean {
    return error instanceof TemporaryError;
  }
}

幂等性处理

// 使用消息 ID 实现幂等
class IdempotentProcessor {
  private redis: Redis;

  async process(messageId: string, handler: () => Promise<void>): Promise<void> {
    const key = `processed:${messageId}`;

    // 检查是否已处理
    const exists = await this.redis.exists(key);
    if (exists) {
      console.log(`Message ${messageId} already processed`);
      return;
    }

    // 设置处理标记(使用 SETNX 保证原子性)
    const acquired = await this.redis.set(key, '1', 'EX', 86400, 'NX');
    if (!acquired) {
      return; // 其他实例正在处理
    }

    try {
      await handler();
    } catch (error) {
      // 处理失败,删除标记允许重试
      await this.redis.del(key);
      throw error;
    }
  }
}

分布式事务

Saga 模式

// Saga 编排器
class SagaOrchestrator {
  private steps: SagaStep[] = [];

  addStep(step: SagaStep): this {
    this.steps.push(step);
    return this;
  }

  async execute(data: any): Promise<void> {
    const completedSteps: SagaStep[] = [];

    try {
      for (const step of this.steps) {
        await step.execute(data);
        completedSteps.push(step);
      }
    } catch (error) {
      // 按逆序补偿
      for (const step of completedSteps.reverse()) {
        try {
          await step.compensate(data);
        } catch (compensateError) {
          console.error('Compensation failed:', compensateError);
        }
      }
      throw error;
    }
  }
}

interface SagaStep {
  execute(data: any): Promise<void>;
  compensate(data: any): Promise<void>;
}

// 使用示例:订单创建 Saga
const orderSaga = new SagaOrchestrator()
  .addStep({
    execute: async (data) => await createOrder(data),
    compensate: async (data) => await cancelOrder(data.orderId),
  })
  .addStep({
    execute: async (data) => await reserveInventory(data),
    compensate: async (data) => await releaseInventory(data),
  })
  .addStep({
    execute: async (data) => await processPayment(data),
    compensate: async (data) => await refundPayment(data),
  });

Outbox 模式

// 使用 Outbox 表保证消息投递
class OutboxPattern {
  async executeWithOutbox(
    db: Database,
    businessLogic: () => Promise<void>,
    message: OutboxMessage
  ): Promise<void> {
    await db.transaction(async (tx) => {
      // 1. 执行业务逻辑
      await businessLogic();

      // 2. 写入 Outbox 表
      await tx.outbox.create({
        data: {
          id: generateUUID(),
          topic: message.topic,
          payload: JSON.stringify(message.payload),
          status: 'PENDING',
          createdAt: new Date(),
        },
      });
    });
  }
}

// Outbox 消息发布器(后台任务)
class OutboxPublisher {
  async publishPendingMessages(): Promise<void> {
    const messages = await db.outbox.findMany({
      where: { status: 'PENDING' },
      orderBy: { createdAt: 'asc' },
      take: 100,
    });

    for (const msg of messages) {
      try {
        await this.producer.send(msg.topic, JSON.parse(msg.payload));
        await db.outbox.update({
          where: { id: msg.id },
          data: { status: 'SENT', sentAt: new Date() },
        });
      } catch (error) {
        await db.outbox.update({
          where: { id: msg.id },
          data: { retryCount: { increment: 1 } },
        });
      }
    }
  }
}

监控与运维

// 消费者滞后监控
async function monitorConsumerLag(admin: Admin, groupId: string): Promise<void> {
  const offsets = await admin.fetchOffsets({ groupId, topics: ['orders'] });

  for (const { topic, partitions } of offsets) {
    for (const { partition, offset } of partitions) {
      const topicOffsets = await admin.fetchTopicOffsets(topic);
      const latestOffset = topicOffsets.find(t => t.partition === partition)?.high;

      const lag = Number(latestOffset) - Number(offset);
      console.log(`Topic ${topic} Partition ${partition}: lag = ${lag}`);

      if (lag > 10000) {
        alertOps(`High consumer lag detected: ${topic}:${partition} = ${lag}`);
      }
    }
  }
}

最佳实践总结

消息队列最佳实践:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   生产者                                            │
│   ├── 使用确认机制                                  │
│   ├── 实现重试逻辑                                  │
│   ├── 批量发送优化性能                              │
│   └── 设置合理的超时时间                            │
│                                                     │
│   消费者                                            │
│   ├── 手动确认消息                                  │
│   ├── 实现幂等处理                                  │
│   ├── 合理设置并发数                                │
│   └── 做好错误处理                                  │
│                                                     │
│   运维                                              │
│   ├── 监控消费滞后                                  │
│   ├── 设置死信队列                                  │
│   ├── 定期清理过期消息                              │
│   └── 容量规划                                      │
│                                                     │
└─────────────────────────────────────────────────────┘
场景推荐方案
业务消息RabbitMQ
日志收集Kafka
轻量级队列Redis Streams
延迟任务RabbitMQ DLX
事件溯源Kafka

消息队列是构建可靠分布式系统的关键组件。理解其原理,选择合适的方案,才能发挥最大价值。


消息是系统的血脉,队列是调度的艺术。异步处理,让系统更加优雅。