微服务架构设计:从单体到分布式系统

掌握微服务架构核心模式、服务拆分策略和分布式系统最佳实践

微服务架构设计:从单体到分布式系统

微服务架构已成为构建大规模分布式系统的主流方案。本文将深入探讨微服务的核心概念、设计模式和实践经验。

架构演进

从单体到微服务

架构演进路径:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   单体架构 (Monolith)                               │
│   ├── 所有功能在一个应用                             │
│   ├── 简单部署                                      │
│   └── 随规模增长难以维护                             │
│              │                                      │
│              ▼                                      │
│   模块化单体 (Modular Monolith)                     │
│   ├── 内部模块化                                    │
│   ├── 清晰边界                                      │
│   └── 单一部署单元                                  │
│              │                                      │
│              ▼                                      │
│   微服务架构 (Microservices)                        │
│   ├── 独立部署服务                                  │
│   ├── 技术多样性                                    │
│   └── 分布式复杂性                                  │
│                                                     │
└─────────────────────────────────────────────────────┘

何时采用微服务

场景单体微服务
团队规模< 10人> 10人
代码库大小< 100K LOC> 100K LOC
部署频率周/月日/小时
扩展需求整体扩展按需扩展
技术栈统一多样化

核心设计原则

单一职责

服务职责划分:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   ✅ 良好划分                                        │
│   ┌─────────┐  ┌─────────┐  ┌─────────┐            │
│   │ 用户服务 │  │ 订单服务 │  │ 支付服务 │            │
│   │         │  │         │  │         │            │
│   │ • 注册   │  │ • 创建   │  │ • 处理   │            │
│   │ • 认证   │  │ • 查询   │  │ • 退款   │            │
│   │ • 资料   │  │ • 状态   │  │ • 对账   │            │
│   └─────────┘  └─────────┘  └─────────┘            │
│                                                     │
│   ❌ 职责混乱                                        │
│   ┌─────────────────────────────────────────┐      │
│   │            业务服务                       │      │
│   │  用户 + 订单 + 支付 + 库存 + 通知...      │      │
│   └─────────────────────────────────────────┘      │
│                                                     │
└─────────────────────────────────────────────────────┘

领域驱动设计 (DDD)

// 聚合根定义
interface Order {
  id: string;
  customerId: string;
  items: OrderItem[];
  status: OrderStatus;
  totalAmount: Money;
  createdAt: Date;

  // 领域行为
  addItem(product: Product, quantity: number): void;
  removeItem(itemId: string): void;
  confirm(): void;
  cancel(): void;
}

// 值对象
interface Money {
  amount: number;
  currency: string;
}

// 领域事件
interface OrderCreatedEvent {
  orderId: string;
  customerId: string;
  items: OrderItem[];
  totalAmount: Money;
  occurredAt: Date;
}

// 仓储接口
interface OrderRepository {
  save(order: Order): Promise<void>;
  findById(id: string): Promise<Order | null>;
  findByCustomerId(customerId: string): Promise<Order[]>;
}

限界上下文

电商系统限界上下文:
┌─────────────────────────────────────────────────────┐
│                                                     │
│  ┌──────────────────┐   ┌──────────────────┐       │
│  │   销售上下文      │   │   库存上下文      │       │
│  │                  │   │                  │       │
│  │  Product         │   │  StockItem       │       │
│  │  • name          │   │  • sku           │       │
│  │  • price         │   │  • quantity      │       │
│  │  • description   │   │  • location      │       │
│  │                  │   │                  │       │
│  └──────────────────┘   └──────────────────┘       │
│           │                      │                  │
│           └──────────┬───────────┘                  │
│                      │                              │
│            ┌─────────▼─────────┐                   │
│            │   防腐层 (ACL)     │                   │
│            │                   │                   │
│            │ ProductId ↔ SKU   │                   │
│            └───────────────────┘                   │
│                                                     │
└─────────────────────────────────────────────────────┘

服务间通信

同步通信

// REST API
// user-service/src/controllers/user.controller.ts
import express from 'express';

const router = express.Router();

router.get('/users/:id', async (req, res) => {
  const user = await userService.findById(req.params.id);
  if (!user) {
    return res.status(404).json({ error: 'User not found' });
  }
  res.json(user);
});

