管理一套生产级的计算机视觉(CV)推理系统,其复杂性远超部署一个无状态应用。它通常包含一个用于特征向量检索的 Qdrant 向量数据库集群,以及一组用于模型推理的 CV 工作负载。当这些组件部署在 Kubernetes 上时,我们面临的第一个现实问题就是状态管理和生命周期协同。仅使用 Helm Chart 和手动脚本来部署和运维,很快就会在版本升级、故障恢复和弹性伸缩等 day-2 操作中暴露出其脆弱性。一个配置的漂移或一个失败的升级脚本,就可能导致整个服务中断。
问题的核心在于,我们需要一个能够理解整个 CV 应用拓扑,并能主动维护其期望状态的控制平面。这正是 Kubernetes Operator 模式的用武之地。本文将详细记录构建这样一个 Operator 的架构决策与核心实现,它不仅负责编排 Qdrant StatefulSet 和 CV Deployment,还将探讨如何将 K8s 控制平面的状态变更实时同步到一个基于 Jotai 的前端监控面板,从而实现端到端的声明式管理。
定义复杂技术问题:声明式 CV 推理平台
我们的目标是构建一个平台,允许用户通过一个简单的 YAML 文件来声明一个完整的 CV 推理服务。这个服务包括:
- 一个 Qdrant 向量数据库集群:用于存储和检索图像特征向量,需要持久化存储和稳定的网络标识。
- 一组 CV 推理工作负载:无状态的 Pod,负责加载模型、预处理图像、生成向量并与 Qdrant 交互。
- 协同与自动化:平台应能自动处理组件的创建、更新、扩缩容和故障恢复。
例如,用户提交如下 CustomResource
(CR):
# config/samples/cv_v1alpha1_inferenceplatform.yaml
apiVersion: cv.platform.tech/v1alpha1
kind: InferencePlatform
metadata:
name: image-retrieval-prod
spec:
qdrant:
version: "v1.6.0"
replicas: 3
storageClassName: "gp2"
storageSize: "10Gi"
inference:
modelUrl: "s3://models/resnet50-v2-7.onnx"
replicas: 5
image: "my-repo/cv-inference:latest"
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "4Gi"
我们的 Operator 需要读取这个 CR,并将其转化为 Kubernetes 中实际运行的 StatefulSet
、Deployment
、Service
等原生资源,并持续监控它们的状态,确保其实际状态(Status)与用户定义的期望状态(Spec)一致。
方案对比:Helm vs. Operator
在决定投入资源开发 Operator 之前,必须严谨地评估替代方案。
方案A:Helm + ArgoCD/Flux
这是一种常见的 GitOps 实践。我们可以创建一个 Helm Chart 来打包 Qdrant 和 CV 推理应用的所有 K8s 资源。
优势:
- 生态成熟,上手快。
- 通过 values.yaml 实现参数化配置。
- 结合 ArgoCD 或 Flux 可以实现声明式的部署和版本控制。
劣势:
- 缺乏领域知识: Helm 本质上是一个模板渲染引擎。它无法理解“Qdrant 集群是否健康”或“CV 模型是否加载成功”。它只能创建资源,无法执行如“在 Qdrant 集群完全就绪之前,不要启动推理服务”或“当 Qdrant leader 节点发生切换时,优雅地重启推理 Pod”这类复杂的、有状态的协调逻辑。
- Day-2 操作的局限: 升级、备份、恢复等操作通常需要外部脚本或手动干预。例如,对 Qdrant 进行一个有状态的滚动更新,可能需要精确控制 Pod 的更新顺序,Helm 本身无法保证这一点。
- 状态反馈缺失: Helm 无法将应用的运行时状态写回到一个 API 对象中。我们无法通过
kubectl get
轻松地了解整个InferencePlatform
的健康状况。
方案B:自定义 Kubernetes Operator
Operator 通过扩展 Kubernetes API(引入 CRD)并实现一个自定义控制器来封装运维知识。
优势:
- 声明式 API: 将整个应用的定义抽象成一个高级 CRD,屏蔽了底层资源的复杂性。
- 自动化运维: 控制器中的调谐循环(Reconciliation Loop)可以编码复杂的运维逻辑,实现真正的自动化,如备份、恢复、故障转移。
- 状态感知: Operator 可以持续监控其管理的资源,并将聚合后的状态更新到 CR 的
status
字段,为监控和告警提供了权威的数据源。
劣势:
- 开发成本高: 需要使用 Go 语言和 Kubebuilder 或 Operator SDK 等框架进行开发,对开发人员的技能要求更高。
- 测试复杂: 需要
envtest
或一个真实的 K8s 集群来进行端到端测试。
最终选择与理由
对于一个需要高可靠性和自动化运维的生产平台而言,Operator 模式是必然选择。它的核心价值在于将运维人员的专业知识代码化,将命令式的操作流程转化为一个持续运行、自我修复的声明式系统。这种投资在系统规模扩大和复杂度增加时,会带来显著的回报。它将我们的应用从“运行在 Kubernetes 上”提升到了“由 Kubernetes 原生管理”。
核心实现:构建 InferencePlatform Operator
我们将使用 Kubebuilder 框架来快速搭建 Operator 的骨架。
1. CRD 定义 (api/v1alpha1/inferenceplatform_types.go
)
CRD 是 Operator 的 API 门面。Spec
结构体定义了用户可以配置的字段,Status
结构体则由 Operator 填充,反映实际状态。
// Go
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// QdrantSpec defines the desired state of Qdrant cluster
type QdrantSpec struct {
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
Version string `json:"version"`
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=1
Replicas int32 `json:"replicas"`
StorageClassName string `json:"storageClassName,omitempty"`
StorageSize string `json:"storageSize,omitempty"`
}
// InferenceSpec defines the desired state of CV Inference workloads
type InferenceSpec struct {
// +kubebuilder:validation:Required
ModelUrl string `json:"modelUrl"`
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=1
Replicas int32 `json:"replicas"`
// +kubebuilder:validation:Required
Image string `json:"image"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}
// InferencePlatformSpec defines the desired state of InferencePlatform
type InferencePlatformSpec struct {
Qdrant QdrantSpec `json:"qdrant"`
Inference InferenceSpec `json:"inference"`
}
// InferencePlatformStatus defines the observed state of InferencePlatform
type InferencePlatformStatus struct {
// Conditions represent the latest available observations of an object's state
Conditions []metav1.Condition `json:"conditions,omitempty"`
QdrantReadyReplicas int32 `json:"qdrantReadyReplicas"`
InferenceReadyReplicas int32 `json:"inferenceReadyReplicas"`
QdrantClusterInfo string `json:"qdrantClusterInfo,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// InferencePlatform is the Schema for the inferenceplatforms API
type InferencePlatform struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec InferencePlatformSpec `json:"spec,omitempty"`
Status InferencePlatformStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// InferencePlatformList contains a list of InferencePlatform
type InferencePlatformList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []InferencePlatform `json:"items"`
}
func init() {
SchemeBuilder.Register(&InferencePlatform{}, &InferencePlatformList{})
}
这里的 +kubebuilder
注解非常关键,它们会生成 CRD YAML 文件、OpenAPI v3 schema(用于 kubectl explain
)和 Go client 代码。
2. 调谐循环 (internal/controller/inferenceplatform_controller.go
)
调谐循环是 Operator 的大脑。它接收 CR 变更事件,然后执行逻辑以使集群状态与 CR 的 spec
保持一致。
graph TD A[User applies InferencePlatform CR] --> B{K8s API Server}; B -- Stores in etcd --> C[etcd]; B -- Notifies Watch --> D[InferencePlatform Operator]; D -- Reconcile Loop Triggered --> E{Read CR Spec}; E --> F{Is Qdrant StatefulSet present?}; F -- No --> G[Create Qdrant StatefulSet & Service]; F -- Yes --> H{Does it match Spec?}; H -- No --> I[Update Qdrant StatefulSet]; H -- Yes --> J{Is Inference Deployment present?}; I --> J; G --> J; J -- No --> K[Create Inference Deployment]; J -- Yes --> L{Does it match Spec?}; L -- No --> M[Update Inference Deployment]; L -- Yes --> N{Update CR Status}; M --> N; K --> N; N --> D;
以下是调谐循环核心逻辑的简化版实现,展示了如何管理 Qdrant StatefulSet
。
// Go
// internal/controller/inferenceplatform_controller.go
func (r *InferencePlatformReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
// 1. 获取 InferencePlatform 实例
var platform cv_v1alpha1.InferencePlatform
if err := r.Get(ctx, req.NamespacedName, &platform); err != nil {
if errors.IsNotFound(err) {
// CR 被删除,相关资源会自动被 K8s GC 回收(如果设置了 OwnerReference)
log.Info("InferencePlatform resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get InferencePlatform")
return ctrl.Result{}, err
}
// 2. 调谐 Qdrant StatefulSet
qdrantSts := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: platform.Name + "-qdrant", Namespace: platform.Namespace}, qdrantSts)
if err != nil && errors.IsNotFound(err) {
// StatefulSet 不存在,创建它
log.Info("Creating a new Qdrant StatefulSet", "StatefulSet.Namespace", platform.Namespace, "StatefulSet.Name", platform.Name+"-qdrant")
newSts := r.statefulSetForQdrant(&platform)
if err := r.Create(ctx, newSts); err != nil {
log.Error(err, "Failed to create new Qdrant StatefulSet")
return ctrl.Result{}, err
}
// 创建成功后,返回并等待下一次调谐,以确保状态稳定
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
log.Error(err, "Failed to get Qdrant StatefulSet")
return ctrl.Result{}, err
}
// 3. 检查 StatefulSet 是否需要更新
// 这是一个简化的比较,真实项目中需要更精细的 deep equal
desiredReplicas := platform.Spec.Qdrant.Replicas
if *qdrantSts.Spec.Replicas != desiredReplicas {
log.Info("Qdrant StatefulSet replicas mismatch, updating...", "current", *qdrantSts.Spec.Replicas, "desired", desiredReplicas)
qdrantSts.Spec.Replicas = &desiredReplicas
if err := r.Update(ctx, qdrantSts); err != nil {
log.Error(err, "Failed to update Qdrant StatefulSet")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
// ... 此处省略调谐 Inference Deployment 的逻辑,与 StatefulSet 类似 ...
// 4. 更新 Status
// 在真实项目中,这里会从实际的 Pod 状态中聚合信息
platform.Status.QdrantReadyReplicas = qdrantSts.Status.ReadyReplicas
// ... 更新其他 status 字段 ...
// 使用 Subresource Client 更新 status,避免与 spec 更新冲突
if err := r.Status().Update(ctx, &platform); err != nil {
log.Error(err, "Failed to update InferencePlatform status")
return ctrl.Result{}, err
}
log.Info("Reconciliation finished successfully")
return ctrl.Result{}, nil
}
// statefulSetForQdrant 根据 CR 构建 Qdrant StatefulSet 对象
func (r *InferencePlatformReconciler) statefulSetForQdrant(p *cv_v1alpha1.InferencePlatform) *appsv1.StatefulSet {
// ... 此处是构建 StatefulSet YAML 的 Go 代码 ...
// 关键点:
// 1. 设置 OwnerReference,这样当 InferencePlatform CR 被删除时,
// 这个 StatefulSet 会被 K8s 垃圾回收机制自动删除。
// ctrl.SetControllerReference(p, sts, r.Scheme)
// 2. 将 CR spec 中的字段映射到 StatefulSet 的 spec 中,
// 例如 replicas, image version, storage 等。
// 3. 配置 Qdrant 集群所需的 headless service 和持久化卷模板。
// 4. 配置健康检查探针 (livenessProbe, readinessProbe)。
// 返回构建好的 StatefulSet 对象
return &appsv1.StatefulSet{} // 这是一个占位符
}
这段代码的核心是调谐模式(reconcile pattern
):获取当前状态,与期望状态比较,然后执行动作以弥合差异。OwnerReference
是一个至关重要的机制,它建立了 CR 与其派生资源之间的父子关系,简化了级联删除。
3. 前端状态同步:从 K8s Watch 到 Jotai Atoms
一个纯后端的 Operator 解决了自动化问题,但对于平台用户来说,系统的内部状态仍然是一个黑盒。我们需要一个 UI 来可视化 InferencePlatform
的状态。
架构
- Frontend (React + Jotai): 纯粹的展示层。
- Backend for Frontend (BFF): 一个简单的 Go 服务,它使用 K8s client-go 库来
Watch
InferencePlatform
资源的变化。 - WebSocket: BFF 将从 K8s API Server 收到的事件(ADDED, MODIFIED, DELETED)实时推送到前端。
BFF 核心逻辑 (Go)
// Go
// main.go (in BFF service)
func watchInferencePlatforms(clientset *kubernetes.Interface, platformClientset *versioned.Interface, conn *websocket.Conn) {
watchlist := cache.NewListWatchFromClient(
platformClientset.CvV1alpha1().RESTClient(),
"inferenceplatforms",
v1.NamespaceAll,
fields.Everything(),
)
_, controller := cache.NewInformer(
watchlist,
&cvv1alpha1.InferencePlatform{},
0, // resyncPeriod
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
platform := obj.(*cvv1alpha1.InferencePlatform)
log.Printf("ADD: %s", platform.Name)
// 将 ADD 事件和对象序列化为 JSON 推送到 WebSocket
sendEvent(conn, "ADDED", platform)
},
UpdateFunc: func(oldObj, newObj interface{}) {
platform := newObj.(*cvv1alpha1.InferencePlatform)
log.Printf("UPDATE: %s", platform.Name)
// 将 UPDATE 事件和对象序列化为 JSON 推送到 WebSocket
sendEvent(conn, "MODIFIED", platform)
},
DeleteFunc: func(obj interface{}) {
platform := obj.(*cvv1alpha1.InferencePlatform)
log.Printf("DELETE: %s", platform.Name)
// 将 DELETE 事件和对象(仅含元数据)序列化为 JSON 推送到 WebSocket
sendEvent(conn, "DELETED", platform)
},
},
)
stop := make(chan struct{})
defer close(stop)
go controller.Run(stop)
// 保持连接,直到客户端断开
for {
// 可以添加心跳机制
time.Sleep(15 * time.Second)
}
}
BFF 使用 Informer
机制,这是一个比直接 Watch
更高效的方式,它在本地维护了一个资源缓存,减轻了 K8s API Server 的压力。
Frontend 核心逻辑 (TypeScript + Jotai)
Jotai 的原子化状态管理模型非常适合处理这种来自外部数据源的、离散的状态更新。
// TSX
// store/platformAtoms.ts
import { atom } from 'jotai';
// 定义平台对象类型
interface InferencePlatform {
metadata: {
name: string;
namespace: string;
uid: string;
};
spec: { /* ... */ };
status?: {
qdrantReadyReplicas: number;
inferenceReadyReplicas: number;
conditions: any[];
};
}
// 使用 Map 来存储平台对象,以 uid 为 key,实现高效的增删改查
type PlatformsMap = Map<string, InferencePlatform>;
// 基础 atom,存储所有平台的 Map
export const platformsMapAtom = atom<PlatformsMap>(new Map());
// 派生 atom,用于处理 WebSocket 事件
// 这是一个 write-only atom,组件通过 useSetAtom 来调用它
export const platformEventHandlerAtom = atom(
null,
(get, set, event: { type: string; payload: InferencePlatform }) => {
const currentMap = new Map(get(platformsMapAtom));
const { uid } = event.payload.metadata;
switch (event.type) {
case 'ADDED':
case 'MODIFIED':
currentMap.set(uid, event.payload);
break;
case 'DELETED':
currentMap.delete(uid);
break;
default:
console.warn('Unknown event type:', event.type);
}
// 更新基础 atom,触发所有依赖该 atom 的组件重新渲染
set(platformsMapAtom, currentMap);
}
);
// 派生 atom,将 Map 转换为数组,方便 UI 渲染
export const platformsListAtom = atom((get) =>
Array.from(get(platformsMapAtom).values())
);
React 组件
// TSX
// components/PlatformDashboard.tsx
import { useAtom, useSetAtom } from 'jotai';
import { useEffect } from 'react';
import { platformsListAtom, platformEventHandlerAtom } from '../store/platformAtoms';
const useWebSocket = (url: string, onMessage: (event: any) => void) => {
useEffect(() => {
const ws = new WebSocket(url);
ws.onmessage = (event) => {
onMessage(JSON.parse(event.data));
};
// 在真实项目中需要处理重连和错误
return () => ws.close();
}, [url, onMessage]);
};
export const PlatformDashboard = () => {
const platforms = useAtom(platformsListAtom)[0];
const handlePlatformEvent = useSetAtom(platformEventHandlerAtom);
useWebSocket('ws://localhost:8080/ws/platforms', handlePlatformEvent);
return (
<div>
<h1>Inference Platforms</h1>
<table>
{/* 表头 */}
<tbody>
{platforms.map(p => (
<tr key={p.metadata.uid}>
<td>{p.metadata.name}</td>
<td>
{p.status?.qdrantReadyReplicas || 0} / {p.spec.qdrant.replicas}
</td>
<td>
{p.status?.inferenceReadyReplicas || 0} / {p.spec.inference.replicas}
</td>
{/* 其他状态信息 */}
</tr>
))}
</tbody>
</table>
</div>
);
};
这个架构实现了完美的关注点分离。Operator 负责核心的后端协调,它完全不知道 UI 的存在。UI 通过一个轻量的 BFF 订阅状态变更,Jotai 则以一种高效且可预测的方式管理着前端的实时状态。当管理员通过 kubectl edit
修改一个 InferencePlatform
的副本数时,变更会立即在 UI 上反映出来,无需刷新页面。
架构的扩展性与局限性
我们构建的这个 Operator 只是一个起点。它的价值在于提供了一个可扩展的框架。未来可以轻松地加入更复杂的功能,例如:
- 智能伸缩:集成 KEDA,根据自定义指标(如 GPU 利用率、消息队列积压)自动调整推理服务的副本数。
- 模型热加载:在 Operator 中添加逻辑,当
spec.modelUrl
变更时,不是重启 Pod,而是向 Pod 发送信号,触发模型的热加载,实现零中断模型更新。 - 备份与恢复:在 CRD 中添加
backupSpec
,Operator 可以定期调用 Qdrant API 创建快照并上传到对象存储。
然而,这个方案也存在局限性:
- 耦合于 K8s: Operator 的设计使其与 Kubernetes API 深度绑定,无法脱离 K8s 环境运行。这既是优点也是缺点。
- 复杂性管理: 随着 Operator 功能的增加,调谐循环的逻辑会变得越来越复杂,状态转换的管理和测试将成为一个挑战。编写健壮的、幂等的调谐逻辑至关重要。
- 实时性依赖: 前端 UI 的实时性完全依赖于 K8s API Server 的 Watch 机制和网络链路。在超大规模集群中,API Server 的负载和事件延迟可能会成为瓶颈。
- 粒度: 当前 Operator 管理的是粗粒度的资源生命周期。它不处理 CV Pod 之间的细粒度任务分配或分布式锁等问题。如果需要这类功能,可能还需要引入一个专门的协调服务,Operator 可以负责部署和管理这个协调服务,但具体的业务协调逻辑应在其内部完成。