构建集成 tRPC 与 Fluentd 的 TensorFlow CV 模型可观测服务管道


模型部署上线只是起点,真正的挑战在于稳定运行。一个计算机视觉(CV)模型服务,在负载升高时出现间歇性延迟飙升,或者对特定类型的图片返回低置信度结果,这些问题在生产环境中是常态。麻烦的是,常规的日志系统只会告诉你服务收到了请求、返回了结果,中间过程完全是个黑盒。当 Node.js 层的 API 网关日志和 Python 层的 TensorFlow 推理日志完全割裂时,定位一个具体请求的全链路耗时与内部状态,无异于大海捞针。

我们的目标是打通这中间的黑盒。要构建一个系统,其中每一个推理请求,从进入 tRPC 网关的那一刻起,到 Python 进程中 TensorFlow 完成计算,再到结果返回,整个生命周期的关键节点信息都被一个唯一的 traceId 串联起来,并以结构化的形式聚合到一处。这不仅仅是记录日志,而是构建一个可观测的数据管道,为后续的性能分析、模型漂移检测和精细化运维提供数据基础。

技术选型上的权衡很明确。前端接口我们选择 tRPC,因为它提供了端到端的类型安全,能极大地提升与上游(可能是 Web 前端或其他微服务)的集成效率。推理核心无疑是 Python 和 TensorFlow,这是 CV 领域的标准生态。而连接这两者的桥梁,我们不用简单的 REST,而是选择内部 gRPC,因为它性能更高,且能通过 Protobuf 定义严格的服务契约。最后,日志聚合的重任交给 Fluentd,它的灵活性和丰富的插件生态使其成为处理异构日志流的理想选择。

这是一个典型的多语言、多组件协作的场景,而可观测性正是粘合这一切的关键。

graph TD
    subgraph "Client"
        A[Browser/Mobile App]
    end

    subgraph "Node.js Service"
        B[tRPC Gateway]
        C[Structured Logger: Pino]
    end
    
    subgraph "Python Service"
        D[gRPC Inference Worker]
        E[TensorFlow/Keras]
        F[Structured Logger: python-json-logger]
    end

    subgraph "Observability Pipeline"
        G[Fluentd Aggregator]
        H[Elasticsearch/Other Sink]
    end

    A -- "tRPC Request" --> B
    B -- "Generate traceId & Log" --> C
    B -- "gRPC Call with traceId" --> D
    C -- "TCP/JSON" --> G
    D -- "Loads Model" --> E
    D -- "Extract traceId & Log" --> F
    F -- "TCP/JSON" --> G
    G -- "Parse & Forward" --> H

第一步:定义服务契约

在多语言系统中,清晰的接口定义是所有协作的基石。我们需要定义两层契约:一层是面向外部的 tRPC 接口,另一层是 Node.js 网关与 Python 推理服务之间的内部 gRPC 接口。

内部 gRPC 契约 (inference.proto)

这是系统的核心通信协议。除了请求和响应数据,最关键的是通过 metadata 传递我们的 traceId

// syntax="proto3";
// package inference;

// service Definition for the internal CV model inference.
service CVInference {
  // Performs inference on a given image.
  rpc Predict(InferenceRequest) returns (InferenceResponse);
}

// The request message containing the image data and tracing context.
message InferenceRequest {
  // A unique identifier to trace this request across services.
  string trace_id = 1;
  
  // The raw bytes of the image to be processed.
  bytes image_data = 2;

  // Optional model specifier, for routing to different models in the future.
  string model_name = 3;
}

// Represents a single prediction result.
message Prediction {
  string label = 1;
  float confidence = 2;
}

// The response message containing the inference results.
message InferenceResponse {
  // The same trace_id from the request for correlation.
  string trace_id = 1;

  // A list of predictions, typically sorted by confidence.
  repeated Prediction predictions = 2;

  // Metadata about the inference process.
  int64 inference_time_ms = 3;
}

这份 .proto 文件定义了服务间的最小通信单元。trace_id 在请求和响应中都存在,确保了链路的闭环。

第二步:构建 Python TensorFlow 推理服务

这个服务是计算密集型的心脏。它需要实现 gRPC 服务,加载 TensorFlow 模型,并最重要地,实现带有上下文的结构化日志记录。

