为 Docker Swarm 集群构建基于 OpenSearch 和 GraphQL 的分布式追踪前端


在 Docker Swarm 上运行了十几个微服务后,排查一个跨服务请求的问题变成了噩梦。日志散落在不同容器的标准输出里,通过 docker service logs <service_name> 逐个翻阅,没有统一的 traceId,根本无法将一次用户操作的完整链路串联起来。当问题出现在服务A调用服务B,而服务B又依赖服务C时,定位根因的时间成本高得无法接受。

市面上的 APM 方案,如 Jaeger 或 Zipkin,功能强大但对于我们当前的规模来说过于笨重,需要额外的 Agent 和复杂的配置。我们真正需要的,只是一个轻量级的内部工具:输入一个请求 ID,就能看到它流经所有服务的日志,并能直观地看到时间线。

于是,我决定自己动手构建一个这样的系统。初步构想的技术栈是:

  • 日志与追踪存储: OpenSearch。它是 Elasticsearch 的一个开源分支,功能完全兼容,查询能力强大,非常适合非结构化日志的存储和聚合。
  • 后端服务与查询网关: Express.js + GraphQL。它不仅能作为接收日志的入口,还能提供一个灵活的 GraphQL 接口,供前端精确查询特定追踪链路的数据。
  • 部署环境: Docker Swarm。与我们现有的基础设施保持一致,利用其内置的服务发现和负载均衡能力。
  • 追踪可视化前端: React + Styled-components。快速构建一个功能专一的界面,Styled-components 能让我们把样式和逻辑紧密耦合,非常适合开发这种内部工具的小组件。

整个架构的核心思想是:在请求进入系统的第一道关卡(网关)时,注入一个全局唯一的 traceId。这个 traceId 将通过 HTTP 头在服务调用链中传递,并且每个服务产生的日志都必须附带这个 traceId。最终,所有日志被发送到 OpenSearch 建立索引。前端应用通过 GraphQL 网关,使用 traceId 作为关键索引,拉取并可视化整个请求链路。

sequenceDiagram
    participant User
    participant Gateway (Express.js + GraphQL)
    participant ServiceA
    participant ServiceB
    participant OpenSearch
    participant TraceUI (React + Styled-components)

    User->>+Gateway: 发起 API 请求
    Gateway->>Gateway: 生成 traceId
    Gateway->>+ServiceA: 转发请求 (Header: x-trace-id)
    ServiceA->>ServiceA: 处理逻辑, 生成日志
    ServiceA-->>OpenSearch: 发送日志 (含 traceId)
    ServiceA->>+ServiceB: 内部调用 (Header: x-trace-id)
    ServiceB->>ServiceB: 处理逻辑, 生成日志
    ServiceB-->>OpenSearch: 发送日志 (含 traceId)
    ServiceB-->>-ServiceA: 返回结果
    ServiceA-->>-Gateway: 返回结果
    Gateway-->>-User: 返回最终结果

    Note right of User: 一段时间后...

    User->>TraceUI: 打开追踪前端
    TraceUI->>+Gateway: GraphQL 查询: trace(id: "...")
    Gateway->>+OpenSearch: 根据 traceId 查询日志
    OpenSearch-->>-Gateway: 返回日志文档
    Gateway-->>-TraceUI: 返回格式化的日志列表
    TraceUI->>TraceUI: 使用 Styled-components 渲染时间线

第一步:改造后端服务,实现上下文传递与日志发送

这是整个系统的基石。我们需要一个机制来创建和传递 traceId。在 Node.js 中,由于其异步特性,请求级别的上下文传递是个常见的痛点。虽然 async_hooks 提供了底层能力,但直接使用过于复杂。在真实项目中,我们会用 cls-hooked 或 Node.js v14+ 内置的 AsyncLocalStorage 来简化这个过程。

这里我们用 express-http-context 这个库,它封装了 cls-hooked,API 更友好。

1. 网关与服务中间件 (gateway/src/middleware/tracing.js)

// gateway/src/middleware/tracing.js
const { v4: uuidv4 } = require('uuid');
const httpContext = require('express-http-context');

const TRACE_ID_HEADER = 'x-trace-id';

/**
 * Express middleware to establish a trace context for each request.
 * If an incoming request includes an 'x-trace-id' header, it uses that ID.
 * Otherwise, it generates a new UUID v4 as the trace ID.
 * This context is accessible throughout the async execution path of the request.
 */
function traceContextMiddleware(req, res, next) {
  // Use httpContext to manage request-scoped context.
  httpContext.middleware(req, res, () => {
    const traceId = req.get(TRACE_ID_HEADER) || uuidv4();
    httpContext.set('traceId', traceId);
    
    // Set traceId in the response header so clients can capture it.
    res.setHeader(TRACE_ID_HEADER, traceId);
    
    next();
  });
}

