构建异构请求合并网关 Tornado Micronaut 与 Ray 的协同实践


问题的起点非常明确:一个外部系统以极高频率向我们推送小型计算任务,峰值可达每秒数万次。每个任务本身处理起来并不复杂,但其背后依赖一个CPU密集型的模拟模型。直接处理这些原子化请求会立刻摧毁后端系统,因为模型的初始化和单次运行的固定开销远大于计算本身,导致CPU在上下文切换和准备工作中空转,而非有效计算。批处理是唯一可行的出路,模型在处理1000个任务的批次时,其总耗时仅比处理单个任务慢不到20%。

我们需要构建一个前置层,它必须能稳定地承受高并发连接和请求流入,在内部将这些碎片化的请求聚合成结构化的批次,然后可靠地分发给后端的分布式计算集群。这是一个典型的生产者-消费者问题,但叠加了对接入层极致的I/O性能和对后端编排层企业级稳定性的双重考验。

方案A:纯粹的Python技术栈

最初的构想是使用一个统一的Python技术栈。具体来说,使用Tornado作为接入层,利用其成熟的异步I/O模型处理海量连接。当请求在内存中聚合成批次后,直接通过Ray Client或内部RPC调用,将任务分发给Ray集群。

优势:

  1. 技术同构: 整个链路从接入到计算都使用Python,降低了团队的技术多样性成本。
  2. 低延迟通信: Tornado和Ray之间的数据传递可以避免跨语言序列化的开销,理论上能实现更低的内部延迟。

劣势与现实考量:

  1. 编排层的健壮性: 中间的“批处理编排”逻辑,在真实项目中远比“攒批次、发送”要复杂。它需要处理事务性、与配置中心的交互、精细化的监控指标上报、以及与公司现有Java生态(如认证、日志、告警系统)的集成。用Python从头构建这一套企业级的“中间件”服务,无论是工作量还是长期维护性,都是一个巨大的挑战。
  2. GIL的潜在影响: 尽管Tornado和Ray都很好地绕开了全局解释器锁(GIL)的限制(Tornado通过事件循环,Ray通过多进程),但编排逻辑本身如果包含任何CPU密集型操作(如复杂的数据校验、转换),GIL依然会成为单进程内的瓶颈。
  3. 类型安全与大型项目维护: 对于这种系统的核心枢纽部分,静态类型带来的编译期检查和代码可维护性至关重要。Python的类型提示虽有进步,但在大型、复杂的业务逻辑编排上,其健壮性相较于JVM语言仍有差距。

方案B:彻底的JVM技术栈

另一个极端是完全拥抱JVM生态。使用Netty或Vert.x构建异步接入网关,业务编排层自然使用Micronaut或Spring,并通过Ray的Java API将任务分发到计算集群。

优势:

  1. 端到端类型安全: 从接入到分发,整个链路都在JVM的强类型体系内,稳定性高。
  2. 生态成熟: 可以无缝利用JVM生态中海量的、经过生产环境严酷考验的库。

劣势与现实考量:

  1. 计算生态的壁垒: 我们的核心计算模型和依赖库(如NumPy, Pandas, Scikit-learn)是纯Python的。Ray的核心优势之一就是与这个生态的无缝集成。虽然Ray提供了Java API,但在实践中,它更像是一个“二等公民”。跨语言调用不仅会引入额外的序列化开销和复杂性,而且一旦计算逻辑需要调整,我们就必须同时维护Python算法代码和Java的调用接口,这在快速迭代的AI/ML场景中是不可接受的。
  2. 异步编程的复杂性: 虽然Netty等框架性能卓越,但其编程模型相对Tornado更为底层和复杂,对于仅仅需要一个稳定高效HTTP/WebSocket接入层的场景,有点“杀鸡用牛刀”。

最终决策:异构协同架构

权衡之下,我们决定采用一种混合技术栈的异构架构,旨在让每个组件都做自己最擅长的事情。