项目结构:

python-worker/
├── protos/
│   └── inference_pb2_grpc.py
│   └── inference_pb2.py
├── models/
│   └── mobilenet_v2/
│       └── ... (SavedModel files)
├── app.py
├── requirements.txt
└── utils/
    └── logger.py

首先,我们需要一个日志工具类来处理上下文。

utils/logger.py

import logging
import sys
from pythonjsonlogger import jsonlogger

class ContextualLogger:
    """
    A logger that injects a trace_id into every log record.
    """
    def __init__(self, name="inference_worker"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        # Prevent logs from propagating to the root logger
        self.logger.propagate = False

        # Use a handler that hasn't been added yet
        if not self.logger.handlers:
            log_handler = logging.StreamHandler(sys.stdout)
            # Add trace_id to the format string
            formatter = jsonlogger.JsonFormatter(
                '%(asctime)s %(name)s %(levelname)s %(trace_id)s %(message)s'
            )
            log_handler.setFormatter(formatter)
            self.logger.addHandler(log_handler)

    def get_logger(self, trace_id: str = "NO_TRACE_ID"):
        """
        Returns a logger adapter with the trace_id injected into the context.
        """
        return logging.LoggerAdapter(self.logger, {'trace_id': trace_id})

# Singleton instance
log_factory = ContextualLogger()

def get_logger(trace_id: str):
    return log_factory.get_logger(trace_id)

这个简单的工厂模式确保我们可以在应用的任何地方获取一个绑定了 trace_id 的 logger 实例。

接下来是核心服务实现。

app.py

import grpc
import time
import os
import numpy as np
import tensorflow as tf
from concurrent import futures
from PIL import Image
import io

# Import generated gRPC files
import protos.inference_pb2 as inference_pb2
import protos.inference_pb2_grpc as inference_pb2_grpc

from utils.logger import get_logger

# --- Configuration ---
SERVER_ADDRESS = os.getenv("SERVER_ADDRESS", "0.0.0.0:50051")
MAX_WORKERS = int(os.getenv("MAX_WORKERS", 10))
MODEL_PATH = os.getenv("MODEL_PATH", "./models/mobilenet_v2")

class CVInferenceServicer(inference_pb2_grpc.CVInferenceServicer):
    """
    Implements the gRPC service for TensorFlow model inference.
    """
    def __init__(self):
        # A common anti-pattern is loading the model per-request.
        # In a real project, this MUST be done once on initialization.
        self.logger = get_logger("init")
        self.logger.info(f"Loading model from {MODEL_PATH}...")
        try:
            self.model = tf.keras.models.load_model(MODEL_PATH)
            self.decode_predictions = tf.keras.applications.mobilenet_v2.decode_predictions
            self.preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input
            self.logger.info("Model loaded successfully.")
        except Exception as e:
            self.logger.error(f"Failed to load model: {e}", exc_info=True)
            # In production, this should trigger a non-ready state or exit.
            raise

    def Predict(self, request: inference_pb2.InferenceRequest, context):
        trace_id = request.trace_id
        logger = get_logger(trace_id)

        try:
            start_time = time.perf_counter()
            logger.info("Received inference request.", extra={'model_name': request.model_name})

            # 1. Preprocessing
            preprocess_start = time.perf_counter()
            image_bytes = request.image_data
            image = Image.open(io.BytesIO(image_bytes)).resize((224, 224))
            image_array = np.array(image)[np.newaxis, ...]
            processed_image = self.preprocess_input(image_array)
            preprocess_end = time.perf_counter()

            # 2. Inference
            inference_start = time.perf_counter()
            predictions = self.model.predict(processed_image)
            inference_end = time.perf_counter()

            # 3. Postprocessing
            postprocess_start = time.perf_counter()
            decoded = self.decode_predictions(predictions, top=3)[0]
            response_predictions = [
                inference_pb2.Prediction(label=label, confidence=float(score))
                for _, label, score in decoded
            ]
            postprocess_end = time.perf_counter()

            total_end = time.perf_counter()
            total_duration_ms = int((total_end - start_time) * 1000)

            # This is the structured log we care about.
            # It contains performance metrics and prediction metadata.
            logger.info(
                "Inference successful.",
                extra={
                    'total_duration_ms': total_duration_ms,
                    'preprocess_ms': int((preprocess_end - preprocess_start) * 1000),
                    'inference_ms': int((inference_end - inference_start) * 1000),
                    'postprocess_ms': int((postprocess_end - postprocess_start) * 1000),
                    'top_prediction': response_predictions[0].label,
                    'top_confidence': response_predictions[0].confidence,
                }
            )

            return inference_pb2.InferenceResponse(
                trace_id=trace_id,
                predictions=response_predictions,
                inference_time_ms=total_duration_ms
            )

        except Exception as e:
            logger.error(f"An error occurred during prediction: {e}", exc_info=True)
            # Inform the gRPC client of the error.
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"Internal error processing request with trace_id: {trace_id}")
            return inference_pb2.InferenceResponse()


