利用MyBatis与Matplotlib量化遗留系统的数据库耦合度以指导DDD重构


我们接手了一个运行了近十年的单体系统。业务逻辑盘根错节,而最棘手的部分在于它的数据库——一个拥有超过800张表的“巨石”。任何一个微小的需求变更,都可能引发一场对数据库完整性的灾难性回归测试。团队决定采用领域驱动设计(DDD)进行战略重构,拆分出限界上下文。问题是,面对这个巨大的泥球,第一刀该从哪里切下?业务文档早已过时,唯一能信任的“真相来源”,就是代码本身。

这个系统的持久层完全基于MyBatis。这意味着,所有的数据库交互逻辑都清晰地定义在Mapper.xml文件中。这是一个突破口。我的初步构想是:如果两张表总是在同一个业务事务中被同时访问,那么它们极有可能属于同一个领域概念,也就是同一个限界上下文。我们能不能通过分析MyBatis的运行时行为,来量化这种表与表之间的“耦合度”,并将其可视化,从而为DDD的上下文划分提供数据支撑?

这个想法最终演变成一个跨语言的技术方案:在Java端,利用MyBatis拦截器捕获运行时SQL,记录事务中的表访问序列;在Python端,利用数据分析和可视化库(Pandas, Matplotlib, NetworkX)对这些数据进行处理,生成数据库表耦合关系图。

第一步:通过MyBatis拦截器捕获动态数据

静态分析MyBatis的XML文件是不可靠的。大量的动态SQL(<if>, <foreach>)使得我们无法仅通过解析XML就准确知道一次调用到底访问了哪些表。因此,唯一的办法是深入运行时,在SQL真正执行前捕获它。MyBatis的Interceptor插件机制是实现这一点的完美工具。

我们的目标是创建一个拦截器,它能:

  1. 拦截所有数据库的读写操作(queryupdate)。
  2. 从即将执行的BoundSql中获取SQL语句。
  3. 解析SQL,提取出所有被操作的表名。
  4. 将“当前事务ID”与“访问的表集合”关联起来,并以结构化日志的形式输出。

在真实项目中,直接解析SQL是个脏活,容易出错。一个常见的错误是使用简单的正则表达式,这无法处理复杂的SQL,比如带子查询或JOIN的语句。幸运的是,有成熟的库可以处理这个问题。我们选用了JSqlParser

下面是这个拦截器的核心实现,TransactionTableInterceptor.java

package com.example.legacy.interceptor;

import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.util.TablesNamesFinder;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Intercepts({
    @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
    @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class})
})
public class TransactionTableInterceptor implements Interceptor {

    // 使用专门的Logger记录分析数据,便于后续处理
    private static final Logger metricLog = LoggerFactory.getLogger("TableAccessMetricLogger");

    // 线程安全的Map,用于在事务级别缓存访问过的表
    // Key: 事务标识 (这里用MDC的requestId)
    // Value: 该事务已访问的表集合
    private static final ThreadLocal<Set<String>> transactionTables = ThreadLocal.withInitial(ConcurrentHashMap::newKeySet);
    
    // MDC key,用于从上游中间件或过滤器获取请求唯一ID
    private static final String REQUEST_ID_KEY = "requestId";

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        // 确保每个请求线程有一个唯一的事务标识
        // 在真实项目中,这个ID通常由网关或第一个Filter生成并放入MDC
        if (MDC.get(REQUEST_ID_KEY) == null) {
            MDC.put(REQUEST_ID_KEY, UUID.randomUUID().toString().substring(0, 8));
        }
        
        MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];
        Object parameter = invocation.getArgs().length > 1 ? invocation.getArgs()[1] : null;
        BoundSql boundSql = mappedStatement.getBoundSql(parameter);
        String sql = boundSql.getSql().replaceAll("\\s+", " ");

        try {
            Statement statement = CCJSqlParserUtil.parse(sql);
            TablesNamesFinder tablesNamesFinder = new TablesNamesFinder();
            List<String> tableList = tablesNamesFinder.getTableList(statement);

            if (tableList != null && !tableList.isEmpty()) {
                // 将所有表名转换为小写,以避免大小写问题
                Set<String> lowerCaseTables = tableList.stream()
                                                       .map(String::toLowerCase)
                                                       .collect(Collectors.toSet());
                transactionTables.get().addAll(lowerCaseTables);
            }
        } catch (Exception e) {
            // JSqlParser可能无法解析某些特殊的方言SQL,这里只记录错误,不中断业务
            // 这里的坑在于,如果解析失败率很高,说明这个方案不适用,需要寻找替代品
            LoggerFactory.getLogger(TransactionTableInterceptor.class)
                         .warn("Failed to parse SQL: [{}]. Error: {}", sql, e.getMessage());
        }

        // 只有当MyBatis的事务管理器提交或回滚时,我们才认为一个业务单元结束
        // 但拦截器层面很难优雅地获取事务结束事件。
        // 一个务实的替代方案是,在请求处理的最后(比如在一个Servlet Filter的finally块中)调用一个方法来清空和记录数据。
        // 为了简化演示,我们假设每个SQL执行后都可以记录当前累积的表。这在简单场景下够用。
        logMetrics();

        return invocation.proceed();
    }
    
    /**
     * 该方法应在Web请求结束时被调用,以确保记录的是一个完整业务事务所访问的所有表。
     * 例如,在一个全局的Filter的`finally`块中调用。
     */
    public static void finalizeAndLog() {
        String requestId = MDC.get(REQUEST_ID_KEY);
        Set<String> tables = transactionTables.get();
        if (requestId != null && !tables.isEmpty()) {
            // 日志格式:requestId|table1,table2,table3
            metricLog.info("{}|{}", requestId, String.join(",", tables));
        }
        // 清理ThreadLocal,防止内存泄漏
        transactionTables.remove();
        MDC.remove(REQUEST_ID_KEY);
    }
    
    // 这是一个简化的记录方式,每次执行SQL都记录一次。
    // 在高并发下会产生大量冗余日志。更优的方式是使用上面的 finalizeAndLog()。
    private void logMetrics() {
        String requestId = MDC.get(REQUEST_ID_KEY);
        Set<String> tables = transactionTables.get();
        if (requestId != null && !tables.isEmpty()) {
            metricLog.info("{}|{}", requestId, String.join(",", tables));
        }
    }

    @Override
    public Object plugin(Object target) {
        return Plugin.wrap(target, this);
    }

    @Override
    public void setProperties(Properties properties) {
        // 可以接收配置属性
    }
}

要启用这个拦截器,需要在mybatis-config.xml中进行配置:

<!-- mybatis-config.xml -->
<configuration>
    <!-- ... other settings ... -->
    <plugins>
        <plugin interceptor="com.example.legacy.interceptor.TransactionTableInterceptor" />
    </plugins>
    <!-- ... other settings ... -->
</configuration>

我们还需要配置一个专门的Logback appender来将这些指标日志输出到独立的文件,方便后续处理。

<!-- logback.xml -->
<appender name="METRICS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>/var/log/app/table-access-metrics.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <fileNamePattern>/var/log/app/table-access-metrics.%d{yyyy-MM-dd}.log</fileNamePattern>
        <maxHistory>30</maxHistory>
    </rollingPolicy>
    <encoder>
        <!-- 只输出原始消息,不带时间戳、线程名等多余信息 -->
        <pattern>%msg%n</pattern>
    </encoder>
</appender>

<logger name="TableAccessMetricLogger" level="INFO" additivity="false">
    <appender-ref ref="METRICS_FILE"/>
</logger>

这个拦截器部署到预发环境运行一周后,我们收集到了大量的原始数据文件 table-access-metrics.log。每一行都记录了一个请求ID及其在该请求生命周期内访问过的所有数据表。

# table-access-metrics.log sample
req-001|orders,order_items,products,customers
req-002|users,user_profiles
req-001|orders,order_items,products,customers,stock
req-003|products,categories
req-002|users,user_profiles,user_addresses
...

这里的日志有冗余,因为每次SQL执行都会记录。我们需要在Python端进行去重和聚合。

第二步:使用Python进行数据处理和耦合度计算

现在进入数据分析阶段。我们的目标是创建一个“共现矩阵”(co-occurrence matrix),矩阵的每个元素 (i, j) 代表表 i 和表 j 在同一个事务中同时出现的次数。

graph LR
    subgraph Java Application
        A[MyBatis Interceptor] -- writes --> B(table-access-metrics.log);
    end
    subgraph Python Analysis Script
        C[Pandas Reader] -- reads --> B;
        C -- outputs DataFrame --> D[Data Aggregation];
        D -- calculates --> E[Co-occurrence Matrix];
        E -- feeds --> F[NetworkX Graph];
        F -- renders via --> G[Matplotlib];
    end
    G -- generates --> H{Coupling Graph PNG};

