本文发表于一年多前。旧文章可能包含过时内容。请检查页面中的信息自发布以来是否已变得不正确。

为 Pod 标签编写控制器

Operator 被证明是在 Kubernetes 中运行有状态分布式应用的绝佳解决方案。像 Operator SDK 这样的开源工具提供了构建可靠且可维护的 Operator 的方法,使得扩展 Kubernetes 和实现自定义调度变得更加容易。

Kubernetes Operator 在您的集群中运行复杂的软件。开源社区已经为 Prometheus、Elasticsearch 或 Argo CD 等分布式应用程序构建了许多 Operator。即使在开源之外,Operator 也可以帮助为您的 Kubernetes 集群带来新功能。

一个 Operator 是一组自定义资源和一组控制器。控制器监视 Kubernetes API 中特定资源的变化,并通过创建、更新或删除资源来做出反应。

Operator SDK 最适合构建功能完备的 Operator。尽管如此,您仍然可以使用它来编写单个控制器。本文将引导您完成用 Go 语言编写一个 Kubernetes 控制器,该控制器将为具有特定注解的 Pod 添加一个 `pod-name` 标签。

为什么我们需要一个控制器来做这件事?

我最近在一个项目中工作,我们需要创建一个 Service,将流量路由到 ReplicaSet 中的特定 Pod。问题是 Service 只能通过标签选择 Pod,而 ReplicaSet 中的所有 Pod 都具有相同的标签。有两种方法可以解决这个问题:

  1. 创建一个没有选择器的 Service,并直接管理该 Service 的 Endpoint 或 EndpointSlice。我们需要编写一个自定义控制器来将 Pod 的 IP 地址插入到这些资源中。
  2. 为 Pod 添加一个具有唯一值的标签。然后我们可以在 Service 的选择器中使用这个标签。同样,我们需要编写一个自定义控制器来添加这个标签。

控制器是一个控制循环,它跟踪一个或多个 Kubernetes 资源类型。上面选项 2 中的控制器只需要跟踪 Pod,这使得实现起来更简单。这就是我们将通过编写一个 Kubernetes 控制器来实现的选项,该控制器将为我们的 Pod 添加一个 `pod-name` 标签。

StatefulSet 原生支持通过为集合中的每个 Pod 添加 `pod-name` 标签来实现此功能。但是,如果我们不想或不能使用 StatefulSet 呢?

我们很少直接创建 Pod;大多数情况下,我们使用 Deployment、ReplicaSet 或其他高级资源。我们可以在 PodSpec 中指定要添加到每个 Pod 的标签,但不能使用动态值,因此无法复制 StatefulSet 的 `pod-name` 标签。

我们尝试过使用可变准入 Webhook。当任何人创建 Pod 时,Webhook 会用包含 Pod 名称的标签来修补 Pod。令人失望的是,这不起作用:并非所有 Pod 在创建前都有名称。例如,当 ReplicaSet 控制器创建 Pod 时,它会向 Kubernetes API 服务器发送 `namePrefix` 而不是 `name`。API 服务器在将新 Pod 持久化到 etcd 之前,但仅在调用我们的准入 Webhook 之后,才会生成一个唯一的名称。因此,在大多数情况下,我们无法通过可变 Webhook 知道 Pod 的名称。

一旦 Pod 存在于 Kubernetes API 中,它就几乎是不可变的,但我们仍然可以添加一个标签。我们甚至可以从命令行执行此操作。

kubectl label my-pod my-label-key=my-label-value

我们需要监视 Kubernetes API 中任何 Pod 的变化并添加我们想要的标签。我们不会手动执行此操作,而是编写一个控制器来为我们完成。

使用 Operator SDK 引导控制器

控制器是一个协调循环,它从 Kubernetes API 读取资源的期望状态,并采取行动使集群的实际状态接近期望状态。

为了尽快编写这个控制器,我们将使用 Operator SDK。如果尚未安装,请按照官方文档进行安装。

$ operator-sdk version
operator-sdk version: "v1.4.2", commit: "4b083393be65589358b3e0416573df04f4ae8d9b", kubernetes version: "v1.19.4", go version: "go1.15.8", GOOS: "darwin", GOARCH: "amd64"

让我们创建一个新目录来编写控制器

mkdir label-operator && cd label-operator

接下来,让我们初始化一个新的 Operator,然后我们将向其中添加一个控制器。为此,您需要指定一个域和一个仓库。该域用作自定义 Kubernetes 资源所属组的前缀。由于我们不定义自定义资源,因此该域无关紧要。该仓库将是我们即将编写的 Go 模块的名称。按照惯例,这是您将存储代码的仓库。

举个例子,这是我运行的命令:

# Feel free to change the domain and repo values.
operator-sdk init --domain=padok.fr --repo=github.com/busser/label-operator