router.post('/users', async (req, res) => {
  const user = await userService.create(req.body);
  res.status(201).json(user);
});

// gRPC 服务定义
// proto/user.proto
syntax = "proto3";

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
}

message User {
  string id = 1;
  string name = 2;
  string email = 3;
  string created_at = 4;
}

message GetUserRequest {
  string id = 1;
}

异步通信

// 事件发布者
// order-service/src/events/publisher.ts
import { Kafka, Producer } from 'kafkajs';

class EventPublisher {
  private producer: Producer;

  constructor(kafka: Kafka) {
    this.producer = kafka.producer();
  }

  async publish(topic: string, event: DomainEvent): Promise<void> {
    await this.producer.send({
      topic,
      messages: [
        {
          key: event.aggregateId,
          value: JSON.stringify(event),
          headers: {
            eventType: event.type,
            timestamp: event.occurredAt.toISOString(),
          },
        },
      ],
    });
  }
}

// 订单创建后发布事件
async function createOrder(data: CreateOrderDto): Promise<Order> {
  const order = Order.create(data);
  await orderRepository.save(order);

  await eventPublisher.publish('order-events', {
    type: 'OrderCreated',
    aggregateId: order.id,
    payload: {
      orderId: order.id,
      customerId: order.customerId,
      items: order.items,
      totalAmount: order.totalAmount,
    },
    occurredAt: new Date(),
  });

  return order;
}

// 事件消费者
// inventory-service/src/events/consumer.ts
import { Kafka, Consumer } from 'kafkajs';

class OrderEventConsumer {
  private consumer: Consumer;

  async start(): Promise<void> {
    await this.consumer.subscribe({ topic: 'order-events' });

    await this.consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value!.toString());

        switch (event.type) {
          case 'OrderCreated':
            await this.handleOrderCreated(event.payload);
            break;
          case 'OrderCancelled':
            await this.handleOrderCancelled(event.payload);
            break;
        }
      },
    });
  }

  private async handleOrderCreated(payload: OrderCreatedPayload): Promise<void> {
    // 扣减库存
    for (const item of payload.items) {
      await inventoryService.reserve(item.productId, item.quantity);
    }
  }
}

分布式模式

API 网关

// gateway/src/index.ts
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';
import rateLimit from 'express-rate-limit';
import jwt from 'jsonwebtoken';

const app = express();

// 速率限制
const limiter = rateLimit({
  windowMs: 60 * 1000,
  max: 100,
  message: { error: 'Too many requests' },
});

app.use(limiter);

// JWT 认证中间件
const authMiddleware = (req, res, next) => {
  const token = req.headers.authorization?.split(' ')[1];
  if (!token) {
    return res.status(401).json({ error: 'Unauthorized' });
  }

  try {
    const decoded = jwt.verify(token, process.env.JWT_SECRET!);
    req.user = decoded;
    next();
  } catch {
    res.status(401).json({ error: 'Invalid token' });
  }
};

// 服务路由
app.use('/api/users', authMiddleware, createProxyMiddleware({
  target: process.env.USER_SERVICE_URL,
  changeOrigin: true,
  pathRewrite: { '^/api/users': '' },
}));

app.use('/api/orders', authMiddleware, createProxyMiddleware({
  target: process.env.ORDER_SERVICE_URL,
  changeOrigin: true,
  pathRewrite: { '^/api/orders': '' },
}));

app.use('/api/products', createProxyMiddleware({
  target: process.env.PRODUCT_SERVICE_URL,
  changeOrigin: true,
  pathRewrite: { '^/api/products': '' },
}));

服务发现

# Kubernetes Service Discovery
apiVersion: v1
kind: Service
metadata:
  name: user-service
  namespace: production
spec:
  selector:
    app: user-service
  ports:
    - port: 80
      targetPort: 8080

---
# 应用配置使用 DNS 名称
# user-service.production.svc.cluster.local
// Consul 服务注册
import Consul from 'consul';

const consul = new Consul({ host: 'consul.local' });

async function registerService(): Promise<void> {
  await consul.agent.service.register({
    name: 'user-service',
    id: `user-service-${process.env.HOSTNAME}`,
    address: process.env.POD_IP,
    port: 8080,
    check: {
      http: `http://${process.env.POD_IP}:8080/health`,
      interval: '10s',
    },
  });
}