module.exports = {
  traceContextMiddleware,
  TRACE_ID_HEADER,
};

2. 配置统一的 Logger (gateway/src/utils/logger.js)

日志记录器必须被改造,使其自动从上下文中读取 traceId 并附加到每条日志消息中。我们使用 winston,并为其创建一个自定义格式化器。

// gateway/src/utils/logger.js
const winston = require('winston');
const httpContext = require('express-http-context');
const { OpenSearchTransport } = require('winston-opensearch-transport');

// A custom formatter to inject the traceId from the httpContext into the log message.
const traceFormat = winston.format.printf(({ level, message, timestamp, service, ...meta }) => {
  const traceId = httpContext.get('traceId');
  const logObject = {
    timestamp,
    level,
    message,
    service,
    traceId,
    ...meta,
  };
  // Stringify the object for OpenSearch to ingest.
  return JSON.stringify(logObject);
});

// In a real project, OpenSearch connection details would come from environment variables.
const opensearchNode = process.env.OPENSEARCH_NODE || 'http://opensearch-node1:9200';

const logger = winston.createLogger({
  level: 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    // Add a 'service' field to identify the log source.
    winston.format.json(),
    winston.format.simple(),
    winston.format.metadata({ fillExcept: ['message', 'level', 'timestamp'] })
  ),
  defaultMeta: { service: 'gateway-service' }, // Default service name for logs from this instance.
  transports: [
    // Console transport for local development and Docker logs.
    new winston.transports.Console({
      format: winston.format.combine(
        winston.format.colorize(),
        traceFormat
      ),
    }),
    // OpenSearch transport for production-level log aggregation.
    new OpenSearchTransport({
      level: 'info',
      index: `logs-my-app-${new Date().toISOString().substring(0, 10)}`, // Daily index
      clientOpts: { node: opensearchNode },
      format: traceFormat,
      transformer: (log) => {
        // The formatter already returns a stringified JSON. We parse it back
        // to an object for the transport to handle.
        return JSON.parse(log.message);
      }
    }),
  ],
  exitOnError: false, // Do not exit on handled exceptions.
});

// A simple error handler to ensure all caught exceptions are logged.
process.on('uncaughtException', (error) => {
  logger.error('Uncaught Exception:', { error: error.message, stack: error.stack });
  // In production, you might want to gracefully shut down.
  // process.exit(1);
});

module.exports = logger;

这里的坑在于 winston 的格式化和 winston-opensearch-transporttransformer 如何协同工作。我们的 traceFormat 最终输出一个 JSON 字符串,而 Transport 需要一个对象。所以 transformer 必须把它解析回来。同时,我们按天创建索引 logs-my-app-YYYY-MM-DD,这是日志管理的最佳实践。

3. 在服务间调用时传递 traceId

当网关调用下游服务(例如 ServiceA)时,必须将 traceId 注入到请求头中。

// gateway/src/services/serviceAClient.js
const axios = require('axios');
const httpContext = require('express-http-context');
const { TRACE_ID_HEADER } = require('../middleware/tracing');
const logger = require('../utils/logger');

const serviceAEndpoint = process.env.SERVICE_A_ENDPOINT || 'http://service-a:3001';

async function callServiceA(data) {
  const traceId = httpContext.get('traceId');
  logger.info('Calling Service A', { endpoint: serviceAEndpoint });

  try {
    const response = await axios.post(`${serviceAEndpoint}/process`, data, {
      headers: {
        [TRACE_ID_HEADER]: traceId, // CRITICAL: Propagate the traceId
        'Content-Type': 'application/json',
      },
      timeout: 5000, // Always set a timeout for inter-service calls.
    });
    return response.data;
  } catch (error) {
    logger.error('Failed to call Service A', {
      error: error.message,
      status: error.response?.status,
    });
    // Re-throw a custom error or handle it as per business logic.
    throw new Error('Service A is unavailable');
  }
}

module.exports = { callServiceA };

下游服务 ServiceA 也需要以完全相同的方式集成 traceContextMiddlewarelogger,以确保它能接收并记录带有相同 traceId 的日志。

第二步:构建 GraphQL 查询网关

网关不仅处理业务逻辑,还需要提供一个查询日志的 GraphQL 端点。这个端点会直接与 OpenSearch 交互。

1. GraphQL Schema (gateway/src/graphql/schema.js)

# gateway/src/graphql/schema.js
const { gql } = require('apollo-server-express');

const typeDefs = gql`
  type LogEntry {
    id: ID!
    timestamp: String!
    level: String!
    message: String!
    service: String!
    traceId: String
  }

  type Query {
    """
    Fetches all log entries associated with a specific trace ID.
    Results are sorted by timestamp in ascending order.
    """
    trace(id: ID!): [LogEntry]
  }
`;

module.exports = typeDefs;

