Kubernetes Operator 高级特性剖析- Webhook、Finalizer 与 Leader Election 深度实践
Kubernetes Operator 高级特性剖析- Webhook、Finalizer 与 Leader Election 深度实践
1. Webhook:动态验证与变更控制
1.1 Validating Webhook 的应用场景
1.2 Mutating Webhook 的应用场景
2. Finalizer:优雅删除资源
2.1 Finalizer 的工作原理
2.2 Finalizer 的应用场景
3. Leader Election:高可用性保障
3.1 Leader Election 的工作原理
3.2 Leader Election 的应用场景
4. 总结
Kubernetes Operator 高级特性剖析- Webhook、Finalizer 与 Leader Election 深度实践
Operator 作为 Kubernetes 扩展 API 的强大工具,允许开发者以声明式的方式管理复杂的应用程序。本文将深入探讨 Kubernetes Operator 的几个高级特性:Webhook、Finalizer 和 Leader Election,并通过实际案例分析它们在 Operator 开发中的应用。
目标读者:
- 已经熟悉 Kubernetes Operator 的基本概念和架构。
- 希望深入了解 Operator 的实现原理和高级特性。
- 致力于开发和维护 Kubernetes Operator 的工程师。
1. Webhook:动态验证与变更控制
Webhook 允许你在 Kubernetes 资源创建、更新或删除时,通过 HTTP 回调到你自己的服务进行验证或修改。它们是实现自定义策略和增强 Kubernetes 功能的强大机制。Kubernetes 提供了两种类型的 Webhook:
- Validating Webhook (验证 Webhook):在资源被持久化到 etcd 之前调用,用于验证资源的有效性。如果验证失败,API server 会拒绝该请求。
- Mutating Webhook (变更 Webhook):在资源被持久化到 etcd 之前调用,用于修改资源的内容。可以用来添加默认值、标签或其他自定义配置。
1.1 Validating Webhook 的应用场景
Validating Webhook 常用于以下场景:
- 自定义资源校验:例如,确保自定义资源中的某些字段满足特定的格式或范围。
- 安全策略执行:例如,限制特定用户或角色创建某些类型的资源。
- 合规性检查:例如,确保所有 Pod 都符合公司的安全标准。
案例分析:限制 Pod 的资源请求
假设我们希望限制所有 Pod 的 CPU 和内存请求必须在一定的范围内。我们可以创建一个 Validating Webhook 来实现这个策略。
定义 Webhook 配置:
创建一个
ValidatingWebhookConfiguration
对象,指定 Webhook 的名称、要拦截的资源、以及回调的 URL。apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata: name: pod-resource-validator webhooks: - name: pod-resource.example.com rules: - apiGroups: [""] apiVersions: ["v1"] operations: [ "CREATE", "UPDATE" ] resources: ["pods"] clientConfig: service: name: pod-resource-validator-service namespace: default path: /validate caBundle: <CA_BUNDLE> admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None rules
:定义了 Webhook 要拦截的资源和操作。这里我们拦截所有 Pod 的创建和更新操作。clientConfig
:定义了 Webhook 服务的信息。service.name
和service.namespace
指定了服务的名称和命名空间,path
指定了回调的 URL,caBundle
指定了用于验证服务器证书的 CA 证书。
实现 Webhook 服务:
创建一个 HTTP 服务,监听指定的 URL,并处理 AdmissionReview 请求。AdmissionReview 对象包含了请求的资源信息和用户信息。
func validatePodResource(ar admission.AdmissionReview) *admission.AdmissionResponse { podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} if ar.Request.Resource != podResource { klog.Errorf("expect resource to be %s", podResource) return &admission.AdmissionResponse{Allowed: true} } pod := corev1.Pod{} err := json.Unmarshal(ar.Request.Object.Raw, &pod) if err != nil { klog.Errorf("could not unmarshal pod object: %v", err) return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: err.Error()}} } // 检查 CPU 和内存请求是否在允许的范围内 cpuRequest := pod.Spec.Containers[0].Resources.Requests.Cpu() memoryRequest := pod.Spec.Containers[0].Resources.Requests.Memory() if cpuRequest.Cmp(resource.MustParse("1")) > 0 { return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: "CPU request exceeds the limit of 1 core"}} } if memoryRequest.Cmp(resource.MustParse("1Gi")) > 0 { return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: "Memory request exceeds the limit of 1Gi"}} } return &admission.AdmissionResponse{Allowed: true} } - 该函数首先检查请求的资源是否为 Pod。
- 然后,它将 AdmissionReview 对象中的 Pod 对象反序列化。
- 最后,它检查 Pod 的 CPU 和内存请求是否在允许的范围内。如果超出范围,则返回一个
admission.AdmissionResponse
对象,其中Allowed
字段设置为false
,并包含错误信息。
部署 Webhook 服务:
将 Webhook 服务部署到 Kubernetes 集群中,并确保它可以被 API server 访问。
配置 TLS:
由于 Webhook 使用 HTTPS 进行通信,因此需要配置 TLS。可以使用 Kubernetes 的 Certificate API 或其他工具来生成和管理证书。
1.2 Mutating Webhook 的应用场景
Mutating Webhook 常用于以下场景:
- 自动注入 sidecar 容器:例如,自动将日志收集或监控代理注入到所有 Pod 中。
- 添加默认标签或注解:例如,自动为所有 Pod 添加版本标签或部署时间戳。
- 修改资源配置:例如,根据集群的资源情况动态调整 Pod 的资源请求。
案例分析:自动注入 sidecar 容器
假设我们希望自动将一个 sidecar 容器注入到所有 Pod 中,用于收集 Pod 的日志。我们可以创建一个 Mutating Webhook 来实现这个功能。
定义 Webhook 配置:
创建一个
MutatingWebhookConfiguration
对象,指定 Webhook 的名称、要拦截的资源、以及回调的 URL。apiVersion: admissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration metadata: name: pod-sidecar-injector webhooks: - name: pod-sidecar.example.com rules: - apiGroups: [""] apiVersions: ["v1"] operations: [ "CREATE" ] resources: ["pods"] clientConfig: service: name: pod-sidecar-injector-service namespace: default path: /mutate caBundle: <CA_BUNDLE> admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None rules
:定义了 Webhook 要拦截的资源和操作。这里我们拦截所有 Pod 的创建操作。clientConfig
:定义了 Webhook 服务的信息。service.name
和service.namespace
指定了服务的名称和命名空间,path
指定了回调的 URL,caBundle
指定了用于验证服务器证书的 CA 证书。
实现 Webhook 服务:
创建一个 HTTP 服务,监听指定的 URL,并处理 AdmissionReview 请求。AdmissionReview 对象包含了请求的资源信息和用户信息。
func mutatePod(ar admission.AdmissionReview) *admission.AdmissionResponse { podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} if ar.Request.Resource != podResource { klog.Errorf("expect resource to be %s", podResource) return &admission.AdmissionResponse{Allowed: true} } pod := corev1.Pod{} err := json.Unmarshal(ar.Request.Object.Raw, &pod) if err != nil { klog.Errorf("could not unmarshal pod object: %v", err) return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: err.Error()}} } // 定义 sidecar 容器 sidecarContainer := corev1.Container{ Name: "log-collector", Image: "busybox", Command: []string{ "sh", "-c", "tail -f /var/log/app.log", }, VolumeMounts: []corev1.VolumeMount{ { Name: "log-volume", MountPath: "/var/log", }, }, } // 将 sidecar 容器添加到 Pod 的 containers 列表中 pod.Spec.Containers = append(pod.Spec.Containers, sidecarContainer) // 创建一个 patch,用于更新 Pod 对象 patchBytes, err := json.Marshal(pod) if err != nil { klog.Errorf("could not marshal pod object: %v", err) return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: err.Error()}} } return &admission.AdmissionResponse{Allowed: true, Patch: patchBytes, PatchType: func() *admission.PatchType { pt := admission.PatchTypeJSONPatch; return &pt }()} } - 该函数首先检查请求的资源是否为 Pod。
- 然后,它将 AdmissionReview 对象中的 Pod 对象反序列化。
- 接下来,它定义了一个 sidecar 容器,用于收集 Pod 的日志。
- 然后,它将 sidecar 容器添加到 Pod 的 containers 列表中。
- 最后,它创建一个 patch,用于更新 Pod 对象,并将 patch 返回给 API server。
部署 Webhook 服务:
将 Webhook 服务部署到 Kubernetes 集群中,并确保它可以被 API server 访问。
配置 TLS:
由于 Webhook 使用 HTTPS 进行通信,因此需要配置 TLS。可以使用 Kubernetes 的 Certificate API 或其他工具来生成和管理证书。
2. Finalizer:优雅删除资源
Finalizer 是一种 Kubernetes 机制,用于在资源被删除之前执行清理操作。它们允许 Operator 在删除资源之前执行一些必要的步骤,例如:
- 删除关联的资源:例如,在删除一个自定义资源之前,删除所有由该资源创建的 Pod。
- 释放外部资源:例如,在删除一个数据库实例之前,释放所有相关的存储卷。
- 执行数据备份:例如,在删除一个数据库实例之前,执行一次数据备份。
2.1 Finalizer 的工作原理
- 添加 Finalizer:当创建一个自定义资源时,Operator 会向该资源的
metadata.finalizers
列表中添加一个或多个 Finalizer。Finalizer 是一个字符串,用于标识需要执行的清理操作。 - 删除资源:当用户尝试删除一个资源时,Kubernetes API server 会检查该资源的
metadata.finalizers
列表。如果列表不为空,API server 不会立即删除该资源,而是将该资源的metadata.deletionTimestamp
字段设置为当前时间。 - Operator 监听删除事件:Operator 监听资源的删除事件,并检查该资源的
metadata.deletionTimestamp
字段是否已设置。如果已设置,Operator 将执行与该资源相关的清理操作。 - 移除 Finalizer:当 Operator 完成清理操作后,它会从资源的
metadata.finalizers
列表中移除对应的 Finalizer。当metadata.finalizers
列表为空时,API server 才会真正删除该资源。
2.2 Finalizer 的应用场景
Finalizer 常用于以下场景:
- 级联删除:确保在删除父资源之前,先删除所有子资源。
- 资源清理:确保在删除资源之前,释放所有相关的外部资源。
- 数据保护:确保在删除资源之前,执行必要的数据备份。
案例分析:确保在删除数据库实例之前删除所有相关的存储卷
假设我们有一个自定义资源 DatabaseInstance
,用于管理数据库实例。每个数据库实例都关联着一个或多个存储卷。我们希望确保在删除数据库实例之前,先删除所有相关的存储卷。
添加 Finalizer:
当创建一个
DatabaseInstance
资源时,Operator 会向该资源的metadata.finalizers
列表中添加一个 Finalizer,例如databaseinstance.example.com/finalizer
。func (r *DatabaseInstanceReconciler) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { instance := &examplecomv1.DatabaseInstance{} err := r.Get(ctx, req.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { return ctrl.Result{}, nil } return ctrl.Result{}, err } // 如果 DatabaseInstance 正在被删除 if !instance.ObjectMeta.DeletionTimestamp.IsZero() { // 检查是否存在 Finalizer if containsString(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer) { // 执行清理操作 if err := r.finalizeDatabaseInstance(ctx, instance); err != nil { return ctrl.Result{}, err } // 移除 Finalizer instance.ObjectMeta.Finalizers = removeString(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer) if err := r.Update(ctx, instance); err != nil { return ctrl.Result{}, err } } return ctrl.Result{}, nil } // 如果 DatabaseInstance 不存在 Finalizer,则添加 Finalizer if !containsString(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer) { instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer) if err := r.Update(ctx, instance); err != nil { return ctrl.Result{}, err } } // ... 其他 reconciliation 逻辑 ... } const databaseInstanceFinalizer = "databaseinstance.example.com/finalizer" // helper functions to check and remove string from a slice func containsString(slice []string, s string) bool { for _, item := range slice { if item == s { return true } } return false } func removeString(slice []string, s string) (result []string) { for _, item := range slice { if item == s { continue } result = append(result, item) } return } 监听删除事件:
Operator 监听
DatabaseInstance
资源的删除事件,并检查该资源的metadata.deletionTimestamp
字段是否已设置。执行清理操作:
如果
metadata.deletionTimestamp
字段已设置,Operator 将执行以下清理操作:- 查找所有与该
DatabaseInstance
资源相关的存储卷。 - 删除这些存储卷。
func (r *DatabaseInstanceReconciler) finalizeDatabaseInstance(ctx context.Context, instance *examplecomv1.DatabaseInstance) error { // 查找所有与该 DatabaseInstance 资源相关的存储卷 // ... // 删除这些存储卷 // ... return nil } - 查找所有与该
移除 Finalizer:
当 Operator 完成清理操作后,它会从
DatabaseInstance
资源的metadata.finalizers
列表中移除databaseinstance.example.com/finalizer
。
3. Leader Election:高可用性保障
在 Operator 的高可用性部署中,需要确保只有一个 Operator 实例处于活动状态,负责处理资源的变化。Leader Election 是一种 Kubernetes 机制,用于在多个 Operator 实例之间选举出一个 Leader。Leader 负责处理资源的变化,而其他实例则处于 Standby 状态,等待 Leader 失败后接管。
3.1 Leader Election 的工作原理
Leader Election 基于 Kubernetes 的 Lease
资源实现。每个 Operator 实例都会尝试创建一个 Lease
资源,只有第一个成功创建 Lease
资源的实例才能成为 Leader。Leader 会定期更新 Lease
资源的续约时间,以表明自己仍然处于活动状态。如果 Leader 失败,Lease
资源的续约时间将过期,其他 Standby 实例将有机会成为新的 Leader。
3.2 Leader Election 的应用场景
Leader Election 常用于以下场景:
- 高可用性 Operator:确保只有一个 Operator 实例处于活动状态,负责处理资源的变化。
- 分布式任务调度:确保只有一个 Worker 节点负责执行某个任务。
- 配置管理:确保只有一个 Config Server 负责管理配置信息。
案例分析:使用 controller-runtime
框架实现 Leader Election
controller-runtime
框架提供了对 Leader Election 的支持。我们可以使用 controller-runtime
框架来简化 Operator 的 Leader Election 实现。
import ( "flag" "os" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run-as-user features work properly when deploying on cloud. _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" examplecomv1 "example.com/my-operator/api/v1" "example.com/my-operator/controllers" ) var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") ) func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(examplecomv1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } func main() { var metricsAddr string var enableLeaderElection bool var probeAddr string flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, \ "Enable leader election for controller manager. \ Enabling this will ensure there is only one active controller manager.") opts := zap.Options{ Development: true, } opts.BindFlags(flag.CommandLine) flag.Parse() ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "my-operator-leader-election", // 设置 Leader Election ID }) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) } if err = (&controllers.DatabaseInstanceReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("DatabaseInstance"), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DatabaseInstance") os.Exit(1) } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up ready check") os.Exit(1) } setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } }
- 启用 Leader Election:在
ctrl.Options
中设置LeaderElection
为true
,并设置LeaderElectionID
为一个唯一的字符串。 - 启动 Manager:调用
mgr.Start
方法启动 Manager。Manager 将自动处理 Leader Election 过程。
4. 总结
本文深入探讨了 Kubernetes Operator 的几个高级特性:Webhook、Finalizer 和 Leader Election。这些特性是构建强大、可靠和高可用的 Operator 的关键。理解和掌握这些特性,可以帮助开发者更好地管理复杂的 Kubernetes 应用程序,并充分利用 Kubernetes 的扩展能力。
希望本文能够帮助你深入了解 Kubernetes Operator 的高级特性,并在实际项目中应用它们。祝你开发顺利!