构建一个管理 Qdrant 与 CV 工作负载的 K8s Operator:从 etcd 一致性到 Jotai 前端状态同步


管理一套生产级的计算机视觉(CV)推理系统,其复杂性远超部署一个无状态应用。它通常包含一个用于特征向量检索的 Qdrant 向量数据库集群,以及一组用于模型推理的 CV 工作负载。当这些组件部署在 Kubernetes 上时,我们面临的第一个现实问题就是状态管理和生命周期协同。仅使用 Helm Chart 和手动脚本来部署和运维,很快就会在版本升级、故障恢复和弹性伸缩等 day-2 操作中暴露出其脆弱性。一个配置的漂移或一个失败的升级脚本,就可能导致整个服务中断。

问题的核心在于,我们需要一个能够理解整个 CV 应用拓扑,并能主动维护其期望状态的控制平面。这正是 Kubernetes Operator 模式的用武之地。本文将详细记录构建这样一个 Operator 的架构决策与核心实现,它不仅负责编排 Qdrant StatefulSet 和 CV Deployment,还将探讨如何将 K8s 控制平面的状态变更实时同步到一个基于 Jotai 的前端监控面板,从而实现端到端的声明式管理。

定义复杂技术问题:声明式 CV 推理平台

我们的目标是构建一个平台,允许用户通过一个简单的 YAML 文件来声明一个完整的 CV 推理服务。这个服务包括:

  1. 一个 Qdrant 向量数据库集群:用于存储和检索图像特征向量,需要持久化存储和稳定的网络标识。
  2. 一组 CV 推理工作负载:无状态的 Pod,负责加载模型、预处理图像、生成向量并与 Qdrant 交互。
  3. 协同与自动化:平台应能自动处理组件的创建、更新、扩缩容和故障恢复。

例如,用户提交如下 CustomResource (CR):

# config/samples/cv_v1alpha1_inferenceplatform.yaml
apiVersion: cv.platform.tech/v1alpha1
kind: InferencePlatform
metadata:
  name: image-retrieval-prod
spec:
  qdrant:
    version: "v1.6.0"
    replicas: 3
    storageClassName: "gp2"
    storageSize: "10Gi"
  inference:
    modelUrl: "s3://models/resnet50-v2-7.onnx"
    replicas: 5
    image: "my-repo/cv-inference:latest"
    resources:
      requests:
        cpu: "1"
        memory: "2Gi"
      limits:
        cpu: "2"
        memory: "4Gi"

我们的 Operator 需要读取这个 CR,并将其转化为 Kubernetes 中实际运行的 StatefulSetDeploymentService 等原生资源,并持续监控它们的状态,确保其实际状态(Status)与用户定义的期望状态(Spec)一致。

方案对比:Helm vs. Operator

在决定投入资源开发 Operator 之前,必须严谨地评估替代方案。

方案A:Helm + ArgoCD/Flux

这是一种常见的 GitOps 实践。我们可以创建一个 Helm Chart 来打包 Qdrant 和 CV 推理应用的所有 K8s 资源。

  • 优势:

    • 生态成熟,上手快。
    • 通过 values.yaml 实现参数化配置。
    • 结合 ArgoCD 或 Flux 可以实现声明式的部署和版本控制。
  • 劣势:

    • 缺乏领域知识: Helm 本质上是一个模板渲染引擎。它无法理解“Qdrant 集群是否健康”或“CV 模型是否加载成功”。它只能创建资源,无法执行如“在 Qdrant 集群完全就绪之前,不要启动推理服务”或“当 Qdrant leader 节点发生切换时,优雅地重启推理 Pod”这类复杂的、有状态的协调逻辑。
    • Day-2 操作的局限: 升级、备份、恢复等操作通常需要外部脚本或手动干预。例如,对 Qdrant 进行一个有状态的滚动更新,可能需要精确控制 Pod 的更新顺序,Helm 本身无法保证这一点。
    • 状态反馈缺失: Helm 无法将应用的运行时状态写回到一个 API 对象中。我们无法通过 kubectl get 轻松地了解整个 InferencePlatform 的健康状况。

方案B:自定义 Kubernetes Operator

