fbpx

連載: Kubernetesでカスタムコントローラを作ろう! ~第6回 ReplicaSetコントローラからコントローラの実装を学ぶ~

この記事は1年以上前に投稿されました。情報が古い可能性がありますので、ご注意ください。

前回は宣言的な管理やReconciliation Loopについて説明しました。 コントローラがどのように作られているか大まかに理解ができたところで、実際のKubernetesのコントローラの実装を見てみましょう。 細かな実装は無視して、ざっくりとどういう作りになっているか理解していきます。

ReplicaSet の実装を確認する

今回は比較的単純な実装となっているReplicaSetコントローラを見ていきます。 確認したKubernetesのバージョンはv1.26.2となっています。 KubernetesのコードはGithub上で管理されており、こちらのリンクから今回解説するコードが確認できます。 行数とともに説明するので、併せて確認してみてください。

コントローラは以下のRun関数から起動します。 208-210行目の go wait.UntilWithContext で実際にReconcileを実行するワーカーを複数起動しています。 前回説明したように、これによりReconcile処理を複数のワーカーで並行処理できるようになっています。

// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
    defer utilruntime.HandleCrash()

    // Start events processing pipeline.
    rsc.eventBroadcaster.StartStructuredLogging(0)
    rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")})
    defer rsc.eventBroadcaster.Shutdown()

    defer rsc.queue.ShutDown()

    controllerName := strings.ToLower(rsc.Kind)
    klog.Infof("Starting %v controller", controllerName)
    defer klog.Infof("Shutting down %v controller", controllerName)

    if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, rsc.worker, time.Second)
    }

    <-ctx.Done()
}

そして各ワーカーは以下のprocessNextWorkItem関数を無限ループで呼び出します。 この中で呼ばれているsyncHandlerがReconcile関数の実態です。 syncHandlerにはキューから取り出したkeyを渡していますが、一体これはなんなのでしょうか。 次にそちらを確認していきましょう。

func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(ctx, key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

このキューにはReconcileしたいReplicaSetが ネームスペース名/ReplicaSet名 という形でエンキューされています。 そのためReplicaSetのイベントが発生した場合はもちろんそうですが、ReplicaSetに紐づくPodにイベントがあった場合でも、その親であるReplicaSetがキューへエンキューされます。 このキューはReplicaSetControllerのaddPod, updatePod, deletePodaddRS, updateRS, deleteRS関数の中でエンキューされています。 以上から、ReplicaSetコントローラのReconcileはReplicaSetにしか興味がないことがわかります。 これは基本的に他のコントローラも同じで、Reconcileは何か1種類のリソースに対してのみ責任を持つことがほとんどです。

syncHandlerはstructであるReplicaSetContrllerのフィールドであり、実態はsyncReplicaSet代入されています。 ではメインであるsyncReplicaSetの実装を確認していきます。

先ほど説明した通り、syncReplicaSetは引数であるkeyでネームスペース名/ReplicaSet名を受け取ります(658行目)。 そしてそれは分解され、ネームスペース名とReplicaSet名が取り出されます(664行目)。 取り出されたReplicaSetのみを対象にこのReconcile処理が実施され、それ以外のReplicaSetはそのReconcile処理では対象としません。

func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    ...(続く)

次にReconcile対象のReplicaSetオブジェクトを取得します。 削除されていた場合はそれ以上処理を進める必要がないため、ここで処理が終了します。

...(続き)
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
    klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
    rsc.expectations.DeleteExpectations(key)
    return nil
}
if err != nil {
    return err
}
...(続く)

rsc.expectations.SatisfiedExpectationsは、最適化の処理のようなものなので、ここでは無視して大丈夫です(678行目)。 その結果であるrsNeedsSyncにはtrueが入るものとしてこれ以降読み進めてください。

先ほど取得したReplicaSetの結果から、それにより管理されるPodを選択するためのselectorを取得します(679行目)。 その次に、ReplicaSetが所属するネームスペースの全てのPodを取得します(688行目)。 そして取得したPodからアクティブなPodのみを取得します(693行目)。 rsc.claimPodsは少し複雑ですが、対象のReplicaSetで管理されるべきPodをフィルターします(697行目)。 単純なフィルターだけでなく、対象のReplicaSetのセレクタにマッチしていますが、そのReplicaSetに管理されていないPodを管理下にするような修正もされます。 ここまででReplicaSetの管理対象のPodが絞り込めました。

...(続き)
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
    utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
    return nil
}

// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
    return err
}
// Ignore inactive pods.
filteredPods := controller.FilterActivePods(allPods)

// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
if err != nil {
    return err
}
...(続く)

ここから本格的なReconcile処理です。 削除されていない場合のみ、rsc.manageReplicasでPodの作成や削除をします(704行目)。 rsc.manageReplicasでは実際に作成されているPod(filteredPods)の数とReplicaSetで作られてほしいPodの数(.spec.replicas)を比較します。 そして足りなければ作成し、逆に多すぎる場合は削除する処理を実施します。 これまでの連載で説明した通り、Reconcile処理はイベントの種類(例えばReplicaSetのCreateやDeleteなど)によって処理を分岐しないことを説明してきましたが、コード上からもそれが確認できました。 ReplicaSetコントローラを通して、Reconcile処理は実際の状態と理想の状態を比較して、差分に対してアクションするものということを、改めて理解していただけたのではないでしょうか。

manageReplicasでは他にもリクエストを急激に流さないようにするための処理だったり、Pod作成時に少しずつリクエスト数を増やしながらリクエストを投げたりといったいくつかの工夫がなされています。 興味があればぜひコードを確認してみてください。

...(続き)
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
    manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
}
...(続く)

最後に.statusフィールドを更新してこの関数は終了します。

    ...(続き)
    rs = rs.DeepCopy()
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

    // Always updates status as pods come up or die.
    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        // Multiple things could lead to this update failing. Requeuing the replica set ensures
        // Returning an error causes a requeue without forcing a hotloop
        return err
    }
    // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
        rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }
    return manageReplicasErr
}

コントローラがどういう流れで記述されているのか雰囲気をつかめましたでしょうか?

最後に

今回は実際のコントローラがどういう作りになっているのかを知るために、KubernetesのReplicaSetコントローラのコードを確認しました。 Reconcile処理の流れを確認するために、大事なところをピックアップして確認していきましたが、興味がある方は細部まで確認してみてください。 様々な考慮がされていてとても面白いと思います。 ここまでで学んだことを活用し、次回以降で実際にサンプルのコントローラの開発してみましょう。

クリエーションラインではKubernetesに関する様々なトレーニングを提供しております。 詳しくはこちらをご参照ください。

また、この連載の更新通知を受け取りたい方は、Twitterアカウントのフォローや、このページ下部にリンクのあるCL LAB MAIL MAGAZINEへの登録をぜひお願いします!

新規CTA