2. GraphQL Resolver (gateway/src/graphql/resolvers.js)

这是连接 GraphQL 和 OpenSearch 的桥梁。

// gateway/src/graphql/resolvers.js
const { Client } = require('@opensearch-project/opensearch');
const logger = require('../utils/logger');

const opensearchNode = process.env.OPENSEARCH_NODE || 'http://opensearch-node1:9200';
const client = new Client({ node: opensearchNode });

const resolvers = {
  Query: {
    trace: async (_, { id }) => {
      if (!id) {
        throw new Error('Trace ID is required.');
      }

      logger.info(`Querying OpenSearch for traceId: ${id}`);

      try {
        const response = await client.search({
          index: 'logs-my-app-*', // Search across all daily indices
          body: {
            size: 1000, // Set a reasonable limit for the number of logs per trace
            sort: [
              { timestamp: { order: 'asc' } }, // Sort logs chronologically
            ],
            query: {
              match: {
                'traceId.keyword': id, // Use '.keyword' for exact match on non-analyzed field
              },
            },
          },
        });

        if (response.body.hits.hits.length === 0) {
            logger.warn(`No logs found for traceId: ${id}`);
            return [];
        }

        return response.body.hits.hits.map(hit => ({
          id: hit._id,
          ...hit._source,
        }));
      } catch (error) {
        logger.error('OpenSearch query failed', {
          traceId: id,
          error: error.message,
          stack: error.meta?.body?.error?.root_cause
        });
        // Do not expose detailed error messages to the client.
        throw new Error('Failed to retrieve trace data.');
      }
    },
  },
};

module.exports = resolvers;

一个常见的错误是直接 match: { 'traceId': id }。如果 OpenSearch 的默认映射将 traceId 字段作为 text 类型处理并进行了分词,这会导致查询不准确。正确的做法是查询 traceId.keyword,它对应的是未经分析的原始字符串值。为保证这一点,最好为索引设置一个明确的映射(Mapping),将 traceId 字段类型定义为 keyword

第三步:部署到 Docker Swarm

现在我们将整个系统栈(OpenSearch, 网关, 服务A)容器化并通过 Docker Swarm 进行编排。

docker-stack.yml

version: '3.8'

services:
  opensearch-node1:
    image: opensearchproject/opensearch:2.11.0
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node1
      - discovery.type=single-node
      - bootstrap.memory_lock=true # along with the memlock settings below
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Demo usage, production should be higher
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data:/usr/share/opensearch/data
    networks:
      - observability-net
    deploy:
      replicas: 1
      placement:
        constraints: [node.role == manager]

  gateway:
    image: my-tracing-app/gateway:latest # You need to build this image
    environment:
      - OPENSEARCH_NODE=http://opensearch-node1:9200
      - SERVICE_A_ENDPOINT=http://service-a:3001
    ports:
      - "4000:4000"
    networks:
      - observability-net
    depends_on:
      - opensearch-node1
    deploy:
      replicas: 2
      update_config:
        parallelism: 1
        delay: 10s
      restart_policy:
        condition: on-failure

  service-a:
    image: my-tracing-app/service-a:latest # You need to build this image
    environment:
      - OPENSEARCH_NODE=http://opensearch-node1:9200
    networks:
      - observability-net
    depends_on:
      - opensearch-node1
    deploy:
      replicas: 2

volumes:
  opensearch-data:

networks:
  observability-net:
    driver: overlay

这个配置文件定义了三个服务,它们都在一个 overlay 网络中,因此可以通过服务名(opensearch-node1, service-a)互相访问。网关服务部署了两个副本,Swarm 会自动进行负载均衡。

第四步:用 Styled-components 构建可视化前端

前端是一个简单的 React 应用。核心是一个输入框和一个展示日志的区域。我们用 styled-components 来构建一个可重用的、样式化的日志条目组件。

TraceView.js React Component

// client/src/components/TraceView.js
import React, { useState } from 'react';
import { useLazyQuery, gql } from '@apollo/client';
import styled, { css } from 'styled-components';

const GET_TRACE = gql`
  query GetTrace($id: ID!) {
    trace(id: $id) {
      id
      timestamp
      level
      message
      service
    }
  }
`;

// --- Styled Components ---

const Container = styled.div`
  font-family: 'Menlo', 'Monaco', monospace;
  padding: 2rem;
  max-width: 1200px;
  margin: 0 auto;
`;

const SearchBar = styled.div`
  display: flex;
  margin-bottom: 2rem;
  
  input {
    flex-grow: 1;
    padding: 0.75rem;
    font-size: 1rem;
    border: 1px solid #ccc;
    border-radius: 4px 0 0 4px;
  }

  button {
    padding: 0.75rem 1.5rem;
    font-size: 1rem;
    border: 1px solid #007bff;
    background-color: #007bff;
    color: white;
    cursor: pointer;
    border-radius: 0 4px 4px 0;
    &:disabled {
      background-color: #ccc;
      border-color: #ccc;
      cursor: not-allowed;
    }
  }
`;

