一个由LlamaIndex驱动的智能体,如果只停留在问答层面,其价值是有限的。真正的挑战在于授权它执行具体、有状态、且必须保证原子性的业务操作。设想一个场景:用户通过自然语言下达指令,“从我的A账户转移1000元到B账户,并为这次操作记录一条审计日志”。这个指令会被智能体解析为两个独立的动作:调用TransactionService
执行转账,然后调用AuditService
记录日志。如果转账成功,但审计日志服务因网络抖动或自身故障而失败,系统就进入了数据不一致的危险状态。这在任何严肃的生产环境中都是不可接受的。
要解决这个问题,我们需要一个跨服务的事务协调机制。整个操作要么完全成功,要么完全失败,不能停在中间状态。
定义问题:原子性与微服务架构的冲突
我们的基础架构如下:一个面向外部的API Gateway,负责路由和安全验证;一个核心的AgentService
,它使用LlamaIndex解析用户意图并调用下游业务服务;以及多个独立的微服务,如TransactionService
和AuditService
。所有服务均使用NestJS构建。
graph TD User -- "1. NL Command (via WebAuthn JWT)" --> APIGateway APIGateway -- "2. Forward Request" --> AgentService AgentService -- "3. Parse & Plan" --> AgentService AgentService -- "4. Execute Step 1" --> TransactionService AgentService -- "5. Execute Step 2" --> AuditService subgraph Security Layer APIGateway end subgraph Core Logic AgentService["AgentService (NestJS + LlamaIndex)"] end subgraph Business Services TransactionService["TransactionService (NestJS)"] AuditService["AuditService (NestJS)"] end
用户通过WebAuthn进行无密码认证后,API Gateway会验证其JWT,然后将请求转发给AgentService
。AgentService
中的LlamaIndex模块将自然语言指令转换为一个结构化的执行计划,即依次调用TransactionService
和AuditService
。这里的根本矛盾在于,HTTP协议是无状态的,而微服务数据库各自独立,我们无法使用传统的单体应用中的数据库事务来保证这两个操作的原子性。
方案A:经典的教科书式两阶段提交(2PC)
两阶段提交(Two-Phase Commit, 2PC)是分布式事务的经典解决方案,旨在实现跨多个节点的强一致性。它引入了一个“协调者”角色,由它来统一指挥所有“参与者”的行为。
- 第一阶段 (Prepare): 协调者向所有参与者发送
prepare
请求。参与者收到请求后,执行本地事务操作,锁定所需资源,但不提交。如果能成功,就向协调者返回ready
;否则返回abort
。 - 第二阶段 (Commit/Abort):
- 如果协调者收到了所有参与者的
ready
响应,它就向所有参与者发送commit
指令,参与者提交本地事务。 - 如果任何一个参与者返回
abort
或超时,协调者就向所有参与者发送abort
指令,参与者回滚本地事务。
- 如果协调者收到了所有参与者的
优劣分析:
- 优点: 提供了原子性保证,实现了数据的强一致性。对于理解分布式事务,它是一个极佳的理论模型。
- 缺点 (在真实项目中是致命的):
- 同步阻塞: 从
prepare
阶段开始,所有参与者都必须锁定资源并等待协调者的最终指令。在整个过程中,资源是被占用的,这极大地降低了系统的并发能力。在高负载的Web服务中,这种长时间的阻塞是灾难性的。 - 协调者单点故障: 如果协调者在发送
commit
指令前崩溃,所有参与者将永远处于锁定状态,等待一个永远不会到来的指令。 - 性能开销: 协议需要多次网络往返,延迟很高。
- 实现复杂性: 2PC与HTTP这种无状态协议天然不兼容,强行实现需要复杂的上下文管理和状态机。
- 同步阻塞: 从
在我们的场景中,AgentService
将扮演协调者的角色,而TransactionService
和AuditService
则是参与者。可以预见,这种模型会让整个系统的响应时间变得非常长,且极为脆弱。
方案B:更务实的Saga模式(业务补偿)
Saga模式是一种通过业务补偿来实现最终一致性的分布式事务模型。它将一个全局事务分解为一系列本地事务,每个本地事务都有一个对应的补偿操作。
- 正向操作: 依次执行
T1, T2, T3, ..., Tn
。 - 反向补偿: 如果
Ti
失败,则依次执行C(i-1), ..., C3, C2, C1
来撤销之前所有已成功操作的影响。
Saga有两种主要的实现方式:
- Choreography (编排式): 服务间通过事件进行通信。一个服务完成本地事务后,发布一个事件,下一个服务订阅该事件并开始自己的本地事务。这种方式去中心化,但整个业务流程散落在各个服务中,难以追踪和调试。
- Orchestration (协调式): 引入一个协调器(Orchestrator),由它来集中管理整个Saga流程,显式地调用每个参与者的服务。这正是我们的
AgentService
可以扮演的角色。
优劣分析:
- 优点:
- 无长期锁定: 每个服务执行完本地事务后就立即提交,资源不会被长时间锁定,系统吞吐量高。
- 高可用性: 没有2PC中协调者的单点故障问题那么严重,协调器本身可以做成高可用,且参与者故障不影响其他服务。
- 松耦合: 服务之间通过API或消息进行交互,耦合度低。
- 缺点:
- 最终一致性: 在Saga执行过程中,系统可能会处于中间状态(例如,钱转了,日志还没记),数据的一致性有延迟。
- 补偿逻辑复杂: 编写健壮的补偿操作是Saga模式的核心难点。补偿操作不是简单的数据库回滚,它本身也是一个业务操作。例如,转账的补偿操作不是删除转账记录,而是再发起一笔反向的转账。
- 隔离性问题: 由于没有全局锁定,Saga无法保证读已提交(Read Committed)之外的隔离级别。一个事务可能会读到另一个未完成的Saga的中间结果。
最终选择与理由:
在微服务架构下,可用性和性能通常比强一致性更为重要。2PC的同步阻塞模型与微服务追求高内聚、低耦合、独立部署的理念背道而驰。因此,我们选择Saga协调式模式。AgentService
将作为Saga协调器,负责驱动整个流程、处理失败并执行补偿。我们接受最终一致性这个代价,并通过精心设计补偿逻辑和完善的监控来弥补其不足。
核心实现概览
我们将构建一个简化的Saga协调器,并集成到AgentService
中。
1. 项目结构 (Monorepo)
/apps
/api-gateway
/agent-service
/transaction-service
/audit-service
2. 服务接口定义
每个参与者服务都需要提供一个执行操作的接口和一个补偿操作的接口。接口设计必须考虑幂等性,即多次调用同一接口应产生相同的结果。这通常通过传递一个唯一的事务ID来实现。
transaction-service
DTOs:
// apps/transaction-service/src/dto/create-transaction.dto.ts
import { IsString, IsNumber, IsNotEmpty } from 'class-validator';
export class CreateTransactionDto {
@IsString()
@IsNotEmpty()
sagaId: string; // 用于幂等性检查
@IsString()
@IsNotEmpty()
fromAccountId: string;
@IsString()
@IsNotEmpty()
toAccountId: string;
@IsNumber()
amount: number;
}
export class CompensateTransactionDto {
@IsString()
@IsNotEmpty()
sagaId: string;
// 关联的原始交易ID
@IsString()
@IsNotEmpty()
originalTransactionId: string;
}
audit-service
DTOs:
// apps/audit-service/src/dto/create-audit-log.dto.ts
import { IsString, IsNotEmpty, IsObject } from 'class-validator';
export class CreateAuditLogDto {
@IsString()
@IsNotEmpty()
sagaId: string;
@IsString()
@IsNotEmpty()
action: string;
@IsObject()
details: Record<string, any>;
}
export class CompensateAuditLogDto {
@IsString()
@IsNotEmpty()
sagaId: string;
@IsString()
@IsNotEmpty()
originalLogId: string;
}
3. Saga协调器 (agent-service
)
协调器的核心是一个状态机,用于追踪Saga的执行进度。我们将这个逻辑封装在一个SagaOrchestrator
服务中。
// apps/agent-service/src/saga/saga.definition.ts
export interface SagaStep {
name: string;
// 执行正向操作
invoke: (context: any) => Promise<any>;
// 执行补偿操作
compensate: (context: any) => Promise<any>;
}
export interface SagaContext {
sagaId: string;
payload: any;
// 用于存储每一步的结果,以便后续步骤或补偿操作使用
stepResults: Record<string, any>;
}
// apps/agent-service/src/saga/saga.orchestrator.ts
import { Injectable, Logger } from '@nestjs/common';
import { v4 as uuidv4 } from 'uuid';
import { SagaStep, SagaContext } from './saga.definition';
@Injectable()
export class SagaOrchestrator {
private readonly logger = new Logger(SagaOrchestrator.name);
async execute(steps: SagaStep[], initialPayload: any): Promise<void> {
const context: SagaContext = {
sagaId: uuidv4(),
payload: initialPayload,
stepResults: {},
};
this.logger.log(`[SagaID: ${context.sagaId}] Starting saga execution.`);
const completedSteps: { step: SagaStep; index: number }[] = [];
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
try {
this.logger.log(`[SagaID: ${context.sagaId}] Invoking step: ${step.name}`);
// 将上一步的结果传入下一步
const result = await step.invoke({ ...context, ...context.stepResults });
context.stepResults[step.name] = result;
completedSteps.push({ step, index: i });
this.logger.log(`[SagaID: ${context.sagaId}] Step ${step.name} completed successfully.`);
} catch (error) {
this.logger.error(`[SagaID: ${context.sagaId}] Step ${step.name} failed. Starting compensation.`, error.stack);
await this.compensate(context, completedSteps);
// 抛出异常,让上层知道Saga失败了
throw new Error(`Saga failed at step ${step.name}`);
}
}
this.logger.log(`[SagaID: ${context.sagaId}] Saga completed successfully.`);
}
private async compensate(context: SagaContext, completedSteps: { step: SagaStep; index: number }[]): Promise<void> {
// 按相反顺序执行补偿操作
for (let i = completedSteps.length - 1; i >= 0; i--) {
const { step } = completedSteps[i];
try {
this.logger.log(`[SagaID: ${context.sagaId}] Compensating step: ${step.name}`);
await step.compensate({ ...context, ...context.stepResults });
this.logger.log(`[SagaID: ${context.sagaId}] Step ${step.name} compensated successfully.`);
} catch (compensationError) {
// 这是一个灾难性的情况:补偿操作也失败了。
// 在真实项目中,这里需要触发紧急警报,需要人工介入。
this.logger.fatal(
`[SagaID: ${context.sagaId}] CRITICAL: Compensation for step ${step.name} failed. Manual intervention required.`,
compensationError.stack,
);
// 停止进一步的补偿,因为状态已经不可预测
return;
}
}
}
}
4. LlamaIndex与Saga的集成
在AgentService
中,当LlamaIndex的函数调用工具被触发时,我们不是直接调用微服务,而是构建一个Saga并执行它。
// apps/agent-service/src/agent.service.ts
import { Injectable } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { SagaOrchestrator } from './saga/saga.orchestrator';
import { SagaStep } from './saga/saga.definition';
import { lastValueFrom } from 'rxjs';
@Injectable()
export class AgentService {
constructor(
private readonly sagaOrchestrator: SagaOrchestrator,
private readonly httpService: HttpService,
) {}
// 这是一个由LlamaIndex工具触发的函数
async executeFinancialTransfer(from: string, to: string, amount: number): Promise<void> {
const transferPayload = { fromAccountId: from, toAccountId: to, amount };
// 定义Saga的各个步骤
const steps: SagaStep[] = [
{
name: 'CreateTransaction',
invoke: async (context) => {
const { sagaId } = context;
const url = 'http://localhost:3002/transactions'; // TransactionService URL
// 这里的关键是把sagaId传下去
const response = await lastValueFrom(
this.httpService.post(url, { ...transferPayload, sagaId }),
);
// 返回的结果会被存储在context中,供后续步骤使用
return { transactionId: response.data.id };
},
compensate: async (context) => {
const { sagaId, CreateTransaction: { transactionId } } = context;
const url = 'http://localhost:3002/transactions/compensate';
await lastValueFrom(
this.httpService.post(url, { sagaId, originalTransactionId: transactionId }),
);
},
},
{
name: 'CreateAuditLog',
invoke: async (context) => {
const { sagaId, payload, CreateTransaction: { transactionId } } = context;
const url = 'http://localhost:3003/audit-logs'; // AuditService URL
const auditDetails = { ...payload, transactionId };
const response = await lastValueFrom(
this.httpService.post(url, { sagaId, action: 'FINANCIAL_TRANSFER', details: auditDetails }),
);
return { logId: response.data.id };
},
compensate: async (context) => {
// 这里为了演示,我们让审计服务的补偿操作失败
if (process.env.NODE_ENV === 'test_compensation_failure') {
throw new Error("Simulated audit compensation failure");
}
const { sagaId, CreateAuditLog: { logId } } = context;
const url = 'http://localhost:3003/audit-logs/compensate';
await lastValueFrom(
this.httpService.post(url, { sagaId, originalLogId: logId }),
);
},
},
];
await this.sagaOrchestrator.execute(steps, transferPayload);
}
}
5. WebAuthn与API Gateway
这部分相对独立。在生产环境中,通常会有一个专门的AuthService
来处理WebAuthn的注册和验证流程。
- 用户在前端完成WebAuthn挑战,将凭证发送到
AuthService
。 -
AuthService
验证凭证,成功后签发一个包含用户信息的JWT。 - 用户后续请求(如向
AgentService
发送指令)时,在HTTPAuthorization
头中携带此JWT。 - API Gateway作为流量入口,配置一个JWT验证插件。它会拦截所有到
AgentService
的请求,验证JWT的签名和有效期。只有合法的请求才会被转发到后端。
在NestJS中,可以在Gateway或AgentService
中用一个Guard来实现这个逻辑。
// apps/agent-service/src/auth/jwt.guard.ts
import { Injectable, CanActivate, ExecutionContext } from '@nestjs/common';
import { Observable } from 'rxjs';
// 假设使用passport-jwt
import { AuthGuard } from '@nestjs/passport';
@Injectable()
export class JwtAuthGuard extends AuthGuard('jwt') implements CanActivate {
// 这里可以添加自定义的验证逻辑
// API Gateway已经验证过一遍,这里可以做更细粒度的检查
}
// 在Controller中使用
// @UseGuards(JwtAuthGuard)
// @Post('execute')
// async executeCommand(...)
架构的扩展性与局限性
当前这套基于协调式的Saga实现,虽然解决了基本的分布式事务问题,但在生产环境中还存在一些局限性。
首先,协调器本身是有状态的。如果AgentService
在执行Saga的过程中重启,当前的内存状态会丢失,导致事务中断。一个生产级的协调器必须将Saga的执行状态(当前执行到哪一步、每一步的结果)持久化到数据库(如PostgreSQL或Redis)中。每次执行或补偿前,都先更新状态记录。
其次,服务间的通信使用的是同步HTTP调用。如果某个下游服务响应缓慢,会阻塞协调器。更健壮的方案是使用消息队列(如RabbitMQ或Kafka)。协调器向队列发送命令消息,下游服务消费消息并执行操作,然后将结果通过另一个队列返回。这种异步模式能更好地解耦服务,提高系统的整体韧性。
最后,补偿逻辑的健壮性是整个模式的基石。一个常见的错误是认为补偿操作永远不会失败。如果补偿操作失败,系统将进入一个无法自动恢复的状态,必须有人工介入。因此,对补偿失败的监控和告警是至关重要的。必须为“补偿失败”这种情况设计明确的应急预案。