深入浅出 Kubernetes Operator:原理、实践与简单示例
Kubernetes Operator 是一种扩展 Kubernetes API 的方式,用于自动化管理和运维复杂的应用程序。它通过自定义资源(Custom Resources,CR)和控制器(Controller)来实现,将运维知识编码到软件中,从而实现应用程序的自动化部署、配置、升级、备份和恢复等操作。简单来说,Operator就像一个专门为你的应用量身定制的 Kubernetes 扩展,让 Kubernetes 更好地理解和管理你的应用。
Operator 的核心概念
- Custom Resource Definition (CRD):CRD 允许你定义自己的 Kubernetes 资源类型。例如,你可以定义一个名为
MyApp的 CRD,用于描述你的应用程序的配置和状态。 - Custom Resource (CR):CR 是 CRD 的一个实例。例如,你可以创建一个
MyApp类型的 CR,指定你的应用程序的名称、版本和所需的资源。 - Controller:Controller 负责监听 CR 的变化,并根据 CR 的定义,执行相应的操作,例如部署应用程序、配置服务、创建备份等。Controller 通过 Kubernetes API Server 与 Kubernetes 集群进行交互。
Operator 的工作原理
Operator 的工作原理可以用一个循环来概括:
- 监听:Controller 监听 Kubernetes API Server,关注 CR 的变化。
- 对比:当 CR 发生变化时,Controller 获取 CR 的最新状态,并与期望状态进行对比。
- 调谐:如果 CR 的当前状态与期望状态不一致,Controller 执行相应的操作,使 CR 的状态与期望状态保持一致。这个过程称为“调谐”(Reconcile)。
- 更新:Controller 更新 CR 的状态,以便 Kubernetes API Server 能够反映 CR 的最新状态。
这个循环不断重复,确保应用程序始终处于期望的状态。
编写一个简单的 Operator
为了更好地理解 Operator 的工作原理,我们来编写一个简单的 Operator,用于管理一个名为 MyWebApp 的 Web 应用程序。这个 Operator 将实现以下功能:
- 部署一个
MyWebApp的 Deployment。 - 创建一个 Service,将
MyWebApp暴露给外部访问。
1. 定义 CRD
首先,我们需要定义一个名为 MyWebApp 的 CRD。CRD 的 YAML 文件如下所示:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mywebapp.example.com
spec:
group: example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
size: # 应用副本数
type: integer
image: # 应用镜像
type: string
scope: Namespaced
names:
plural: mywebapp
singular: mywebapp
kind: MyWebApp
shortNames: # 简称
- mw
这个 CRD 定义了一个名为 MyWebApp 的资源,它属于 example.com 组,版本为 v1。MyWebApp 资源具有一个 spec 字段,用于指定应用程序的配置,包括 size(副本数)和 image(镜像)。
2. 创建 CR
接下来,我们可以创建一个 MyWebApp 类型的 CR,指定应用程序的名称、版本和所需的资源。CR 的 YAML 文件如下所示:
apiVersion: example.com/v1
kind: MyWebApp
metadata:
name: mywebapp-sample
spec:
size: 3
image: nginx:latest
这个 CR 定义了一个名为 mywebapp-sample 的 MyWebApp 资源,它指定应用程序的副本数为 3,镜像为 nginx:latest。
3. 编写 Controller
现在,我们需要编写 Controller,负责监听 MyWebApp CR 的变化,并根据 CR 的定义,执行相应的操作。Controller 可以使用多种编程语言编写,例如 Go、Python、Java 等。这里我们使用 Go 语言编写 Controller。
首先,我们需要安装 Kubernetes 的 Go 客户端库:
go get k8s.io/client-go@kubernetes-1.28.0
go get k8s.io/apimachinery@kubernetes-1.28.0
然后,我们可以编写 Controller 的代码。以下是一个简单的 Controller 示例:
package main
import (
"context"
"fmt"
"os"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
// 自定义的CRD
webappv1 "./pkg/apis/webapp/v1"
webappclientset "./pkg/client/clientset/versioned"
webappinformers "./pkg/client/informers/externalversions/webapp/v1"
)
const controllerName = "mywebapp-controller"
func main() {
// 1. 初始化 K8S 配置
config, err := rest.InClusterConfig()
if err != nil {
config, err = clientcmd.BuildConfigFromFlags("", os.Getenv("HOME")+"/.kube/config")
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
}
// 2. 创建 K8S 客户端
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 3. 创建 CRD 客户端
webappClient, err := webappclientset.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 4. 创建 Informer
webappInformerFactory := webappinformers.NewSharedInformerFactory(webappClient, 0)
webappInformer := webappInformerFactory.Webapp().V1().MyWebApps().Informer()
// 5. 创建 Controller
controller := &Controller{
k8sClient: k8sClient,
webappClient: webappClient,
webappInformer: webappInformer,
}
// 6. 启动 Informer
stopCh := make(chan struct{})
defer close(stopCh)
webappInformerFactory.Start(stopCh)
// 7. 启动 Controller
err = controller.Run(2, stopCh)
if err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
type Controller struct {
k8sClient kubernetes.Interface
webappClient webappclientset.Interface
webappInformer cache.SharedIndexInformer
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
// 同步缓存,确保 Informer 已经同步了所有资源
if !cache.WaitForCacheSync(stopCh, c.webappInformer.HasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
// 启动 worker 线程
for i := 0; i < threadiness; i++ {
go func() {
for c.processNextItem() {
}
}()
}
<-stopCh
return nil
}
func (c *Controller) processNextItem() bool {
// 从 Informer 的队列中获取一个事件
obj, shutdown := c.webappInformer.GetIndexer().Pop()
if shutdown {
return false
}
// 处理事件
err := func(obj interface{}) error {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
// 将事件对象转换为 MyWebApp 对象
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return err
}
// 从 key 中提取 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 获取 MyWebApp 对象
myWebApp, err := c.webappClient.WebappV1().MyWebApps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
// 如果 MyWebApp 对象不存在,则说明已经被删除
if apierrors.IsNotFound(err) {
klog.Infof("MyWebApp '%s/%s' in work queue no longer exists", namespace, name)
return nil
}
return err
}
// 调谐 MyWebApp 对象
err = c.syncHandler(myWebApp, namespace)
if err != nil {
return err
}
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (c *Controller) syncHandler(myWebApp *webappv1.MyWebApp, namespace string) error {
// 1. 创建 Deployment
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: myWebApp.Name + "-deployment",
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(myWebApp, webappv1.SchemeGroupVersion.WithKind("MyWebApp")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: int32Ptr(int32(myWebApp.Spec.Size)),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": myWebApp.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": myWebApp.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "webapp",
Image: myWebApp.Spec.Image,
},
},
},
},
},
}
_, err := c.k8sClient.AppsV1().Deployments(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
// 2. 创建 Service
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: myWebApp.Name + "-service",
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(myWebApp, webappv1.SchemeGroupVersion.WithKind("MyWebApp")),
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": myWebApp.Name,
},
Ports: []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: 80,
TargetPort: int32ToIntstr(80),
},
},
},
}
_, err = c.k8sClient.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}
func int32Ptr(i int32) *int32 {
return &i
}
func int32ToIntstr(i int32) interface{} {
return i
}
这个 Controller 的主要逻辑如下:
main函数:- 初始化 Kubernetes 配置。
- 创建 Kubernetes 客户端和 CRD 客户端。
- 创建 Informer,用于监听
MyWebApp资源的变化。 - 创建 Controller 实例。
- 启动 Informer 和 Controller。
Controller结构体:- 包含 Kubernetes 客户端、CRD 客户端和 Informer。
Run函数:- 同步缓存,确保 Informer 已经同步了所有资源。
- 启动 worker 线程,处理事件。
processNextItem函数:- 从 Informer 的队列中获取一个事件。
- 将事件对象转换为
MyWebApp对象。 - 调用
syncHandler函数,调谐MyWebApp对象。
syncHandler函数:- 创建 Deployment 和 Service,确保应用程序按照 CR 的定义运行。
4. 部署 Operator
要部署 Operator,你需要:
将 CRD 的 YAML 文件应用到 Kubernetes 集群:
kubectl apply -f mywebapp-crd.yaml编译 Controller 代码,并将其打包成 Docker 镜像。
创建一个 Deployment,将 Controller 部署到 Kubernetes 集群。
创建一个 ServiceAccount,并授予 Controller 相应的权限,以便它可以访问 Kubernetes API Server。
5. 验证 Operator
部署完成后,你可以创建一个 MyWebApp 类型的 CR,验证 Operator 是否正常工作:
kubectl apply -f mywebapp-cr.yaml
如果 Operator 正常工作,它将自动创建一个 Deployment 和 Service,用于运行 MyWebApp 应用程序。
总结
Kubernetes Operator 是一种强大的工具,可以自动化管理和运维复杂的应用程序。通过自定义资源和控制器,Operator 将运维知识编码到软件中,从而实现应用程序的自动化部署、配置、升级、备份和恢复等操作。希望本文能够帮助你理解 Operator 的工作原理,并掌握编写一个简单的 Operator 的方法。当然,这只是一个非常简单的示例,实际的 Operator 可能会更加复杂,需要处理更多的细节和错误情况。但是,掌握了 Operator 的核心概念和基本原理,你就可以构建出更加强大的 Operator,从而更好地管理你的应用程序。