const Timeline = styled.div`
  border-left: 2px solid #e0e0e0;
  padding-left: 2rem;
`;

// Dynamically set styles based on log level
const LogEntryWrapper = styled.div`
  position: relative;
  padding: 1rem;
  margin-bottom: 1rem;
  border-radius: 4px;
  border: 1px solid;

  ${({ level }) => {
    switch (level) {
      case 'error':
        return css`
          border-color: #f5c6cb;
          background-color: #f8d7da;
          color: #721c24;
        `;
      case 'warn':
        return css`
          border-color: #ffeeba;
          background-color: #fff3cd;
          color: #856404;
        `;
      default:
        return css`
          border-color: #d6d8db;
          background-color: #f8f9fa;
          color: #343a40;
        `;
    }
  }}

  &:before {
    content: '';
    position: absolute;
    left: -2.5rem;
    top: 50%;
    transform: translateY(-50%);
    width: 1rem;
    height: 1rem;
    border-radius: 50%;
    background-color: ${({ level }) => (level === 'error' ? '#dc3545' : level === 'warn' ? '#ffc107' : '#007bff')};
    border: 2px solid white;
  }
`;

const LogHeader = styled.div`
  display: flex;
  justify-content: space-between;
  font-weight: bold;
  margin-bottom: 0.5rem;
  font-size: 0.9rem;
`;

const ServiceTag = styled.span`
  background-color: #6c757d;
  color: white;
  padding: 0.2rem 0.5rem;
  border-radius: 1rem;
  font-size: 0.8rem;
`;

const LogMessage = styled.div`
  white-space: pre-wrap;
  word-break: break-all;
`;


// --- Component Logic ---

function TraceView() {
  const [traceId, setTraceId] = useState('');
  const [getTrace, { loading, error, data }] = useLazyQuery(GET_TRACE);

  const handleSearch = () => {
    if (traceId.trim()) {
      getTrace({ variables: { id: traceId.trim() } });
    }
  };

  return (
    <Container>
      <h1>Distributed Trace Viewer</h1>
      <SearchBar>
        <input
          type="text"
          value={traceId}
          onChange={(e) => setTraceId(e.target.value)}
          placeholder="Enter Trace ID"
          onKeyPress={(e) => e.key === 'Enter' && handleSearch()}
        />
        <button onClick={handleSearch} disabled={loading}>
          {loading ? 'Searching...' : 'Search'}
        </button>
      </SearchBar>

      {error && <p>Error: {error.message}</p>}
      
      {data && (
        <Timeline>
          {data.trace.length === 0 ? (
            <p>No logs found for this Trace ID.</p>
          ) : (
            data.trace.map(log => (
              <LogEntryWrapper key={log.id} level={log.level}>
                <LogHeader>
                  <span>{new Date(log.timestamp).toISOString()}</span>
                  <ServiceTag>{log.service}</ServiceTag>
                </LogHeader>
                <LogMessage>{log.message}</LogMessage>
              </LogEntryWrapper>
            ))
          )}
        </Timeline>
      )}
    </Container>
  );
}

export default TraceView;

这个组件展示了 styled-components 的强大之处:我们可以基于 props(如 level)动态生成 CSS,创建出视觉上能清晰区分不同日志级别的 UI。这种组件化的样式使得代码非常内聚和可复用。

局限性与未来迭代

这个方案虽然轻量且有效地解决了我们最初的问题,但它并不完美,存在一些明显的局限性。

首先,日志收集路径是应用直连 OpenSearch。在生产环境中,这是一种反模式,它将应用与日志后端紧密耦合。更健壮的架构应该引入一个日志聚合层,例如 Fluentd 或 Logstash。应用只需将日志输出到 stdout,由 Docker 的日志驱动程序转发给聚合器,再由聚合器进行缓冲、转换和发送。

其次,我们实现的只是“基于日志的追踪”,而不是真正的分布式追踪。它能将日志串联起来,但无法精确度量每个操作(如数据库查询、RPC 调用)的耗时和父子关系。一个完整的追踪系统应该遵循 OpenTelemetry 规范,生成和传递 Spans。每个 Span 代表一个工作单元,它们共同组成一个 Trace。

未来的迭代方向可以是在现有基础上引入 OpenTelemetry SDK。日志中不仅记录 traceId,还记录 spanIdparentSpanId。这样,我们就能在 OpenSearch 中存储完整的追踪树结构。GraphQL API 也需要升级,以支持查询和重构这种树状关系。前端的可视化将不再是简单的列表,而是一个更复杂的甘特图或火焰图,直观地展示服务调用的并行与串行关系、延迟瓶颈。


  目录