WEBKT

Kubernetes Operator 高级特性剖析- Webhook、Finalizer 与 Leader Election 深度实践

36 0 0 0

Kubernetes Operator 高级特性剖析- Webhook、Finalizer 与 Leader Election 深度实践

1. Webhook:动态验证与变更控制

1.1 Validating Webhook 的应用场景

1.2 Mutating Webhook 的应用场景

2. Finalizer:优雅删除资源

2.1 Finalizer 的工作原理

2.2 Finalizer 的应用场景

3. Leader Election:高可用性保障

3.1 Leader Election 的工作原理

3.2 Leader Election 的应用场景

4. 总结

Kubernetes Operator 高级特性剖析- Webhook、Finalizer 与 Leader Election 深度实践

Operator 作为 Kubernetes 扩展 API 的强大工具,允许开发者以声明式的方式管理复杂的应用程序。本文将深入探讨 Kubernetes Operator 的几个高级特性:Webhook、Finalizer 和 Leader Election,并通过实际案例分析它们在 Operator 开发中的应用。

目标读者

  • 已经熟悉 Kubernetes Operator 的基本概念和架构。
  • 希望深入了解 Operator 的实现原理和高级特性。
  • 致力于开发和维护 Kubernetes Operator 的工程师。

1. Webhook:动态验证与变更控制

Webhook 允许你在 Kubernetes 资源创建、更新或删除时,通过 HTTP 回调到你自己的服务进行验证或修改。它们是实现自定义策略和增强 Kubernetes 功能的强大机制。Kubernetes 提供了两种类型的 Webhook:

  • Validating Webhook (验证 Webhook):在资源被持久化到 etcd 之前调用,用于验证资源的有效性。如果验证失败,API server 会拒绝该请求。
  • Mutating Webhook (变更 Webhook):在资源被持久化到 etcd 之前调用,用于修改资源的内容。可以用来添加默认值、标签或其他自定义配置。

1.1 Validating Webhook 的应用场景

Validating Webhook 常用于以下场景:

  • 自定义资源校验:例如,确保自定义资源中的某些字段满足特定的格式或范围。
  • 安全策略执行:例如,限制特定用户或角色创建某些类型的资源。
  • 合规性检查:例如,确保所有 Pod 都符合公司的安全标准。

案例分析:限制 Pod 的资源请求

假设我们希望限制所有 Pod 的 CPU 和内存请求必须在一定的范围内。我们可以创建一个 Validating Webhook 来实现这个策略。

  1. 定义 Webhook 配置

    创建一个 ValidatingWebhookConfiguration 对象,指定 Webhook 的名称、要拦截的资源、以及回调的 URL。

    apiVersion: admissionregistration.k8s.io/v1
    kind: ValidatingWebhookConfiguration
    metadata:
    name: pod-resource-validator
    webhooks:
    - name: pod-resource.example.com
    rules:
    - apiGroups: [""]
    apiVersions: ["v1"]
    operations: [ "CREATE", "UPDATE" ]
    resources: ["pods"]
    clientConfig:
    service:
    name: pod-resource-validator-service
    namespace: default
    path: /validate
    caBundle: <CA_BUNDLE>
    admissionReviewVersions: ["v1", "v1beta1"]
    sideEffects: None
    • rules:定义了 Webhook 要拦截的资源和操作。这里我们拦截所有 Pod 的创建和更新操作。
    • clientConfig:定义了 Webhook 服务的信息。service.nameservice.namespace 指定了服务的名称和命名空间,path 指定了回调的 URL,caBundle 指定了用于验证服务器证书的 CA 证书。
  2. 实现 Webhook 服务

    创建一个 HTTP 服务,监听指定的 URL,并处理 AdmissionReview 请求。AdmissionReview 对象包含了请求的资源信息和用户信息。

    func validatePodResource(ar admission.AdmissionReview) *admission.AdmissionResponse {
    podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
    if ar.Request.Resource != podResource {
    klog.Errorf("expect resource to be %s", podResource)
    return &admission.AdmissionResponse{Allowed: true}
    }
    pod := corev1.Pod{}
    err := json.Unmarshal(ar.Request.Object.Raw, &pod)
    if err != nil {
    klog.Errorf("could not unmarshal pod object: %v", err)
    return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: err.Error()}}
    }
    // 检查 CPU 和内存请求是否在允许的范围内
    cpuRequest := pod.Spec.Containers[0].Resources.Requests.Cpu()
    memoryRequest := pod.Spec.Containers[0].Resources.Requests.Memory()
    if cpuRequest.Cmp(resource.MustParse("1")) > 0 {
    return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: "CPU request exceeds the limit of 1 core"}}
    }
    if memoryRequest.Cmp(resource.MustParse("1Gi")) > 0 {
    return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: "Memory request exceeds the limit of 1Gi"}}
    }
    return &admission.AdmissionResponse{Allowed: true}
    }
    • 该函数首先检查请求的资源是否为 Pod。
    • 然后,它将 AdmissionReview 对象中的 Pod 对象反序列化。
    • 最后,它检查 Pod 的 CPU 和内存请求是否在允许的范围内。如果超出范围,则返回一个 admission.AdmissionResponse 对象,其中 Allowed 字段设置为 false,并包含错误信息。
  3. 部署 Webhook 服务

    将 Webhook 服务部署到 Kubernetes 集群中,并确保它可以被 API server 访问。

  4. 配置 TLS

    由于 Webhook 使用 HTTPS 进行通信,因此需要配置 TLS。可以使用 Kubernetes 的 Certificate API 或其他工具来生成和管理证书。