def serve():
    """Starts the gRPC server."""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=MAX_WORKERS))
    inference_pb2_grpc.add_CVInferenceServicer_to_server(CVInferenceServicer(), server)
    server.add_insecure_port(SERVER_ADDRESS)
    
    init_logger = get_logger("server_startup")
    init_logger.info(f"Server starting on {SERVER_ADDRESS}")
    server.start()
    init_logger.info("Server started. Awaiting requests...")
    
    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        init_logger.info("Shutting down server.")
        server.stop(0)

if __name__ == '__main__':
    serve()

这段代码的核心在于 Predict 方法。它不仅执行了标准的 preprocess -> predict -> postprocess 流程,还在每个关键节点记录了带 trace_id 的结构化日志。这些日志包含了详细的耗时分段和预测结果元数据,是后续分析的基石。

第三步:实现 tRPC 网关服务

Node.js 网关是系统的入口。它负责处理外部 tRPC 请求,生成 traceId,调用 Python gRPC 服务,并同样记录详细的结构化日志。

项目结构:

trpc-gateway/
├── protos/
│   └── inference.proto
├── src/
│   ├── client.ts   // gRPC client
│   ├── logger.ts   // Structured logger (Pino)
│   ├── router.ts   // tRPC router definition
│   └── server.ts   // Main server entrypoint
├── package.json
└── tsconfig.json

src/logger.ts

import pino from 'pino';
import { randomUUID } from 'crypto';

// A base logger for the application.
// In production, you might want to stream this to a file or a logging service.
// For Fluentd, we'll configure it to forward stdout.
const logger = pino({
  level: 'info',
  formatters: {
    level: (label) => {
      return { level: label };
    },
  },
  // Base properties to include in every log.
  base: {
    service: 'trpc-gateway',
    pid: process.pid,
  },
  timestamp: pino.stdTimeFunctions.isoTime,
});

// A factory to create child loggers with a traceId.
export const createLoggerForRequest = () => {
  const traceId = randomUUID();
  return {
    traceId,
    log: logger.child({ traceId }),
  };
};

export type Logger = pino.Logger;

使用 pinochild logger 是一个高效且优雅的方式来为每个请求注入上下文。

src/client.ts (gRPC 客户端)

import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import path from 'path';

// --- Configuration ---
const PROTO_PATH = path.join(__dirname, '../../protos/inference.proto');
const INFERENCE_SERVICE_URL = process.env.INFERENCE_SERVICE_URL || 'localhost:50051';

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true,
});

const inferenceProto = grpc.loadPackageDefinition(packageDefinition).inference as any;

// A singleton gRPC client.
// In a real-world scenario, connection management, retries, and deadlines are crucial.
export const grpcClient = new inferenceProto.CVInference(
  INFERENCE_SERVICE_URL,
  grpc.credentials.createInsecure()
);

src/router.ts & src/server.ts (tRPC 核心逻辑)

// src/router.ts
import { initTRPC } from '@trpc/server';
import { z } from 'zod';
import { grpcClient } from './client';
import { createLoggerForRequest, Logger } from './logger';
import { status } from '@grpc/grpc-js';

// Create a tRPC context that includes our logger.
export const createContext = () => {
  const { traceId, log } = createLoggerForRequest();
  return { traceId, log };
};
type Context = ReturnType<typeof createContext>;

const t = initTRPC.context<Context>().create();