graph TD
    subgraph "接入层 (Python / Tornado)"
        A[Client 1] --> T
        B[Client 2] --> T
        C[Client N] --> T
        T{Tornado Server}
        T -- "1. 接收高频请求" --> BFR(In-Memory Buffer)
        BFR -- "2. 聚合批次 (基于大小/时间)" --> G_CLIENT(gRPC Client)
    end

    subgraph "编排层 (JVM / Micronaut)"
        G_CLIENT -- "3. 发送gRPC BatchRequest" --> G_SERVER(Micronaut gRPC Server)
        G_SERVER -- "4. 验证与业务逻辑" --> LOGIC(Orchestration Logic)
        LOGIC -- "5. 调用Ray Java API" --> RAY_JAVA(Ray Java Driver)
    end
    
    subgraph "计算层 (Python / Ray)"
         RAY_JAVA -- "6. 提交远程任务" --> RC(Ray Cluster)
         RC --> W1(Ray Worker 1: @ray.remote)
         RC --> W2(Ray Worker 2: @ray.remote)
         RC --> WN(Ray Worker N: @ray.remote)
    end
  1. Tornado (I/O接入层): 它的唯一职责就是利用其高效的事件循环,稳定地处理海量并发连接,将请求快速放入内存缓冲区,并在满足批处理条件(大小或超时)时,通过gRPC将整个批次发送给下游。它极其轻量,专注I/O。
  2. Micronaut (业务编排层): 作为系统的“大脑”,它负责接收来自Tornado的批处理请求。我们选择Micronaut是因为其基于AOT编译,提供了极快的启动速度和非常低的内存占用,非常适合作为云原生服务。在这里,我们可以利用Java的强类型、成熟的依赖注入、数据库访问、配置管理等能力,实现健壮的业务逻辑、失败重试、监控告警等。
  3. Ray (分布式计算层): 负责执行真正的CPU密集型任务。计算逻辑依然用Python编写,享受其丰富的科学计算生态。Micronaut通过Ray的Java API与Ray集群通信,将任务分派出去。

这个架构的本质是一种责任分离:将I/O密集型、计算密集型和逻辑密集型的关注点,映射到最适合它们的工具上。

核心实现剖析

1. gRPC 协议定义 (gateway.proto)

这是三大组件之间的通信契约,清晰地定义了服务和数据结构。

syntax = "proto3";

package gateway;

option java_package = "com.example.gateway.grpc";
option java_multiple_files = true;

// 服务定义:编排器服务
service Orchestrator {
  // 处理一个请求批次
  rpc ProcessBatch(BatchRequest) returns (BatchResponse);
}

// 单个任务的请求体
message TaskRequest {
  string task_id = 1;
  string payload = 2; // 使用JSON字符串或其他序列化格式
  map<string, string> metadata = 3;
}

// 请求批次
message BatchRequest {
  string batch_id = 1;
  int64 created_timestamp = 2;
  repeated TaskRequest tasks = 3;
}

// 批次处理结果
message BatchResponse {
  string batch_id = 1;
  bool success = 2;
  int32 processed_count = 3;
  string message = 4; // 用于错误信息
}

2. Tornado 接入层实现 (gateway_server.py)

Tornado层代码必须是非阻塞的,并且要有一个优雅的机制来触发批处理。

import asyncio
import logging
import time
import uuid
from collections import deque
from concurrent.futures import ThreadPoolExecutor

import grpc
import tornado.ioloop
import tornado.web
from tornado.options import define, options, parse_command_line

# 导入生成的 gRPC 代码
import gateway_pb2
import gateway_pb2_grpc

