基于 Axum 与 NATS 构建运行于 GKE 的 Presto 数据管道动态密钥管理服务


我们面临的核心挑战并非数据摄取本身,而是如何在数据流经我们系统的整个生命周期中,为其处理节点动态、安全地授予访问下游资源的权限。在我们的场景中,大量短暂的批处理或流处理作业需要访问 Google Cloud Storage (GCS) 上的数据湖,这些数据最终由 Presto 集群进行查询。将长期有效的服务账号密钥分发并硬编码到这些作业中,是一条通往安全噩梦的捷径。凭证泄露、轮换困难、权限范围过大等问题,在生产环境中是不可接受的。

我们的目标是构建一个中间服务,它不仅负责接收数据并将其推送到消息队列,更核心的职责是充当一个“凭证代理”。它会为每条需要处理的数据请求,向密钥管理系统申请一个有时效性、权限最小化的临时凭证,然后将数据与这个临时凭证一同注入数据管道。处理节点在消费消息时,就能获得恰好在有效期内、拥有恰当权限的“一次性钥匙”,用完即焚。

技术选型决策

  • HTTP 服务层: 我们选择了 Axum。在 Rust 的生态中,Axum 基于 Tokio 和 Tower,其强类型的 Handler、灵活的中间件(Extractor)系统以及优秀的状态共享机制,非常适合构建需要与多个外部系统(如 NATS, Vault)交互的高性能网络服务。编译时安全保证和极低的资源占用,使其成为 GKE 环境中长时间稳定运行的理想选择。
  • 消息队列: NATS JetStream 是我们的不二之选。相比于其他消息系统,NATS 的简洁、高性能和云原生设计理念非常突出。JetStream 提供的持久化、至少一次送达保证以及流控能力,完全满足了我们对数据管道可靠性的要求。更重要的是,它的轻量级特性使得在 Kubernetes 集群内部署和维护的成本极低。
  • 密钥管理: 我们使用 HashiCorp Vault。具体来说,是利用其 GCP Secrets Engine。该引擎能够与 GCP IAM 深度集成,动态地为指定的 GCP 服务账号生成短期的 OAuth 访问令牌(Access Token)。这是实现我们“凭证代理”模式的核心。
  • 数据分析引擎: Presto / Trino 是我们整个数据平台的终端消费者。这个服务的设计初衷就是为了让 Presto 的数据源(GCS)的访问变得更安全、更可控。
  • 部署平台: **GCP GKE (Google Kubernetes Engine)**。我们将利用 GKE 的 Workload Identity 特性,让运行在 Pod 中的服务能够以 Kubernetes Service Account 的身份,安全地模拟 GCP Service Account,从而无需在 Pod 中存储任何静态 GCP 密钥,就能与 GCP API 和 Vault 进行交互。

架构流程设计

整个系统的交互流程可以用下面的图来描述:

sequenceDiagram
    participant Client as 外部客户端
    participant AxumSvc as Axum 凭证代理服务 (GKE Pod)
    participant Vault
    participant NATS as NATS JetStream
    participant Consumer as 下游数据处理作业
    participant GCS as Google Cloud Storage

    Client ->>+ AxumSvc: POST /ingest (携带业务数据)
    Note over AxumSvc, Vault: Pod 使用 Workload Identity 
以 KSA 身份模拟 GCP SA AxumSvc ->>+ Vault: 请求为目标GCP SA生成临时Token Vault-->>-AxumSvc: 返回短期有效的GCP Access Token AxumSvc ->>+ NATS: Publish(主题, {数据 + Token}) NATS-->>-AxumSvc: Ack AxumSvc-->>-Client: 202 Accepted Consumer ->>+ NATS: Fetch Message NATS -->>- Consumer: 返回 {数据 + Token} Consumer ->>+ GCS: 使用获取的临时Token访问数据 GCS -->>- Consumer: 授权通过, 返回数据

核心实现