1.2 Mutating Webhook 的应用场景

Mutating Webhook 常用于以下场景:

  • 自动注入 sidecar 容器:例如,自动将日志收集或监控代理注入到所有 Pod 中。
  • 添加默认标签或注解:例如,自动为所有 Pod 添加版本标签或部署时间戳。
  • 修改资源配置:例如,根据集群的资源情况动态调整 Pod 的资源请求。

案例分析:自动注入 sidecar 容器

假设我们希望自动将一个 sidecar 容器注入到所有 Pod 中,用于收集 Pod 的日志。我们可以创建一个 Mutating Webhook 来实现这个功能。

  1. 定义 Webhook 配置

    创建一个 MutatingWebhookConfiguration 对象,指定 Webhook 的名称、要拦截的资源、以及回调的 URL。

    apiVersion: admissionregistration.k8s.io/v1
    kind: MutatingWebhookConfiguration
    metadata:
    name: pod-sidecar-injector
    webhooks:
    - name: pod-sidecar.example.com
    rules:
    - apiGroups: [""]
    apiVersions: ["v1"]
    operations: [ "CREATE" ]
    resources: ["pods"]
    clientConfig:
    service:
    name: pod-sidecar-injector-service
    namespace: default
    path: /mutate
    caBundle: <CA_BUNDLE>
    admissionReviewVersions: ["v1", "v1beta1"]
    sideEffects: None
    • rules:定义了 Webhook 要拦截的资源和操作。这里我们拦截所有 Pod 的创建操作。
    • clientConfig:定义了 Webhook 服务的信息。service.nameservice.namespace 指定了服务的名称和命名空间,path 指定了回调的 URL,caBundle 指定了用于验证服务器证书的 CA 证书。
  2. 实现 Webhook 服务

    创建一个 HTTP 服务,监听指定的 URL,并处理 AdmissionReview 请求。AdmissionReview 对象包含了请求的资源信息和用户信息。

    func mutatePod(ar admission.AdmissionReview) *admission.AdmissionResponse {
    podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
    if ar.Request.Resource != podResource {
    klog.Errorf("expect resource to be %s", podResource)
    return &admission.AdmissionResponse{Allowed: true}
    }
    pod := corev1.Pod{}
    err := json.Unmarshal(ar.Request.Object.Raw, &pod)
    if err != nil {
    klog.Errorf("could not unmarshal pod object: %v", err)
    return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: err.Error()}}
    }
    // 定义 sidecar 容器
    sidecarContainer := corev1.Container{
    Name: "log-collector",
    Image: "busybox",
    Command: []string{
    "sh",
    "-c",
    "tail -f /var/log/app.log",
    },
    VolumeMounts: []corev1.VolumeMount{
    {
    Name: "log-volume",
    MountPath: "/var/log",
    },
    },
    }
    // 将 sidecar 容器添加到 Pod 的 containers 列表中
    pod.Spec.Containers = append(pod.Spec.Containers, sidecarContainer)
    // 创建一个 patch,用于更新 Pod 对象
    patchBytes, err := json.Marshal(pod)
    if err != nil {
    klog.Errorf("could not marshal pod object: %v", err)
    return &admission.AdmissionResponse{Allowed: false, Result: &metav1.Status{Message: err.Error()}}
    }
    return &admission.AdmissionResponse{Allowed: true, Patch: patchBytes, PatchType: func() *admission.PatchType { pt := admission.PatchTypeJSONPatch; return &pt }()}
    }
    • 该函数首先检查请求的资源是否为 Pod。
    • 然后,它将 AdmissionReview 对象中的 Pod 对象反序列化。
    • 接下来,它定义了一个 sidecar 容器,用于收集 Pod 的日志。
    • 然后,它将 sidecar 容器添加到 Pod 的 containers 列表中。
    • 最后,它创建一个 patch,用于更新 Pod 对象,并将 patch 返回给 API server。
  3. 部署 Webhook 服务

    将 Webhook 服务部署到 Kubernetes 集群中,并确保它可以被 API server 访问。

  4. 配置 TLS

    由于 Webhook 使用 HTTPS 进行通信,因此需要配置 TLS。可以使用 Kubernetes 的 Certificate API 或其他工具来生成和管理证书。