async function discoverService(name: string): Promise<string[]> {
  const services = await consul.catalog.service.nodes(name);
  return services.map(s => `http://${s.ServiceAddress}:${s.ServicePort}`);
}

断路器模式

// circuit-breaker.ts
import CircuitBreaker from 'opossum';

interface CircuitBreakerOptions {
  timeout: number;
  errorThresholdPercentage: number;
  resetTimeout: number;
}

function createCircuitBreaker<T>(
  fn: (...args: any[]) => Promise<T>,
  options: CircuitBreakerOptions
): CircuitBreaker {
  const breaker = new CircuitBreaker(fn, {
    timeout: options.timeout,
    errorThresholdPercentage: options.errorThresholdPercentage,
    resetTimeout: options.resetTimeout,
  });

  breaker.on('open', () => {
    console.log('Circuit breaker opened');
  });

  breaker.on('halfOpen', () => {
    console.log('Circuit breaker half-open');
  });

  breaker.on('close', () => {
    console.log('Circuit breaker closed');
  });

  breaker.fallback(() => {
    return { error: 'Service temporarily unavailable' };
  });

  return breaker;
}

// 使用示例
const userServiceBreaker = createCircuitBreaker(
  (userId: string) => fetch(`http://user-service/users/${userId}`).then(r => r.json()),
  {
    timeout: 3000,
    errorThresholdPercentage: 50,
    resetTimeout: 30000,
  }
);

async function getUser(userId: string) {
  return userServiceBreaker.fire(userId);
}

Saga 模式

// saga/order-saga.ts
interface SagaStep<T> {
  name: string;
  execute: (context: T) => Promise<void>;
  compensate: (context: T) => Promise<void>;
}

class OrderSaga {
  private steps: SagaStep<OrderContext>[] = [
    {
      name: 'reserveInventory',
      execute: async (ctx) => {
        ctx.inventoryReservationId = await inventoryService.reserve(
          ctx.items
        );
      },
      compensate: async (ctx) => {
        if (ctx.inventoryReservationId) {
          await inventoryService.cancelReservation(ctx.inventoryReservationId);
        }
      },
    },
    {
      name: 'processPayment',
      execute: async (ctx) => {
        ctx.paymentId = await paymentService.process({
          orderId: ctx.orderId,
          amount: ctx.totalAmount,
          customerId: ctx.customerId,
        });
      },
      compensate: async (ctx) => {
        if (ctx.paymentId) {
          await paymentService.refund(ctx.paymentId);
        }
      },
    },
    {
      name: 'createShipment',
      execute: async (ctx) => {
        ctx.shipmentId = await shippingService.createShipment({
          orderId: ctx.orderId,
          items: ctx.items,
          address: ctx.shippingAddress,
        });
      },
      compensate: async (ctx) => {
        if (ctx.shipmentId) {
          await shippingService.cancelShipment(ctx.shipmentId);
        }
      },
    },
  ];

  async execute(context: OrderContext): Promise<void> {
    const executedSteps: SagaStep<OrderContext>[] = [];

    try {
      for (const step of this.steps) {
        console.log(`Executing step: ${step.name}`);
        await step.execute(context);
        executedSteps.push(step);
      }
    } catch (error) {
      console.error('Saga failed, compensating...');

      // 逆序补偿
      for (const step of executedSteps.reverse()) {
        try {
          console.log(`Compensating step: ${step.name}`);
          await step.compensate(context);
        } catch (compensateError) {
          console.error(`Compensation failed for ${step.name}`, compensateError);
          // 记录失败,人工干预
        }
      }

      throw error;
    }
  }
}

数据管理

数据库每服务

数据库隔离策略:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐         │
│   │用户服务   │  │订单服务   │  │库存服务   │         │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘         │
│        │             │             │                │
│   ┌────▼─────┐  ┌────▼─────┐  ┌────▼─────┐         │
│   │PostgreSQL│  │ MongoDB  │  │  Redis   │         │
│   │  Users   │  │  Orders  │  │ Inventory│         │
│   └──────────┘  └──────────┘  └──────────┘         │
│                                                     │
│   优势:                                            │
│   • 技术多样性                                      │
│   • 独立扩展                                        │
│   • 故障隔离                                        │
│                                                     │
│   挑战:                                            │
│   • 跨服务查询                                      │
│   • 数据一致性                                      │
│   • 分布式事务                                      │
│                                                     │
└─────────────────────────────────────────────────────┘