# --- 配置 ---
define("port", default=8888, help="run on the given port", type=int)
define("micronaut_grpc_host", default="localhost", help="Micronaut gRPC server host")
define("micronaut_grpc_port", default=50051, help="Micronaut gRPC server port")
define("batch_size", default=100, help="Max tasks in a batch", type=int)
define("batch_timeout", default=1.0, help="Max seconds to wait before flushing a batch", type=float)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class TaskBuffer:
    """
    线程安全的任务缓冲区,用于暂存请求
    """
    def __init__(self):
        self.buffer = deque()
        self.lock = asyncio.Lock()

    async def add(self, task: gateway_pb2.TaskRequest):
        async with self.lock:
            self.buffer.append(task)

    async def flush(self) -> list[gateway_pb2.TaskRequest]:
        async with self.lock:
            tasks = list(self.buffer)
            self.buffer.clear()
            return tasks
            
    async def size(self) -> int:
        async with self.lock:
            return len(self.buffer)

class BatchProcessor:
    """
    负责将缓冲区中的任务批量发送到 Micronaut 服务
    """
    def __init__(self, buffer: TaskBuffer):
        self.buffer = buffer
        self.grpc_target = f"{options.micronaut_grpc_host}:{options.micronaut_grpc_port}"
        # 使用 aio-grpc channel for async operations
        self.channel = grpc.aio.insecure_channel(self.grpc_target)
        self.stub = gateway_pb2_grpc.OrchestratorStub(self.channel)
        self.last_flush_time = time.time()
        self.is_running = False

    async def start(self):
        self.is_running = True
        logging.info("BatchProcessor started.")
        while self.is_running:
            await self.maybe_flush()
            await asyncio.sleep(0.1) # 检查频率

    def stop(self):
        self.is_running = False
        logging.info("BatchProcessor stopping...")

    async def maybe_flush(self, force: bool = False):
        """
        检查是否满足批处理条件,如果满足则执行
        """
        buffer_size = await self.buffer.size()
        time_since_last_flush = time.time() - self.last_flush_time
        
        # 核心触发条件:达到批次大小,或超时,或强制执行
        if buffer_size > 0 and (force or buffer_size >= options.batch_size or time_since_last_flush >= options.batch_timeout):
            tasks_to_send = await self.buffer.flush()
            if tasks_to_send:
                self.last_flush_time = time.time()
                # 使用 asyncio.create_task 避免阻塞主循环
                asyncio.create_task(self.send_batch(tasks_to_send))

    async def send_batch(self, tasks: list[gateway_pb2.TaskRequest]):
        batch_id = str(uuid.uuid4())
        logging.info(f"Sending batch {batch_id} with {len(tasks)} tasks.")
        
        request = gateway_pb2.BatchRequest(
            batch_id=batch_id,
            created_timestamp=int(time.time()),
            tasks=tasks
        )

        try:
            response = await self.stub.ProcessBatch(request, timeout=5.0)
            if response.success:
                logging.info(f"Batch {batch_id} processed successfully. Count: {response.processed_count}")
            else:
                logging.error(f"Batch {batch_id} processing failed: {response.message}")
        except grpc.aio.AioRpcError as e:
            # 在真实项目中,这里应该有重试逻辑或将失败的任务推送到死信队列
            logging.error(f"gRPC call failed for batch {batch_id}: {e.code()} - {e.details()}")


class MainHandler(tornado.web.RequestHandler):
    """
    接收外部HTTP请求的Handler
    """
    def initialize(self, buffer: TaskBuffer):
        self.buffer = buffer

    async def post(self):
        try:
            # 假设 payload 是 JSON,实际应做更严格的校验
            payload_str = self.request.body.decode('utf-8')
            
            task = gateway_pb2.TaskRequest(
                task_id=str(uuid.uuid4()),
                payload=payload_str,
                metadata={"source_ip": self.request.remote_ip}
            )
            await self.buffer.add(task)
            self.set_status(202) # Accepted
            self.write({"status": "accepted", "task_id": task.task_id})
        except Exception as e:
            self.set_status(500)
            self.write({"status": "error", "message": str(e)})


