我们面临的第一个硬性约束,是必须在不中断核心业务的前提下,将一个承载了多年交易数据的 Oracle 11g 数据库逐步迁移至云服务商提供的 MariaDB 集群上。直接的“停机迁移”方案在项目启动的第一天就被否决了。业务方要求新旧系统并行运行至少一个季度,新应用在云端 MariaDB 上开发,老应用继续读写 Oracle,而两者之间的数据必须保持毫秒级的最终一致性。
最初的构想是应用层双写,但这很快被证明是条死路。改造线上稳定运行多年的核心交易代码,风险和成本都无法估量。况且,这无法解决历史存量数据的同步问题。另一个方案是传统的夜间ETL批处理,但这完全无法满足“实时”这个核心需求。最终,我们把目光投向了变更数据捕获(Change Data Capture, CDC)。
技术选型决策的核心是三个组件的组合:Debezium 作为 CDC 源连接器,Apache Kafka 作为高可靠的事件流总线,以及一个定制化的消费者服务将数据变更应用到目标 MariaDB。选择 Debezium 的原因在于其对 Oracle LogMiner 的成熟支持和强大的社区。Kafka 则天然是云原生架构的解耦利器,云服务商提供的托管 Kafka 服务能极大地降低我们的运维负担。目标端之所以不选择现成的 JDBC Sink Connector,是因为我们需要在消费端加入更复杂的业务逻辑,比如数据清洗、格式转换、以及对特定 DDL 变更的告警处理,这些用一个定制化的服务实现起来更灵活、可控。
graph TD subgraph On-Premise Data Center A[Oracle 11g Database] -- Redo Logs --> B{Debezium Oracle Connector} end subgraph Cloud Provider VPC B -- Kafka Connect API --> C[Kafka Connect Cluster on K8s] C -- Produces CDC Events --> D[Apache Kafka Topic: oracle.cdc.customer_events] E[Custom MariaDB Consumer Service on K8s] -- Consumes Events --> D E -- Applies Changes (INSERT/UPDATE/DELETE) --> F[MariaDB Cluster] end G[Monitoring & Alerting] -- Observes --> C G -- Observes --> E G -- Checks Lag --> D style A fill:#f9f,stroke:#333,stroke-width:2px style F fill:#9f9,stroke:#333,stroke-width:2px
第一步:Oracle 源端配置
这是整个管道的起点,也是最容易出问题的地方。Debezium 对 Oracle 的 CDC 依赖于 LogMiner,这要求数据库开启归档日志模式(Archive Log Mode)和补充日志(Supplemental Logging)。
首先,确认归档日志模式已开启。
-- 以 sysdba 身份连接
sqlplus / as sysdba
-- 查询归档模式
ARCHIVE LOG LIST;
-- 如果结果显示 "Database log mode" 不是 "Archive Mode"
-- 则需要执行以下操作(这需要短暂的数据库重启)
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
接下来,为要捕获的表启用补充日志。这是为了确保即使在 UPDATE
操作中没有修改主键列,Redo Log 中也包含足够的主键信息来定位行。
-- 为整个数据库启用最小补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 为特定表启用主键级别的补充日志
-- 假设我们要同步的表是 ORDERS
ALTER TABLE sales.ORDERS ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
-- 如果表没有主键,但有唯一索引,可以基于唯一索引
ALTER TABLE sales.INVENTORY ADD SUPPLEMENTAL LOG DATA (UNIQUE) COLUMNS;
最后,创建一个专用于 Debezium 的用户,并授予必要的权限。在生产环境中,权限必须遵循最小化原则。
-- 创建用户
CREATE USER debezium IDENTIFIED BY "dbz_password";
-- 授予基本连接权限
GRANT CREATE SESSION TO debezium;
-- 授予访问数据字典和日志的权限
GRANT SELECT ON V_$DATABASE TO debezium;
GRANT SELECT ON V_$ARCHIVED_LOG TO debezium;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO debezium;
GRANT EXECUTE ON DBMS_LOGMNR TO debezium;
GRANT EXECUTE ON DBMS_LOGMNR_D TO debezium;
-- 允许用户在数据库范围内选择任何表
-- 在更严格的环境中,应逐一授予特定表的 SELECT 权限
GRANT SELECT ANY TABLE TO debezium;
-- 允许连接器锁定它正在捕获的表,以防止在快照期间发生 DDL
GRANT LOCK ANY TABLE to debezium;
这里的坑在于 SELECT ANY TABLE
权限。虽然方便,但在安全审计严格的环境中通常不被允许。替代方案是为 debezium
用户需要捕获的每一张表精确授权 GRANT SELECT ON schema.table_name TO debezium;
。
第二步:部署与配置 Debezium Oracle 连接器
我们在云服务商的 Kubernetes 集群上部署了一个 Kafka Connect 集群。Debezium 连接器作为插件运行在其中。核心工作是编写连接器的 JSON 配置文件。
这是一个生产级的配置示例,通过 Kafka Connect 的 REST API 创建或更新连接器。
{
"name": "oracle-connector-prod-01",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle.prod.internal",
"database.port": "1521",
"database.user": "debezium",
"database.password": "dbz_password",
"database.dbname": "ORCLPDB1",
"database.server.name": "oracle-prod-server",
"database.history.kafka.bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
"database.history.kafka.topic": "dbz-schema-history.oracle-prod-server",
"database.history.kafka.recovery.poll.interval.ms": "5000",
"table.include.list": "SALES.ORDERS,SALES.CUSTOMERS,INVENTORY.PRODUCTS",
"column.exclude.list": "SALES.CUSTOMERS.CREDIT_CARD_INFO",
"snapshot.mode": "initial",
"log.mining.strategy": "online_catalog",
"log.mining.archive.log.only.mode": "false",
"tombstones.on.delete": "false",
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive_time_microseconds",
"provide.transaction.metadata": "true",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
关键配置项解析:
-
database.server.name
: 这是一个逻辑名称,将作为 Kafka topic 的前缀,例如oracle-prod-server.SALES.ORDERS
。必须全局唯一。 -
database.history.kafka.topic
: Debezium 使用这个 topic 来存储 DDL 变更历史。这是保证 schema 演进后连接器能正确重启的关键。这个 topic 必须是单分区的。 -
table.include.list
: 白名单模式,明确指定要捕获的表。在生产环境中,永远不要使用黑名单模式。 -
column.exclude.list
: 可以排除敏感字段,例如信用卡信息。数据在源头就被过滤,不会进入 Kafka。 -
snapshot.mode
: 设置为initial
。连接器首次启动时,会对include.list
中的表进行一次全量快照,然后无缝切换到增量日志捕获。对于已经存在的庞大表,这是一个必须的步骤。 -
decimal.handling.mode
: 设置为string
。Oracle 的NUMBER
类型精度非常高,直接转换为double
或float
可能会丢失精度。转换为字符串是最安全的做法,由消费端根据目标列的精度要求进行处理。 -
tombstones.on.delete
: 设置为false
。默认情况下,DELETE
操作会产生一个 value 为null
的 tombstone 消息。我们的消费逻辑直接处理DELETE
事件,不需要这个机制。 -
key.converter.schemas.enable
&value.converter.schemas.enable
: 设置为false
。我们选择使用无 schema 的 JSON 格式,简化消费端的处理。如果需要严格的 schema 校验,可以配合 Schema Registry 使用 Avro 格式。
第三步:构建健壮的 MariaDB 消费端服务
这是我们定制化工作的核心。我们使用 Python 和 kafka-python
库编写了一个消费者服务,部署为 Kubernetes Deployment。
# consumer_service.py
import json
import logging
import os
import signal
import sys
from typing import Dict, Any
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import NoBrokersAvailable
import mariadb
# --- 日志配置 ---
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MariaDBConsumer')
# --- 数据库连接配置 ---
DB_CONFIG = {
'host': os.environ.get('DB_HOST', 'mariadb-host'),
'port': int(os.environ.get('DB_PORT', 3306)),
'user': os.environ.get('DB_USER', 'sync_user'),
'password': os.environ.get('DB_PASSWORD'),
'database': os.environ.get('DB_NAME', 'synced_db'),
'autocommit': False # 手动控制事务
}
# --- Kafka 配置 ---
KAFKA_BROKERS = os.environ.get('KAFKA_BROKERS', 'kafka:9092').split(',')
KAFKA_TOPICS = os.environ.get('KAFKA_TOPICS', '').split(',')
CONSUMER_GROUP_ID = os.environ.get('CONSUMER_GROUP_ID', 'mariadb-sync-group')
class GracefulKiller:
"""优雅地处理SIGTERM和SIGINT信号"""
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, signum, frame):
self.kill_now = True
logger.info("Termination signal received. Shutting down gracefully...")
class DebeziumEventProcessor:
"""处理Debezium CDC事件的核心逻辑"""
def __init__(self, db_conn):
self.db_conn = db_conn
self.cursor = db_conn.cursor()
def process(self, msg: Dict[str, Any]):
try:
payload = msg.get('payload', {})
if not payload:
logger.warning(f"Message without payload: {msg}")
return
op = payload.get('op') # 'c' for create, 'u' for update, 'd' for delete, 'r' for read (snapshot)
source = payload.get('source', {})
table_name = source.get('table').lower()
# 真实项目中,这里可能需要一个表名/列名的映射规则
target_table = f"`{table_name}`"
if op in ['c', 'r']:
self._handle_insert(target_table, payload['after'])
elif op == 'u':
self._handle_update(target_table, payload['before'], payload['after'])
elif op == 'd':
self._handle_delete(target_table, payload['before'])
else:
logger.warning(f"Unsupported operation '{op}'")
# 所有操作成功后提交事务
self.db_conn.commit()
logger.info(f"Successfully processed op '{op}' for table {target_table}")
except Exception as e:
logger.error(f"Error processing message: {msg}. Error: {e}", exc_info=True)
# 发生错误,回滚事务
self.db_conn.rollback()
# 抛出异常,让消费者循环决定是否重试或跳过
raise
def _get_pk_columns(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
一个简化的主键提取逻辑。
生产环境中,应该从外部配置或数据库元数据中获取主键信息。
这里假设主键列名为'ID'。
"""
if 'ID' not in data:
raise ValueError(f"Primary key 'ID' not found in data: {data}")
return {'id': data['ID']}
def _handle_insert(self, table: str, after: Dict[str, Any]):
# 注意:这里的列名可能需要大小写转换或映射
cols = ', '.join([f"`{k.lower()}`" for k in after.keys()])
placeholders = ', '.join(['?'] * len(after))
sql = f"INSERT INTO {table} ({cols}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE id=id"
# 转换为MariaDB期望的数据类型
values = tuple(self._type_converter(v) for v in after.values())
self.cursor.execute(sql, values)
logger.debug(f"INSERT executed: {self.cursor.statement}")
def _handle_update(self, table: str, before: Dict[str, Any], after: Dict[str, Any]):
pk_where_clause = self._get_pk_columns(before)
set_clauses = []
values = []
for k, v in after.items():
# 仅当值发生变化时才更新
if k in before and before[k] != v:
set_clauses.append(f"`{k.lower()}` = ?")
values.append(self._type_converter(v))
if not set_clauses:
logger.info(f"Update event with no changed values for PK {pk_where_clause}. Skipping.")
return
set_sql = ", ".join(set_clauses)
where_sql = " AND ".join([f"`{k}` = ?" for k in pk_where_clause.keys()])
sql = f"UPDATE {table} SET {set_sql} WHERE {where_sql}"
final_values = tuple(values) + tuple(pk_where_clause.values())
self.cursor.execute(sql, final_values)
if self.cursor.rowcount == 0:
logger.warning(f"UPDATE affected 0 rows for PK {pk_where_clause}. Data might be inconsistent.")
logger.debug(f"UPDATE executed: {self.cursor.statement}")
def _handle_delete(self, table: str, before: Dict[str, Any]):
pk_where_clause = self._get_pk_columns(before)
where_sql = " AND ".join([f"`{k}` = ?" for k in pk_where_clause.keys()])
sql = f"DELETE FROM {table} WHERE {where_sql}"
values = tuple(pk_where_clause.values())
self.cursor.execute(sql, values)
if self.cursor.rowcount == 0:
logger.warning(f"DELETE affected 0 rows for PK {pk_where_clause}. The row might have been already deleted.")
logger.debug(f"DELETE executed: {self.cursor.statement}")
def _type_converter(self, value):
# 可以在这里处理从Oracle到MariaDB的特定类型转换
# 例如,Debezium的时间戳可能是微秒级的,需要适配MariaDB
return value
def main():
if not KAFKA_TOPICS or not KAFKA_TOPICS[0]:
logger.error("KAFKA_TOPICS environment variable is not set.")
sys.exit(1)
killer = GracefulKiller()
db_connection = None
consumer = None
try:
logger.info(f"Connecting to MariaDB at {DB_CONFIG['host']}...")
db_connection = mariadb.connect(**DB_CONFIG)
logger.info("MariaDB connection successful.")
processor = DebeziumEventProcessor(db_connection)
logger.info(f"Connecting to Kafka brokers at {KAFKA_BROKERS}...")
consumer = KafkaConsumer(
*KAFKA_TOPICS,
bootstrap_servers=KAFKA_BROKERS,
group_id=CONSUMER_GROUP_ID,
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
enable_auto_commit=False # 手动控制offset提交
)
logger.info("Kafka consumer connected.")
while not killer.kill_now:
msg_pack = consumer.poll(timeout_ms=1000)
if not msg_pack:
continue
for tp, messages in msg_pack.items():
for message in messages:
try:
logger.info(f"Received message from {tp.topic} partition {tp.partition} offset {message.offset}")
processor.process(message.value)
# 仅在数据库事务成功提交后,才手动提交Kafka offset
consumer.commit({tp: message.offset + 1})
except Exception as e:
logger.error(f"Failed to process message at offset {message.offset}. Pausing partition and exiting.", exc_info=True)
# 这是一个关键的容错策略:如果某个消息处理失败,
# 我们不应该继续处理后续消息,以免数据错乱。
# 这里我们选择暂停消费并退出,让Kubernetes重启pod。
# 更复杂的策略可以是将其发送到死信队列。
consumer.pause(tp)
sys.exit(1) # 触发pod重启
except NoBrokersAvailable:
logger.error("Could not connect to Kafka brokers. Please check KAFKA_BROKERS.")
sys.exit(1)
except mariadb.Error as e:
logger.error(f"Database connection error: {e}")
sys.exit(1)
finally:
if consumer:
consumer.close()
if db_connection:
db_connection.close()
logger.info("Resources cleaned up. Exiting.")
if __name__ == '__main__':
main()
消费者服务的核心设计考量:
- 事务性与幂等性: 消费者对数据库的操作与 Kafka offset 的提交必须是原子性的。代码中禁用了
enable_auto_commit
,在数据库事务成功commit
后,才手动提交 offset。这保证了 “at-least-once” 语义。INSERT
语句使用了ON DUPLICATE KEY UPDATE
,即使消息被重复消费,也不会导致数据重复插入,保证了操作的幂等性。 - 错误处理: 这是一个生产级消费者和简单脚本的本质区别。当处理一条消息失败时(例如,数据库连接中断、SQL 语法错误),代码会回滚当前数据库事务,并且不会提交 Kafka offset。我们选择直接让 pod 退出,由 Kubernetes 的重启策略来自动重试。这是一种简单而有效的容错机制。更完善的方案是引入死信队列(Dead Letter Queue),将处理失败的消息发送到DLQ,以便后续人工介入。
- 优雅停机: 服务能响应
SIGTERM
信号,确保在 Kubernetes 缩容或滚动更新时,能完成当前正在处理的消息,然后干净地关闭连接,避免数据不一致。 - 配置化: 所有敏感信息和环境配置都通过环境变量注入,符合云原生应用的十二要素原则。
- 主键提取: 代码中的
_get_pk_columns
是一个简化实现。在真实项目中,这部分逻辑需要更健壮,例如通过配置来定义每个表的主键,或者在服务启动时从information_schema
查询并缓存。
局限性与未来展望
这个方案成功地为我们解决了 Oracle 到 MariaDB 的实时数据同步问题,支撑了新旧系统的并行运行。但它并非银弹,存在一些局限和需要持续关注的运营成本。
首先,DDL 变更处理相对初级。Debezium 会将 DDL 事件发送到 schema history topic,但我们的消费者并未解析这些事件来自动在 MariaDB 中执行 ALTER TABLE
。当前我们依赖于手动的、有计划的 schema 变更流程,先在目标端执行,再在源端执行。自动化 DDL 同步是一个复杂的挑战,需要引入 Schema Registry 或更复杂的 DDL 解析逻辑。
其次,性能和延迟监控至关重要。我们需要持续监控 Kafka consumer lag,一旦出现积压,就意味着同步延迟增大,需要立即排查是消费端处理慢还是网络问题。使用 Prometheus 监控 Kafka exporter 和我们自定义的消费者应用指标是必不可少的。
最后,这条管道的维护需要跨领域的知识,涉及 Oracle DBA、Kafka 运维以及应用开发。虽然比改造核心应用代码的风险小,但其运维成本不容忽视。对于一些标准场景,完全托管的云服务商数据迁移服务(如 AWS DMS, Google Datastream)可能是成本效益更高的选择。但我们的场景需要复杂的自定义转换逻辑,自建管道给予了我们必要的灵活性,这种权衡是架构决策中永恒的主题。