消息队列是分布式系统的基石,实现服务解耦、异步处理和流量削峰。本文深入探讨消息队列的核心概念和实战应用。
为什么需要消息队列
核心应用场景
消息队列应用场景:
┌─────────────────────────────────────────────────────┐
│ │
│ 异步处理 │
│ ├── 发送邮件/短信 │
│ ├── 生成报表 │
│ └── 数据同步 │
│ │
│ 服务解耦 │
│ ├── 订单系统 → 库存系统 │
│ ├── 用户系统 → 通知系统 │
│ └── 支付系统 → 积分系统 │
│ │
│ 流量削峰 │
│ ├── 秒杀活动 │
│ ├── 批量数据处理 │
│ └── 日志收集 │
│ │
│ 事件驱动 │
│ ├── 实时数据处理 │
│ ├── 事件溯源 (Event Sourcing) │
│ └── CQRS 架构 │
│ │
└─────────────────────────────────────────────────────┘
消息队列对比
| 特性 | RabbitMQ | Kafka | Redis Streams |
|---|---|---|---|
| 模型 | 推模式 | 拉模式 | 拉模式 |
| 吞吐量 | 万级 | 百万级 | 十万级 |
| 延迟 | 微秒级 | 毫秒级 | 微秒级 |
| 持久化 | 可选 | 强制 | 可选 |
| 消息回溯 | 不支持 | 支持 | 支持 |
| 适用场景 | 业务消息 | 日志/流处理 | 轻量级队列 |
RabbitMQ 实战
基本概念
RabbitMQ 架构:
┌─────────────────────────────────────────────────────┐
│ │
│ Producer → Exchange → Binding → Queue → Consumer │
│ │
│ Exchange 类型: │
│ ├── Direct → 精确匹配路由键 │
│ ├── Fanout → 广播到所有队列 │
│ ├── Topic → 模式匹配路由键 │
│ └── Headers → 基于消息头匹配 │
│ │
└─────────────────────────────────────────────────────┘
连接与基本操作
import amqp from 'amqplib';
class RabbitMQClient {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
async connect(url: string): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// 设置预取数量,控制消费速率
await this.channel.prefetch(10);
// 连接错误处理
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
this.reconnect(url);
});
}
private async reconnect(url: string): Promise<void> {
await new Promise(resolve => setTimeout(resolve, 5000));
await this.connect(url);
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
发布订阅模式
// 发布者
class Publisher {
constructor(private channel: amqp.Channel) {}
async publish(exchange: string, routingKey: string, message: object): Promise<void> {
const content = Buffer.from(JSON.stringify(message));
this.channel.publish(exchange, routingKey, content, {
persistent: true, // 消息持久化
contentType: 'application/json',
timestamp: Date.now(),
messageId: generateUUID(),
});
}
}
// 订阅者
class Subscriber {
constructor(private channel: amqp.Channel) {}
async subscribe(
queue: string,
handler: (msg: object) => Promise<void>
): Promise<void> {
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const content = JSON.parse(msg.content.toString());
await handler(content);
this.channel.ack(msg); // 确认消息
} catch (error) {
console.error('Message processing failed:', error);
// 拒绝并重新入队
this.channel.nack(msg, false, true);
}
});
}
}
延迟队列
// 使用 Dead Letter Exchange 实现延迟队列
async function setupDelayedQueue(channel: amqp.Channel): Promise<void> {
// 延迟交换机
await channel.assertExchange('delayed.exchange', 'direct', { durable: true });
// 延迟队列(消息过期后转发到处理队列)
await channel.assertQueue('delayed.queue', {
durable: true,
deadLetterExchange: 'process.exchange',
deadLetterRoutingKey: 'process',
});
// 处理交换机和队列
await channel.assertExchange('process.exchange', 'direct', { durable: true });
await channel.assertQueue('process.queue', { durable: true });
await channel.bindQueue('delayed.queue', 'delayed.exchange', 'delayed');
await channel.bindQueue('process.queue', 'process.exchange', 'process');
}
// 发送延迟消息
async function sendDelayedMessage(
channel: amqp.Channel,
message: object,
delayMs: number
): Promise<void> {
channel.publish('delayed.exchange', 'delayed', Buffer.from(JSON.stringify(message)), {
persistent: true,
expiration: String(delayMs),
});
}
Kafka 实战
核心概念
Kafka 架构:
┌─────────────────────────────────────────────────────┐
│ │
│ Topic (主题) │
│ ├── Partition 0 [msg0, msg3, msg6, ...] │
│ ├── Partition 1 [msg1, msg4, msg7, ...] │
│ └── Partition 2 [msg2, msg5, msg8, ...] │
│ │
│ Consumer Group │
│ ├── Consumer A → Partition 0 │
│ ├── Consumer B → Partition 1 │
│ └── Consumer C → Partition 2 │
│ │
│ 关键概念: │
│ ├── Offset → 消息在分区中的位置 │
│ ├── Replica → 分区副本,保证高可用 │
│ └── Leader → 负责读写的副本 │
│ │
└─────────────────────────────────────────────────────┘
生产者
import { Kafka, Producer, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
retry: {
initialRetryTime: 100,
retries: 8,
},
});
class KafkaProducer {
private producer: Producer;
constructor() {
this.producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
allowAutoTopicCreation: false,
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async send(topic: string, messages: object[]): Promise<void> {
await this.producer.send({
topic,
messages: messages.map(msg => ({
key: msg.id?.toString(),
value: JSON.stringify(msg),
headers: {
'content-type': 'application/json',
timestamp: Date.now().toString(),
},
})),
});
}
// 批量发送优化
async sendBatch(messages: { topic: string; data: object }[]): Promise<void> {
const grouped = messages.reduce((acc, msg) => {
if (!acc[msg.topic]) acc[msg.topic] = [];
acc[msg.topic].push(msg.data);
return acc;
}, {} as Record<string, object[]>);
await this.producer.sendBatch({
topicMessages: Object.entries(grouped).map(([topic, data]) => ({
topic,
messages: data.map(d => ({ value: JSON.stringify(d) })),
})),
});
}
}
消费者
import { Consumer, EachMessagePayload } from 'kafkajs';
class KafkaConsumer {
private consumer: Consumer;
constructor(groupId: string) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
}
async connect(): Promise<void> {
await this.consumer.connect();
}
async subscribe(topics: string[]): Promise<void> {
for (const topic of topics) {
await this.consumer.subscribe({ topic, fromBeginning: false });
}
}
async run(handler: (payload: EachMessagePayload) => Promise<void>): Promise<void> {
await this.consumer.run({
eachMessage: async (payload) => {
const { topic, partition, message } = payload;
try {
await handler(payload);
} catch (error) {
console.error(`Error processing message from ${topic}:${partition}`, error);
// 可以发送到死信队列
await this.sendToDeadLetterQueue(topic, message);
}
},
});
}
private async sendToDeadLetterQueue(topic: string, message: any): Promise<void> {
// 实现死信队列逻辑
}
}
// 使用示例
const consumer = new KafkaConsumer('order-service');
await consumer.connect();
await consumer.subscribe(['orders', 'payments']);
await consumer.run(async ({ topic, message }) => {
const data = JSON.parse(message.value?.toString() || '{}');
console.log(`Received from ${topic}:`, data);
});
消息回溯
// 从指定偏移量消费
async function seekToOffset(consumer: Consumer, topic: string, partition: number, offset: string): Promise<void> {
consumer.seek({
topic,
partition,
offset,
});
}
// 从指定时间戳消费
async function seekToTimestamp(admin: Admin, topic: string, timestamp: number): Promise<void> {
const offsets = await admin.fetchTopicOffsetsByTimestamp(topic, timestamp);
for (const { partition, offset } of offsets) {
consumer.seek({ topic, partition, offset });
}
}
消息可靠性
生产者确认
// RabbitMQ 确认模式
async function publishWithConfirm(channel: amqp.ConfirmChannel, message: object): Promise<void> {
await channel.assertQueue('reliable.queue', { durable: true });
return new Promise((resolve, reject) => {
channel.sendToQueue(
'reliable.queue',
Buffer.from(JSON.stringify(message)),
{ persistent: true },
(err) => {
if (err) reject(err);
else resolve();
}
);
});
}
// Kafka acks 配置
const producer = kafka.producer({
acks: -1, // 等待所有副本确认
idempotent: true, // 幂等性保证
});
消费者确认
// 手动确认模式
class ReliableConsumer {
async consume(channel: amqp.Channel, queue: string): Promise<void> {
await channel.consume(queue, async (msg) => {
if (!msg) return;
try {
await this.processMessage(msg);
// 处理成功后确认
channel.ack(msg);
} catch (error) {
if (this.shouldRetry(error)) {
// 重新入队
channel.nack(msg, false, true);
} else {
// 发送到死信队列
await this.sendToDeadLetter(msg);
channel.ack(msg);
}
}
}, { noAck: false });
}
private shouldRetry(error: Error): boolean {
return error instanceof TemporaryError;
}
}
幂等性处理
// 使用消息 ID 实现幂等
class IdempotentProcessor {
private redis: Redis;
async process(messageId: string, handler: () => Promise<void>): Promise<void> {
const key = `processed:${messageId}`;
// 检查是否已处理
const exists = await this.redis.exists(key);
if (exists) {
console.log(`Message ${messageId} already processed`);
return;
}
// 设置处理标记(使用 SETNX 保证原子性)
const acquired = await this.redis.set(key, '1', 'EX', 86400, 'NX');
if (!acquired) {
return; // 其他实例正在处理
}
try {
await handler();
} catch (error) {
// 处理失败,删除标记允许重试
await this.redis.del(key);
throw error;
}
}
}
分布式事务
Saga 模式
// Saga 编排器
class SagaOrchestrator {
private steps: SagaStep[] = [];
addStep(step: SagaStep): this {
this.steps.push(step);
return this;
}
async execute(data: any): Promise<void> {
const completedSteps: SagaStep[] = [];
try {
for (const step of this.steps) {
await step.execute(data);
completedSteps.push(step);
}
} catch (error) {
// 按逆序补偿
for (const step of completedSteps.reverse()) {
try {
await step.compensate(data);
} catch (compensateError) {
console.error('Compensation failed:', compensateError);
}
}
throw error;
}
}
}
interface SagaStep {
execute(data: any): Promise<void>;
compensate(data: any): Promise<void>;
}
// 使用示例:订单创建 Saga
const orderSaga = new SagaOrchestrator()
.addStep({
execute: async (data) => await createOrder(data),
compensate: async (data) => await cancelOrder(data.orderId),
})
.addStep({
execute: async (data) => await reserveInventory(data),
compensate: async (data) => await releaseInventory(data),
})
.addStep({
execute: async (data) => await processPayment(data),
compensate: async (data) => await refundPayment(data),
});
Outbox 模式
// 使用 Outbox 表保证消息投递
class OutboxPattern {
async executeWithOutbox(
db: Database,
businessLogic: () => Promise<void>,
message: OutboxMessage
): Promise<void> {
await db.transaction(async (tx) => {
// 1. 执行业务逻辑
await businessLogic();
// 2. 写入 Outbox 表
await tx.outbox.create({
data: {
id: generateUUID(),
topic: message.topic,
payload: JSON.stringify(message.payload),
status: 'PENDING',
createdAt: new Date(),
},
});
});
}
}
// Outbox 消息发布器(后台任务)
class OutboxPublisher {
async publishPendingMessages(): Promise<void> {
const messages = await db.outbox.findMany({
where: { status: 'PENDING' },
orderBy: { createdAt: 'asc' },
take: 100,
});
for (const msg of messages) {
try {
await this.producer.send(msg.topic, JSON.parse(msg.payload));
await db.outbox.update({
where: { id: msg.id },
data: { status: 'SENT', sentAt: new Date() },
});
} catch (error) {
await db.outbox.update({
where: { id: msg.id },
data: { retryCount: { increment: 1 } },
});
}
}
}
}
监控与运维
// 消费者滞后监控
async function monitorConsumerLag(admin: Admin, groupId: string): Promise<void> {
const offsets = await admin.fetchOffsets({ groupId, topics: ['orders'] });
for (const { topic, partitions } of offsets) {
for (const { partition, offset } of partitions) {
const topicOffsets = await admin.fetchTopicOffsets(topic);
const latestOffset = topicOffsets.find(t => t.partition === partition)?.high;
const lag = Number(latestOffset) - Number(offset);
console.log(`Topic ${topic} Partition ${partition}: lag = ${lag}`);
if (lag > 10000) {
alertOps(`High consumer lag detected: ${topic}:${partition} = ${lag}`);
}
}
}
}
最佳实践总结
消息队列最佳实践:
┌─────────────────────────────────────────────────────┐
│ │
│ 生产者 │
│ ├── 使用确认机制 │
│ ├── 实现重试逻辑 │
│ ├── 批量发送优化性能 │
│ └── 设置合理的超时时间 │
│ │
│ 消费者 │
│ ├── 手动确认消息 │
│ ├── 实现幂等处理 │
│ ├── 合理设置并发数 │
│ └── 做好错误处理 │
│ │
│ 运维 │
│ ├── 监控消费滞后 │
│ ├── 设置死信队列 │
│ ├── 定期清理过期消息 │
│ └── 容量规划 │
│ │
└─────────────────────────────────────────────────────┘
| 场景 | 推荐方案 |
|---|---|
| 业务消息 | RabbitMQ |
| 日志收集 | Kafka |
| 轻量级队列 | Redis Streams |
| 延迟任务 | RabbitMQ DLX |
| 事件溯源 | Kafka |
消息队列是构建可靠分布式系统的关键组件。理解其原理,选择合适的方案,才能发挥最大价值。
消息是系统的血脉,队列是调度的艺术。异步处理,让系统更加优雅。