让我们深入代码,一步步构建这个服务。

1. 项目结构与依赖

首先是我们的 Cargo.toml,定义了项目依赖。

[package]
name = "credential-delegation-service"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
async-nats = "0.34"
anyhow = "1.0"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
reqwest = { version = "0.12", features = ["json"] }
config = "0.14"
once_cell = "1.19"

2. 配置管理

在真实项目中,配置管理至关重要。我们使用 config crate 从文件中加载配置。

config/default.toml:

[server]
addr = "0.0.0.0:8080"

[nats]
url = "nats://localhost:4222"
stream_name = "DATA_PIPELINE"
subjects = ["pipeline.raw.events"]

[vault]
addr = "http://localhost:8200"
# 在 GKE 中,这个 token 路径由 Vault Agent Injector 提供
# 在本地开发时,需要手动设置 VAULT_TOKEN 环境变量
token_path = "/var/run/secrets/vault.hashicorp.com/token" 
# 这是 Vault 中配置的 GCP Secrets Engine 的角色名称
gcp_role_name = "presto-gcs-accessor" 
# 生成的临时 Token 的 TTL
gcp_token_ttl = "15m" 

src/config.rs:

use serde::Deserialize;

#[derive(Debug, Deserialize, Clone)]
pub struct ServerConfig {
    pub addr: String,
}

#[derive(Debug, Deserialize, Clone)]
pub struct NatsConfig {
    pub url: String,
    pub stream_name: String,
    pub subjects: Vec<String>,
}

#[derive(Debug, Deserialize, Clone)]
pub struct VaultConfig {
    pub addr: String,
    pub token_path: String,
    pub gcp_role_name: String,
    pub gcp_token_ttl: String,
}

#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
    pub server: ServerConfig,
    pub nats: NatsConfig,
    pub vault: VaultConfig,
}

impl AppConfig {
    pub fn load() -> Result<Self, config::ConfigError> {
        let builder = config::Config::builder()
            .add_source(config::File::with_name("config/default"))
            .add_source(config::Environment::with_prefix("APP").separator("__"));

        builder.build()?.try_deserialize()
    }
}

3. 服务状态与错误处理

我们需要定义一个共享状态 AppState,它将包含 NATS 客户端、Vault 客户端以及配置信息。

src/errors.rs:

