WEBKT

深入浅出 Kubernetes Operator:原理、实践与简单示例

14 0 0 0

Operator 的核心概念

Operator 的工作原理

编写一个简单的 Operator

1. 定义 CRD

2. 创建 CR

3. 编写 Controller

4. 部署 Operator

5. 验证 Operator

总结

Kubernetes Operator 是一种扩展 Kubernetes API 的方式,用于自动化管理和运维复杂的应用程序。它通过自定义资源(Custom Resources,CR)和控制器(Controller)来实现,将运维知识编码到软件中,从而实现应用程序的自动化部署、配置、升级、备份和恢复等操作。简单来说,Operator就像一个专门为你的应用量身定制的 Kubernetes 扩展,让 Kubernetes 更好地理解和管理你的应用。

Operator 的核心概念

  1. Custom Resource Definition (CRD):CRD 允许你定义自己的 Kubernetes 资源类型。例如,你可以定义一个名为 MyApp 的 CRD,用于描述你的应用程序的配置和状态。
  2. Custom Resource (CR):CR 是 CRD 的一个实例。例如,你可以创建一个 MyApp 类型的 CR,指定你的应用程序的名称、版本和所需的资源。
  3. Controller:Controller 负责监听 CR 的变化,并根据 CR 的定义,执行相应的操作,例如部署应用程序、配置服务、创建备份等。Controller 通过 Kubernetes API Server 与 Kubernetes 集群进行交互。

Operator 的工作原理

Operator 的工作原理可以用一个循环来概括:

  1. 监听:Controller 监听 Kubernetes API Server,关注 CR 的变化。
  2. 对比:当 CR 发生变化时,Controller 获取 CR 的最新状态,并与期望状态进行对比。
  3. 调谐:如果 CR 的当前状态与期望状态不一致,Controller 执行相应的操作,使 CR 的状态与期望状态保持一致。这个过程称为“调谐”(Reconcile)。
  4. 更新:Controller 更新 CR 的状态,以便 Kubernetes API Server 能够反映 CR 的最新状态。

这个循环不断重复,确保应用程序始终处于期望的状态。

编写一个简单的 Operator

为了更好地理解 Operator 的工作原理,我们来编写一个简单的 Operator,用于管理一个名为 MyWebApp 的 Web 应用程序。这个 Operator 将实现以下功能:

  • 部署一个 MyWebApp 的 Deployment。
  • 创建一个 Service,将 MyWebApp 暴露给外部访问。

1. 定义 CRD

首先,我们需要定义一个名为 MyWebApp 的 CRD。CRD 的 YAML 文件如下所示:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: mywebapp.example.com
spec:
group: example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
size: # 应用副本数
type: integer
image: # 应用镜像
type: string
scope: Namespaced
names:
plural: mywebapp
singular: mywebapp
kind: MyWebApp
shortNames: # 简称
- mw

这个 CRD 定义了一个名为 MyWebApp 的资源,它属于 example.com 组,版本为 v1MyWebApp 资源具有一个 spec 字段,用于指定应用程序的配置,包括 size(副本数)和 image(镜像)。

2. 创建 CR

接下来,我们可以创建一个 MyWebApp 类型的 CR,指定应用程序的名称、版本和所需的资源。CR 的 YAML 文件如下所示:

apiVersion: example.com/v1
kind: MyWebApp
metadata:
name: mywebapp-sample
spec:
size: 3
image: nginx:latest

这个 CR 定义了一个名为 mywebapp-sampleMyWebApp 资源,它指定应用程序的副本数为 3,镜像为 nginx:latest

3. 编写 Controller

现在,我们需要编写 Controller,负责监听 MyWebApp CR 的变化,并根据 CR 的定义,执行相应的操作。Controller 可以使用多种编程语言编写,例如 Go、Python、Java 等。这里我们使用 Go 语言编写 Controller。

首先,我们需要安装 Kubernetes 的 Go 客户端库:

go get k8s.io/client-go@kubernetes-1.28.0
go get k8s.io/apimachinery@kubernetes-1.28.0

然后,我们可以编写 Controller 的代码。以下是一个简单的 Controller 示例:

