微服务架构设计与实践:从单体到分布式

掌握微服务拆分策略、服务通信、数据一致性和运维治理

微服务架构设计与实践:从单体到分布式

微服务架构已成为构建大型分布式系统的主流选择。本文深入探讨微服务的设计原则和实践经验。

单体 vs 微服务

架构对比

单体 vs 微服务:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   单体架构                                          │
│   ┌─────────────────────────────────┐              │
│   │           单一应用              │              │
│   │  ┌─────┐ ┌─────┐ ┌─────┐       │              │
│   │  │用户 │ │订单 │ │支付 │       │              │
│   │  └─────┘ └─────┘ └─────┘       │              │
│   │         共享数据库              │              │
│   └─────────────────────────────────┘              │
│                                                     │
│   微服务架构                                        │
│   ┌──────┐  ┌──────┐  ┌──────┐                    │
│   │用户  │  │订单  │  │支付  │                    │
│   │服务  │  │服务  │  │服务  │                    │
│   └──┬───┘  └──┬───┘  └──┬───┘                    │
│      │         │         │                         │
│   ┌──┴───┐  ┌──┴───┐  ┌──┴───┐                    │
│   │ DB1  │  │ DB2  │  │ DB3  │                    │
│   └──────┘  └──────┘  └──────┘                    │
│                                                     │
└─────────────────────────────────────────────────────┘
特性单体微服务
开发复杂度
部署整体部署独立部署
扩展整体扩展按需扩展
技术栈统一可异构
团队协作紧耦合松耦合

服务拆分策略

领域驱动设计 (DDD)

领域驱动拆分:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   电商系统限界上下文                                │
│                                                     │
│   ┌─────────────┐     ┌─────────────┐              │
│   │  用户上下文  │     │  商品上下文  │              │
│   │  ├─用户     │     │  ├─商品     │              │
│   │  ├─认证     │     │  ├─分类     │              │
│   │  └─权限     │     │  └─库存     │              │
│   └─────────────┘     └─────────────┘              │
│                                                     │
│   ┌─────────────┐     ┌─────────────┐              │
│   │  订单上下文  │     │  支付上下文  │              │
│   │  ├─购物车   │     │  ├─支付     │              │
│   │  ├─订单     │     │  ├─退款     │              │
│   │  └─物流     │     │  └─账单     │              │
│   └─────────────┘     └─────────────┘              │
│                                                     │
└─────────────────────────────────────────────────────┘

拆分原则

// 服务边界定义
interface ServiceBoundary {
  // 单一职责:每个服务只做一件事
  responsibility: string;

  // 高内聚:相关功能在同一服务
  cohesion: 'high' | 'medium' | 'low';

  // 低耦合:服务间依赖最小化
  coupling: 'loose' | 'tight';

  // 数据所有权:每个服务拥有自己的数据
  dataOwnership: boolean;

  // 团队边界:匹配组织结构
  teamOwnership: string;
}

// 拆分决策
const splitDecision = {
  // 按业务能力拆分
  byBusinessCapability: [
    'user-service',      // 用户管理
    'product-service',   // 商品管理
    'order-service',     // 订单处理
    'payment-service',   // 支付处理
    'notification-service', // 通知服务
  ],

  // 按子域拆分
  bySubdomain: {
    core: ['order', 'payment'],      // 核心子域
    supporting: ['user', 'product'], // 支撑子域
    generic: ['notification', 'auth'], // 通用子域
  },
};

服务通信

同步通信 (REST/gRPC)

// REST API 网关
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';

const app = express();

// 路由到不同服务
app.use('/api/users', createProxyMiddleware({
  target: 'http://user-service:3001',
  pathRewrite: { '^/api/users': '' },
}));

app.use('/api/orders', createProxyMiddleware({
  target: 'http://order-service:3002',
  pathRewrite: { '^/api/orders': '' },
}));

app.use('/api/products', createProxyMiddleware({
  target: 'http://product-service:3003',
  pathRewrite: { '^/api/products': '' },
}));

// gRPC 服务定义
// order.proto
syntax = "proto3";

service OrderService {
  rpc CreateOrder(CreateOrderRequest) returns (Order);
  rpc GetOrder(GetOrderRequest) returns (Order);
  rpc ListOrders(ListOrdersRequest) returns (stream Order);
}

message Order {
  string id = 1;
  string user_id = 2;
  repeated OrderItem items = 3;
  OrderStatus status = 4;
  double total_amount = 5;
}