Operator 通过扩展 Kubernetes API(引入 CRD)并实现一个自定义控制器来封装运维知识。

  • 优势:

    • 声明式 API: 将整个应用的定义抽象成一个高级 CRD,屏蔽了底层资源的复杂性。
    • 自动化运维: 控制器中的调谐循环(Reconciliation Loop)可以编码复杂的运维逻辑,实现真正的自动化,如备份、恢复、故障转移。
    • 状态感知: Operator 可以持续监控其管理的资源,并将聚合后的状态更新到 CR 的 status 字段,为监控和告警提供了权威的数据源。
  • 劣势:

    • 开发成本高: 需要使用 Go 语言和 Kubebuilder 或 Operator SDK 等框架进行开发,对开发人员的技能要求更高。
    • 测试复杂: 需要 envtest 或一个真实的 K8s 集群来进行端到端测试。

最终选择与理由

对于一个需要高可靠性和自动化运维的生产平台而言,Operator 模式是必然选择。它的核心价值在于将运维人员的专业知识代码化,将命令式的操作流程转化为一个持续运行、自我修复的声明式系统。这种投资在系统规模扩大和复杂度增加时,会带来显著的回报。它将我们的应用从“运行在 Kubernetes 上”提升到了“由 Kubernetes 原生管理”。

核心实现:构建 InferencePlatform Operator

我们将使用 Kubebuilder 框架来快速搭建 Operator 的骨架。

1. CRD 定义 (api/v1alpha1/inferenceplatform_types.go)

CRD 是 Operator 的 API 门面。Spec 结构体定义了用户可以配置的字段,Status 结构体则由 Operator 填充,反映实际状态。

// Go

package v1alpha1