export const appRouter = t.router({
  predictImage: t.procedure
    .input(z.object({
      // Base64 encoded image string
      imageBase64: z.string(),
      modelName: z.string().optional().default('mobilenet_v2'),
    }))
    .mutation(async ({ input, ctx }) => {
      const { log, traceId } = ctx;
      
      log.info({ input_size: input.imageBase64.length }, 'Received predictImage request.');

      const requestPayload = {
        trace_id: traceId,
        image_data: Buffer.from(input.imageBase64, 'base64'),
        model_name: input.modelName,
      };

      const gatewayStartTime = process.hrtime.bigint();

      return new Promise((resolve, reject) => {
        grpcClient.Predict(requestPayload, (error: any, response: any) => {
          const gatewayEndTime = process.hrtime.bigint();
          const gatewayDurationMs = Number((gatewayEndTime - gatewayStartTime) / 1000000n);

          if (error) {
            log.error({
                grpc_code: error.code,
                grpc_message: error.details,
                duration_ms: gatewayDurationMs,
              },
              'gRPC call to inference worker failed.'
            );
            
            // Map gRPC error to a tRPC error
            if (error.code === status.UNAVAILABLE) {
              return reject(new Error('Inference service is currently unavailable.'));
            }
            return reject(new Error('An internal error occurred during inference.'));
          }

          log.info({
              worker_duration_ms: response.inference_time_ms,
              gateway_overhead_ms: gatewayDurationMs - response.inference_time_ms,
              total_duration_ms: gatewayDurationMs,
            },
            'Successfully received response from inference worker.'
          );

          resolve({
            traceId: response.trace_id,
            predictions: response.predictions,
          });
        });
      });
    }),
});

export type AppRouter = typeof appRouter;


// src/server.ts
import { createHTTPServer } from '@trpc/server/adapters/standalone';
import { appRouter, createContext } from './router';
import pino from 'pino';

const logger = pino({ name: 'server_startup' });
const port = process.env.PORT ? parseInt(process.env.PORT, 10) : 3001;

const server = createHTTPServer({
  router: appRouter,
  createContext,
});

server.listen(port);
logger.info(`tRPC server listening on http://localhost:${port}`);

这里的 predictImage procedure 是关键。它:

  1. createContext 中为每个请求生成唯一的 traceId 和 logger 实例。
  2. 将输入的 base64 图片解码为 Buffer。
  3. 调用 gRPC 客户端,将 traceId 和图片数据一起发送出去。
  4. 在回调中处理成功和失败的情况,并记录包含网关自身耗时和 worker 耗时对比的详细日志。这种 gateway_overhead_ms 指标对于排查是网络问题还是网关逻辑问题非常有价值。

第四步:配置 Fluentd 聚合管道

现在,两个服务都会向 stdout 输出带有 traceId 的 JSON 日志。我们需要 Fluentd 来捕获、解析并转发它们。假设我们使用 Docker Compose 部署这套系统,可以使用 Fluentd 的 forward 驱动。

docker-compose.yml (片段)

version: '3.8'
services:
  trpc_gateway:
    build: ./trpc-gateway
    ports:
      - "3001:3001"
    environment:
      - INFERENCE_SERVICE_URL=inference_worker:50051
    logging:
      driver: "fluentd"
      options:
        fluentd-address: localhost:24224
        tag: service.trpc_gateway

  inference_worker:
    build: ./python-worker
    # No ports exposed externally
    logging:
      driver: "fluentd"
      options:
        fluentd-address: localhost:24224
        tag: service.inference_worker
  
  fluentd:
    image: fluent/fluentd:v1.16-1
    volumes:
      - ./fluentd/conf:/fluentd/etc
    ports:
      - "24224:24224"
      - "24224:24224/udp"

fluentd/conf/fluent.conf

# --- Input: Receives logs from Docker logging driver ---
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# --- Filter: Parse the JSON logs and enrich them ---
# This filter applies to logs from both services.
<filter service.**>
  @type parser
  key_name log
  reserve_data true # Keep original log field
  <parse>
    @type json
  </parse>
</filter>

# Add Kubernetes metadata if running in K8s
# <filter service.**>
#   @type kubernetes_metadata
# </filter>