异步通信 (消息队列)

// 事件发布
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka:9092'],
});

const producer = kafka.producer();

async function publishOrderCreated(order: Order) {
  await producer.send({
    topic: 'order.created',
    messages: [{
      key: order.id,
      value: JSON.stringify({
        eventType: 'ORDER_CREATED',
        timestamp: new Date().toISOString(),
        data: order,
      }),
    }],
  });
}

// 事件消费
const consumer = kafka.consumer({ groupId: 'payment-service' });

async function startConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'order.created' });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());

      if (event.eventType === 'ORDER_CREATED') {
        await processPayment(event.data);
      }
    },
  });
}

服务发现

// Consul 服务注册
import Consul from 'consul';

const consul = new Consul({ host: 'consul', port: 8500 });

// 注册服务
async function registerService() {
  await consul.agent.service.register({
    id: `order-service-${process.env.HOSTNAME}`,
    name: 'order-service',
    address: process.env.HOST,
    port: parseInt(process.env.PORT),
    check: {
      http: `http://${process.env.HOST}:${process.env.PORT}/health`,
      interval: '10s',
      timeout: '5s',
    },
  });
}

// 发现服务
async function discoverService(serviceName: string) {
  const result = await consul.health.service({
    service: serviceName,
    passing: true,
  });

  const instances = result.map(entry => ({
    address: entry.Service.Address,
    port: entry.Service.Port,
  }));

  // 负载均衡选择实例
  return instances[Math.floor(Math.random() * instances.length)];
}

数据一致性

Saga 模式

// Saga 协调器
class OrderSaga {
  private steps: SagaStep[] = [];

  constructor() {
    this.steps = [
      {
        name: 'reserveInventory',
        execute: this.reserveInventory,
        compensate: this.releaseInventory,
      },
      {
        name: 'processPayment',
        execute: this.processPayment,
        compensate: this.refundPayment,
      },
      {
        name: 'createShipment',
        execute: this.createShipment,
        compensate: this.cancelShipment,
      },
    ];
  }

  async execute(order: Order): Promise<SagaResult> {
    const executedSteps: SagaStep[] = [];

    try {
      for (const step of this.steps) {
        console.log(`Executing step: ${step.name}`);
        await step.execute(order);
        executedSteps.push(step);
      }

      return { success: true };
    } catch (error) {
      // 补偿已执行的步骤
      console.log('Saga failed, compensating...');
      for (const step of executedSteps.reverse()) {
        try {
          await step.compensate(order);
        } catch (compensateError) {
          console.error(`Compensation failed for ${step.name}`);
        }
      }

      return { success: false, error: error.message };
    }
  }

  private async reserveInventory(order: Order): Promise<void> {
    const response = await fetch('http://inventory-service/reserve', {
      method: 'POST',
      body: JSON.stringify({ orderId: order.id, items: order.items }),
    });
    if (!response.ok) throw new Error('Inventory reservation failed');
  }

  private async releaseInventory(order: Order): Promise<void> {
    await fetch('http://inventory-service/release', {
      method: 'POST',
      body: JSON.stringify({ orderId: order.id }),
    });
  }

  private async processPayment(order: Order): Promise<void> {
    const response = await fetch('http://payment-service/charge', {
      method: 'POST',
      body: JSON.stringify({ orderId: order.id, amount: order.total }),
    });
    if (!response.ok) throw new Error('Payment processing failed');
  }

  private async refundPayment(order: Order): Promise<void> {
    await fetch('http://payment-service/refund', {
      method: 'POST',
      body: JSON.stringify({ orderId: order.id }),
    });
  }
}

事件溯源

// 事件存储
interface DomainEvent {
  eventId: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  data: any;
  version: number;
  timestamp: Date;
}

class EventStore {
  async append(event: DomainEvent): Promise<void> {
    await this.db.insert('events', event);
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    return this.db.query(
      'SELECT * FROM events WHERE aggregate_id = ? ORDER BY version',
      [aggregateId]
    );
  }
}

// 聚合重建
class OrderAggregate {
  private state: Order;
  private version: number = 0;

  static async load(eventStore: EventStore, orderId: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate();
    const events = await eventStore.getEvents(orderId);

    for (const event of events) {
      aggregate.apply(event);
    }

    return aggregate;
  }

