微服务架构已成为构建大规模分布式系统的主流方案。本文将深入探讨微服务的核心概念、设计模式和实践经验。
架构演进
从单体到微服务
架构演进路径:
┌─────────────────────────────────────────────────────┐
│ │
│ 单体架构 (Monolith) │
│ ├── 所有功能在一个应用 │
│ ├── 简单部署 │
│ └── 随规模增长难以维护 │
│ │ │
│ ▼ │
│ 模块化单体 (Modular Monolith) │
│ ├── 内部模块化 │
│ ├── 清晰边界 │
│ └── 单一部署单元 │
│ │ │
│ ▼ │
│ 微服务架构 (Microservices) │
│ ├── 独立部署服务 │
│ ├── 技术多样性 │
│ └── 分布式复杂性 │
│ │
└─────────────────────────────────────────────────────┘
何时采用微服务
| 场景 | 单体 | 微服务 |
|---|---|---|
| 团队规模 | < 10人 | > 10人 |
| 代码库大小 | < 100K LOC | > 100K LOC |
| 部署频率 | 周/月 | 日/小时 |
| 扩展需求 | 整体扩展 | 按需扩展 |
| 技术栈 | 统一 | 多样化 |
核心设计原则
单一职责
服务职责划分:
┌─────────────────────────────────────────────────────┐
│ │
│ ✅ 良好划分 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 用户服务 │ │ 订单服务 │ │ 支付服务 │ │
│ │ │ │ │ │ │ │
│ │ • 注册 │ │ • 创建 │ │ • 处理 │ │
│ │ • 认证 │ │ • 查询 │ │ • 退款 │ │
│ │ • 资料 │ │ • 状态 │ │ • 对账 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ ❌ 职责混乱 │
│ ┌─────────────────────────────────────────┐ │
│ │ 业务服务 │ │
│ │ 用户 + 订单 + 支付 + 库存 + 通知... │ │
│ └─────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
领域驱动设计 (DDD)
// 聚合根定义
interface Order {
id: string;
customerId: string;
items: OrderItem[];
status: OrderStatus;
totalAmount: Money;
createdAt: Date;
// 领域行为
addItem(product: Product, quantity: number): void;
removeItem(itemId: string): void;
confirm(): void;
cancel(): void;
}
// 值对象
interface Money {
amount: number;
currency: string;
}
// 领域事件
interface OrderCreatedEvent {
orderId: string;
customerId: string;
items: OrderItem[];
totalAmount: Money;
occurredAt: Date;
}
// 仓储接口
interface OrderRepository {
save(order: Order): Promise<void>;
findById(id: string): Promise<Order | null>;
findByCustomerId(customerId: string): Promise<Order[]>;
}
限界上下文
电商系统限界上下文:
┌─────────────────────────────────────────────────────┐
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ 销售上下文 │ │ 库存上下文 │ │
│ │ │ │ │ │
│ │ Product │ │ StockItem │ │
│ │ • name │ │ • sku │ │
│ │ • price │ │ • quantity │ │
│ │ • description │ │ • location │ │
│ │ │ │ │ │
│ └──────────────────┘ └──────────────────┘ │
│ │ │ │
│ └──────────┬───────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ 防腐层 (ACL) │ │
│ │ │ │
│ │ ProductId ↔ SKU │ │
│ └───────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
服务间通信
同步通信
// REST API
// user-service/src/controllers/user.controller.ts
import express from 'express';
const router = express.Router();
router.get('/users/:id', async (req, res) => {
const user = await userService.findById(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
res.json(user);
});
router.post('/users', async (req, res) => {
const user = await userService.create(req.body);
res.status(201).json(user);
});
// gRPC 服务定义
// proto/user.proto
syntax = "proto3";
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
}
message User {
string id = 1;
string name = 2;
string email = 3;
string created_at = 4;
}
message GetUserRequest {
string id = 1;
}
异步通信
// 事件发布者
// order-service/src/events/publisher.ts
import { Kafka, Producer } from 'kafkajs';
class EventPublisher {
private producer: Producer;
constructor(kafka: Kafka) {
this.producer = kafka.producer();
}
async publish(topic: string, event: DomainEvent): Promise<void> {
await this.producer.send({
topic,
messages: [
{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
eventType: event.type,
timestamp: event.occurredAt.toISOString(),
},
},
],
});
}
}
// 订单创建后发布事件
async function createOrder(data: CreateOrderDto): Promise<Order> {
const order = Order.create(data);
await orderRepository.save(order);
await eventPublisher.publish('order-events', {
type: 'OrderCreated',
aggregateId: order.id,
payload: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount,
},
occurredAt: new Date(),
});
return order;
}
// 事件消费者
// inventory-service/src/events/consumer.ts
import { Kafka, Consumer } from 'kafkajs';
class OrderEventConsumer {
private consumer: Consumer;
async start(): Promise<void> {
await this.consumer.subscribe({ topic: 'order-events' });
await this.consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value!.toString());
switch (event.type) {
case 'OrderCreated':
await this.handleOrderCreated(event.payload);
break;
case 'OrderCancelled':
await this.handleOrderCancelled(event.payload);
break;
}
},
});
}
private async handleOrderCreated(payload: OrderCreatedPayload): Promise<void> {
// 扣减库存
for (const item of payload.items) {
await inventoryService.reserve(item.productId, item.quantity);
}
}
}
分布式模式
API 网关
// gateway/src/index.ts
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';
import rateLimit from 'express-rate-limit';
import jwt from 'jsonwebtoken';
const app = express();
// 速率限制
const limiter = rateLimit({
windowMs: 60 * 1000,
max: 100,
message: { error: 'Too many requests' },
});
app.use(limiter);
// JWT 认证中间件
const authMiddleware = (req, res, next) => {
const token = req.headers.authorization?.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'Unauthorized' });
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET!);
req.user = decoded;
next();
} catch {
res.status(401).json({ error: 'Invalid token' });
}
};
// 服务路由
app.use('/api/users', authMiddleware, createProxyMiddleware({
target: process.env.USER_SERVICE_URL,
changeOrigin: true,
pathRewrite: { '^/api/users': '' },
}));
app.use('/api/orders', authMiddleware, createProxyMiddleware({
target: process.env.ORDER_SERVICE_URL,
changeOrigin: true,
pathRewrite: { '^/api/orders': '' },
}));
app.use('/api/products', createProxyMiddleware({
target: process.env.PRODUCT_SERVICE_URL,
changeOrigin: true,
pathRewrite: { '^/api/products': '' },
}));
服务发现
# Kubernetes Service Discovery
apiVersion: v1
kind: Service
metadata:
name: user-service
namespace: production
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8080
---
# 应用配置使用 DNS 名称
# user-service.production.svc.cluster.local
// Consul 服务注册
import Consul from 'consul';
const consul = new Consul({ host: 'consul.local' });
async function registerService(): Promise<void> {
await consul.agent.service.register({
name: 'user-service',
id: `user-service-${process.env.HOSTNAME}`,
address: process.env.POD_IP,
port: 8080,
check: {
http: `http://${process.env.POD_IP}:8080/health`,
interval: '10s',
},
});
}
async function discoverService(name: string): Promise<string[]> {
const services = await consul.catalog.service.nodes(name);
return services.map(s => `http://${s.ServiceAddress}:${s.ServicePort}`);
}
断路器模式
// circuit-breaker.ts
import CircuitBreaker from 'opossum';
interface CircuitBreakerOptions {
timeout: number;
errorThresholdPercentage: number;
resetTimeout: number;
}
function createCircuitBreaker<T>(
fn: (...args: any[]) => Promise<T>,
options: CircuitBreakerOptions
): CircuitBreaker {
const breaker = new CircuitBreaker(fn, {
timeout: options.timeout,
errorThresholdPercentage: options.errorThresholdPercentage,
resetTimeout: options.resetTimeout,
});
breaker.on('open', () => {
console.log('Circuit breaker opened');
});
breaker.on('halfOpen', () => {
console.log('Circuit breaker half-open');
});
breaker.on('close', () => {
console.log('Circuit breaker closed');
});
breaker.fallback(() => {
return { error: 'Service temporarily unavailable' };
});
return breaker;
}
// 使用示例
const userServiceBreaker = createCircuitBreaker(
(userId: string) => fetch(`http://user-service/users/${userId}`).then(r => r.json()),
{
timeout: 3000,
errorThresholdPercentage: 50,
resetTimeout: 30000,
}
);
async function getUser(userId: string) {
return userServiceBreaker.fire(userId);
}
Saga 模式
// saga/order-saga.ts
interface SagaStep<T> {
name: string;
execute: (context: T) => Promise<void>;
compensate: (context: T) => Promise<void>;
}
class OrderSaga {
private steps: SagaStep<OrderContext>[] = [
{
name: 'reserveInventory',
execute: async (ctx) => {
ctx.inventoryReservationId = await inventoryService.reserve(
ctx.items
);
},
compensate: async (ctx) => {
if (ctx.inventoryReservationId) {
await inventoryService.cancelReservation(ctx.inventoryReservationId);
}
},
},
{
name: 'processPayment',
execute: async (ctx) => {
ctx.paymentId = await paymentService.process({
orderId: ctx.orderId,
amount: ctx.totalAmount,
customerId: ctx.customerId,
});
},
compensate: async (ctx) => {
if (ctx.paymentId) {
await paymentService.refund(ctx.paymentId);
}
},
},
{
name: 'createShipment',
execute: async (ctx) => {
ctx.shipmentId = await shippingService.createShipment({
orderId: ctx.orderId,
items: ctx.items,
address: ctx.shippingAddress,
});
},
compensate: async (ctx) => {
if (ctx.shipmentId) {
await shippingService.cancelShipment(ctx.shipmentId);
}
},
},
];
async execute(context: OrderContext): Promise<void> {
const executedSteps: SagaStep<OrderContext>[] = [];
try {
for (const step of this.steps) {
console.log(`Executing step: ${step.name}`);
await step.execute(context);
executedSteps.push(step);
}
} catch (error) {
console.error('Saga failed, compensating...');
// 逆序补偿
for (const step of executedSteps.reverse()) {
try {
console.log(`Compensating step: ${step.name}`);
await step.compensate(context);
} catch (compensateError) {
console.error(`Compensation failed for ${step.name}`, compensateError);
// 记录失败,人工干预
}
}
throw error;
}
}
}
数据管理
数据库每服务
数据库隔离策略:
┌─────────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │用户服务 │ │订单服务 │ │库存服务 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐ │
│ │PostgreSQL│ │ MongoDB │ │ Redis │ │
│ │ Users │ │ Orders │ │ Inventory│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 优势: │
│ • 技术多样性 │
│ • 独立扩展 │
│ • 故障隔离 │
│ │
│ 挑战: │
│ • 跨服务查询 │
│ • 数据一致性 │
│ • 分布式事务 │
│ │
└─────────────────────────────────────────────────────┘
CQRS 模式
// 命令端 - 写模型
interface CreateOrderCommand {
customerId: string;
items: { productId: string; quantity: number }[];
shippingAddress: Address;
}
class OrderCommandHandler {
async handle(command: CreateOrderCommand): Promise<string> {
const order = Order.create(command);
await this.orderRepository.save(order);
// 发布事件用于更新读模型
await this.eventBus.publish(new OrderCreatedEvent(order));
return order.id;
}
}
// 查询端 - 读模型
interface OrderReadModel {
id: string;
customerName: string;
customerEmail: string;
items: {
productName: string;
quantity: number;
price: number;
}[];
totalAmount: number;
status: string;
createdAt: Date;
}
class OrderQueryHandler {
// 使用优化的读模型数据库 (如 Elasticsearch)
private readStore: ReadModelStore;
async getOrderDetails(orderId: string): Promise<OrderReadModel> {
return this.readStore.findById(orderId);
}
async getOrdersByCustomer(customerId: string): Promise<OrderReadModel[]> {
return this.readStore.findByCustomerId(customerId);
}
}
// 投影器 - 更新读模型
class OrderProjection {
@EventHandler(OrderCreatedEvent)
async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
const customer = await this.customerService.getById(event.customerId);
const products = await this.productService.getByIds(
event.items.map(i => i.productId)
);
const readModel: OrderReadModel = {
id: event.orderId,
customerName: customer.name,
customerEmail: customer.email,
items: event.items.map(item => ({
productName: products.find(p => p.id === item.productId)!.name,
quantity: item.quantity,
price: item.price,
})),
totalAmount: event.totalAmount,
status: 'created',
createdAt: event.occurredAt,
};
await this.readStore.save(readModel);
}
}
事件溯源
// 事件存储
interface Event {
id: string;
aggregateId: string;
aggregateType: string;
type: string;
payload: Record<string, any>;
version: number;
occurredAt: Date;
}
class EventStore {
async append(event: Event): Promise<void> {
await this.db.collection('events').insertOne(event);
}
async getEvents(aggregateId: string): Promise<Event[]> {
return this.db
.collection('events')
.find({ aggregateId })
.sort({ version: 1 })
.toArray();
}
}
// 聚合重建
class Order {
private id: string;
private status: OrderStatus;
private items: OrderItem[] = [];
static async fromEvents(events: Event[]): Promise<Order> {
const order = new Order();
for (const event of events) {
order.apply(event);
}
return order;
}
private apply(event: Event): void {
switch (event.type) {
case 'OrderCreated':
this.id = event.payload.orderId;
this.status = 'created';
break;
case 'ItemAdded':
this.items.push(event.payload.item);
break;
case 'OrderConfirmed':
this.status = 'confirmed';
break;
case 'OrderShipped':
this.status = 'shipped';
break;
}
}
}
可观测性
分布式追踪
// OpenTelemetry 配置
import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
const sdk = new NodeSDK({
traceExporter: new OTLPTraceExporter({
url: 'http://jaeger:4318/v1/traces',
}),
instrumentations: [
new HttpInstrumentation(),
new ExpressInstrumentation(),
],
});
sdk.start();
// 手动追踪
import { trace, SpanStatusCode } from '@opentelemetry/api';
const tracer = trace.getTracer('order-service');
async function processOrder(orderId: string): Promise<void> {
const span = tracer.startSpan('processOrder');
span.setAttribute('orderId', orderId);
try {
await validateOrder(orderId);
span.addEvent('Order validated');
await reserveInventory(orderId);
span.addEvent('Inventory reserved');
await processPayment(orderId);
span.addEvent('Payment processed');
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
throw error;
} finally {
span.end();
}
}
日志聚合
// 结构化日志
import pino from 'pino';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
formatters: {
level: (label) => ({ level: label }),
},
base: {
service: 'order-service',
version: process.env.APP_VERSION,
environment: process.env.NODE_ENV,
},
});
// 请求日志中间件
const requestLogger = (req, res, next) => {
const startTime = Date.now();
res.on('finish', () => {
logger.info({
method: req.method,
path: req.path,
statusCode: res.statusCode,
duration: Date.now() - startTime,
traceId: req.headers['x-trace-id'],
userId: req.user?.id,
});
});
next();
};
// 业务日志
logger.info({ orderId, customerId }, 'Order created');
logger.error({ orderId, error: err.message }, 'Order processing failed');
健康检查
// 健康检查端点
import express from 'express';
const app = express();
// 存活检查
app.get('/health/live', (req, res) => {
res.json({ status: 'ok' });
});
// 就绪检查
app.get('/health/ready', async (req, res) => {
const checks = await Promise.allSettled([
checkDatabase(),
checkMessageQueue(),
checkCache(),
]);
const results = {
database: checks[0].status === 'fulfilled' ? 'ok' : 'error',
messageQueue: checks[1].status === 'fulfilled' ? 'ok' : 'error',
cache: checks[2].status === 'fulfilled' ? 'ok' : 'error',
};
const healthy = Object.values(results).every(v => v === 'ok');
res.status(healthy ? 200 : 503).json({
status: healthy ? 'ok' : 'degraded',
checks: results,
});
});
async function checkDatabase(): Promise<void> {
await db.query('SELECT 1');
}
async function checkMessageQueue(): Promise<void> {
await kafka.admin().describeCluster();
}
async function checkCache(): Promise<void> {
await redis.ping();
}
安全设计
服务间认证
// mTLS 配置
import https from 'https';
import fs from 'fs';
const server = https.createServer({
key: fs.readFileSync('/certs/server.key'),
cert: fs.readFileSync('/certs/server.crt'),
ca: fs.readFileSync('/certs/ca.crt'),
requestCert: true,
rejectUnauthorized: true,
}, app);
// JWT 服务间认证
class ServiceAuthMiddleware {
verify(req, res, next) {
const token = req.headers['x-service-token'];
try {
const decoded = jwt.verify(token, process.env.SERVICE_SECRET);
if (!decoded.service) {
throw new Error('Invalid service token');
}
req.callingService = decoded.service;
next();
} catch {
res.status(401).json({ error: 'Unauthorized service' });
}
}
}
最佳实践总结
微服务最佳实践:
┌─────────────────────────────────────────────────────┐
│ │
│ 设计原则 │
│ ├── 按业务能力拆分服务 │
│ ├── 保持服务自治 │
│ ├── 使用 API 优先设计 │
│ └── 实践领域驱动设计 │
│ │
│ 通信模式 │
│ ├── 优先异步通信 │
│ ├── 使用事件驱动架构 │
│ ├── 实现幂等性 │
│ └── 设计容错机制 │
│ │
│ 数据管理 │
│ ├── 数据库每服务 │
│ ├── 最终一致性 │
│ ├── 使用 Saga 处理分布式事务 │
│ └── 考虑 CQRS 和事件溯源 │
│ │
│ 运维实践 │
│ ├── 完善可观测性 │
│ ├── 自动化 CI/CD │
│ ├── 容器化部署 │
│ └── 实施混沌工程 │
│ │
└─────────────────────────────────────────────────────┘
| 模式 | 使用场景 |
|---|---|
| API 网关 | 统一入口、认证、限流 |
| 服务发现 | 动态服务注册与发现 |
| 断路器 | 故障隔离、优雅降级 |
| Saga | 分布式事务协调 |
| CQRS | 读写分离、性能优化 |
| 事件溯源 | 审计追踪、状态重建 |
微服务架构带来灵活性的同时也引入复杂性。从模块化单体开始,在真正需要时再拆分为微服务。
微服务不是目标,而是手段。关键是解决业务问题,而非追求技术复杂性。