2. Finalizer:优雅删除资源

Finalizer 是一种 Kubernetes 机制,用于在资源被删除之前执行清理操作。它们允许 Operator 在删除资源之前执行一些必要的步骤,例如:

  • 删除关联的资源:例如,在删除一个自定义资源之前,删除所有由该资源创建的 Pod。
  • 释放外部资源:例如,在删除一个数据库实例之前,释放所有相关的存储卷。
  • 执行数据备份:例如,在删除一个数据库实例之前,执行一次数据备份。

2.1 Finalizer 的工作原理

  1. 添加 Finalizer:当创建一个自定义资源时,Operator 会向该资源的 metadata.finalizers 列表中添加一个或多个 Finalizer。Finalizer 是一个字符串,用于标识需要执行的清理操作。
  2. 删除资源:当用户尝试删除一个资源时,Kubernetes API server 会检查该资源的 metadata.finalizers 列表。如果列表不为空,API server 不会立即删除该资源,而是将该资源的 metadata.deletionTimestamp 字段设置为当前时间。
  3. Operator 监听删除事件:Operator 监听资源的删除事件,并检查该资源的 metadata.deletionTimestamp 字段是否已设置。如果已设置,Operator 将执行与该资源相关的清理操作。
  4. 移除 Finalizer:当 Operator 完成清理操作后,它会从资源的 metadata.finalizers 列表中移除对应的 Finalizer。当 metadata.finalizers 列表为空时,API server 才会真正删除该资源。

2.2 Finalizer 的应用场景

Finalizer 常用于以下场景:

  • 级联删除:确保在删除父资源之前,先删除所有子资源。
  • 资源清理:确保在删除资源之前,释放所有相关的外部资源。
  • 数据保护:确保在删除资源之前,执行必要的数据备份。

案例分析:确保在删除数据库实例之前删除所有相关的存储卷