CQRS 模式

// 命令端 - 写模型
interface CreateOrderCommand {
  customerId: string;
  items: { productId: string; quantity: number }[];
  shippingAddress: Address;
}

class OrderCommandHandler {
  async handle(command: CreateOrderCommand): Promise<string> {
    const order = Order.create(command);
    await this.orderRepository.save(order);

    // 发布事件用于更新读模型
    await this.eventBus.publish(new OrderCreatedEvent(order));

    return order.id;
  }
}

// 查询端 - 读模型
interface OrderReadModel {
  id: string;
  customerName: string;
  customerEmail: string;
  items: {
    productName: string;
    quantity: number;
    price: number;
  }[];
  totalAmount: number;
  status: string;
  createdAt: Date;
}

class OrderQueryHandler {
  // 使用优化的读模型数据库 (如 Elasticsearch)
  private readStore: ReadModelStore;

  async getOrderDetails(orderId: string): Promise<OrderReadModel> {
    return this.readStore.findById(orderId);
  }

  async getOrdersByCustomer(customerId: string): Promise<OrderReadModel[]> {
    return this.readStore.findByCustomerId(customerId);
  }
}

// 投影器 - 更新读模型
class OrderProjection {
  @EventHandler(OrderCreatedEvent)
  async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
    const customer = await this.customerService.getById(event.customerId);
    const products = await this.productService.getByIds(
      event.items.map(i => i.productId)
    );

    const readModel: OrderReadModel = {
      id: event.orderId,
      customerName: customer.name,
      customerEmail: customer.email,
      items: event.items.map(item => ({
        productName: products.find(p => p.id === item.productId)!.name,
        quantity: item.quantity,
        price: item.price,
      })),
      totalAmount: event.totalAmount,
      status: 'created',
      createdAt: event.occurredAt,
    };

    await this.readStore.save(readModel);
  }
}

事件溯源

// 事件存储
interface Event {
  id: string;
  aggregateId: string;
  aggregateType: string;
  type: string;
  payload: Record<string, any>;
  version: number;
  occurredAt: Date;
}

class EventStore {
  async append(event: Event): Promise<void> {
    await this.db.collection('events').insertOne(event);
  }

  async getEvents(aggregateId: string): Promise<Event[]> {
    return this.db
      .collection('events')
      .find({ aggregateId })
      .sort({ version: 1 })
      .toArray();
  }
}

// 聚合重建
class Order {
  private id: string;
  private status: OrderStatus;
  private items: OrderItem[] = [];

  static async fromEvents(events: Event[]): Promise<Order> {
    const order = new Order();

    for (const event of events) {
      order.apply(event);
    }

    return order;
  }

  private apply(event: Event): void {
    switch (event.type) {
      case 'OrderCreated':
        this.id = event.payload.orderId;
        this.status = 'created';
        break;
      case 'ItemAdded':
        this.items.push(event.payload.item);
        break;
      case 'OrderConfirmed':
        this.status = 'confirmed';
        break;
      case 'OrderShipped':
        this.status = 'shipped';
        break;
    }
  }
}

可观测性

分布式追踪

// OpenTelemetry 配置
import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';

const sdk = new NodeSDK({
  traceExporter: new OTLPTraceExporter({
    url: 'http://jaeger:4318/v1/traces',
  }),
  instrumentations: [
    new HttpInstrumentation(),
    new ExpressInstrumentation(),
  ],
});

sdk.start();

// 手动追踪
import { trace, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('order-service');

async function processOrder(orderId: string): Promise<void> {
  const span = tracer.startSpan('processOrder');
  span.setAttribute('orderId', orderId);

  try {
    await validateOrder(orderId);
    span.addEvent('Order validated');

    await reserveInventory(orderId);
    span.addEvent('Inventory reserved');

    await processPayment(orderId);
    span.addEvent('Payment processed');

    span.setStatus({ code: SpanStatusCode.OK });
  } catch (error) {
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: error.message,
    });
    throw error;
  } finally {
    span.end();
  }
}