  private apply(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.state = {
          id: event.aggregateId,
          status: 'PENDING',
          items: event.data.items,
        };
        break;
      case 'OrderPaid':
        this.state.status = 'PAID';
        break;
      case 'OrderShipped':
        this.state.status = 'SHIPPED';
        this.state.trackingNumber = event.data.trackingNumber;
        break;
    }
    this.version = event.version;
  }
}

服务治理

熔断器

import CircuitBreaker from 'opossum';

const options = {
  timeout: 3000,           // 超时时间
  errorThresholdPercentage: 50,  // 错误率阈值
  resetTimeout: 30000,     // 重置时间
};

const breaker = new CircuitBreaker(callExternalService, options);

// 监听事件
breaker.on('success', (result) => {
  console.log('Call succeeded');
});

breaker.on('timeout', () => {
  console.log('Call timed out');
});

breaker.on('reject', () => {
  console.log('Call rejected (circuit open)');
});

breaker.on('open', () => {
  console.log('Circuit breaker opened');
});

breaker.on('halfOpen', () => {
  console.log('Circuit breaker half-open');
});

breaker.on('close', () => {
  console.log('Circuit breaker closed');
});

// 降级处理
breaker.fallback(() => {
  return { cached: true, data: getCachedData() };
});

// 使用熔断器
async function getProductDetails(productId: string) {
  return breaker.fire(productId);
}

限流

import rateLimit from 'express-rate-limit';
import RedisStore from 'rate-limit-redis';
import { createClient } from 'redis';

const redisClient = createClient({ url: 'redis://redis:6379' });

// 全局限流
const globalLimiter = rateLimit({
  windowMs: 60 * 1000, // 1 分钟
  max: 100, // 最大请求数
  store: new RedisStore({
    sendCommand: (...args: string[]) => redisClient.sendCommand(args),
  }),
  message: { error: 'Too many requests' },
});

// API 级别限流
const apiLimiter = rateLimit({
  windowMs: 60 * 1000,
  max: 10,
  keyGenerator: (req) => `${req.user?.id}:${req.path}`,
  message: { error: 'API rate limit exceeded' },
});

app.use(globalLimiter);
app.use('/api/orders', apiLimiter);

链路追踪

import { trace, context, SpanKind } from '@opentelemetry/api';

const tracer = trace.getTracer('order-service');

// HTTP 请求追踪
async function callService(url: string, data: any) {
  return tracer.startActiveSpan('http-call', {
    kind: SpanKind.CLIENT,
    attributes: {
      'http.url': url,
      'http.method': 'POST',
    },
  }, async (span) => {
    try {
      // 注入追踪上下文到请求头
      const headers = {};
      propagation.inject(context.active(), headers);

      const response = await fetch(url, {
        method: 'POST',
        headers: {
          ...headers,
          'Content-Type': 'application/json',
        },
        body: JSON.stringify(data),
      });

      span.setAttribute('http.status_code', response.status);
      return response.json();
    } catch (error) {
      span.recordException(error);
      throw error;
    } finally {
      span.end();
    }
  });
}

最佳实践总结

微服务最佳实践:
┌─────────────────────────────────────────────────────┐
│                                                     │
│   设计原则                                          │
│   ├── 单一职责原则                                  │
│   ├── 服务自治                                      │
│   ├── 去中心化数据管理                              │
│   └── 容错设计                                      │
│                                                     │
│   通信策略                                          │
│   ├── 同步用于查询                                  │
│   ├── 异步用于命令                                  │
│   ├── 事件驱动解耦                                  │
│   └── API 版本控制                                  │
│                                                     │
│   运维治理                                          │
│   ├── 集中日志和监控                                │
│   ├── 链路追踪                                      │
│   ├── 熔断和限流                                    │
│   └── 灰度发布                                      │
│                                                     │
│   团队协作                                          │
│   ├── 服务所有权明确                                │
│   ├── API 契约优先                                  │
│   ├── 文档即代码                                    │
│   └── 自动化测试                                    │
│                                                     │
└─────────────────────────────────────────────────────┘
场景推荐方案
服务通信gRPC (内部) + REST (外部)
事件总线Kafka / RabbitMQ
服务发现Consul / Kubernetes
配置中心Consul / etcd
API 网关Kong / Envoy

微服务不是银弹,但对于复杂系统是有效的架构选择。理解其复杂性,权衡利弊,做出合适的决策。


分而治之是软件复杂性的解药,微服务将这个理念推向极致。