def make_app(buffer: TaskBuffer):
    return tornado.web.Application([
        (r"/submit", MainHandler, dict(buffer=buffer)),
    ])

async def main():
    parse_command_line()
    
    task_buffer = TaskBuffer()
    app = make_app(task_buffer)
    app.listen(options.port)
    logging.info(f"Tornado server listening on port {options.port}")
    
    batch_processor = BatchProcessor(task_buffer)
    processor_task = asyncio.create_task(batch_processor.start())

    # 优雅地关闭
    shutdown_event = asyncio.Event()
    loop = asyncio.get_running_loop()

    def signal_handler():
        logging.info("Shutdown signal received.")
        shutdown_event.set()

    # for sig in (signal.SIGTERM, signal.SIGINT):
    #    loop.add_signal_handler(sig, signal_handler)
    # Windows不支持 add_signal_handler, 这里简化处理
    
    await shutdown_event.wait()

    batch_processor.stop()
    await processor_task
    logging.info("Flushing remaining tasks before shutdown...")
    await batch_processor.maybe_flush(force=True)
    await asyncio.sleep(1) # 等待最后的gRPC调用完成
    await batch_processor.channel.close()
    logging.info("Server stopped.")


if __name__ == "__main__":
    # tornado.ioloop.IOLoop.current().run_sync(main)
    # 在Python 3.8+,推荐使用 asyncio.run
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Application interrupted.")

关键设计点:

  • TaskBuffer是核心的暂存区,使用asyncio.Lock保证并发安全。
  • BatchProcessor是一个独立的异步循环,它周期性地检查缓冲区状态,解耦了请求接收和批处理发送逻辑。
  • asyncio.create_task(self.send_batch(...))至关重要,它确保了gRPC调用不会阻塞BatchProcessor的主循环,否则整个系统都会被一个慢速的RPC调用卡住。
  • 提供了基本的配置项,可以通过命令行修改,这是生产级应用的基本要求。

3. Micronaut 编排层实现 (OrchestratorService.java)

这是JVM世界的入口,负责稳定性和业务逻辑。

// build.gradle (关键依赖)
/*
dependencies {
    ...
    implementation("io.micronaut.grpc:micronaut-grpc-runtime")
    implementation("io.ray:ray-api:2.6.0") // Ray Java API
    implementation("io.ray:ray-runtime:2.6.0")
    runtimeOnly("io.grpc:grpc-netty")
    ...
}
*/

package com.example.gateway.grpc;

import com.example.gateway.ray.RayTaskExecutor;
import com.example.gateway.ray.SimulationTask;
import io.grpc.stub.StreamObserver;
import io.micronaut.grpc.annotation.GrpcService;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;

@GrpcService
@Singleton
public class OrchestratorService extends OrchestratorGrpc.OrchestratorImplBase {

    private static final Logger LOG = LoggerFactory.getLogger(OrchestratorService.class);
    private final RayTaskExecutor rayTaskExecutor;

    // 使用 Micronaut 的依赖注入来获取 Ray 执行器
    public OrchestratorService(RayTaskExecutor rayTaskExecutor) {
        this.rayTaskExecutor = rayTaskExecutor;
    }