日志聚合

// 结构化日志
import pino from 'pino';

const logger = pino({
  level: process.env.LOG_LEVEL || 'info',
  formatters: {
    level: (label) => ({ level: label }),
  },
  base: {
    service: 'order-service',
    version: process.env.APP_VERSION,
    environment: process.env.NODE_ENV,
  },
});

// 请求日志中间件
const requestLogger = (req, res, next) => {
  const startTime = Date.now();

  res.on('finish', () => {
    logger.info({
      method: req.method,
      path: req.path,
      statusCode: res.statusCode,
      duration: Date.now() - startTime,
      traceId: req.headers['x-trace-id'],
      userId: req.user?.id,
    });
  });

  next();
};

// 业务日志
logger.info({ orderId, customerId }, 'Order created');
logger.error({ orderId, error: err.message }, 'Order processing failed');

健康检查

// 健康检查端点
import express from 'express';

const app = express();

// 存活检查
app.get('/health/live', (req, res) => {
  res.json({ status: 'ok' });
});

// 就绪检查
app.get('/health/ready', async (req, res) => {
  const checks = await Promise.allSettled([
    checkDatabase(),
    checkMessageQueue(),
    checkCache(),
  ]);

  const results = {
    database: checks[0].status === 'fulfilled' ? 'ok' : 'error',
    messageQueue: checks[1].status === 'fulfilled' ? 'ok' : 'error',
    cache: checks[2].status === 'fulfilled' ? 'ok' : 'error',
  };

  const healthy = Object.values(results).every(v => v === 'ok');

  res.status(healthy ? 200 : 503).json({
    status: healthy ? 'ok' : 'degraded',
    checks: results,
  });
});

async function checkDatabase(): Promise<void> {
  await db.query('SELECT 1');
}

async function checkMessageQueue(): Promise<void> {
  await kafka.admin().describeCluster();
}

async function checkCache(): Promise<void> {
  await redis.ping();
}

安全设计

服务间认证

// mTLS 配置
import https from 'https';
import fs from 'fs';

const server = https.createServer({
  key: fs.readFileSync('/certs/server.key'),
  cert: fs.readFileSync('/certs/server.crt'),
  ca: fs.readFileSync('/certs/ca.crt'),
  requestCert: true,
  rejectUnauthorized: true,
}, app);

// JWT 服务间认证
class ServiceAuthMiddleware {
  verify(req, res, next) {
    const token = req.headers['x-service-token'];

    try {
      const decoded = jwt.verify(token, process.env.SERVICE_SECRET);
      if (!decoded.service) {
        throw new Error('Invalid service token');
      }
      req.callingService = decoded.service;
      next();
    } catch {
      res.status(401).json({ error: 'Unauthorized service' });
    }
  }
}

最佳实践总结

微服务最佳实践:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   设计原则                                          │
│   ├── 按业务能力拆分服务                            │
│   ├── 保持服务自治                                  │
│   ├── 使用 API 优先设计                             │
│   └── 实践领域驱动设计                              │
│                                                     │
│   通信模式                                          │
│   ├── 优先异步通信                                  │
│   ├── 使用事件驱动架构                              │
│   ├── 实现幂等性                                    │
│   └── 设计容错机制                                  │
│                                                     │
│   数据管理                                          │
│   ├── 数据库每服务                                  │
│   ├── 最终一致性                                    │
│   ├── 使用 Saga 处理分布式事务                      │
│   └── 考虑 CQRS 和事件溯源                          │
│                                                     │
│   运维实践                                          │
│   ├── 完善可观测性                                  │
│   ├── 自动化 CI/CD                                  │
│   ├── 容器化部署                                    │
│   └── 实施混沌工程                                  │
│                                                     │
└─────────────────────────────────────────────────────┘
模式使用场景
API 网关统一入口、认证、限流
服务发现动态服务注册与发现
断路器故障隔离、优雅降级
Saga分布式事务协调
CQRS读写分离、性能优化
事件溯源审计追踪、状态重建

微服务架构带来灵活性的同时也引入复杂性。从模块化单体开始,在真正需要时再拆分为微服务。


微服务不是目标,而是手段。关键是解决业务问题,而非追求技术复杂性。