一个看似合理的需求摆在面前:用户希望在一个内部数据平台上,通过一个简洁的前端界面,对存储在数据湖(基于 Apache Iceberg)中的数十亿条记录进行即席查询、聚合与分析,并期望获得类似数据库应用的交互体验。这里的核心矛盾显而易见:前端 UI 组件库追求的是亚秒级的响应,而 Iceberg 上的大规模分析查询(通常通过 Trino 或 Spark 执行)本质上是批处理任务,其执行时间从数十秒到数十分钟不等。
直接将两者连接起来是行不通的。任何试图构建一个同步的 RESTful API——即 Express.js 接收请求,立即执行一个 Trino 查询,然后等待结果返回给前端——的架构,都会在生产环境中迅速崩溃。HTTP 连接会因超时而中断,API 服务器的线程池会被长时间运行的查询占满,导致服务对其他请求完全无响应,最终引发雪崩。
方案 A:同步代理模式的必然失败
我们首先排除最直观但完全错误的方案:同步代理。
// anti-pattern-do-not-use.js
// 这是一个典型的错误示范,用于说明为何同步模式不可行
const express = require('express');
const { Trino } = require('trino-client'); // 假设存在一个 Trino 客户端
const app = express();
app.use(express.json());
const trinoClient = Trino.create({
server: 'http://trino-coordinator:8080',
catalog: 'iceberg',
schema: 'default',
user: 'api-service'
});
// 致命缺陷:这是一个长时间阻塞的同步 API
app.post('/api/v1/query/sync', async (req, res) => {
const { sqlQuery } = req.body;
if (!sqlQuery) {
return res.status(400).json({ error: 'sqlQuery is required' });
}
try {
console.log(`[${new Date().toISOString()}] Executing synchronous query: ${sqlQuery}`);
// 这里的 await 将会阻塞整个请求处理流程,可能长达数分钟
const result = await trinoClient.query(sqlQuery);
console.log(`[${new Date().toISOString()}] Query finished. Returning ${result.rows.length} rows.`);
// 如果结果集巨大,这里的序列化和网络传输也会成为瓶颈
res.status(200).json({ data: result.rows });
} catch (error) {
console.error(`[${new Date().toISOString()}] Query failed: ${error.message}`);
res.status(500).json({ error: 'Failed to execute query', details: error.message });
}
});
const port = 3000;
app.listen(port, () => {
console.log(`Synchronous API server (anti-pattern) listening on port ${port}`);
});
这个方案的缺陷是架构层面的:
- HTTP 超时: 大多数 Web 服务器和负载均衡器(如 Nginx)的默认超时时间是 60-120 秒。任何超过此时间的查询都会导致客户端收到 504 Gateway Timeout 错误,即使用户愿意等待。
- 资源耗尽: Node.js 的事件循环模型虽然善于处理高并发 I/O,但对于长时间运行的 CPU 密集型任务(或在此场景下,等待一个远程的、长时间运行的计算任务)同样无力。每个被阻塞的请求都会持有一个连接和相应的内存资源,很快就会耗尽服务器能力。
- 糟糕的用户体验: 前端 UI 将会长时间冻结,等待一个永远不会及时到达的响应。用户无法取消查询,也无法获得任何进度反馈。
方案 B:基于任务队列的异步查询架构
一个务实的架构必须解耦“请求提交”与“结果获取”这两个环节。我们将 API 的角色从一个“查询执行者”转变为一个“查询协调者”。
整个流程将被拆分为三个主要步骤:
- 提交查询: 客户端通过 API 提交一个查询任务。API 验证请求,生成一个唯一的
jobId,将任务推送到一个消息队列,并立即向客户端返回202 Accepted响应,附带该jobId。 - 执行查询: 一个或多个独立的 Worker 进程监听消息队列。当接收到新任务时,它们负责执行耗时的 Trino/Spark 查询。查询完成后,将结果存储到一个临时的、高速的存储介质中(如 Redis 或 S3),并更新任务状态。
- 获取结果: 客户端使用
jobId轮询一个状态接口,获取任务的实时状态(PENDING,RUNNING,SUCCEEDED,FAILED)。一旦任务成功,客户端就可以通过另一个接口,分页地拉取存储在临时介质中的结果集。
sequenceDiagram
participant UI as UI 组件库
participant API as Express.js API
participant MQ as 消息队列 (e.g., Redis Streams)
participant Worker as 查询执行 Worker
participant Trino as Trino/Iceberg
participant Cache as 结果缓存 (e.g., Redis)
UI->>+API: POST /api/v2/jobs (提交查询 SQL)
API->>API: 1. 验证请求, 生成 jobId
API->>+MQ: 2. 将 {jobId, sql} 推入队列
API-->>-UI: 3. 返回 202 Accepted { jobId }
Note right of Worker: Worker 持续监听队列
MQ-->>+Worker: 4. 消费任务 {jobId, sql}
Worker->>Worker: 5. 更新任务状态为 RUNNING
Worker->>+Trino: 6. 执行长时间的 Iceberg 查询
Trino-->>-Worker: 7. 返回查询结果
Worker->>+Cache: 8. 将结果分页存入 Redis (以 jobId 为 key)
Worker->>Worker: 9. 更新任务状态为 SUCCEEDED/FAILED
deactivate Worker
loop 轮询状态
UI->>+API: GET /api/v2/jobs/{jobId}/status
API-->>-UI: 返回 { status: 'RUNNING' }
end
UI->>+API: GET /api/v2/jobs/{jobId}/status
API-->>-UI: 返回 { status: 'SUCCEEDED' }
UI->>+API: GET /api/v2/jobs/{jobId}/results?page=1&limit=100
API->>+Cache: 从 Redis 读取分页结果
Cache-->>-API: 返回结果数据
API-->>-UI: 返回分页后的 JSON 数据
核心实现概览
我们将使用 Express.js 构建 API 服务,Redis 同时作为消息队列(使用 Streams)和结果缓存。Worker 可以是一个独立的 Node.js 进程。
1. Express.js API (Query Coordinator)
这个服务只负责接收任务、管理状态和提供结果,它本身不执行任何慢查询。
目录结构:
/iceberg-query-api
├── src/
│ ├── api/
│ │ └── jobs.js // Express 路由和控制器
│ ├── services/
│ │ ├── redisClient.js // Redis 连接和操作封装
│ │ └── jobManager.js // 任务管理逻辑
│ ├── utils/
│ │ └── logger.js // 日志配置
│ ├── app.js // Express 应用主文件
│ └── server.js // 服务器启动文件
├── workers/
│ └── queryExecutor.js // 查询执行 Worker
└── package.json
src/api/jobs.js - API 路由实现
// src/api/jobs.js
const express = require('express');
const { submitJob, getJobStatus, getJobResults } = require('../services/jobManager');
const logger = require('../utils/logger');
const router = express.Router();
// POST /api/v2/jobs - 提交新查询任务
router.post('/', async (req, res, next) => {
const { sqlQuery, user } = req.body;
if (!sqlQuery || typeof sqlQuery !== 'string') {
return res.status(400).json({ error: 'A non-empty "sqlQuery" string is required.' });
}
try {
// jobManager 负责处理任务提交的复杂性
const { jobId } = await submitJob({ sqlQuery, requestedBy: user || 'anonymous' });
logger.info(`Job submitted successfully with ID: ${jobId}`);
// 立即返回 202 Accepted,这是异步 API 的标准实践
res.status(202).json({
jobId,
statusUrl: `/api/v2/jobs/${jobId}/status`,
resultsUrl: `/api/v2/jobs/${jobId}/results`
});
} catch (error) {
next(error); // 传递给全局错误处理中间件
}
});
// GET /api/v2/jobs/:jobId/status - 获取任务状态
router.get('/:jobId/status', async (req, res, next) => {
try {
const { jobId } = req.params;
const status = await getJobStatus(jobId);
if (!status) {
return res.status(404).json({ error: 'Job not found.' });
}
res.status(200).json(status);
} catch (error) {
next(error);
}
});
// GET /api/v2/jobs/:jobId/results - 获取分页结果
router.get('/:jobId/results', async (req, res, next) => {
try {
const { jobId } = req.params;
const page = parseInt(req.query.page, 10) || 1;
const limit = parseInt(req.query.limit, 10) || 100;
// 在生产环境中,limit 需要有最大值限制
if (limit > 1000) {
return res.status(400).json({ error: 'Page limit cannot exceed 1000.' });
}
const results = await getJobResults(jobId, page, limit);
if (!results) {
return res.status(404).json({ error: 'Job not found or results have expired.' });
}
// 如果任务尚未完成,应提示用户
if (results.status !== 'SUCCEEDED') {
return res.status(200).json({
message: `Job is not yet complete. Current status: ${results.status}`,
status: results.status
});
}
res.status(200).json(results);
} catch (error) {
next(error);
}
});
module.exports = router;
src/services/jobManager.js - 任务管理服务
// src/services/jobManager.js
const { v4: uuidv4 } = require('uuid');
const redisClient = require('./redisClient');
const logger = require('../utils/logger');
const JOB_STREAM_KEY = 'iceberg_query_jobs'; // Redis Stream for jobs
const JOB_HASH_PREFIX = 'job:'; // Prefix for job status hashes
async function submitJob(jobData) {
const jobId = uuidv4();
const job = {
id: jobId,
sql: jobData.sqlQuery,
user: jobData.requestedBy,
status: 'PENDING',
submittedAt: new Date().toISOString(),
};
// 使用 MULTI 确保原子性
const multi = redisClient.multi();
// 1. 将任务状态存入 Hash
multi.hSet(`${JOB_HASH_PREFIX}${jobId}`, job);
// 2. 将任务推送到 Stream,供 worker 消费
multi.xAdd(JOB_STREAM_KEY, '*', { payload: JSON.stringify({ jobId, sql: job.sql }) });
await multi.exec();
logger.info({ message: 'Job details stored in Redis hash', jobId });
return { jobId };
}
async function getJobStatus(jobId) {
const jobStatus = await redisClient.hGetAll(`${JOB_HASH_PREFIX}${jobId}`);
if (Object.keys(jobStatus).length === 0) {
return null;
}
// 对数字类型进行转换
if (jobStatus.totalRows) jobStatus.totalRows = parseInt(jobStatus.totalRows, 10);
if (jobStatus.totalPages) jobStatus.totalPages = parseInt(jobStatus.totalPages, 10);
return jobStatus;
}
async function getJobResults(jobId, page, limit) {
const jobStatus = await getJobStatus(jobId);
if (!jobStatus) {
return null;
}
if (jobStatus.status !== 'SUCCEEDED') {
return { status: jobStatus.status };
}
const start = (page - 1) * limit;
const end = start + limit - 1;
// 结果存储在以 jobId 命名的 List 中
const resultRows = await redisClient.lRange(`results:${jobId}`, start, end);
const data = resultRows.map(row => JSON.parse(row));
return {
status: 'SUCCEEDED',
metadata: {
jobId,
totalRows: jobStatus.totalRows,
totalPages: jobStatus.totalPages,
currentPage: page,
pageSize: data.length
},
data
};
}
module.exports = { submitJob, getJobStatus, getJobResults };
2. Query Executor Worker
这是一个独立的后台进程,它才是真正与 Trino 交互的组件。
workers/queryExecutor.js
// workers/queryExecutor.js
const { createClient } = require('redis');
const { Trino } = require('trino-client'); // 假设的 Trino 客户端
const logger = require('../src/utils/logger');
const JOB_STREAM_KEY = 'iceberg_query_jobs';
const JOB_HASH_PREFIX = 'job:';
const RESULTS_LIST_PREFIX = 'results:';
const RESULTS_TTL = 3600; // 结果缓存1小时
const PAGE_SIZE = 1000; // Worker 将结果分片写入 Redis
// worker 也应该有自己的 Redis 客户端实例
const redisClient = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' });
const trinoClient = Trino.create({ /* ... Trino config ... */ });
async function updateJobStatus(jobId, updates) {
await redisClient.hSet(`${JOB_HASH_PREFIX}${jobId}`, updates);
logger.info({ message: 'Job status updated', jobId, ...updates });
}
async function processJob(jobId, sql) {
try {
await updateJobStatus(jobId, { status: 'RUNNING', startedAt: new Date().toISOString() });
// 关键执行步骤
const queryIterator = await trinoClient.queryIterator(sql);
let rowCount = 0;
let pageBuffer = [];
const resultsKey = `${RESULTS_LIST_PREFIX}${jobId}`;
for await (const row of queryIterator) {
rowCount++;
pageBuffer.push(JSON.stringify(row));
// 每 PAGE_SIZE 行写入一次 Redis,避免内存溢出
if (pageBuffer.length >= PAGE_SIZE) {
await redisClient.rPush(resultsKey, pageBuffer);
pageBuffer = [];
logger.debug(`Wrote page to Redis for job ${jobId}`);
}
}
// 写入最后一页的剩余数据
if (pageBuffer.length > 0) {
await redisClient.rPush(resultsKey, pageBuffer);
}
// 设置结果的过期时间
await redisClient.expire(resultsKey, RESULTS_TTL);
await updateJobStatus(jobId, {
status: 'SUCCEEDED',
finishedAt: new Date().toISOString(),
totalRows: rowCount,
totalPages: Math.ceil(rowCount / 100) // 这里的 100 是 API 的默认 limit
});
} catch (error) {
logger.error({ message: 'Job failed during execution', jobId, error: error.message, stack: error.stack });
await updateJobStatus(jobId, {
status: 'FAILED',
finishedAt: new Date().toISOString(),
error: error.message.substring(0, 1000) // 限制错误信息长度
});
}
}
async function main() {
await redisClient.connect();
logger.info('Query executor worker started, waiting for jobs...');
// 使用阻塞式读取 (XREAD) 从 Stream 中消费任务
// 使用消费者组可以实现更好的容错和扩展性,这里为简化使用单个消费者
let lastId = '0-0'; // 从头开始消费
while (true) {
try {
const response = await redisClient.xRead(
{ key: JOB_STREAM_KEY, id: lastId },
{ BLOCK: 5000, COUNT: 1 } // 阻塞5秒
);
if (response) {
const [stream] = response;
const [message] = stream.messages;
lastId = message.id;
const { jobId, sql } = JSON.parse(message.message.payload);
logger.info({ message: 'Received new job', jobId });
// 不使用 await,让 job 在后台执行,以便 worker 可以立即拉取下一个
processJob(jobId, sql);
}
} catch (err) {
logger.error(`Error reading from stream: ${err.message}`);
await new Promise(resolve => setTimeout(resolve, 5000)); // 发生错误时等待后重试
}
}
}
main().catch(err => logger.error({ message: 'Worker crashed', error: err.message, stack: err.stack }));
3. 前端 UI 组件库的消费策略
前端的交互逻辑同样需要是异步的。一个自定义 React Hook 是封装这种复杂性的好方法。
// hooks/useIcebergQuery.js (示例)
import { useState, useEffect, useCallback } from 'react';
const API_BASE_URL = '/api/v2';
export function useIcebergQuery() {
const [jobId, setJobId] = useState(null);
const [status, setStatus] = useState('IDLE'); // IDLE, SUBMITTED, RUNNING, SUCCEEDED, FAILED
const [error, setError] = useState(null);
const [results, setResults] = useState(null);
const [progress, setProgress] = useState(null); // 可以从 status API 获取更详细的进度
const pollStatus = useCallback(async (currentJobId) => {
try {
const response = await fetch(`${API_BASE_URL}/jobs/${currentJobId}/status`);
if (!response.ok) throw new Error('Failed to fetch status');
const data = await response.json();
setStatus(data.status);
if (data.status === 'SUCCEEDED' || data.status === 'FAILED') {
if (data.status === 'FAILED') setError(data.error);
return; // 停止轮询
}
// 继续轮询
setTimeout(() => pollStatus(currentJobId), 3000); // 轮询间隔可以动态调整
} catch (err) {
setError(err.message);
setStatus('FAILED');
}
}, []);
const executeQuery = useCallback(async (sqlQuery) => {
setJobId(null);
setStatus('SUBMITTING');
setError(null);
setResults(null);
try {
const response = await fetch(`${API_BASE_URL}/jobs`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ sqlQuery }),
});
if (response.status !== 202) throw new Error('Failed to submit job');
const { jobId: newJobId } = await response.json();
setJobId(newJobId);
setStatus('PENDING');
// 开始轮询
pollStatus(newJobId);
} catch (err) {
setError(err.message);
setStatus('FAILED');
}
}, [pollStatus]);
const fetchResults = useCallback(async (page = 1, limit = 100) => {
if (!jobId || status !== 'SUCCEEDED') return;
try {
const response = await fetch(`${API_BASE_URL}/jobs/${jobId}/results?page=${page}&limit=${limit}`);
if (!response.ok) throw new Error('Failed to fetch results');
const data = await response.json();
setResults(data);
} catch (err) {
setError(err.message);
setStatus('FAILED');
}
}, [jobId, status]);
return { executeQuery, status, error, results, jobId, fetchResults };
}
一个UI组件(例如,一个复杂的数据表格)可以使用这个 Hook 来管理整个数据加载生命周期。当用户点击“查询”时,调用 executeQuery。UI 根据 status 变量显示加载指示器或进度条。当 status 变为 SUCCEEDED 时,调用 fetchResults 来获取第一页数据。表格组件的翻页、排序等操作都将调用 fetchResults 获取新数据。对于海量结果的渲染,必须配合虚拟列表(Virtual Scrolling)技术,否则 DOM 元素过多会直接导致浏览器崩溃。
架构的扩展性与局限性
这个架构解决了核心的同步阻塞问题,具备良好的水平扩展能力:可以通过增加 API 实例来应对更高的请求提交并发,通过增加 Query Worker 实例来提高查询处理的吞吐量。
然而,它并非没有局限性:
- 轮询的开销: 当并发查询量巨大时,客户端的轮询会对 API 服务器造成不小的压力。一个更优的方案是使用 WebSocket 或 Server-Sent Events (SSE) 来实现服务器向客户端的状态推送。
- 结果缓存的限制: Redis 作为结果缓存,不适合存储 GigaBytes 级别的巨大结果集。对于超过百万行级别的查询,Worker 应将结果写入 S3 或其他对象存储,并在 Redis 中仅存储元数据和 S3 的预签名 URL。API 则将该 URL 返回给客户端。
- 查询治理的缺失: 当前架构缺少对查询的治理。在真实项目中,必须增加机制来限制单个用户可提交的查询并发数、查询的复杂度(例如,禁止
SELECT *不带LIMIT)、以及查询消耗的计算资源,以防止恶意或低效的查询拖垮整个 Trino 集群。 - 容错与重试: Worker 进程可能崩溃。使用 Redis Stream 的消费者组(Consumer Groups)可以确保即使一个 Worker 失败,其正在处理的任务也能被另一个 Worker 接管,实现至少一次(at-least-once)的处理语义。
这个异步任务架构虽然比简单的同步代理复杂得多,但它是唯一能在生产环境中稳定支持 UI 与大数据后台进行交互的务实选择。它将前端的即时性需求与后端的批处理特性通过解耦和状态管理有效地粘合在一起。