package main
import (
"context"
"fmt"
"os"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
// 自定义的CRD
webappv1 "./pkg/apis/webapp/v1"
webappclientset "./pkg/client/clientset/versioned"
webappinformers "./pkg/client/informers/externalversions/webapp/v1"
)
const controllerName = "mywebapp-controller"
func main() {
// 1. 初始化 K8S 配置
config, err := rest.InClusterConfig()
if err != nil {
config, err = clientcmd.BuildConfigFromFlags("", os.Getenv("HOME")+"/.kube/config")
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
}
// 2. 创建 K8S 客户端
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 3. 创建 CRD 客户端
webappClient, err := webappclientset.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 4. 创建 Informer
webappInformerFactory := webappinformers.NewSharedInformerFactory(webappClient, 0)
webappInformer := webappInformerFactory.Webapp().V1().MyWebApps().Informer()
// 5. 创建 Controller
controller := &Controller{
k8sClient: k8sClient,
webappClient: webappClient,
webappInformer: webappInformer,
}
// 6. 启动 Informer
stopCh := make(chan struct{})
defer close(stopCh)
webappInformerFactory.Start(stopCh)
// 7. 启动 Controller
err = controller.Run(2, stopCh)
if err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
type Controller struct {
k8sClient kubernetes.Interface
webappClient webappclientset.Interface
webappInformer cache.SharedIndexInformer
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
// 同步缓存,确保 Informer 已经同步了所有资源
if !cache.WaitForCacheSync(stopCh, c.webappInformer.HasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
// 启动 worker 线程
for i := 0; i < threadiness; i++ {
go func() {
for c.processNextItem() {
}
}()
}
<-stopCh
return nil
}
func (c *Controller) processNextItem() bool {
// 从 Informer 的队列中获取一个事件
obj, shutdown := c.webappInformer.GetIndexer().Pop()
if shutdown {
return false
}
// 处理事件
err := func(obj interface{}) error {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
// 将事件对象转换为 MyWebApp 对象
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return err
}
// 从 key 中提取 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 获取 MyWebApp 对象
myWebApp, err := c.webappClient.WebappV1().MyWebApps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
// 如果 MyWebApp 对象不存在,则说明已经被删除
if apierrors.IsNotFound(err) {
klog.Infof("MyWebApp '%s/%s' in work queue no longer exists", namespace, name)
return nil
}
return err
}
// 调谐 MyWebApp 对象
err = c.syncHandler(myWebApp, namespace)
if err != nil {
return err
}
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (c *Controller) syncHandler(myWebApp *webappv1.MyWebApp, namespace string) error {
// 1. 创建 Deployment
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: myWebApp.Name + "-deployment",
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(myWebApp, webappv1.SchemeGroupVersion.WithKind("MyWebApp")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: int32Ptr(int32(myWebApp.Spec.Size)),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": myWebApp.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": myWebApp.Name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "webapp",
Image: myWebApp.Spec.Image,
},
},
},
},
},
}
_, err := c.k8sClient.AppsV1().Deployments(namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
// 2. 创建 Service
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: myWebApp.Name + "-service",
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(myWebApp, webappv1.SchemeGroupVersion.WithKind("MyWebApp")),
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": myWebApp.Name,
},
Ports: []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: 80,
TargetPort: int32ToIntstr(80),
},
},
},
}
_, err = c.k8sClient.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
return nil
}
func int32Ptr(i int32) *int32 {
return &i
}
func int32ToIntstr(i int32) interface{} {
return i
}

这个 Controller 的主要逻辑如下:

  • main 函数
    • 初始化 Kubernetes 配置。
    • 创建 Kubernetes 客户端和 CRD 客户端。
    • 创建 Informer,用于监听 MyWebApp 资源的变化。
    • 创建 Controller 实例。
    • 启动 Informer 和 Controller。
  • Controller 结构体
    • 包含 Kubernetes 客户端、CRD 客户端和 Informer。
  • Run 函数
    • 同步缓存,确保 Informer 已经同步了所有资源。
    • 启动 worker 线程,处理事件。
  • processNextItem 函数
    • 从 Informer 的队列中获取一个事件。
    • 将事件对象转换为 MyWebApp 对象。
    • 调用 syncHandler 函数,调谐 MyWebApp 对象。
  • syncHandler 函数
    • 创建 Deployment 和 Service,确保应用程序按照 CR 的定义运行。

4. 部署 Operator

要部署 Operator,你需要:

  1. 将 CRD 的 YAML 文件应用到 Kubernetes 集群:

    kubectl apply -f mywebapp-crd.yaml
    
  2. 编译 Controller 代码,并将其打包成 Docker 镜像。

  3. 创建一个 Deployment,将 Controller 部署到 Kubernetes 集群。

  4. 创建一个 ServiceAccount,并授予 Controller 相应的权限,以便它可以访问 Kubernetes API Server。

5. 验证 Operator

部署完成后,你可以创建一个 MyWebApp 类型的 CR,验证 Operator 是否正常工作:

kubectl apply -f mywebapp-cr.yaml

如果 Operator 正常工作,它将自动创建一个 Deployment 和 Service,用于运行 MyWebApp 应用程序。

总结

Kubernetes Operator 是一种强大的工具,可以自动化管理和运维复杂的应用程序。通过自定义资源和控制器,Operator 将运维知识编码到软件中,从而实现应用程序的自动化部署、配置、升级、备份和恢复等操作。希望本文能够帮助你理解 Operator 的工作原理,并掌握编写一个简单的 Operator 的方法。当然,这只是一个非常简单的示例,实际的 Operator 可能会更加复杂,需要处理更多的细节和错误情况。但是,掌握了 Operator 的核心概念和基本原理,你就可以构建出更加强大的 Operator,从而更好地管理你的应用程序。

运维小能手 KubernetesOperator自动化运维

评论点评

打赏赞助
sponsor

感谢您的支持让我们更好的前行

分享

QRcode

https://www.webkt.com/article/10175