    @Override
    public void processBatch(BatchRequest request, StreamObserver<BatchResponse> responseObserver) {
        LOG.info("Received batch {} with {} tasks", request.getBatchId(), request.getTasksCount());

        if (request.getTasksCount() == 0) {
            LOG.warn("Received an empty batch {}. Responding with success.", request.getBatchId());
            BatchResponse response = BatchResponse.newBuilder()
                .setBatchId(request.getBatchId())
                .setSuccess(true)
                .setProcessedCount(0)
                .build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
            return;
        }

        try {
            // 1. 数据转换: 从 gRPC 对象转换为内部业务对象
            List<SimulationTask> tasks = request.getTasksList().stream()
                .map(grpcTask -> new SimulationTask(grpcTask.getTaskId(), grpcTask.getPayload()))
                .collect(Collectors.toList());

            // 2. 核心逻辑: 将任务列表提交给 Ray 执行器
            // 这是一个异步操作,但为了简化gRPC接口,我们在此处等待结果
            // 在更复杂的场景中,这里可能会返回一个任务ID,客户端稍后查询结果
            boolean success = rayTaskExecutor.executeBatch(tasks);

            // 3. 构建响应
            BatchResponse.Builder responseBuilder = BatchResponse.newBuilder()
                .setBatchId(request.getBatchId())
                .setSuccess(success)
                .setProcessedCount(request.getTasksCount());
            
            if (!success) {
                responseBuilder.setMessage("Partial or total failure during Ray execution. Check logs for details.");
            }

            responseObserver.onNext(responseBuilder.build());
            responseObserver.onCompleted();

        } catch (Exception e) {
            LOG.error("Critical error processing batch " + request.getBatchId(), e);
            BatchResponse response = BatchResponse.newBuilder()
                .setBatchId(request.getBatchId())
                .setSuccess(false)
                .setMessage("Internal server error: " + e.getMessage())
                .build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
    }
}
// RayTaskExecutor.java (与Ray交互的封装)
package com.example.gateway.ray;

import io.ray.api.ActorHandle;
import io.ray.api.Ray;
import io.ray.api.ObjectRef;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;

@Singleton
public class RayTaskExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(RayTaskExecutor.class);

    // 在Micronaut启动时初始化Ray连接
    // 实际项目中,Ray.init() 应通过 @Factory 和生命周期管理来控制
    public RayTaskExecutor() {
        // 连接到已有的Ray集群。如果未启动,会尝试本地启动。
        // System.setProperty("ray.address", "127.0.0.1:6379");
        Ray.init(); 
        LOG.info("Ray driver connected.");
    }

    /**
     * 以批处理方式执行任务
     * @return 如果所有任务都成功提交则返回 true
     */
    public boolean executeBatch(List<SimulationTask> tasks) {
        try {
            // 将整个批次分发给一个远程actor处理,也可以将每个task分发给不同的函数
            // 这里选择为每个任务创建一个远程调用,以展示最大并行度
            List<ObjectRef<String>> results = tasks.stream()
                .map(task -> Ray.task(
                        // Class name, method name
                        "compute_worker.HeavySimulator", "process",
                        task.getTaskId(), 
                        task.getPayload()
                    ).remote()
                )
                .collect(Collectors.toList());
            
            LOG.info("Submitted {} tasks to Ray cluster.", results.size());

            // 等待所有任务完成。设置一个超时来防止无限期阻塞
            // 这里的 get() 会阻塞当前线程,在生产环境中可能需要异步处理
            List<String> finishedResults = Ray.get(results, 60 * 1000); // 60s timeout

            LOG.info("All {} tasks in batch completed.", finishedResults.size());
            // 可以在这里检查结果,例如是否有异常
            return true;

        } catch (Exception e) {
            LOG.error("Failed to execute batch on Ray", e);
            return false;
        }
    }
}
// SimulationTask.java (简单的DTO)
package com.example.gateway.ray;

public class SimulationTask {
    private final String taskId;
    private final String payload;

    public SimulationTask(String taskId, String payload) {
        this.taskId = taskId;
        this.payload = payload;
    }

    public String getTaskId() { return taskId; }
    public String getPayload() { return payload; }
}

关键设计点:

  • 依赖注入: Micronaut的@Singleton和构造函数注入使得管理RayTaskExecutor这样的重量级组件非常清晰。
  • 错误处理: try-catch块捕获了处理过程中的所有异常,确保gRPC调用总能正常返回,不会因为内部错误而挂起。
  • 解耦: RayTaskExecutor将所有与Ray交互的细节封装起来,OrchestratorService只关心业务逻辑。这种分离使得单元测试变得容易。
  • 阻塞与异步: Ray.get()是一个阻塞操作。在当前设计下,gRPC调用会一直等待直到所有Ray任务完成。对于长耗时任务,更好的模式是立即返回一个批处理ID,并提供另一个接口用于查询批处理状态。

