開發 Operator 調度 GPU 實例資源池
本章節將引入一個新的概念——K8s Operator,它是 K8s 的一種擴展形式,可以幫助用戶以 K8s 聲明式 API 的方式管理應用及服務,Operator 定義了一組在 Kubernetes 集群中打包和部署復雜業務應用的方法,主要是為解決特定應用或服務關于如何運行、部署及出現問題時如何處理提供的一種特定的自定義方式。比如:
- 按需部署應用服務
- 實現應用狀態的備份和還原,完成版本升級
- 數據庫 schema 或額外的配置設置的改動
在 K8s 中我們使用的 Deployment、Daemonset、Statefulset 等這些都是 K8s 的資源,這些資源的創建、刪除、更新等動作都會被稱為事件,K8s 的 Controller Manager 負責事件的監聽,并觸發對應的動作來滿足期望,這種方式就是聲明式,即用戶只需要關心應用程序的最終狀態。當我們在使用中發現有些資源并不能滿足日常的需求,對于這類需求可以使用 K8s 的自定義資源和 Operator 為應用程序提供基于 K8s 的擴展。
在這其中,CRD 就是對自定義資源的描述,如果要自定義資源,就需要先定義好 CRD,也就是介紹這個資源有什么屬性,這些屬性的類型、結構是怎樣的。
比如 PG 的 Operator 如下:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: postgresqls.acid.zalan.do
labels:
app.kubernetes.io/name: postgres-operator
annotations:
"helm.sh/hook": crd-install
spec:
group: acid.zalan.do
names:
kind: postgresql
listKind: postgresqlList
plural: postgresqls
singular: postgresql
shortNames:
- pg additionalPrinterColumns:
- name: Team
type: string
description: Team responsible for Postgres CLuster
JSONPath: .spec.teamId
- name: Version
type: string
description: PostgreSQL version
JSONPath: .spec.postgresql.version
- name: Pods
type: integer
description: Number of Pods per Postgres cluster
JSONPath: .spec.numberOfInstances
- name: Volume
type: string
description: Size of the bound volume
JSONPath: .spec.volume.sizeCRD 主要包括 apiVersion、kind、metadata 和 spec 四個部分。其中最關鍵的是 apiVersion 和 kind,apiVersion 表示資源所屬組織和版本,apiVersion 一般由 APIGourp 和 Version 組成,這里的 APIGourp 是http://apiextensions.k8s.io,Version 是 v1beta1,相關信息可以通過kubectl api-resoures查看。kind 表示資源類型,這里是CustomResourceDefinition,表示是一個自定義的資源描述。
本文我們將自己開發一個 Operator 來維護 GPU 資源池的穩定,解決 AI 模型訓練的基礎平臺的穩定性。其架構如下:

ee11ee9bb3ba2f232c0f78573956823f MD5
其中:
- GPU 資源池采用的是騰訊云的競價 GPU 實例
- Operator 運行在 K8s 中,通過 SpootPool 控制 GPU 資源池的數量
- 若云平臺釋放了某臺 GPU 實例,當 Operator 監聽到資源池數量和期望的不匹配,會自動補充到期望數量
Operator 的開發有多種腳手架,常用的有 operator-sdk、kubebuilder 等,這里我們將使用 kubebuilder 來完成 Operator 的開發。
前置條件
- 準備一個可用的 K8s 集群,可以使用 kind、kubeadm、二進制等各種形式安裝,如果使用 kubeadm 安裝集群,可以參考 Kubernetes集群管理。
- 安裝好 kubebuilder,可以參考 kubebuild快速安裝。
- 準備好云平臺的 AK,這里是采用騰訊云,其他云類似。
快速開始
1.設計 CRD
在開發之前需要先設計好 CRD(就像業務開發前先設計好表結構一樣),本文的 CRD 主要包含云平臺虛擬機的開通,包括最小和最大實例數,以及騰訊云 SDK 所需要的各種參數,比如地域、可用區、VPC、子網、安全組、鏡像等。
最后 CRD 設計如下:
apiVersion: devops.jokerbai.com/v1
kind: Spotpool
metadata:
labels:
app.kubernetes.io/name: spotpool
app.kubernetes.io/managed-by: kustomize
name: spotpool-sample
spec:
secretId: 密鑰ID
secretKey: 密鑰Key
region: 區域
availabilityZone: 可用區
instanceType: 實例類型
minimum: 最小實例數
maximum: 最大實例數
subnetId: 子網ID
vpcId: VPC ID
securityGroupIds:
- 安全組
imageId: 鏡像ID
instanceChargeType: 實例付費類型2.初始化項目
定義好 CRD 字段之后,我們先使用 kubebuilder 初始化一個 Operator 項目,命令如下:
(1)初始化項目
mkdir spotpool && cd spotpool
kubebuilder init \
--domain jokerbai.com \
--repo github.com/joker-bai/spotpool \
--project-name spotpool \
--plugins go/v4 \
--owner "Joker Bai"(2)創建 API
kubebuilder create api --group devops.jokerbai.com --version v1 --kind Spotpool(3)生成后的目錄結構大致如下:
.
├── api
│ └── v1
│ ├── groupversion_info.go
│ ├── spotpool_types.go
│ └── zz_generated.deepcopy.go
├── bin
│ ├── controller-gen -> /root/workspace/godev/src/github.com/joker-bai/spotpool/bin/controller-gen-v0.18.0
│ └── controller-gen-v0.18.0
├── cmd
│ └── main.go
├── config
│ ├── crd
│ │ ├── kustomization.yaml
│ │ └── kustomizeconfig.yaml
│ ├── default
│ │ ├── cert_metrics_manager_patch.yaml
│ │ ├── kustomization.yaml
│ │ ├── manager_metrics_patch.yaml
│ │ └── metrics_service.yaml
│ ├── manager
│ │ ├── kustomization.yaml
│ │ └── manager.yaml
│ ├── network-policy
│ │ ├── allow-metrics-traffic.yaml
│ │ └── kustomization.yaml
│ ├── prometheus
│ │ ├── kustomization.yaml
│ │ ├── monitor_tls_patch.yaml
│ │ └── monitor.yaml
│ ├── rbac
│ │ ├── kustomization.yaml
│ │ ├── leader_election_role_binding.yaml
│ │ ├── leader_election_role.yaml
│ │ ├── metrics_auth_role_binding.yaml
│ │ ├── metrics_auth_role.yaml
│ │ ├── metrics_reader_role.yaml
│ │ ├── role_binding.yaml
│ │ ├── role.yaml
│ │ ├── service_account.yaml
│ │ ├── spotpool_admin_role.yaml
│ │ ├── spotpool_editor_role.yaml
│ │ └── spotpool_viewer_role.yaml
│ └── samples
│ ├── devops.jokerbai.com_v1_spotpool.yaml
│ └── kustomization.yaml
├── Dockerfile
├── go.mod
├── go.sum
├── hack
│ └── boilerplate.go.txt
├── internal
│ └── controller
│ ├── spotpool_controller.go
│ ├── spotpool_controller_test.go
│ └── suite_test.go
├── Makefile
├── PROJECT
├── README.md
└── test
├── e2e
│ ├── e2e_suite_test.go
│ └── e2e_test.go
└── utils
└── utils.go3.CRD 開發
(1)定義 API
在api/v1alpha1/spotpool_types.go中定義 CRD 的結構體,如下:
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// SpotpoolSpec defines the desired state of Spotpool
type SpotpoolSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
SecretId string `json:"secretId,omitempty"`
SecretKey string `json:"secretKey,omitempty"`
Region string `json:"region,omitempty"`
AvaliableZone string `json:"availabilityZone,omitempty"`
InstanceType string `json:"instanceType,omitempty"`
SubnetId string `json:"subnetId,omitempty"`
VpcId string `json:"vpcId,omitempty"`
SecurityGroupId []string `json:"securityGroupIds,omitempty"`
ImageId string `json:"imageId,omitempty"`
InstanceChargeType string `json:"instanceChargeType,omitempty"`
Minimum int32 `json:"minimum,omitempty"`
Maximum int32 `json:"maximum,omitempty"`
}
// SpotpoolStatus defines the observed state of Spotpool
type SpotpoolStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Size int32 `json:"size,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,rep,name=conditions"`
Instances []Instances `json:"instances,omitempty"`
}
type Instances struct {
InstanceId string `json:"instanceId,omitempty"`
PublicIp string `json:"publicIp,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// Spotpool is the Schema for the spotpools API
type Spotpool struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec SpotpoolSpec `json:"spec,omitempty"`
Status SpotpoolStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// SpotpoolList contains a list of Spotpool
type SpotpoolList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Spotpool `json:"items"`
}
func init() {
SchemeBuilder.Register(&Spotpool{}, &SpotpoolList{})
}在 SpotpoolSpec 中定義設計的 CRD 結構體,這些字段都是創建虛擬機的必要字段。另外,在 SpotpoolStatus 中定義返回狀態里的信息,這里只需要 Instance 相關的信息。
(2)生成代碼
API 相關的代碼開發完后,執行以下命令生成代碼:
make generate
make manifests4.Controller 開發
(1)開發控制器邏輯
控制器的主邏輯是:
- 從云平臺獲取運行的實例數
- 判斷實例數和期望的實例數是否相等
- 如果小于期望值,則創建實例
- 如果大于期望值,則刪除實例
所以主邏輯的代碼如下,修改internal/controller/spotpool_controller.go:
func (r *SpotpoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx)
// 獲取用戶期望
spotpool := &devopsjokerbaicomv1.Spotpool{}
if err := r.Get(ctx, req.NamespacedName, spotpool); err != nil {
log.Error(err, "unable to fetch spotspool")
}
// 從云平臺獲取獲取運行的實例
runningVmList, err := r.getRunningInstanceIds(spotpool)
if err != nil {
log.Error(err, "get running vm instance failed")
// 十秒后重試
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
runningCount := len(runningVmList)
switch {
case runningCount < int(spotpool.Spec.Minimum):
// 創建實例擴容
delta := spotpool.Spec.Minimum - int32(runningCount)
log.Info("creating instances", "delta", delta)
err = r.runInstances(spotpool, delta)
if err != nil {
log.Error(err, "unable to create instances")
return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}
case runningCount > int(spotpool.Spec.Maximum):
// 刪除實例縮容
delta := int32(runningCount) - spotpool.Spec.Maximum
log.Info("terminating instances", "delta", delta)
err = r.terminateInstances(spotpool, delta)
if err != nil {
log.Error(err, "unable to terminate instances")
return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}
}
return ctrl.Result{RequeueAfter: 40 * time.Second}, nil
}其中:
r.getRunningInstanceIds(spotpool)用戶獲取云平臺運行的實例數r.runInstances(spotpool, delta)用于調用云平臺進行擴容r.terminateInstances(spotpool, delta)用于調用云平臺進行縮容
接下來分別實現上面的三個方法。
(1)首先,實現 getRunningInstanceIds 方法
func (r *SpotpoolReconciler) getRunningInstanceIds(spotpool *devopsjokerbaicomv1.Spotpool) ([]string, error) {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return nil, err
}
request := cvm.NewDescribeInstancesRequest()
response, err := client.DescribeInstances(request)
if err != nil {
return nil, err
}
var instances []devopsjokerbaicomv1.Instances
var runningInstanceIDs []string
for _, instance := range response.Response.InstanceSet {
if *instance.InstanceState == "RUNNING" || *instance.InstanceState == "PENDING" || *instance.InstanceState == "STARTING" {
runningInstanceIDs = append(runningInstanceIDs, *instance.InstanceId)
}
// 檢查實例的公網 IP,如果不存在公網 IP,則繼續重試
if len(instance.PublicIpAddresses) == 0 {
return nil, fmt.Errorf("instance %s does not have public ip", *instance.InstanceId)
}
instances = append(instances, devopsjokerbaicomv1.Instances{
InstanceId: *instance.InstanceId,
PublicIp: *instance.PublicIpAddresses[0],
})
}
// 更新 status
spotpool.Status.Instances = instances
err = r.Status().Update(context.Background(), spotpool)
if err != nil {
return nil, err
}
return runningInstanceIDs, nil
}
// 獲取騰訊云 SDK client
func (r *SpotpoolReconciler) createCVMClient(spec devopsjokerbaicomv1.SpotpoolSpec) (*cvm.Client, error) {
credential := common.NewCredential(spec.SecretId, spec.SecretKey)
cpf := profile.NewClientProfile()
cpf.HttpProfile.ReqMethod = "POST"
cpf.HttpProfile.ReqTimeout = 30
cpf.SignMethod = "HmacSHA1"
client, err := cvm.NewClient(credential, spec.Region, cpf)
if err != nil {
return nil, err
}
return client, nil
}其中:
- 調用
r.createCVMClient(spotpool.Spec)獲取騰訊云SDK client - 然后調用
client.DescribeInstances(request)獲取實例詳細信息 - 最后通過判斷
instance.InstanceStat和instance.PublicIpAddresses的狀態信息決定是否是需要的實例 - 最后返回實例列表信息
(2)實現 r.runInstances(spotpool, delta) 用于調用云平臺進行擴容
func (r *SpotpoolReconciler) runInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return err
}
request := cvm.NewRunInstancesRequest()
request.ImageId = common.StringPtr(spotpool.Spec.ImageId)
request.Placement = &cvm.Placement{
Zone: common.StringPtr(spotpool.Spec.AvaliableZone),
}
request.InstanceChargeType = common.StringPtr(spotpool.Spec.InstanceChargeType)
request.InstanceCount = common.Int64Ptr(int64(count))
request.InstanceName = common.StringPtr("spotpool" + time.Now().Format("20060102150405"))
request.InstanceType = common.StringPtr(spotpool.Spec.InstanceType)
request.InternetAccessible = &cvm.InternetAccessible{
InternetChargeType: common.StringPtr("BANDWIDTH_POSTPAID_BY_HOUR"),
InternetMaxBandwidthOut: common.Int64Ptr(1),
PublicIpAssigned: common.BoolPtr(true),
}
request.LoginSettings = &cvm.LoginSettings{
Password: common.StringPtr("Password123"),
}
request.SecurityGroupIds = common.StringPtrs(spotpool.Spec.SecurityGroupId)
request.SystemDisk = &cvm.SystemDisk{
DiskType: common.StringPtr("CLOUD_BSSD"),
DiskSize: common.Int64Ptr(100),
}
request.VirtualPrivateCloud = &cvm.VirtualPrivateCloud{
SubnetId: common.StringPtr(spotpool.Spec.SubnetId),
VpcId: common.StringPtr(spotpool.Spec.VpcId),
}
// print request
fmt.Println(request.ToJsonString())
// 創建實例
response, err := client.RunInstances(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
return err
}
// other errors
if err != nil {
return err
}
// 獲取到返回的 instancesid
instanceIds := make([]string, 0, len(response.Response.InstanceIdSet))
for _, instanceId := range response.Response.InstanceIdSet {
instanceIds = append(instanceIds, *instanceId)
}
fmt.Println("run instances success", instanceIds)
// 更新 status
_, err = r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
return nil
}這個方法主要是調用 client.RunInstances(request) 進行實例創建,然后調用 r.getRunningInstanceIds(spotpool) 更新 status 的狀態信息。
(3)開發r.terminateInstances(spotpool, delta) 用于調用云平臺進行縮容
func (r *SpotpoolReconciler) terminateInstances(spotpool *devopsjokerbaicomv1.Spotpool, count int32) error {
client, err := r.createCVMClient(spotpool.Spec)
if err != nil {
return err
}
runningInstances, err := r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
instancesIds := runningInstances[:count]
request := cvm.NewTerminateInstancesRequest()
request.InstanceIds = common.StringPtrs(instancesIds)
// 獲取返回
response, err := client.TerminateInstances(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
return err
}
// other errors
if err != nil {
return err
}
fmt.Println("Terminate response: ", response)
fmt.Println("terminate instances success", instancesIds)
// 更新 status
_, err = r.getRunningInstanceIds(spotpool)
if err != nil {
return err
}
return nil
}刪除實例和創建實例的實現邏輯類似,先調用 client.TerminateInstances(request) 進行刪除,然后調用 r.getRunningInstanceIds(spotpool) 更新狀態。
上面三個步驟完成了主要邏輯開發,可以初步實現具體的效果,如果希望功能更健全,則需要對其進行開發優化。
部署和測試
1.本地測試
# 安裝 CRD
make install
# 運行 controller
make run2.創建 Spotpool 實例測試
(1)創建 Spotpool 資源清單,編輯 config/samples/devops.jokerbai.com_v1_spotpool.yaml
apiVersion: devops.jokerbai.com.jokerbai.com/v1
kind: Spotpool
metadata:
labels:
app.kubernetes.io/name: spotpool
app.kubernetes.io/managed-by: kustomize
name: spotpool-sample
spec:
secretId: xxx
secretKey: xxx
region: ap-singapore
availabilityZone: ap-singapore-2
instanceType: "GN7.2XLARGE32"
minimum: 2
maximum: 2
subnetId: DEFAULT
vpcId: DEFAULT
securityGroupIds:
- sg-xxx
imageId: img-xxx
instanceChargeType: SPOTPAID(2)運行資源清單
# 創建實例
kubectl apply -f config/samples/devops.jokerbai.com_v1_spotpool.yaml
# 查看狀態
kubectl get spotpool(3)構建并部署到集群
# 構建鏡像
make docker-build docker-push IMG=<your-registry>/spotpool:v1
# 部署到集群
make deploy IMG=<your-registry>/spotpool:v1(4)清理
# 刪除 operator
make undeploy
# 刪除 CRD
make uninstall最后
本文通過結合 Kubernetes、AI 和云平臺,深入探討了如何利用 K8s Operator 實現對 GPU 資源池的自動化管理。我們從 Operator 的核心概念出發,介紹了 CRD(自定義資源定義)和控制器的設計原理,并基于 kubebuilder 開發了一個名為 Spotpool 的 Operator,用于在騰訊云上維護競價實例的穩定運行。
整個開發過程遵循“聲明式 API”的思想,用戶只需定義期望的狀態(如最小/最大實例數),Operator 便會在后臺持續監控并自動調整實際狀態,確保資源池始終符合預期。這不僅極大地簡化了運維操作,也提升了 AI 模型訓練平臺的穩定性和彈性。
Operator 是云原生時代自動化運維的重要利器。掌握其開發方法,意味著我們不僅能“用好” Kubernetes,更能“擴展” Kubernetes,為復雜業務場景提供定制化的解決方案。






























