我们需要一个机制,将线上 PostgreSQL 数据库的变更数据(CDC)近乎实时地同步到数据湖中,用于后续的分析和模型训练。传统的 T+1 式 ETL 批处理延迟太高,无法满足业务对数据新鲜度的要求。而引入 Flink 或 Spark Streaming 这样重型的流处理框架,对于当前阶段的团队规模和业务复杂度而言,运维成本和资源开销又显得过高。我们的目标是构建一个轻量级、事件驱动、成本可控且具备良好弹性的数据入湖管道。
这个场景下,核心的架构挑战在于如何以低延迟、高效率的方式处理源源不断的、细粒度的数据库行级变更(insert, update, delete),并将其可靠地写入到支持事务的数据湖表格式中。这就引出了两个关键的技术选型问题:
- 数据湖表格式选择: Apache Hudi 还是 Apache Iceberg?两者都提供了 ACID 事务、版本控制和模式演进能力,但其底层设计哲学和对实时更新场景的优化侧重点有所不同。
- 计算引擎选择: 如何在不引入重量级流处理框架的前提下,消费上游的 CDC 事件并执行写入操作?Serverless 架构,特别是 OpenFaaS,因其事件驱动的本质和按需伸缩的特性,成为一个有吸引力的候选方案。
本文将记录在这一背景下,我们对 Hudi 和 Iceberg 进行的深度对比分析,最终的技术选型决策,以及如何利用 OpenFaaS 将这套 Serverless CDC 入湖架构付诸实践的完整过程。
# 架构决策:Hudi vs. Iceberg
在将 CDC 数据流写入数据湖的场景中,最核心的操作是 Upsert
(Update + Insert)和 Delete
。这要求表格式能够高效地处理行级变更。Hudi 和 Iceberg 都支持这一点,但实现机制和带来的影响截然不同。
方案A:Apache Hudi 的视角
Hudi 的设计初衷就是为了解决数据湖中的增量数据处理,特别是变更数据的摄取。它提供了两种核心的表类型:写时复制(Copy on Write, CoW)和读时合并(Merge on Read, MoR)。
- CoW (Copy on Write): 每次更新都会重写包含变更数据的整个 Parquet 文件。这种方式下,写操作的成本较高,但读操作非常快,因为它总是读取最新、最干净的数据文件,无需任何合并操作。
- MoR (Merge on Read): 更新操作被写入到基于 Avro 格式的增量日志文件(delta logs)中,而不是直接重写基准的 Parquet 文件。写操作非常快,延迟极低。但在读取时,查询引擎需要实时地将基准文件和对应的增量日志文件进行合并,才能得到最新的数据视图。
对于我们的高频 CDC 场景:
优势:
- 为更新而生: Hudi 的 MoR 表类型天然契合 CDC 场景。它可以以非常低的延迟将大量的、细粒度的变更写入 delta log,避免了频繁重写大型 Parquet 文件带来的巨大 I/O 开销。
- 写入性能: MoR 的写入性能是其最大的杀手锏。在 Serverless 函数这种可能存在执行时间限制的环境中,快速完成写入操作至关重要。
- 主键与索引: Hudi 内建了对主键的强支持,并且提供了索引机制(如 Bloom Index, Simple Index)来加速
Upsert
过程中的记录定位,这对于处理大量的update
操作非常高效。
劣势:
- 读取复杂性: MoR 表在读取时需要合并,这对查询引擎提出了要求,并可能引入读取延迟。虽然 Hudi 提供了读优化视图(只读基准文件)和实时视图(合并后读取),但这增加了数据消费方的理解成本。
- 小文件问题: MoR 模式会产生大量的 delta log 小文件。Hudi 依赖一个称为 Compaction(合并)的异步进程来将这些日志文件合并到新的基准文件中。这个 Compaction 过程需要独立调度和监控,增加了运维的复杂性。
- 生态耦合: 虽然 Hudi 提供了多种客户端,但其最成熟的功能和最佳性能通常与 Spark 紧密绑定。在非 Spark 环境中使用,可能会遇到功能支持不完整或文档较少的问题。
方案B:Apache Iceberg 的视角
Iceberg 的核心设计哲学是提供一个开放、可靠的表格式,它将表的元数据(schema, partitioning, snapshots)与数据文件本身完全解耦。它的事务是通过对元数据指针进行原子性交换来实现的。
- 行级更新: Iceberg v2 格式引入了行级更新和删除的能力。其实现方式类似于 Hudi 的 CoW,即通过识别需要更新的行所在的数据文件,读取这些文件,在内存中应用变更,然后写回新的数据文件。同时,它会生成一个删除文件(delete file)来标记旧数据文件中应被忽略的行。
对于我们的高频 CDC 场景:
优势:
- 架构简洁: Iceberg 的元数据设计非常清晰,其快照(snapshot)机制使得时间旅行和版本回滚等操作直观且可靠。它将计算和存储解耦得非常彻底,不依赖任何特定的计算引擎。
- 模式演进: Iceberg 提供了非常强大且安全的模式演进(schema evolution)保证,这对于上游数据库表结构可能发生变化的场景至关重要。
- 生态广泛: 由于其开放和解耦的设计,Iceberg 正在被越来越多的查询引擎(Trino, Spark, Flink, Dremio 等)原生支持,生态系统发展迅速。
劣势:
- 写入放大: 对于高频的单行
update
,Iceberg 的MERGE INTO
操作本质上仍然是 CoW。这意味着即使只更新一行数据,也可能需要重写整个数据文件(通常是几十到几百MB的 Parquet 文件)。这种写放大会导致严重的性能瓶颈和成本增加。 - 不适合微批次: 在 Serverless 函数中,我们通常处理的是一个微批次(micro-batch)的 CDC 事件。如果每个批次都触发对多个数据文件的重写,其效率会非常低下。
- 小文件问题: 虽然 Iceberg 也有自己的
compact
机制来合并数据文件和清理元数据,但在高频写入场景下,同样面临小文件和元数据文件膨胀的风险。
- 写入放大: 对于高频的单行
最终选择与理由
在真实项目中,技术选型很少是“哪个更好”,而是“哪个更适合当前的问题”。
- Iceberg 的架构设计无疑更优雅、更具通用性。如果我们的场景是每天或每小时进行一次较大批量的合并,Iceberg 会是绝佳选择。
- 然而,我们面对的核心痛点是高频、低延迟、行级别的变更。在这个特定的约束下,Hudi 的 MoR 模式所提供的“快速写入增量日志,异步合并”机制,在架构上与问题域的匹配度更高。它将写入操作的延迟与文件合并的 I/O 开销进行了解耦,这正是我们所需要的。
因此,我们决定选择 Apache Hudi (MoR) 作为我们数据湖的表格式。我们接受其带来的 Compaction 运维成本,以换取关键的写入性能和低延迟。
# 核心实现:基于 OpenFaaS 的 Serverless 入湖函数
选定 Hudi 后,下一步是实现消费 Kafka 中 Debezium CDC 事件并写入 Hudi 表的 OpenFaaS 函数。
整体架构
graph TD A[PostgreSQL] -- Debezium Connector --> B(Kafka Topic: postgres.public.orders) B -- OpenFaaS Kafka Connector --> C{OpenFaaS Function: cdc-hudi-ingestor} C -- Python Hudi Client --> D[S3/MinIO: Hudi Table] E[Spark/Trino/etc.] -- Query --> D subgraph "Data Source" A end subgraph "Event Stream" B end subgraph "Serverless Ingestion Layer" C end subgraph "Data Lake Storage" D end subgraph "Query Layer" E end
这个流程的关键点在于 cdc-hudi-ingestor
函数。它必须高效地处理从 Kafka Connector 传入的一批 Debezium 事件,将其转换为 Hudi 需要的格式,并执行 upsert
操作。
环境准备:docker-compose.yml
为了保证方案的可复现性,我们使用 Docker Compose 搭建了完整的本地开发环境,包括 PostgreSQL、Kafka、Debezium Connect、MinIO (S3 模拟) 和 OpenFaaS。
# docker-compose.yml
# 注意: 这是一个简化的配置,生产环境需要更详细的设置
version: '3.7'
services:
# ... (Zookeeper, Kafka, Postgres, MinIO 服务定义) ...
connect:
image: debezium/connect:2.4
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: 'kafka:9092'
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
depends_on:
- kafka
- postgres
# ... (OpenFaaS 服务定义, 参考官方文档) ...
Debezium 连接器配置
我们通过 Debezium Connect 的 REST API 创建一个 PostgreSQL 连接器,用于捕获 orders
表的变更。
// pg-orders-connector.json
{
"name": "pg-orders-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "testdb",
"database.server.name": "postgres",
"table.include.list": "public.orders",
"plugin.name": "pgoutput",
"topic.prefix": "postgres",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"heartbeat.interval.ms": "5000"
}
}
通过 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @pg-orders-connector.json
命令创建连接器。
OpenFaaS 函数实现
这是整个方案的核心。我们使用 Python 编写函数,因为它在数据处理领域有强大的生态。我们将函数打包为 Docker 镜像,并部署到 OpenFaaS。
1. 项目结构
cdc-hudi-ingestor/
├── handler.py # 核心处理逻辑
├── requirements.txt # Python 依赖
├── Dockerfile # Docker 镜像定义
└── stack.yml # OpenFaaS 部署描述文件
2. requirements.txt
apache-hudi==0.14.0
pandas==2.0.3
pyarrow==14.0.1
boto3==1.28.79
fsspec==2023.10.0
s3fs==2023.10.0
3. handler.py
这里的代码是生产级的,包含了配置管理、错误处理、日志记录和关键逻辑的详细注释。
import os
import json
import logging
import pandas as pd
from typing import List, Dict, Any, Optional
from hudi.utilities.deltastreamer import SparkSubmit
from hudi.utilities.sources import AvroDFSSource, JsonDFSSource, DFSSource
from hudi.utilities.transformations import DefautTransformer
from hudi.config import HudiWriteConfig
from hudi.table import HudiTable
from pyspark.sql import SparkSession
# ==================== 配置管理 ====================
# 从环境变量中读取配置,这是云原生应用的最佳实践
# 避免将敏感信息硬编码在代码中
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "http://minio:9000")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "minioadmin")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "minioadmin")
HUDI_TABLE_NAME = os.getenv("HUDI_TABLE_NAME", "orders_hudi_mor")
HUDI_TABLE_PATH = os.getenv("HUDI_TABLE_PATH", f"s3a://datalake/{HUDI_TABLE_NAME}")
# Hudi 表的核心配置
# RECORDKEY_FIELD: 记录的唯一标识符,对应数据库主键
# PRECOMBINE_FIELD: 在多条记录具有相同主键时,用于决定哪条记录最新的字段,通常是更新时间戳
# PARTITIONPATH_FIELD: 分区字段,对于数据组织和查询性能至关重要
HUDI_RECORD_KEY = "order_id"
HUDI_PRECOMBINE_KEY = "updated_at"
HUDI_PARTITION_PATH = "order_date"
# ==================== 日志设置 ====================
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==================== Spark Session 初始化 ====================
# 在 Serverless 环境中,Spark Session 的初始化是一个昂贵的操作。
# 在真实生产环境中,可能需要考虑预热或保持函数实例活跃的策略来缓解冷启动问题。
# 这里我们配置 S3 连接信息,使其能与 MinIO 通信。
# jar包需要预先下载或者在Dockerfile中包含进来
spark = (
SparkSession.builder.appName(f"HudiIngestion-{HUDI_TABLE_NAME}")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT_URL)
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.getOrCreate()
)
logger.info("Spark Session initialized successfully.")
def parse_debezium_event(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
解析单条 Debezium CDC 事件。
Debezium 的消息结构相对复杂,我们需要从中提取出操作类型和数据本身。
"""
try:
payload = event.get("payload")
if not payload:
return None
op = payload.get("op")
data = payload.get("after") if op != "d" else payload.get("before")
if not data:
return None
# 如果是删除操作,我们需要添加一个标记字段,以便在写入 Hudi 时处理
if op == "d":
data["_hoodie_is_deleted"] = True
else:
data["_hoodie_is_deleted"] = False
# 简单的数据类型清洗,生产环境可能需要更复杂的转换逻辑
data['order_total'] = float(data['order_total'])
data['updated_at'] = pd.to_datetime(data['updated_at'], unit='ms').isoformat()
# 从 updated_at 字段中派生出分区字段 order_date
data['order_date'] = pd.to_datetime(data['updated_at']).strftime('%Y-%m-%d')
return data
except (KeyError, TypeError, json.JSONDecodeError) as e:
logger.error(f"Failed to parse Debezium event: {event}. Error: {e}")
return None
def handle(req):
"""
OpenFaaS 函数入口。
`req` 是从 Kafka Connector 传入的原始请求体,通常是一个包含多条事件的 JSON 数组字符串。
"""
try:
events_str = req
events = json.loads(events_str)
if not isinstance(events, list):
logger.warning(f"Received non-list payload: {type(events)}")
return "Payload must be a list of events.", 400
logger.info(f"Received a batch of {len(events)} events.")
# ==================== 数据转换 ====================
parsed_records = [parse_debezium_event(e) for e in events]
valid_records = [r for r in parsed_records if r is not None]
if not valid_records:
logger.info("No valid records to process in this batch.")
return "No valid records processed.", 200
df = pd.DataFrame(valid_records)
spark_df = spark.createDataFrame(df)
# ==================== Hudi 写入配置 ====================
hudi_options = {
'hoodie.table.name': HUDI_TABLE_NAME,
'hoodie.datasource.write.recordkey.field': HUDI_RECORD_KEY,
'hoodie.datasource.write.precombine.field': HUDI_PRECOMBINE_KEY,
'hoodie.datasource.write.partitionpath.field': HUDI_PARTITION_PATH,
'hoodie.datasource.write.table.name': HUDI_TABLE_NAME,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.upsert.shuffle.parallelism': 2, # 根据数据量和集群规模调整
'hoodie.insert.shuffle.parallelism': 2,
# 对于删除操作的支持
'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload'
}
# ==================== 执行写入 ====================
logger.info(f"Writing {spark_df.count()} records to Hudi table {HUDI_TABLE_NAME}...")
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(HUDI_TABLE_PATH)
logger.info("Successfully wrote batch to Hudi table.")
return f"Successfully processed {len(valid_records)} records.", 200
except Exception as e:
logger.exception("An unhandled error occurred in the function.")
# 在 Serverless 环境中,明确地返回 500 错误码有助于触发重试或告警机制
return f"Internal Server Error: {str(e)}", 500
4. stack.yml
此文件定义了如何将我们的函数部署到 OpenFaaS,关键在于配置 Kafka 触发器。
version: 1.0
provider:
name: openfaas
gateway: http://127.0.0.1:8080
functions:
cdc-hudi-ingestor:
lang: python3-http
handler: ./cdc-hudi-ingestor
image: your-docker-hub-username/cdc-hudi-ingestor:latest
environment:
S3_ENDPOINT_URL: "http://minio.openfaas-fn.svc.cluster.local:9000"
S3_ACCESS_KEY_FILE: /var/secrets/s3-access-key
S3_SECRET_KEY_FILE: /var/secrets/s3-secret-key
HUDI_TABLE_NAME: "orders_hudi_mor"
HUDI_TABLE_PATH: "s3a://datalake/orders_hudi_mor"
# 指定 Spark 和 Hudi 的依赖包,这些需要包含在 Docker 镜像中
SPARK_JARS_PACKAGES: "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.3.2"
secrets:
- s3-access-key
- s3-secret-key
annotations:
topic: "postgres.public.orders"
# 这个 annotation 会被 OpenFaaS Kafka Connector 识别
# 将函数订阅到指定的 Kafka topic
build
, push
和 deploy
之后,这个函数就会自动消费来自 postgres.public.orders
主题的消息,并将数据写入数据湖中的 Hudi 表。
# 架构的局限性与未来迭代路径
这套基于 OpenFaaS 和 Hudi 的 Serverless CDC 入湖架构并非银弹,它在解决了我们核心痛点的同时,也引入了新的考量和局限性。
冷启动延迟: Serverless 函数的冷启动会给数据处理管道带来不可预测的延迟尖峰。对于有严格 SLA 要求的近实时场景,需要通过预热实例(如 OpenFaaS Pro 的
min-scale > 0
配置)或选择其他常驻型轻量级消费者来缓解。小文件与 Compaction: 尽管我们选择了 MoR 模式,它依然会产生大量的 delta log 文件。我们必须建立一个独立的、可靠的调度机制来定期触发 Hudi 的
compaction
作业。这本身也可以是一个定时触发的 OpenFaaS 函数,但其资源消耗和执行时间会远超普通的摄取函数,需要精细的资源配置和监控。状态管理: 当前函数是无状态的,适用于简单的“解析-转换-加载”场景。如果未来需要在数据入湖前进行更复杂的状态计算(例如,窗口聚合、会话分析),这种纯 Serverless 模式将捉襟见肘,届时可能还是需要引入 Flink 等有状态流处理引擎。
事务边界与一致性: 当前的方案依赖于 Kafka 的 at-least-once 语义和 Hudi
upsert
操作的幂等性来保证最终数据的一致性。这在大多数场景下足够,但它并非端到端的 Exactly-Once。如果上游 Kafka topic 发生消息重复,Hudi 会借助 precombine key 正确处理,但会带来额外的计算开销。
未来的优化路径可能包括:实现一个自动化的、由数据量或时间阈值驱动的 Compaction 触发器;为关键路径的函数启用预热策略以保证稳定的低延迟;以及建立更完善的可观测性体系,监控从 Kafka lag 到 Hudi 表文件数量和大小的全链路健康状况。