这个Python脚本的核心逻辑如下:

  1. 读取日志文件,按请求ID对访问的表进行分组和去重。
  2. 遍历每个事务中的表集合,对集合中任意两个表,其共现次数加一。
  3. 将共现矩阵转换为图的边列表,权重为共现次数。
  4. 过滤掉低权重的边和孤立的节点,使图形更清晰。
# analyze_coupling.py

import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
from collections import defaultdict
from itertools import combinations
import os

# --- 配置区 ---
LOG_FILE_PATH = './table-access-metrics.log'
OUTPUT_GRAPH_FILE = './db_coupling_graph.png'
MIN_COUPLING_THRESHOLD = 50  # 过滤掉共现次数低于此阈值的边
EXCLUDED_TABLES = {'audit_log', 'schema_version'} # 排除通用技术表

# --- 1. 数据加载与预处理 ---

def load_and_process_logs(file_path):
    """
    从日志文件加载数据,并按requestId聚合表。
    真实项目中,数据量可能很大,需要考虑分块读取。
    """
    if not os.path.exists(file_path):
        print(f"Error: Log file not found at {file_path}")
        return None

    # 使用字典按requestId聚合,比DataFrame groupby更快
    transactions = defaultdict(set)
    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()
            if not line or '|' not in line:
                continue
            request_id, tables_str = line.split('|', 1)
            tables = {tbl for tbl in tables_str.split(',') if tbl}
            transactions[request_id].update(tables)
    
    # 转换为list of sets
    processed_transactions = [tables for tables in transactions.values() if len(tables) > 1]
    return processed_transactions

# --- 2. 计算共现矩阵 ---

def calculate_cooccurrence(transactions):
    """
    计算表之间的共现次数。
    """
    cooccurrence_matrix = defaultdict(int)
    for tables in transactions:
        # 使用combinations来获取一个事务中所有表的配对
        # 这里的坑在于,如果一个事务访问了50个表,combinations数量会爆炸
        # 需要对超大事务做预处理或采样
        if len(tables) > 30: # 设定一个阈值,防止组合爆炸
            print(f"Warning: Skipping a large transaction with {len(tables)} tables.")
            continue
            
        for table1, table2 in combinations(sorted(list(tables)), 2):
            if table1 in EXCLUDED_TABLES or table2 in EXCLUDED_TABLES:
                continue
            pair = tuple(sorted((table1, table2)))
            cooccurrence_matrix[pair] += 1
    return cooccurrence_matrix

# --- 3. 图构建与可视化 ---

def create_and_visualize_graph(cooccurrence_matrix):
    """
    使用NetworkX和Matplotlib创建和可视化耦合图。
    """
    G = nx.Graph()
    
    # 添加边,权重为共现次数
    for (table1, table2), weight in cooccurrence_matrix.items():
        if weight >= MIN_COUPLING_THRESHOLD:
            G.add_edge(table1, table2, weight=weight)
            
    # 如果图中没有节点,直接返回
    if G.number_of_nodes() == 0:
        print("No significant couplings found with the current threshold. The graph is empty.")
        return

    # 移除孤立节点,使视图更清晰
    G.remove_nodes_from(list(nx.isolates(G)))

    # 使用spring_layout,它会把连接紧密的节点放在一起,形成簇
    # k参数控制节点间的距离,iterations控制布局算法的迭代次数
    pos = nx.spring_layout(G, k=0.8, iterations=50, seed=42)
    
    # 节点大小与节点的度(连接数)成正比
    node_sizes = [G.degree(n) * 100 for n in G.nodes()]
    
    # 边的宽度与权重(共现次数)成正比
    edge_weights = [G.edges[e]['weight'] for e in G.edges()]
    # 对权重进行归一化,使其在合理范围内
    max_weight = max(edge_weights) if edge_weights else 1.0
    edge_widths = [1 + (w / max_weight) * 5 for w in edge_weights]

    plt.figure(figsize=(20, 20))
    
    nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color='skyblue', alpha=0.8)
    nx.draw_networkx_edges(G, pos, width=edge_widths, edge_color='gray', alpha=0.5)
    nx.draw_networkx_labels(G, pos, font_size=8, font_family='sans-serif')
    
    plt.title('Database Table Coupling Graph', size=20)
    plt.axis('off')
    plt.savefig(OUTPUT_GRAPH_FILE, format="PNG", dpi=300)
    print(f"Graph saved to {OUTPUT_GRAPH_FILE}")