use axum::{
    http::StatusCode,
    response::{IntoResponse, Response},
    Json,
};
use serde_json::json;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum AppError {
    #[error("NATS client error: {0}")]
    NatsError(#[from] async_nats::Error),

    #[error("NATS publish error: {0}")]
    NatsPublishError(#[from] async_nats::jetstream::context::PublishError),

    #[error("Vault interaction error: {0}")]
    VaultError(String),

    #[error("Configuration error: {0}")]
    ConfigError(#[from] config::ConfigError),

    #[error("I/O error: {0}")]
    IoError(#[from] std::io::Error),

    #[error("JSON serialization/deserialization error: {0}")]
    JsonError(#[from] serde_json::Error),
    
    #[error("HTTP client error: {0}")]
    ReqwestError(#[from] reqwest::Error),
}

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let (status, error_message) = match self {
            AppError::NatsError(_) | AppError::NatsPublishError(_) => {
                (StatusCode::INTERNAL_SERVER_ERROR, "Messaging service unavailable".to_string())
            }
            AppError::VaultError(msg) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Credential service error: {}", msg)),
            _ => (StatusCode::INTERNAL_SERVER_ERROR, "An internal error occurred".to_string()),
        };

        // 在生产环境中,日志应该更详细
        tracing::error!("Error: {:?}", self);

        let body = Json(json!({ "error": error_message }));
        (status, body).into_response()
    }
}

4. Vault 客户端实现

这是动态密钥管理的核心。我们将实现一个简单的 Vault 客户端,专门用于从 GCP Secrets Engine 获取临时令牌。

src/vault_client.rs:

use crate::config::VaultConfig;
use crate::errors::AppError;
use serde::{Deserialize, Serialize};
use std::fs;
use std::time::Duration;

#[derive(Debug, Clone)]
pub struct VaultClient {
    client: reqwest::Client,
    config: VaultConfig,
    token: String,
}

#[derive(Deserialize, Debug)]
struct VaultTokenResponseData {
    token: String,
    token_ttl: u64,
}

#[derive(Deserialize, Debug)]
struct VaultTokenResponse {
    data: VaultTokenResponseData,
}

impl VaultClient {
    pub async fn new(config: VaultConfig) -> Result<Self, AppError> {
        let token = fs::read_to_string(&config.token_path)
            .map_err(|e| AppError::IoError(e))?;
            
        let client = reqwest::Client::builder()
            .timeout(Duration::from_secs(10))
            .build()?;

        Ok(Self {
            client,
            config,
            token: token.trim().to_string(),
        })
    }

    /// 从 Vault 的 GCP Secrets Engine 获取一个短期的 Access Token
    pub async fn get_gcp_token(&self) -> Result<String, AppError> {
        let url = format!(
            "{}/v1/gcp/token/{}",
            self.config.addr, self.config.gcp_role_name
        );

        let response = self
            .client
            .get(&url)
            .header("X-Vault-Token", &self.token)
            .send()
            .await
            .map_err(|e| AppError::VaultError(format!("Failed to send request to Vault: {}", e)))?;

        if !response.status().is_success() {
            let error_body = response.text().await.unwrap_or_else(|_| "Failed to read error body".to_string());
            return Err(AppError::VaultError(format!(
                "Vault returned non-success status: {}. Body: {}",
                response.status(),
                error_body
            )));
        }

        let token_response: VaultTokenResponse = response
            .json()
            .await
            .map_err(|e| AppError::VaultError(format!("Failed to parse Vault response: {}", e)))?;
            
        Ok(token_response.data.token)
    }
}
  • 注释: 这里的关键是 get_gcp_token 方法。它向 Vault 中预先配置好的 gcp/token/:role_name 端点发出请求。Vault 会使用它自己的 GCP 凭证,代表我们请求的角色,向 GCP IAM 请求一个临时的服务账号令牌,并返回给我们。Pod 内的 Vault token 是通过 Vault Agent Injector 自动挂载到 token_path 的,我们的代码只需读取即可。

5. 主服务与 API Handler

现在,我们将所有部分组合在一起。

src/main.rs:

use axum::{extract::State, http::StatusCode, routing::post, Json, Router};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

mod config;
mod errors;
mod vault_client;

use config::AppConfig;
use errors::AppError;
use vault_client::VaultClient;
use async_nats::jetstream::{self, stream::Stream};

// 共享的服务状态
#[derive(Clone)]
struct AppState {
    nats_context: jetstream::Context,
    vault_client: Arc<VaultClient>,
}

// 输入的业务数据结构
#[derive(Deserialize)]
struct IngestRequest {
    event_type: String,
    payload: Value,
}

// 发布到 NATS 的消息结构,包含了原始数据和临时凭证
#[derive(Serialize)]
struct PipelineMessage {
    original_request: IngestRequest,
    gcp_access_token: String,
    timestamp: i64,
}

impl Serialize for IngestRequest {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        let mut state = serializer.serialize_struct("IngestRequest", 2)?;
        state.serialize_field("event_type", &self.event_type)?;
        state.serialize_field("payload", &self.payload)?;
        state.end()
    }
}

async fn ingest_handler(
    State(state): State<Arc<AppState>>,
    Json(payload): Json<IngestRequest>,
) -> Result<StatusCode, AppError> {
    // 1. 从 Vault 获取临时 GCP Token
    tracing::info!("Requesting GCP token from Vault...");
    let gcp_token = state.vault_client.get_gcp_token().await?;
    tracing::info!("Successfully obtained GCP token.");

    // 2. 构建要发布到 NATS 的消息
    let message = PipelineMessage {
        original_request: payload,
        gcp_access_token: gcp_token,
        timestamp: chrono::Utc::now().timestamp_millis(),
    };

    let message_bytes = serde_json::to_vec(&message)?;

    // 3. 将消息发布到 NATS JetStream
    // 我们假设主题是配置中的第一个
    let subject = "pipeline.raw.events"; // 在生产中应该从配置中动态获取
    tracing::info!("Publishing message to NATS subject: {}", subject);
    state
        .nats_context
        .publish(subject, message_bytes.into())
        .await?
        .await?; // 等待 server 的 ack
    tracing::info!("Message published successfully.");

    Ok(StatusCode::ACCEPTED)
}

#[tokio::main]
async fn main() -> Result<(), AppError> {
    // 初始化日志
    tracing_subscriber::registry()
        .with(tracing_subscriber::EnvFilter::new(
            std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()),
        ))
        .with(tracing_subscriber::fmt::layer())
        .init();
    
    // 加载配置
    let config = AppConfig::load()?;

    // 初始化 Vault 客户端
    let vault_client = Arc::new(VaultClient::new(config.vault.clone()).await?);

    // 初始化 NATS 客户端并确保 Stream 存在
    let nats_client = async_nats::connect(&config.nats.url).await?;
    let jetstream = jetstream::new(nats_client);

    let stream: Stream = jetstream.get_or_create_stream(jetstream::stream::Config {
        name: config.nats.stream_name.clone(),
        subjects: config.nats.subjects.clone(),
        ..Default::default()
    }).await?;
    tracing::info!("NATS JetStream stream '{}' is ready.", stream.info().await?.config.name);


    // 创建共享状态
    let app_state = Arc::new(AppState {
        nats_context: jetstream,
        vault_client,
    });

    // 创建 Axum 路由
    let app = Router::new()
        .route("/ingest", post(ingest_handler))
        .with_state(app_state);

    // 启动服务器
    let listener = TcpListener::bind(&config.server.addr).await?;
    tracing::info!("Server listening on {}", config.server.addr);
    axum::serve(listener, app).await?;
    
    Ok(())
}
  • 注释: ingest_handler 函数清晰地展示了我们的核心逻辑:接收请求 -> 调用 Vault 获取令牌 -> 封装新消息 -> 发布到 NATS。main 函数负责初始化所有组件,包括加载配置、连接外部服务、创建 JetStream 流以及启动 Axum 服务器。get_or_create_stream 确保了即使 NATS 中还没有对应的流,服务也能自动创建,这增强了系统的鲁棒性。

6. GKE 部署与 Workload Identity 配置

代码只是故事的一半,在 GKE 上的正确部署和配置才是让这套机制安全运转的关键。

deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: credential-delegator
spec:
  replicas: 2
  selector:
    matchLabels:
      app: credential-delegator
  template:
    metadata:
      labels:
        app: credential-delegator
    spec:
      # 这个 KSA 将被绑定到 GCP SA
      serviceAccountName: credential-delegator-ksa 
      containers:
      - name: app
        image: gcr.io/YOUR_PROJECT/credential-delegation-service:latest
        ports:
        - containerPort: 8080
        env:
        - name: RUST_LOG
          value: "info,credential_delegation_service=debug"
        # Vault Agent Injector 将会在这里注入 sidecar 和 token volume
        # 我们需要添加注解来启用它
      # ... liveness/readiness probes, resource requests/limits ...
---
apiVersion: v1
kind: Service
metadata:
  name: credential-delegator-svc
spec:
  selector:
    app: credential-delegator
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080
  type: ClusterIP

要让 Workload Identity 和 Vault Agent Injector 工作,需要执行以下步骤(通常由 DevOps 或平台工程师通过 Terraform/gcloud CLI 完成):

  1. 创建 GKE Service Account (KSA):
    kubectl create serviceaccount credential-delegator-ksa

  2. 创建 GCP Service Account (GCP SA):
    gcloud iam service-accounts create credential-delegator-gcp-sa

  3. 为 GCP SA 授予绑定到 Vault Role 的权限:
    这个 GCP SA 自身并不需要 GCS 的访问权限。它唯一需要的权限是能够生成其他服务账号的令牌。这需要 iam.serviceAccountTokenCreator 角色。
    gcloud iam service-accounts add-iam-policy-binding "[email protected]" --role="roles/iam.serviceAccountTokenCreator" --member="serviceAccount:[email protected]"

  4. 将 KSA 绑定到 GCP SA (Workload Identity 的核心):
    gcloud iam service-accounts add-iam-policy-binding "[email protected]" --role="roles/iam.workloadIdentityUser" --member="serviceAccount:YOUR_PROJECT.svc.id.goog[default/credential-delegator-ksa]"

  5. 配置 Vault:

    • 在 Vault 中启用 GCP Auth Method,并配置它信任我们的 credential-delegator-gcp-sa
    • 创建一个 Vault Policy,允许读取 gcp/token/:role_name 路径。
    • 将这个 Policy 绑定到一个 Vault Role,该 Role 允许 credential-delegator-ksa 进行认证。
    • 在 Vault 中配置 GCP Secrets Engine,并创建一个名为 presto-gcs-accessor 的 Role。这个 Role 绑定了真正拥有 GCS 读取权限的 target-gcs-reader-sa 服务账号。
  6. 为 Deployment 添加 Vault Agent Injector 注解:
    deployment.yamltemplate.metadata.annotations 中添加:

    vault.hashicorp.com/agent-inject: 'true'
    vault.hashicorp.com/role: 'gcp-auth-role-for-delegator' # Vault 中配置的 role
    vault.hashicorp.com/agent-inject-secret-config: 'path/to/secret' # (可选)

完成这些配置后,当我们的 Pod 启动时,GKE 会通过 Workload Identity 机制让 Pod 内的 KSA 能够模拟 GCP SA。Vault Agent Injector sidecar 会利用这个身份向 Vault 进行认证,获取一个 Vault Token,并将其写入 Pod 内的共享卷 /var/run/secrets/vault.hashicorp.com/token。我们的 Rust 应用启动后,就能从这个路径读取到有效的 Vault Token,从而完成整个认证链条。

方案的局限性与未来展望

此方案优雅地解决了动态凭证分发的问题,但并非没有权衡。

首先,引入了新的复杂性。Vault 和 NATS 成为了系统的关键依赖,它们的可用性和性能直接影响整个数据管道。对这两个组件的监控、维护和高可用部署是必须考虑的成本。

其次,性能瓶颈。每次数据摄取都需要与 Vault 进行一次同步的 API 调用。在高吞吐量场景下,Vault 的响应延迟可能会成为瓶颈。一种可能的优化是,服务可以在内部缓存一个有效的 GCP Token,并在其过期前的一小段时间内复用它来处理多个请求,但这会牺牲一部分安全性(一个 Token 被用于多个消息),需要在安全性和性能之间做出权衡。

最后,适用边界。此模式非常适用于处理作业生命周期较短(例如几分钟到几小时)的场景,因为临时凭证的有效期可以设置得与作业执行时间相匹配。对于需要长期运行的流处理作业,可能需要设计一种凭证自动续期的机制,例如消费者在凭证即将过期时主动向一个“续期服务”请求新的凭证。

未来的迭代方向可能包括:将 Vault 客户端的逻辑抽象成一个通用的 Tower Service 中间件;探索使用 gRPC 替换 REST 与 Vault 通信以获得更好的性能;以及为下游消费者提供一个轻量级的 SDK,以简化它们解析消息和使用临时凭证的逻辑。


  目录