接下来,我们需要创建一个新的控制器。此控制器将处理 Pod 而不是自定义资源,因此无需生成资源代码。让我们运行此命令来搭建所需的代码:

operator-sdk create api --group=core --version=v1 --kind=Pod --controller=true --resource=false

我们现在有了一个新文件:`controllers/pod_controller.go`。此文件包含一个 `PodReconciler` 类型,其中包含我们需要实现的两个方法。第一个是 `Reconcile`,它目前看起来像这样:

func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    _ = r.Log.WithValues("pod", req.NamespacedName)

    // your logic here

    return ctrl.Result{}, nil
}

`Reconcile` 方法在 Pod 创建、更新或删除时调用。Pod 的名称和命名空间在 `ctrl.Request` 中,该方法作为参数接收。

第二个方法是 `SetupWithManager`,目前它看起来像这样:

func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        // Uncomment the following line adding a pointer to an instance of the controlled resource as an argument
        // For().
        Complete(r)
}

`SetupWithManager` 方法在 Operator 启动时调用。它用于告诉 Operator 框架 `PodReconciler` 需要监视哪些类型。要使用 Kubernetes 内部使用的相同 `Pod` 类型,我们需要导入它的一些代码。所有 Kubernetes 源代码都是开源的,因此您可以在自己的 Go 代码中导入任何您喜欢的部分。您可以在 Kubernetes 源代码中或pkg.go.dev上找到可用包的完整列表。要使用 Pod,我们需要 `k8s.io/api/core/v1` 包。

package controllers

import (
    // other imports...
    corev1 "k8s.io/api/core/v1"
    // other imports...
)

让我们在 `SetupWithManager` 中使用 `Pod` 类型,告诉 Operator 框架我们想要监视 Pod:

func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&corev1.Pod{}).
        Complete(r)
}

在继续之前,我们应该设置控制器所需的 RBAC 权限。在 `Reconcile` 方法上方,我们有一些默认权限:

// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update

我们不需要所有这些。我们的控制器永远不会与 Pod 的状态或其终结器交互。它只需要读取和更新 Pod。让我们删除不必要的权限,只保留我们需要的:

// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch

我们现在准备编写控制器的协调逻辑。

实现协调

这是我们希望 `Reconcile` 方法执行的操作:

  1. 使用 `ctrl.Request` 中的 Pod 名称和命名空间从 Kubernetes API 获取 Pod。
  2. 如果 Pod 具有 `add-pod-name-label` 注解,则向 Pod 添加 `pod-name` 标签;如果缺少该注解,则不添加标签。
  3. 在 Kubernetes API 中更新 Pod 以持久化所做的更改。

让我们为注解和标签定义一些常量:

const (
    addPodNameLabelAnnotation = "padok.fr/add-pod-name-label"
    podNameLabel              = "padok.fr/pod-name"
)

协调函数的第一步是从 Kubernetes API 获取我们正在处理的 Pod:

// Reconcile handles a reconciliation request for a Pod.
// If the Pod has the addPodNameLabelAnnotation annotation, then Reconcile
// will make sure the podNameLabel label is present with the correct value.
// If the annotation is absent, then Reconcile will make sure the label is too.
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := r.Log.WithValues("pod", req.NamespacedName)

    /*
        Step 0: Fetch the Pod from the Kubernetes API.
    */

    var pod corev1.Pod
    if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
        log.Error(err, "unable to fetch Pod")
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

当 Pod 被创建、更新或删除时,将调用 `Reconcile` 方法。在删除的情况下,我们对 `r.Get` 的调用将返回一个特定的错误。让我们导入定义此错误的包:

package controllers

import (
    // other imports...
    apierrors "k8s.io/apimachinery/pkg/api/errors"
    // other imports...
)

我们现在可以处理这个特定的错误,并且——因为我们的控制器不关心被删除的 Pod——明确地忽略它:

    /*
        Step 0: Fetch the Pod from the Kubernetes API.
    */

    var pod corev1.Pod
    if err := r.Get(ctx, req.NamespacedName, &pod); err != nil {
        if apierrors.IsNotFound(err) {
            // we'll ignore not-found errors, since we can get them on deleted requests.
            return ctrl.Result{}, nil
        }
        log.Error(err, "unable to fetch Pod")
        return ctrl.Result{}, err
    }

接下来,让我们编辑我们的 Pod,以便只有在我们的注解存在时才存在我们的动态标签:

    /*
        Step 1: Add or remove the label.
    */

    labelShouldBePresent := pod.Annotations[addPodNameLabelAnnotation] == "true"
    labelIsPresent := pod.Labels[podNameLabel] == pod.Name

    if labelShouldBePresent == labelIsPresent {
        // The desired state and actual state of the Pod are the same.
        // No further action is required by the operator at this moment.
        log.Info("no update required")
        return ctrl.Result{}, nil
    }

    if labelShouldBePresent {
        // If the label should be set but is not, set it.
        if pod.Labels == nil {
            pod.Labels = make(map[string]string)
        }
        pod.Labels[podNameLabel] = pod.Name
        log.Info("adding label")
    } else {
        // If the label should not be set but is, remove it.
        delete(pod.Labels, podNameLabel)
        log.Info("removing label")
    }