# --- 主流程 ---
if __name__ == '__main__':
    print("Step 1: Loading and processing logs...")
    transactions_data = load_and_process_logs(LOG_FILE_PATH)
    
    if transactions_data:
        print(f"Step 2: Calculating co-occurrence matrix for {len(transactions_data)} transactions...")
        cooccurrence_data = calculate_cooccurrence(transactions_data)
        
        print(f"Step 3: Building and visualizing graph...")
        create_and_visualize_graph(cooccurrence_data)
        
        print("Analysis complete.")

执行这个脚本后,我们得到了一张PNG图片。这张图非常震撼:原本看似混沌的800多个节点(表),通过spring_layout算法的力导向布局,自然地形成了几个高内聚的“星系”。

  • 星系的核心:是一些度非常高(连接非常多)的节点,通常是核心业务表,如orders, customers, products
  • 星系内部:节点之间通过粗壮的边(高权重)紧密连接。例如,orders, order_items, order_status_history, payments 形成了一个密集的集群。这清晰地指向了“订单上下文”。
  • 星系之间:连接稀疏且边很细。比如“订单集群”和“用户集群” (users, user_profiles, user_addresses) 之间可能只有一条细线连接着orderscustomers表,这正是一个理想的上下文边界。
  • 孤立的星球:一些独立的表或小簇,通常是支撑性功能,如配置表、字典表等。

最终成果:数据驱动的重构决策

这张图成为了我们DDD战略会议上最有力的工具。它不再是基于某个架构师的“感觉”来划分边界,而是基于系统在真实负载下的运行时行为数据。

我们围绕图中的明显集群,启动了第一次事件风暴(Event Storming)工作坊。我们把“订单集群”、“用户集群”、“商品集群”的表名打印出来贴在墙上,业务专家和开发团队很快就能将这些技术实体与他们熟悉的业务流程和领域事件对应起来。

例如,我们发现productsstock(库存)之间存在极强的耦合,但在业务上,“商品管理”和“库存管理”是两个独立的领域,由不同团队负责。这张图暴露了一个深刻的设计问题:系统将商品信息和库存信息紧紧地绑在了一起,导致任何一方的变更都互相影响。这是一个明确的技术债,也是我们划清“商品上下文”和“库存上下文”边界时必须解决的核心问题。

这张图也揭示了一些意料之外的耦合。比如,customer_service_tickets(客服工单)表竟然和warehouse_shipments(仓库发货)表有很强的关联。经过深挖代码,我们发现是因为客服系统有一个“一键查询物流状态”的功能,它直接跨域查询了物流表。在DDD重构中,这种查询应该通过发布领域事件或提供一个专用的API来实现,而不是直接的数据库耦合。

方案的局限性与未来迭代方向

这个方案并非银弹,它有其固有的局限性:

  1. 关联不等于因果:该分析只显示了相关性。两张表频繁一起出现,可能不是因为它们属于同一领域,而是因为某个横切关注点(如一个统一的日志记录服务)同时操作了它们。这需要结合业务知识进行人工甄别。

  2. 对“读”操作的偏见:复杂的报表查询可能会一次性连接十几张表,在图中制造出“伪耦合”。在数据处理阶段,需要有策略来识别并降低这些大规模只读事务的权重。

  3. 忽略了业务逻辑的耦合:此方法完全基于数据访问层。如果两个模块在数据库层面解耦了,但在服务层代码中依然存在强依赖,这个工具是发现不了的。

未来的迭代方向可以包括:

  • 结合静态代码分析:除了分析MyBatis的日志,还可以通过静态分析工具(如ArchUnit)分析Java代码的调用关系,构建一个更全面的系统耦合图。
  • 引入时序信息:分析在一个事务中,表访问的先后顺序,可能会揭示出业务流程的依赖关系,而不仅仅是简单的共现。
  • . 交互式可视化:将静态的PNG图升级为使用D3.js或Gephi等工具构建的交互式网页。用户可以点击节点查看表的详细信息,或者动态调整边的权重阈值来观察簇的变化,这将大大提升分析效率。

  目录