4. Ray 计算层 (compute_worker.py)

这是实际执行计算的Python代码,它运行在Ray集群的Worker节点上。

import time
import random
import logging
import json

import ray

logging.basicConfig(level=logging.INFO)

@ray.remote
class HeavySimulator:
    """
    一个Ray Actor,模拟一个需要初始化开销的计算资源
    """
    def __init__(self):
        # 模拟昂贵的初始化过程
        logging.info("Initializing HeavySimulator actor...")
        time.sleep(2)
        self.model_loaded = True
        logging.info("HeavySimulator actor initialized.")

    def process(self, task_id: str, payload: str) -> str:
        """
        处理单个任务
        """
        if not self.model_loaded:
            raise RuntimeError("Model is not loaded!")
            
        logging.info(f"Processing task {task_id}...")
        try:
            # 模拟CPU密集型计算
            data = json.loads(payload)
            complexity = data.get("complexity", 0.1)
            time.sleep(random.uniform(0.1, 0.5) * (1 + complexity))

            result = {"status": "success", "task_id": task_id, "result_value": random.random()}
            logging.info(f"Task {task_id} finished.")
            return json.dumps(result)
        except Exception as e:
            logging.error(f"Error processing task {task_id}: {e}")
            return json.dumps({"status": "failed", "task_id": task_id, "error": str(e)})

关键设计点:

  • Actor vs. Task: 这里使用了@ray.remote作用于一个类,创建了一个Actor。Actor是有状态的,__init__方法只会在Actor创建时调用一次。这完美地模拟了我们“初始化开销大”的场景。如果每次调用都是无状态的计算,使用@ray.remote修饰一个函数(创建Ray Task)会更合适。
  • 跨语言调用: Micronaut中的Ray.task("compute_worker.HeavySimulator", "process", ...)正是通过类名和方法名的字符串来定位并调用这个Python方法的。Ray的跨语言API在底层处理了序列化和通信。

架构的扩展性与局限性

该架构具备良好的横向扩展能力。Tornado层、Micronaut层都可以作为无状态服务进行多实例部署,并通过负载均衡器接收流量。Ray集群本身就是为分布式扩展而设计的。

然而,这个方案并非银弹,它存在一些固有的复杂性和局限性:

  1. 运维复杂度: 维护一个涉及Python/Tornado、JVM/Micronaut和Ray三种核心组件的系统,对监控、日志、部署流水线和团队技能栈都提出了更高的要求。跨语言的链路追踪和问题定位会变得更具挑战性。
  2. Tornado内存缓冲区的风险: 当前实现中,请求被缓存在Tornado进程的内存里。如果某个Tornado实例崩溃,这部分尚未发送的数据将会丢失。对于要求更高数据可靠性的场景,Tornado和Micronaut之间需要引入一个持久化的消息队列(如Kafka或Redis Streams),但这会增加系统延迟和复杂性。
  3. 背压机制的缺失: 当前的批处理触发机制(大小或超时)是单向的。如果后端Micronaut或Ray集群处理能力跟不上,请求会在Tornado的内存中无限堆积,最终导致内存耗尽。一个生产级系统必须实现背压(Backpressure),例如,当Micronaut感知到处理延迟时,可以通知Tornado层减缓接收新请求或降低批处理频率。
  4. 序列化开销: 数据在 HTTP -> Python Object -> Protobuf -> Java Object -> Ray 的链路中经历了多次序列化和反序列化。尽管gRPC和Ray的序列化性能很高,但这部分开销是客观存在的,对于延迟极其敏感的应用需要仔细评估。

  目录