最后,让我们将更新后的 Pod 推送到 Kubernetes API:

    /*
        Step 2: Update the Pod in the Kubernetes API.
    */

    if err := r.Update(ctx, &pod); err != nil {
        log.Error(err, "unable to update Pod")
        return ctrl.Result{}, err
    }

将更新后的 Pod 写入 Kubernetes API 时,存在 Pod 自我们首次读取以来已被更新或删除的风险。在编写 Kubernetes 控制器时,我们应该记住,我们不是集群中唯一的参与者。发生这种情况时,最好的做法是通过重新排队事件从头开始协调。我们正是这样做:

    /*
        Step 2: Update the Pod in the Kubernetes API.
    */

    if err := r.Update(ctx, &pod); err != nil {
        if apierrors.IsConflict(err) {
            // The Pod has been updated since we read it.
            // Requeue the Pod to try to reconciliate again.
            return ctrl.Result{Requeue: true}, nil
        }
        if apierrors.IsNotFound(err) {
            // The Pod has been deleted since we read it.
            // Requeue the Pod to try to reconciliate again.
            return ctrl.Result{Requeue: true}, nil
        }
        log.Error(err, "unable to update Pod")
        return ctrl.Result{}, err
    }

别忘了在方法结束时成功返回:

    return ctrl.Result{}, nil
}

就这样!我们现在可以在集群上运行控制器了。

在集群上运行控制器

要在您的集群上运行我们的控制器,我们需要运行 Operator。为此,您只需要 `kubectl`。如果您没有 Kubernetes 集群,我建议您使用 KinD (Kubernetes in Docker) 在本地启动一个。

从您的机器运行 Operator 只需此命令:

make run

几秒钟后,您应该会看到 Operator 的日志。请注意,我们控制器的 `Reconcile` 方法已为集群中所有已运行的 Pod 调用。

让我们保持 Operator 运行,并在另一个终端中创建一个新的 Pod:

kubectl run --image=nginx my-nginx

Operator 应该迅速打印一些日志,表明它对 Pod 的创建和随后的状态变化做出了反应。

INFO    controllers.Pod no update required  {"pod": "default/my-nginx"}
INFO    controllers.Pod no update required  {"pod": "default/my-nginx"}
INFO    controllers.Pod no update required  {"pod": "default/my-nginx"}
INFO    controllers.Pod no update required  {"pod": "default/my-nginx"}

让我们检查 Pod 的标签:

$ kubectl get pod my-nginx --show-labels
NAME       READY   STATUS    RESTARTS   AGE   LABELS
my-nginx   1/1     Running   0          11m   run=my-nginx

让我们向 Pod 添加一个注解,以便我们的控制器知道为其添加动态标签:

kubectl annotate pod my-nginx padok.fr/add-pod-name-label=true

请注意,控制器立即做出反应并在其日志中生成了一个新行:

INFO    controllers.Pod adding label    {"pod": "default/my-nginx"}
$ kubectl get pod my-nginx --show-labels
NAME       READY   STATUS    RESTARTS   AGE   LABELS
my-nginx   1/1     Running   0          13m   padok.fr/pod-name=my-nginx,run=my-nginx

太棒了!您刚刚成功编写了一个 Kubernetes 控制器,能够为集群中的资源添加具有动态值的标签。

控制器和 Operator,无论大小,都可能是您 Kubernetes 之旅的重要组成部分。现在编写 Operator 比以往任何时候都容易。可能性是无限的。

下一步是什么?

如果您想进一步,我建议从在集群内部署控制器或 Operator 开始。Operator SDK 生成的 `Makefile` 将完成大部分工作。

在将 Operator 部署到生产环境时,实施健壮的测试始终是一个好主意。朝这个方向迈出的第一步是编写单元测试。此文档将指导您为 Operator 编写测试。我为我们刚刚编写的 Operator 编写了测试;您可以在此 GitHub 仓库中找到我所有的代码。

如何了解更多?

Operator SDK 文档详细介绍了如何进一步实现更复杂的 Operator。

在建模更复杂的用例时,作用于内置 Kubernetes 类型的单个控制器可能不够。您可能需要使用自定义资源定义 (CRD) 和多个控制器来构建更复杂的 Operator。Operator SDK 是一个很好的工具来帮助您实现这一点。

如果您想讨论构建 Operator,请加入 Kubernetes Slack 工作区中的 #kubernetes-operator 频道!