假设我们有一个自定义资源 DatabaseInstance,用于管理数据库实例。每个数据库实例都关联着一个或多个存储卷。我们希望确保在删除数据库实例之前,先删除所有相关的存储卷。

  1. 添加 Finalizer

    当创建一个 DatabaseInstance 资源时,Operator 会向该资源的 metadata.finalizers 列表中添加一个 Finalizer,例如 databaseinstance.example.com/finalizer

    func (r *DatabaseInstanceReconciler) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &examplecomv1.DatabaseInstance{}
    err := r.Get(ctx, req.NamespacedName, instance)
    if err != nil {
    if errors.IsNotFound(err) {
    return ctrl.Result{}, nil
    }
    return ctrl.Result{}, err
    }
    // 如果 DatabaseInstance 正在被删除
    if !instance.ObjectMeta.DeletionTimestamp.IsZero() {
    // 检查是否存在 Finalizer
    if containsString(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer) {
    // 执行清理操作
    if err := r.finalizeDatabaseInstance(ctx, instance); err != nil {
    return ctrl.Result{}, err
    }
    // 移除 Finalizer
    instance.ObjectMeta.Finalizers = removeString(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer)
    if err := r.Update(ctx, instance); err != nil {
    return ctrl.Result{}, err
    }
    }
    return ctrl.Result{}, nil
    }
    // 如果 DatabaseInstance 不存在 Finalizer,则添加 Finalizer
    if !containsString(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer) {
    instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, databaseInstanceFinalizer)
    if err := r.Update(ctx, instance); err != nil {
    return ctrl.Result{}, err
    }
    }
    // ... 其他 reconciliation 逻辑 ...
    }
    const databaseInstanceFinalizer = "databaseinstance.example.com/finalizer"
    // helper functions to check and remove string from a slice
    func containsString(slice []string, s string) bool {
    for _, item := range slice {
    if item == s {
    return true
    }
    }
    return false
    }
    func removeString(slice []string, s string) (result []string) {
    for _, item := range slice {
    if item == s {
    continue
    }
    result = append(result, item)
    }
    return
    }
  2. 监听删除事件

    Operator 监听 DatabaseInstance 资源的删除事件,并检查该资源的 metadata.deletionTimestamp 字段是否已设置。

  3. 执行清理操作

    如果 metadata.deletionTimestamp 字段已设置,Operator 将执行以下清理操作:

    • 查找所有与该 DatabaseInstance 资源相关的存储卷。
    • 删除这些存储卷。
    func (r *DatabaseInstanceReconciler) finalizeDatabaseInstance(ctx context.Context, instance *examplecomv1.DatabaseInstance) error {
    // 查找所有与该 DatabaseInstance 资源相关的存储卷
    // ...
    // 删除这些存储卷
    // ...
    return nil
    }
  4. 移除 Finalizer

    当 Operator 完成清理操作后,它会从 DatabaseInstance 资源的 metadata.finalizers 列表中移除 databaseinstance.example.com/finalizer

3. Leader Election:高可用性保障

在 Operator 的高可用性部署中,需要确保只有一个 Operator 实例处于活动状态,负责处理资源的变化。Leader Election 是一种 Kubernetes 机制,用于在多个 Operator 实例之间选举出一个 Leader。Leader 负责处理资源的变化,而其他实例则处于 Standby 状态,等待 Leader 失败后接管。

3.1 Leader Election 的工作原理

Leader Election 基于 Kubernetes 的 Lease 资源实现。每个 Operator 实例都会尝试创建一个 Lease 资源,只有第一个成功创建 Lease 资源的实例才能成为 Leader。Leader 会定期更新 Lease 资源的续约时间,以表明自己仍然处于活动状态。如果 Leader 失败,Lease 资源的续约时间将过期,其他 Standby 实例将有机会成为新的 Leader。

3.2 Leader Election 的应用场景

Leader Election 常用于以下场景:

  • 高可用性 Operator:确保只有一个 Operator 实例处于活动状态,负责处理资源的变化。
  • 分布式任务调度:确保只有一个 Worker 节点负责执行某个任务。
  • 配置管理:确保只有一个 Config Server 负责管理配置信息。

案例分析:使用 controller-runtime 框架实现 Leader Election

controller-runtime 框架提供了对 Leader Election 的支持。我们可以使用 controller-runtime 框架来简化 Operator 的 Leader Election 实现。

import (
"flag"
"os"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run-as-user features work properly when deploying on cloud.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
examplecomv1 "example.com/my-operator/api/v1"
"example.com/my-operator/controllers"
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(examplecomv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false, \
"Enable leader election for controller manager. \
Enabling this will ensure there is only one active controller manager.")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "my-operator-leader-election", // 设置 Leader Election ID
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
if err = (&controllers.DatabaseInstanceReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("DatabaseInstance"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DatabaseInstance")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
  • 启用 Leader Election:在 ctrl.Options 中设置 LeaderElectiontrue,并设置 LeaderElectionID 为一个唯一的字符串。
  • 启动 Manager:调用 mgr.Start 方法启动 Manager。Manager 将自动处理 Leader Election 过程。

4. 总结

本文深入探讨了 Kubernetes Operator 的几个高级特性:Webhook、Finalizer 和 Leader Election。这些特性是构建强大、可靠和高可用的 Operator 的关键。理解和掌握这些特性,可以帮助开发者更好地管理复杂的 Kubernetes 应用程序,并充分利用 Kubernetes 的扩展能力。

希望本文能够帮助你深入了解 Kubernetes Operator 的高级特性,并在实际项目中应用它们。祝你开发顺利!

Operator探索者 Kubernetes OperatorWebhookFinalizer

评论点评

打赏赞助
sponsor

感谢您的支持让我们更好的前行

分享

QRcode

https://www.webkt.com/article/9393