import (
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// QdrantSpec defines the desired state of Qdrant cluster
type QdrantSpec struct {
	// +kubebuilder:validation:Required
	// +kubebuilder:validation:MinLength=1
	Version string `json:"version"`

	// +kubebuilder:validation:Minimum=1
	// +kubebuilder:default=1
	Replicas int32 `json:"replicas"`

	StorageClassName string `json:"storageClassName,omitempty"`
	StorageSize      string `json:"storageSize,omitempty"`
}

// InferenceSpec defines the desired state of CV Inference workloads
type InferenceSpec struct {
	// +kubebuilder:validation:Required
	ModelUrl string `json:"modelUrl"`

	// +kubebuilder:validation:Minimum=0
	// +kubebuilder:default=1
	Replicas int32 `json:"replicas"`

	// +kubebuilder:validation:Required
	Image string `json:"image"`

	Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}

// InferencePlatformSpec defines the desired state of InferencePlatform
type InferencePlatformSpec struct {
	Qdrant    QdrantSpec    `json:"qdrant"`
	Inference InferenceSpec `json:"inference"`
}

// InferencePlatformStatus defines the observed state of InferencePlatform
type InferencePlatformStatus struct {
	// Conditions represent the latest available observations of an object's state
	Conditions []metav1.Condition `json:"conditions,omitempty"`

	QdrantReadyReplicas   int32  `json:"qdrantReadyReplicas"`
	InferenceReadyReplicas int32  `json:"inferenceReadyReplicas"`
	QdrantClusterInfo     string `json:"qdrantClusterInfo,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

// InferencePlatform is the Schema for the inferenceplatforms API
type InferencePlatform struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   InferencePlatformSpec   `json:"spec,omitempty"`
	Status InferencePlatformStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// InferencePlatformList contains a list of InferencePlatform
type InferencePlatformList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []InferencePlatform `json:"items"`
}

func init() {
	SchemeBuilder.Register(&InferencePlatform{}, &InferencePlatformList{})
}

这里的 +kubebuilder 注解非常关键,它们会生成 CRD YAML 文件、OpenAPI v3 schema(用于 kubectl explain)和 Go client 代码。

2. 调谐循环 (internal/controller/inferenceplatform_controller.go)

调谐循环是 Operator 的大脑。它接收 CR 变更事件,然后执行逻辑以使集群状态与 CR 的 spec 保持一致。

graph TD
    A[User applies InferencePlatform CR] --> B{K8s API Server};
    B -- Stores in etcd --> C[etcd];
    B -- Notifies Watch --> D[InferencePlatform Operator];
    D -- Reconcile Loop Triggered --> E{Read CR Spec};
    E --> F{Is Qdrant StatefulSet present?};
    F -- No --> G[Create Qdrant StatefulSet & Service];
    F -- Yes --> H{Does it match Spec?};
    H -- No --> I[Update Qdrant StatefulSet];
    H -- Yes --> J{Is Inference Deployment present?};
    I --> J;
    G --> J;
    J -- No --> K[Create Inference Deployment];
    J -- Yes --> L{Does it match Spec?};
    L -- No --> M[Update Inference Deployment];
    L -- Yes --> N{Update CR Status};
    M --> N;
    K --> N;
    N --> D;

以下是调谐循环核心逻辑的简化版实现,展示了如何管理 Qdrant StatefulSet

// Go
// internal/controller/inferenceplatform_controller.go

func (r *InferencePlatformReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := log.FromContext(ctx)

	// 1. 获取 InferencePlatform 实例
	var platform cv_v1alpha1.InferencePlatform
	if err := r.Get(ctx, req.NamespacedName, &platform); err != nil {
		if errors.IsNotFound(err) {
			// CR 被删除,相关资源会自动被 K8s GC 回收(如果设置了 OwnerReference)
			log.Info("InferencePlatform resource not found. Ignoring since object must be deleted")
			return ctrl.Result{}, nil
		}
		log.Error(err, "Failed to get InferencePlatform")
		return ctrl.Result{}, err
	}

	// 2. 调谐 Qdrant StatefulSet
	qdrantSts := &appsv1.StatefulSet{}
	err := r.Get(ctx, types.NamespacedName{Name: platform.Name + "-qdrant", Namespace: platform.Namespace}, qdrantSts)

	if err != nil && errors.IsNotFound(err) {
		// StatefulSet 不存在,创建它
		log.Info("Creating a new Qdrant StatefulSet", "StatefulSet.Namespace", platform.Namespace, "StatefulSet.Name", platform.Name+"-qdrant")
		newSts := r.statefulSetForQdrant(&platform)
		if err := r.Create(ctx, newSts); err != nil {
			log.Error(err, "Failed to create new Qdrant StatefulSet")
			return ctrl.Result{}, err
		}
		// 创建成功后,返回并等待下一次调谐,以确保状态稳定
		return ctrl.Result{Requeue: true}, nil
	} else if err != nil {
		log.Error(err, "Failed to get Qdrant StatefulSet")
		return ctrl.Result{}, err
	}

	// 3. 检查 StatefulSet 是否需要更新
	// 这是一个简化的比较,真实项目中需要更精细的 deep equal
	desiredReplicas := platform.Spec.Qdrant.Replicas
	if *qdrantSts.Spec.Replicas != desiredReplicas {
		log.Info("Qdrant StatefulSet replicas mismatch, updating...", "current", *qdrantSts.Spec.Replicas, "desired", desiredReplicas)
		qdrantSts.Spec.Replicas = &desiredReplicas
		if err := r.Update(ctx, qdrantSts); err != nil {
			log.Error(err, "Failed to update Qdrant StatefulSet")
			return ctrl.Result{}, err
		}
		return ctrl.Result{Requeue: true}, nil
	}
    
    // ... 此处省略调谐 Inference Deployment 的逻辑,与 StatefulSet 类似 ...

	// 4. 更新 Status
	// 在真实项目中,这里会从实际的 Pod 状态中聚合信息
	platform.Status.QdrantReadyReplicas = qdrantSts.Status.ReadyReplicas
	// ... 更新其他 status 字段 ...
	
	// 使用 Subresource Client 更新 status,避免与 spec 更新冲突
	if err := r.Status().Update(ctx, &platform); err != nil {
		log.Error(err, "Failed to update InferencePlatform status")
		return ctrl.Result{}, err
	}
    
	log.Info("Reconciliation finished successfully")
	return ctrl.Result{}, nil
}

// statefulSetForQdrant 根据 CR 构建 Qdrant StatefulSet 对象
func (r *InferencePlatformReconciler) statefulSetForQdrant(p *cv_v1alpha1.InferencePlatform) *appsv1.StatefulSet {
	// ... 此处是构建 StatefulSet YAML 的 Go 代码 ...
    // 关键点:
    // 1. 设置 OwnerReference,这样当 InferencePlatform CR 被删除时,
    //    这个 StatefulSet 会被 K8s 垃圾回收机制自动删除。
	//    ctrl.SetControllerReference(p, sts, r.Scheme)
    // 2. 将 CR spec 中的字段映射到 StatefulSet 的 spec 中,
    //    例如 replicas, image version, storage 等。
    // 3. 配置 Qdrant 集群所需的 headless service 和持久化卷模板。
    // 4. 配置健康检查探针 (livenessProbe, readinessProbe)。
	
	// 返回构建好的 StatefulSet 对象
	return &appsv1.StatefulSet{} // 这是一个占位符
}

这段代码的核心是调谐模式(reconcile pattern):获取当前状态,与期望状态比较,然后执行动作以弥合差异。OwnerReference 是一个至关重要的机制,它建立了 CR 与其派生资源之间的父子关系,简化了级联删除。

3. 前端状态同步:从 K8s Watch 到 Jotai Atoms

一个纯后端的 Operator 解决了自动化问题,但对于平台用户来说,系统的内部状态仍然是一个黑盒。我们需要一个 UI 来可视化 InferencePlatform 的状态。

架构

  1. Frontend (React + Jotai): 纯粹的展示层。
  2. Backend for Frontend (BFF): 一个简单的 Go 服务,它使用 K8s client-go 库来 Watch InferencePlatform 资源的变化。
  3. WebSocket: BFF 将从 K8s API Server 收到的事件(ADDED, MODIFIED, DELETED)实时推送到前端。

BFF 核心逻辑 (Go)

// Go
// main.go (in BFF service)

func watchInferencePlatforms(clientset *kubernetes.Interface, platformClientset *versioned.Interface, conn *websocket.Conn) {
	watchlist := cache.NewListWatchFromClient(
		platformClientset.CvV1alpha1().RESTClient(),
		"inferenceplatforms",
		v1.NamespaceAll,
		fields.Everything(),
	)

	_, controller := cache.NewInformer(
		watchlist,
		&cvv1alpha1.InferencePlatform{},
		0, // resyncPeriod
		cache.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				platform := obj.(*cvv1alpha1.InferencePlatform)
				log.Printf("ADD: %s", platform.Name)
				// 将 ADD 事件和对象序列化为 JSON 推送到 WebSocket
				sendEvent(conn, "ADDED", platform)
			},
			UpdateFunc: func(oldObj, newObj interface{}) {
				platform := newObj.(*cvv1alpha1.InferencePlatform)
				log.Printf("UPDATE: %s", platform.Name)
				// 将 UPDATE 事件和对象序列化为 JSON 推送到 WebSocket
				sendEvent(conn, "MODIFIED", platform)
			},
			DeleteFunc: func(obj interface{}) {
				platform := obj.(*cvv1alpha1.InferencePlatform)
				log.Printf("DELETE: %s", platform.Name)
				// 将 DELETE 事件和对象(仅含元数据)序列化为 JSON 推送到 WebSocket
				sendEvent(conn, "DELETED", platform)
			},
		},
	)

	stop := make(chan struct{})
	defer close(stop)
	go controller.Run(stop)

	// 保持连接,直到客户端断开
	for {
		// 可以添加心跳机制
		time.Sleep(15 * time.Second)
	}
}

BFF 使用 Informer 机制,这是一个比直接 Watch 更高效的方式,它在本地维护了一个资源缓存,减轻了 K8s API Server 的压力。

Frontend 核心逻辑 (TypeScript + Jotai)
Jotai 的原子化状态管理模型非常适合处理这种来自外部数据源的、离散的状态更新。

// TSX
// store/platformAtoms.ts

import { atom } from 'jotai';

// 定义平台对象类型
interface InferencePlatform {
  metadata: {
    name: string;
    namespace: string;
    uid: string;
  };
  spec: { /* ... */ };
  status?: {
    qdrantReadyReplicas: number;
    inferenceReadyReplicas: number;
    conditions: any[];
  };
}

// 使用 Map 来存储平台对象,以 uid 为 key,实现高效的增删改查
type PlatformsMap = Map<string, InferencePlatform>;

// 基础 atom,存储所有平台的 Map
export const platformsMapAtom = atom<PlatformsMap>(new Map());

// 派生 atom,用于处理 WebSocket 事件
// 这是一个 write-only atom,组件通过 useSetAtom 来调用它
export const platformEventHandlerAtom = atom(
  null,
  (get, set, event: { type: string; payload: InferencePlatform }) => {
    const currentMap = new Map(get(platformsMapAtom));
    const { uid } = event.payload.metadata;

    switch (event.type) {
      case 'ADDED':
      case 'MODIFIED':
        currentMap.set(uid, event.payload);
        break;
      case 'DELETED':
        currentMap.delete(uid);
        break;
      default:
        console.warn('Unknown event type:', event.type);
    }
    // 更新基础 atom,触发所有依赖该 atom 的组件重新渲染
    set(platformsMapAtom, currentMap);
  }
);

// 派生 atom,将 Map 转换为数组,方便 UI 渲染
export const platformsListAtom = atom((get) =>
  Array.from(get(platformsMapAtom).values())
);

React 组件

// TSX
// components/PlatformDashboard.tsx

import { useAtom, useSetAtom } from 'jotai';
import { useEffect } from 'react';
import { platformsListAtom, platformEventHandlerAtom } from '../store/platformAtoms';

const useWebSocket = (url: string, onMessage: (event: any) => void) => {
  useEffect(() => {
    const ws = new WebSocket(url);
    ws.onmessage = (event) => {
      onMessage(JSON.parse(event.data));
    };
    // 在真实项目中需要处理重连和错误
    return () => ws.close();
  }, [url, onMessage]);
};

export const PlatformDashboard = () => {
  const platforms = useAtom(platformsListAtom)[0];
  const handlePlatformEvent = useSetAtom(platformEventHandlerAtom);

  useWebSocket('ws://localhost:8080/ws/platforms', handlePlatformEvent);

  return (
    <div>
      <h1>Inference Platforms</h1>
      <table>
        {/* 表头 */}
        <tbody>
          {platforms.map(p => (
            <tr key={p.metadata.uid}>
              <td>{p.metadata.name}</td>
              <td>
                {p.status?.qdrantReadyReplicas || 0} / {p.spec.qdrant.replicas}
              </td>
              <td>
                {p.status?.inferenceReadyReplicas || 0} / {p.spec.inference.replicas}
              </td>
              {/* 其他状态信息 */}
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
};

这个架构实现了完美的关注点分离。Operator 负责核心的后端协调,它完全不知道 UI 的存在。UI 通过一个轻量的 BFF 订阅状态变更,Jotai 则以一种高效且可预测的方式管理着前端的实时状态。当管理员通过 kubectl edit 修改一个 InferencePlatform 的副本数时,变更会立即在 UI 上反映出来,无需刷新页面。

架构的扩展性与局限性

我们构建的这个 Operator 只是一个起点。它的价值在于提供了一个可扩展的框架。未来可以轻松地加入更复杂的功能,例如:

  • 智能伸缩:集成 KEDA,根据自定义指标(如 GPU 利用率、消息队列积压)自动调整推理服务的副本数。
  • 模型热加载:在 Operator 中添加逻辑,当 spec.modelUrl 变更时,不是重启 Pod,而是向 Pod 发送信号,触发模型的热加载,实现零中断模型更新。
  • 备份与恢复:在 CRD 中添加 backupSpec,Operator 可以定期调用 Qdrant API 创建快照并上传到对象存储。

然而,这个方案也存在局限性:

  • 耦合于 K8s: Operator 的设计使其与 Kubernetes API 深度绑定,无法脱离 K8s 环境运行。这既是优点也是缺点。
  • 复杂性管理: 随着 Operator 功能的增加,调谐循环的逻辑会变得越来越复杂,状态转换的管理和测试将成为一个挑战。编写健壮的、幂等的调谐逻辑至关重要。
  • 实时性依赖: 前端 UI 的实时性完全依赖于 K8s API Server 的 Watch 机制和网络链路。在超大规模集群中,API Server 的负载和事件延迟可能会成为瓶颈。
  • 粒度: 当前 Operator 管理的是粗粒度的资源生命周期。它不处理 CV Pod 之间的细粒度任务分配或分布式锁等问题。如果需要这类功能,可能还需要引入一个专门的协调服务,Operator 可以负责部署和管理这个协调服务,但具体的业务协调逻辑应在其内部完成。

  目录