我们面临的核心挑战并非数据摄取本身,而是如何在数据流经我们系统的整个生命周期中,为其处理节点动态、安全地授予访问下游资源的权限。在我们的场景中,大量短暂的批处理或流处理作业需要访问 Google Cloud Storage (GCS) 上的数据湖,这些数据最终由 Presto 集群进行查询。将长期有效的服务账号密钥分发并硬编码到这些作业中,是一条通往安全噩梦的捷径。凭证泄露、轮换困难、权限范围过大等问题,在生产环境中是不可接受的。
我们的目标是构建一个中间服务,它不仅负责接收数据并将其推送到消息队列,更核心的职责是充当一个“凭证代理”。它会为每条需要处理的数据请求,向密钥管理系统申请一个有时效性、权限最小化的临时凭证,然后将数据与这个临时凭证一同注入数据管道。处理节点在消费消息时,就能获得恰好在有效期内、拥有恰当权限的“一次性钥匙”,用完即焚。
技术选型决策
- HTTP 服务层: 我们选择了 Axum。在 Rust 的生态中,Axum 基于 Tokio 和 Tower,其强类型的 Handler、灵活的中间件(Extractor)系统以及优秀的状态共享机制,非常适合构建需要与多个外部系统(如 NATS, Vault)交互的高性能网络服务。编译时安全保证和极低的资源占用,使其成为 GKE 环境中长时间稳定运行的理想选择。
- 消息队列: NATS JetStream 是我们的不二之选。相比于其他消息系统,NATS 的简洁、高性能和云原生设计理念非常突出。JetStream 提供的持久化、至少一次送达保证以及流控能力,完全满足了我们对数据管道可靠性的要求。更重要的是,它的轻量级特性使得在 Kubernetes 集群内部署和维护的成本极低。
- 密钥管理: 我们使用 HashiCorp Vault。具体来说,是利用其 GCP Secrets Engine。该引擎能够与 GCP IAM 深度集成,动态地为指定的 GCP 服务账号生成短期的 OAuth 访问令牌(Access Token)。这是实现我们“凭证代理”模式的核心。
- 数据分析引擎: Presto / Trino 是我们整个数据平台的终端消费者。这个服务的设计初衷就是为了让 Presto 的数据源(GCS)的访问变得更安全、更可控。
- 部署平台: **GCP GKE (Google Kubernetes Engine)**。我们将利用 GKE 的 Workload Identity 特性,让运行在 Pod 中的服务能够以 Kubernetes Service Account 的身份,安全地模拟 GCP Service Account,从而无需在 Pod 中存储任何静态 GCP 密钥,就能与 GCP API 和 Vault 进行交互。
架构流程设计
整个系统的交互流程可以用下面的图来描述:
sequenceDiagram participant Client as 外部客户端 participant AxumSvc as Axum 凭证代理服务 (GKE Pod) participant Vault participant NATS as NATS JetStream participant Consumer as 下游数据处理作业 participant GCS as Google Cloud Storage Client ->>+ AxumSvc: POST /ingest (携带业务数据) Note over AxumSvc, Vault: Pod 使用 Workload Identity
以 KSA 身份模拟 GCP SA AxumSvc ->>+ Vault: 请求为目标GCP SA生成临时Token Vault-->>-AxumSvc: 返回短期有效的GCP Access Token AxumSvc ->>+ NATS: Publish(主题, {数据 + Token}) NATS-->>-AxumSvc: Ack AxumSvc-->>-Client: 202 Accepted Consumer ->>+ NATS: Fetch Message NATS -->>- Consumer: 返回 {数据 + Token} Consumer ->>+ GCS: 使用获取的临时Token访问数据 GCS -->>- Consumer: 授权通过, 返回数据
核心实现
让我们深入代码,一步步构建这个服务。
1. 项目结构与依赖
首先是我们的 Cargo.toml
,定义了项目依赖。
[package]
name = "credential-delegation-service"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
async-nats = "0.34"
anyhow = "1.0"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
reqwest = { version = "0.12", features = ["json"] }
config = "0.14"
once_cell = "1.19"
2. 配置管理
在真实项目中,配置管理至关重要。我们使用 config
crate 从文件中加载配置。
config/default.toml
:
[server]
addr = "0.0.0.0:8080"
[nats]
url = "nats://localhost:4222"
stream_name = "DATA_PIPELINE"
subjects = ["pipeline.raw.events"]
[vault]
addr = "http://localhost:8200"
# 在 GKE 中,这个 token 路径由 Vault Agent Injector 提供
# 在本地开发时,需要手动设置 VAULT_TOKEN 环境变量
token_path = "/var/run/secrets/vault.hashicorp.com/token"
# 这是 Vault 中配置的 GCP Secrets Engine 的角色名称
gcp_role_name = "presto-gcs-accessor"
# 生成的临时 Token 的 TTL
gcp_token_ttl = "15m"
src/config.rs
:
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct ServerConfig {
pub addr: String,
}
#[derive(Debug, Deserialize, Clone)]
pub struct NatsConfig {
pub url: String,
pub stream_name: String,
pub subjects: Vec<String>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct VaultConfig {
pub addr: String,
pub token_path: String,
pub gcp_role_name: String,
pub gcp_token_ttl: String,
}
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
pub server: ServerConfig,
pub nats: NatsConfig,
pub vault: VaultConfig,
}
impl AppConfig {
pub fn load() -> Result<Self, config::ConfigError> {
let builder = config::Config::builder()
.add_source(config::File::with_name("config/default"))
.add_source(config::Environment::with_prefix("APP").separator("__"));
builder.build()?.try_deserialize()
}
}
3. 服务状态与错误处理
我们需要定义一个共享状态 AppState
,它将包含 NATS 客户端、Vault 客户端以及配置信息。
src/errors.rs
:
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde_json::json;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum AppError {
#[error("NATS client error: {0}")]
NatsError(#[from] async_nats::Error),
#[error("NATS publish error: {0}")]
NatsPublishError(#[from] async_nats::jetstream::context::PublishError),
#[error("Vault interaction error: {0}")]
VaultError(String),
#[error("Configuration error: {0}")]
ConfigError(#[from] config::ConfigError),
#[error("I/O error: {0}")]
IoError(#[from] std::io::Error),
#[error("JSON serialization/deserialization error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("HTTP client error: {0}")]
ReqwestError(#[from] reqwest::Error),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, error_message) = match self {
AppError::NatsError(_) | AppError::NatsPublishError(_) => {
(StatusCode::INTERNAL_SERVER_ERROR, "Messaging service unavailable".to_string())
}
AppError::VaultError(msg) => (StatusCode::INTERNAL_SERVER_ERROR, format!("Credential service error: {}", msg)),
_ => (StatusCode::INTERNAL_SERVER_ERROR, "An internal error occurred".to_string()),
};
// 在生产环境中,日志应该更详细
tracing::error!("Error: {:?}", self);
let body = Json(json!({ "error": error_message }));
(status, body).into_response()
}
}
4. Vault 客户端实现
这是动态密钥管理的核心。我们将实现一个简单的 Vault 客户端,专门用于从 GCP Secrets Engine 获取临时令牌。
src/vault_client.rs
:
use crate::config::VaultConfig;
use crate::errors::AppError;
use serde::{Deserialize, Serialize};
use std::fs;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct VaultClient {
client: reqwest::Client,
config: VaultConfig,
token: String,
}
#[derive(Deserialize, Debug)]
struct VaultTokenResponseData {
token: String,
token_ttl: u64,
}
#[derive(Deserialize, Debug)]
struct VaultTokenResponse {
data: VaultTokenResponseData,
}
impl VaultClient {
pub async fn new(config: VaultConfig) -> Result<Self, AppError> {
let token = fs::read_to_string(&config.token_path)
.map_err(|e| AppError::IoError(e))?;
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?;
Ok(Self {
client,
config,
token: token.trim().to_string(),
})
}
/// 从 Vault 的 GCP Secrets Engine 获取一个短期的 Access Token
pub async fn get_gcp_token(&self) -> Result<String, AppError> {
let url = format!(
"{}/v1/gcp/token/{}",
self.config.addr, self.config.gcp_role_name
);
let response = self
.client
.get(&url)
.header("X-Vault-Token", &self.token)
.send()
.await
.map_err(|e| AppError::VaultError(format!("Failed to send request to Vault: {}", e)))?;
if !response.status().is_success() {
let error_body = response.text().await.unwrap_or_else(|_| "Failed to read error body".to_string());
return Err(AppError::VaultError(format!(
"Vault returned non-success status: {}. Body: {}",
response.status(),
error_body
)));
}
let token_response: VaultTokenResponse = response
.json()
.await
.map_err(|e| AppError::VaultError(format!("Failed to parse Vault response: {}", e)))?;
Ok(token_response.data.token)
}
}
- 注释: 这里的关键是
get_gcp_token
方法。它向 Vault 中预先配置好的gcp/token/:role_name
端点发出请求。Vault 会使用它自己的 GCP 凭证,代表我们请求的角色,向 GCP IAM 请求一个临时的服务账号令牌,并返回给我们。Pod 内的 Vault token 是通过 Vault Agent Injector 自动挂载到token_path
的,我们的代码只需读取即可。
5. 主服务与 API Handler
现在,我们将所有部分组合在一起。
src/main.rs
:
use axum::{extract::State, http::StatusCode, routing::post, Json, Router};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod config;
mod errors;
mod vault_client;
use config::AppConfig;
use errors::AppError;
use vault_client::VaultClient;
use async_nats::jetstream::{self, stream::Stream};
// 共享的服务状态
#[derive(Clone)]
struct AppState {
nats_context: jetstream::Context,
vault_client: Arc<VaultClient>,
}
// 输入的业务数据结构
#[derive(Deserialize)]
struct IngestRequest {
event_type: String,
payload: Value,
}
// 发布到 NATS 的消息结构,包含了原始数据和临时凭证
#[derive(Serialize)]
struct PipelineMessage {
original_request: IngestRequest,
gcp_access_token: String,
timestamp: i64,
}
impl Serialize for IngestRequest {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("IngestRequest", 2)?;
state.serialize_field("event_type", &self.event_type)?;
state.serialize_field("payload", &self.payload)?;
state.end()
}
}
async fn ingest_handler(
State(state): State<Arc<AppState>>,
Json(payload): Json<IngestRequest>,
) -> Result<StatusCode, AppError> {
// 1. 从 Vault 获取临时 GCP Token
tracing::info!("Requesting GCP token from Vault...");
let gcp_token = state.vault_client.get_gcp_token().await?;
tracing::info!("Successfully obtained GCP token.");
// 2. 构建要发布到 NATS 的消息
let message = PipelineMessage {
original_request: payload,
gcp_access_token: gcp_token,
timestamp: chrono::Utc::now().timestamp_millis(),
};
let message_bytes = serde_json::to_vec(&message)?;
// 3. 将消息发布到 NATS JetStream
// 我们假设主题是配置中的第一个
let subject = "pipeline.raw.events"; // 在生产中应该从配置中动态获取
tracing::info!("Publishing message to NATS subject: {}", subject);
state
.nats_context
.publish(subject, message_bytes.into())
.await?
.await?; // 等待 server 的 ack
tracing::info!("Message published successfully.");
Ok(StatusCode::ACCEPTED)
}
#[tokio::main]
async fn main() -> Result<(), AppError> {
// 初始化日志
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
// 加载配置
let config = AppConfig::load()?;
// 初始化 Vault 客户端
let vault_client = Arc::new(VaultClient::new(config.vault.clone()).await?);
// 初始化 NATS 客户端并确保 Stream 存在
let nats_client = async_nats::connect(&config.nats.url).await?;
let jetstream = jetstream::new(nats_client);
let stream: Stream = jetstream.get_or_create_stream(jetstream::stream::Config {
name: config.nats.stream_name.clone(),
subjects: config.nats.subjects.clone(),
..Default::default()
}).await?;
tracing::info!("NATS JetStream stream '{}' is ready.", stream.info().await?.config.name);
// 创建共享状态
let app_state = Arc::new(AppState {
nats_context: jetstream,
vault_client,
});
// 创建 Axum 路由
let app = Router::new()
.route("/ingest", post(ingest_handler))
.with_state(app_state);
// 启动服务器
let listener = TcpListener::bind(&config.server.addr).await?;
tracing::info!("Server listening on {}", config.server.addr);
axum::serve(listener, app).await?;
Ok(())
}
- 注释:
ingest_handler
函数清晰地展示了我们的核心逻辑:接收请求 -> 调用 Vault 获取令牌 -> 封装新消息 -> 发布到 NATS。main
函数负责初始化所有组件,包括加载配置、连接外部服务、创建 JetStream 流以及启动 Axum 服务器。get_or_create_stream
确保了即使 NATS 中还没有对应的流,服务也能自动创建,这增强了系统的鲁棒性。
6. GKE 部署与 Workload Identity 配置
代码只是故事的一半,在 GKE 上的正确部署和配置才是让这套机制安全运转的关键。
deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: credential-delegator
spec:
replicas: 2
selector:
matchLabels:
app: credential-delegator
template:
metadata:
labels:
app: credential-delegator
spec:
# 这个 KSA 将被绑定到 GCP SA
serviceAccountName: credential-delegator-ksa
containers:
- name: app
image: gcr.io/YOUR_PROJECT/credential-delegation-service:latest
ports:
- containerPort: 8080
env:
- name: RUST_LOG
value: "info,credential_delegation_service=debug"
# Vault Agent Injector 将会在这里注入 sidecar 和 token volume
# 我们需要添加注解来启用它
# ... liveness/readiness probes, resource requests/limits ...
---
apiVersion: v1
kind: Service
metadata:
name: credential-delegator-svc
spec:
selector:
app: credential-delegator
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
要让 Workload Identity 和 Vault Agent Injector 工作,需要执行以下步骤(通常由 DevOps 或平台工程师通过 Terraform/gcloud CLI 完成):
创建 GKE Service Account (KSA):
kubectl create serviceaccount credential-delegator-ksa
创建 GCP Service Account (GCP SA):
gcloud iam service-accounts create credential-delegator-gcp-sa
为 GCP SA 授予绑定到 Vault Role 的权限:
这个 GCP SA 自身并不需要 GCS 的访问权限。它唯一需要的权限是能够生成其他服务账号的令牌。这需要iam.serviceAccountTokenCreator
角色。
gcloud iam service-accounts add-iam-policy-binding "[email protected]" --role="roles/iam.serviceAccountTokenCreator" --member="serviceAccount:[email protected]"
将 KSA 绑定到 GCP SA (Workload Identity 的核心):
gcloud iam service-accounts add-iam-policy-binding "[email protected]" --role="roles/iam.workloadIdentityUser" --member="serviceAccount:YOUR_PROJECT.svc.id.goog[default/credential-delegator-ksa]"
配置 Vault:
- 在 Vault 中启用 GCP Auth Method,并配置它信任我们的
credential-delegator-gcp-sa
。 - 创建一个 Vault Policy,允许读取
gcp/token/:role_name
路径。 - 将这个 Policy 绑定到一个 Vault Role,该 Role 允许
credential-delegator-ksa
进行认证。 - 在 Vault 中配置 GCP Secrets Engine,并创建一个名为
presto-gcs-accessor
的 Role。这个 Role 绑定了真正拥有 GCS 读取权限的target-gcs-reader-sa
服务账号。
- 在 Vault 中启用 GCP Auth Method,并配置它信任我们的
为 Deployment 添加 Vault Agent Injector 注解:
在deployment.yaml
的template.metadata.annotations
中添加:vault.hashicorp.com/agent-inject: 'true' vault.hashicorp.com/role: 'gcp-auth-role-for-delegator' # Vault 中配置的 role vault.hashicorp.com/agent-inject-secret-config: 'path/to/secret' # (可选)
完成这些配置后,当我们的 Pod 启动时,GKE 会通过 Workload Identity 机制让 Pod 内的 KSA 能够模拟 GCP SA。Vault Agent Injector sidecar 会利用这个身份向 Vault 进行认证,获取一个 Vault Token,并将其写入 Pod 内的共享卷 /var/run/secrets/vault.hashicorp.com/token
。我们的 Rust 应用启动后,就能从这个路径读取到有效的 Vault Token,从而完成整个认证链条。
方案的局限性与未来展望
此方案优雅地解决了动态凭证分发的问题,但并非没有权衡。
首先,引入了新的复杂性。Vault 和 NATS 成为了系统的关键依赖,它们的可用性和性能直接影响整个数据管道。对这两个组件的监控、维护和高可用部署是必须考虑的成本。
其次,性能瓶颈。每次数据摄取都需要与 Vault 进行一次同步的 API 调用。在高吞吐量场景下,Vault 的响应延迟可能会成为瓶颈。一种可能的优化是,服务可以在内部缓存一个有效的 GCP Token,并在其过期前的一小段时间内复用它来处理多个请求,但这会牺牲一部分安全性(一个 Token 被用于多个消息),需要在安全性和性能之间做出权衡。
最后,适用边界。此模式非常适用于处理作业生命周期较短(例如几分钟到几小时)的场景,因为临时凭证的有效期可以设置得与作业执行时间相匹配。对于需要长期运行的流处理作业,可能需要设计一种凭证自动续期的机制,例如消费者在凭证即将过期时主动向一个“续期服务”请求新的凭证。
未来的迭代方向可能包括:将 Vault 客户端的逻辑抽象成一个通用的 Tower Service 中间件;探索使用 gRPC 替换 REST 与 Vault 通信以获得更好的性能;以及为下游消费者提供一个轻量级的 SDK,以简化它们解析消息和使用临时凭证的逻辑。