# --- Output: Send unified logs to a destination ---
# For demonstration, we'll output to stdout.
# In a real project, this would be elasticsearch, datadog, etc.
<match service.**>
  @type stdout
  <format>
    @type json
  </format>
</match>

# Example output to Elasticsearch
# <match service.**>
#   @type elasticsearch
#   host elasticsearch
#   port 9200
#   logstash_format true
#   logstash_prefix service_logs
#   logstash_dateformat %Y%m%d
#   include_tag_key true
#   type_name _doc
#   tag_key @log_name
#   flush_interval 1s
# </match>

这个配置非常直接:

  1. source 插件监听 24224 端口,接收来自 Docker 日志驱动的 forward 协议数据。
  2. filter 插件是核心。它会匹配所有 tag 以 service. 开头的日志(即我们的两个服务)。@type parser 指示它解析 log 字段(Docker 驱动默认将容器 stdout 放在这里)为 JSON 对象,并将其中的字段提升到日志记录的顶层。reserve_data true 保证了原始的 log 字段也会被保留。
  3. match 插件将处理后的日志输出。这里为了简单演示,使用了 @type stdout,它会将格式化后的 JSON 日志打印到 Fluentd 自己的控制台。生产环境中,这里会替换成 Elasticsearch、Splunk 或其他日志系统的输出插件。

当系统运行时,发送一个请求,你会在 Fluentd 的输出中看到类似这样的日志流:

// From tRPC Gateway
{ "level": "info", "time": "...", "pid": 123, "hostname": "...", "service": "trpc-gateway", "traceId": "abc-123", "input_size": 54321, "msg": "Received predictImage request.", "container_id": "...", "container_name": "...", "tag": "service.trpc_gateway" }

// From Python Worker
{ "asctime": "...", "name": "inference_worker", "levelname": "INFO", "trace_id": "abc-123", "message": "Received inference request.", "model_name": "mobilenet_v2", "container_id": "...", "container_name": "...", "tag": "service.inference_worker" }

// From Python Worker (after inference)
{ "asctime": "...", "name": "inference_worker", "levelname": "INFO", "trace_id": "abc-123", "message": "Inference successful.", "total_duration_ms": 150, "preprocess_ms": 25, "inference_ms": 110, "postprocess_ms": 15, "top_prediction": "...", "top_confidence": 0.98, "container_id": "...", "container_name": "...", "tag": "service.inference_worker" }

// From tRPC Gateway (after response)
{ "level": "info", "time": "...", "pid": 123, "hostname": "...", "service": "trpc-gateway", "traceId": "abc-123", "worker_duration_ms": 150, "gateway_overhead_ms": 12, "total_duration_ms": 162, "msg": "Successfully received response from inference worker.", "container_id": "...", "container_name": "...", "tag": "service.trpc_gateway" }

当这些日志被送入 Elasticsearch 后,只需按 traceId (或 trace_id) 进行过滤,一个请求的完整生命周期,包括其在两个服务中的耗时分布、网关开销、模型预测结果等所有细节,都一目了然。黑盒被彻底打开。

方案的局限与未来路径

这个架构虽然解决了核心的可观测性问题,但并非没有缺点。当前的实现是同步的 gRPC 调用,在高并发下,如果 Python worker 处理能力达到瓶颈,会直接阻塞 Node.js 的事件循环,造成请求堆积。一个更具弹性的架构演进方向是引入消息队列(如 RabbitMQ 或 Kafka)。tRPC 网关将请求作为消息放入队列,推理服务作为消费者集群去处理,实现了服务间的解耦和异步化,能更好地应对流量洪峰。

此外,我们目前只解决了日志(Logs)这一可观测性支柱。一个更完备的系统还需要指标(Metrics)和链路追踪(Traces)。下一步的迭代应该是引入 OpenTelemetry。通过 OpenTelemetry SDK,我们可以自动或手动地在服务中创建 Span,将 traceId 作为追踪上下文在服务间传播。同时,可以从服务中暴露关键业务指标(如请求QPS、延迟分布、GPU利用率)给 Prometheus 监控系统。最终,在一个类似 Grafana 的平台上,实现日志、指标、链路的完全关联,达到更高维度的可观测性。


  目录