podにNFSがmountされるまで #kubernetes #コードリーディング
この記事は1年以上前に投稿されました。情報が古い可能性がありますので、ご注意ください。
先日、KubernetesのPersistentVolumeからNFSを利用する場合の挙動について調査する機会がありました。
調査にあたってKubernetesのコードリーディングを行いましたので、内容をこちらで紹介します。
調査したいテーマは下記のものです。
- Persistent VolumeからNFSボリュームをマウントする際に、デフォルトで指定されるマウントオプションは存在するか?
一般的なLinuxサーバでNFSボリュームをマウントする際にはmountコマンドのオプションを指定していますが、kubernetesでマウントする場合はどうなるのか?という疑問です。
挙動
まずソースコードを読む前に挙動を確認します。
実際にPodにPV,PVCを介してNFSをマウントした際に、Podが存在するNode上で /proc/mounts を見てみます。
root@vanilla:~# cat /proc/mounts | grep 10.99.99.40 10.99.99.40:/srv/hoge //var/lib/kubelet/pods/704a1f4c-5634-4b66-8833-9d381adcac92/volumes/kubernetes.io~nfs/pv0001 nfs4 rw,relatime,vers=4.2,rsize=524288,wsize=524288,namlen=255,hard,proto=tcp,timeo=600, retrans=2,sec=sys,clientaddr=10.99.99.42,local_lock=none,addr=10.99.99.40 0 0
(見やすさのため、改行を入れています)
10.99.99.40:/srv/hoge が今回マウントしているNFSサーバです。podのvolume領域にマウントされていることが確認できます。特に変わったオプションは見当たりません。mountコマンドをオプション無しで実行した場合と同じ状態に見えます。ふむ。
ちなみに、コンテナの中から実行しても同じです。唯一の違いは、mount先が /opt に見えています。
10.99.99.40:/srv/hoge /opt nfs4 rw,relatime,vers=4.2,rsize=524288,wsize=524288,namlen=255,hard,proto=tcp,timeo=600, retrans=2,sec=sys,clientaddr=10.99.99.42,local_lock=none,addr=10.99.99.40 0 0
この/var/lib/kubelet/pods/~ と /optは、dockerによってローカルでバインドマウントされてることが確認できます。
root@vanilla:~# docker container inspect cddf9c510f43
[
{
"Id": "cddf9c510f430df7f9300c6d0f93fbed2f81a8bad130bda7cdf7a65a25b4617e",
"Created": "2020-11-12T07:12:33.297648697Z",
...
"Mounts": [
{
"Type": "bind",
"Source": "/var/lib/kubelet/pods/704a1f4c-5634-4b66-8833-9d381adcac92/volumes/kubernetes.io~nfs/pv0001",
"Destination": "/opt",
"Mode": "",
"RW": true,
"Propagation": "rprivate"
},
...
ということで、/var/lib/kubelet/pods/~ にどうNFSボリュームがマウントされているかをコードから確認すればよさそうです。他のコンテナランタイムだとどうなっているかはちょっと気になりますが。
コードリーディング
ソースコードから確認してみます。本記事ではNodeはLinux、バージョンは1.19.3としています。
reconcileからmount処理まで
kubernetesでPodにNFSをmountする役割はkubeletが担います。その中でも、特にvolumeのattach/detachを担うコンポーネントがvolumemanagerというものです。さて、volumemanagerのreconcile loopの中を探すと、mountAttachVolumesなる関数が呼ばれていることを確認できます。
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
}
mountAttachVolumesの中でいくつか処理の分岐がありますが、NFSのボリュームを新たにマウントする場合にはrc.operationExecutor.MountVolumeでmountが開始されます。
func (rc *reconciler) mountAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
//...
} else if !volMounted || cache.IsRemountRequiredError(err) {
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(err)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
処理はoperationExecutorに移譲されます。operationExecutor.MountVolumeでは、まずGenerateMountVolumeFuncという関数でmountするためのGeneratedOperationsというオブジェクトを作成します。GeneratedOperationsは関数を含んだオブジェクトです。
func (oe *operationExecutor) MountVolume(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) error {
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
if err != nil {
return err
}
var generatedOperations volumetypes.GeneratedOperations
if fsVolume {
// Filesystem volume case
// Mount/remount a volume when a volume is attached
generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
} else {
// Block volume case
// Creates a map to device if a volume is attached
generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld)
}
作成したOperationをここで実行しています。
return oe.pendingOperations.Run(
volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
では、GeneratedOperationsに含まれる関数の中を見てみます。
func (og *operationGenerator) GenerateMountVolumeFunc(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) volumetypes.GeneratedOperations {
生成される関数であるmountVolumeFuncの中では、まずvolumePluginをspecから取得しています。specはyamlファイルによく出てくるあのspecですね。
mountVolumeFunc := func() (error, error) {
// Get mounter plugin
volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
}
そのあと、volumePluginからvolumeMounterを生成し、
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod,
volume.VolumeOptions{})
いくつかのvalidationを経て、ここでマウントが実行されます。
// Execute mount
mountErr := volumeMounter.SetUp(volume.MounterArgs{
FsUser: ioutil.FsUserFrom(volumeToMount.Pod),
FsGroup: fsGroup,
DesiredSize: volumeToMount.DesiredSizeLimit,
FSGroupChangePolicy: fsGroupChangePolicy,
})
ということで、マウントの実行処理はvolumePluginと、volumePluginから生成されるvolumeMounterに移譲しているようです。では、今度はこのvolumePluginを追いかけていきます。
volumePluginを取得する
volumePluginを取得しているFindPluginBySpecの処理をまず確認します。
どうも、volumePluginMgrに格納されているvolumePluginからマッチするものを探しているようです。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/plugins.go#L654
func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if spec == nil {
return nil, fmt.Errorf("Could not find plugin because volume spec is nil")
}
matches := []VolumePlugin{}
for _, v := range pm.plugins {
if v.CanSupport(spec) {
matches = append(matches, v)
}
}
//...
return matches[0], nil
}
では、volumePluginMgrにvolumePluginが登録される処理を探します。
このvolumePluginMgrは引数で引き回されているのですが、追いかけるとkubelet起動時に初期化していて、どうもInitPluginsという関数にpluginsという引数を与えてセットされています。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/kubelet/kubelet.go#L708-L709
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
//...
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/kubelet/volume_host.go#L89
func NewInitializedVolumePluginMgr(
kubelet ⋆Kubelet,
secretManager secret.Manager,
configMapManager configmap.Manager,
tokenManager ⋆token.Manager,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber) (⋆volume.VolumePluginMgr, error) {
//...
if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
return nil, fmt.Errorf(
"could not initialize volume plugins for KubeletVolumePluginMgr: %v",
err)
}
return &kvh.volumePluginMgr, nil
}
このpluginsですが、kubeDepsなるインスタンスに格納されていましたね。今度はこのkubeDepsを追いかけます。このkubeDepsも引き回されている変数なのですが、元を追いかけるとkubeletの起動コマンドにたどり着きます。初期化処理でしょうし当たり前と言えば当たり前なのですが、ここまでたどり着くと何か感動があります。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/cmd/kubelet/app/server.go#L251
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.Fatal(err)
}
cmd := &cobra.Command{
Use: componentKubelet,
//...
Run: func(cmd *cobra.Command, args []string) {
// initial flag parse, since we disable cobra's flag parsing // use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
(関係ないですが、programmer error が気になる)
今度はUnsecuredDependenciesの中身を見てみます。すると、ProbeVolumePluginsなる、それっぽい処理が。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/cmd/kubelet/app/server.go#L379
func UnsecuredDependencies(s ⋆options.KubeletServer, featureGate featuregate.FeatureGate) (⋆kubelet.Dependencies, error) {
// Initialize the TLS Options
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
//...
plugins, err := ProbeVolumePlugins(featureGate)
中を見てみると...ありました!各pluginを呼び出しています。NFSも書かれていますね。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/cmd/kubelet/app/plugins.go#L75
func ProbeVolumePlugins(featureGate featuregate.FeatureGate) ([]volume.VolumePlugin, error) {
allPlugins := []volume.VolumePlugin{}
// The list of plugins to probe is decided by the kubelet binary, not
// by dynamic linking or other "magic". Plugins will be analyzed and
// initialized later.
//
// Kubelet does not currently need to configure volume plugins.
// If/when it does, see kube-controller-manager/app/plugins.go for example of using volume.VolumeConfig
var err error
allPlugins, err = appendLegacyProviderVolumes(allPlugins, featureGate)
if err != nil {
return allPlugins, err
}
allPlugins = append(allPlugins, emptydir.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, hostpath.ProbeVolumePlugins(volume.VolumeConfig{})...)
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...)
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/nfs/nfs.go#L40-L45
// ProbeVolumePlugins is the primary entrypoint for volume plugins.
// This is the primary entrypoint for volume plugins.
// The volumeConfig arg provides the ability to configure recycler behavior. It is implemented as a pointer to allow nils.
// The nfsPlugin is used to store the volumeConfig and give it, when needed, to the func that creates NFS Recyclers.
// Tests that exercise recycling should not use this func but instead use ProbeRecyclablePlugins() to override default behavior.
func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&nfsPlugin{
host: nil,
config: volumeConfig,
},
}
}
ということで、volumePluginMgrにNFS Pluginが登録されることを確認できました。このNFS PluginがvolumePluginMgr.FindPluginBySpecによって選択されることになります。では続いて、NFS Pluginの中を見ていきます。
NFS Volume Pluginがmountするまで
前に見たように、volumePluginからvolumeMounter が生成されます。nfsの場合はこのあたりですね。nfsMounterが生成されています。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/nfs/nfs.go#L113-L136
func (plugin *nfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod, plugin.host.GetMounter(plugin.GetPluginName()))
}
func (plugin *nfsPlugin) newMounterInternal(spec *volume.Spec, pod *v1.Pod, mounter mount.Interface) (volume.Mounter, error) {
source, readOnly, err := getVolumeSource(spec)
if err != nil {
return nil, err
}
return &nfsMounter{
nfs: &nfs{
volName: spec.Name(),
mounter: mounter,
pod: pod,
plugin: plugin,
MetricsProvider: volume.NewMetricsStatFS(getPath(pod.UID, spec.Name(), plugin.host)),
},
server: source.Server,
exportPath: source.Path,
readOnly: readOnly,
mountOptions: util.MountOptionFromSpec(spec),
}, nil
}
mount処理では、このmounterからmounter.SetUpを呼んでいましたね。見ていくと、SetUpからSetUpAtが呼ばれます。
この中を見ると、マウントオプションが作られていますね!nfs.readOnly: true の場合に ro を付与して、mounterのmountOptionsとマージしています。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/nfs/nfs.go#L244-L261
// SetUp attaches the disk and bind mounts to the volume path.
func (nfsMounter *nfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
return nfsMounter.SetUpAt(nfsMounter.GetPath(), mounterArgs)
}
func (nfsMounter *nfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
//...
options := []string{}
if nfsMounter.readOnly {
options = append(options, "ro")
}
mountOptions := util.JoinMountOptions(nfsMounter.mountOptions, options)
ちなみに、mounterのmountOptionsはspec.mountOptionsから取得していました。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/nfs/nfs.go#L134
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/util/util.go#L263
// MountOptionFromSpec extracts and joins mount options from volume spec with supplied options
func MountOptionFromSpec(spec *volume.Spec, options ...string) []string {
pv := spec.PersistentVolume
if pv != nil {
// Use beta annotation first
if mo, ok := pv.Annotations[v1.MountOptionAnnotation]; ok {
moList := strings.Split(mo, ",")
return JoinMountOptions(moList, options)
}
if len(pv.Spec.MountOptions) > 0 {
return JoinMountOptions(pv.Spec.MountOptions, options)
}
}
return options
}
SetUpAtの処理に戻ります。mountOptionのセット後にmounter.Mountが呼ばれています。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/nfs/nfs.go#L262
func (nfsMounter *nfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
//...
mountOptions := util.JoinMountOptions(nfsMounter.mountOptions, options)
err = nfsMounter.mounter.Mount(source, dir, "nfs", mountOptions)
このmounterは、nfsMounterの初期化時にplugin.host.GetMounterで与えられていました。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/pkg/volume/nfs/nfs.go#L114
func (plugin *nfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
return plugin.newMounterInternal(spec, pod, plugin.host.GetMounter(plugin.GetPluginName()))
}
ではこのGetMounterですが、これも実は起動時の処理で設定されていて、遡るとkubeDepsにセットされていました。UnsecutredDependenciesの下記の処理で生成されています。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/cmd/kubelet/app/server.go#L365
func UnsecuredDependencies(s ⋆options.KubeletServer, featureGate featuregate.FeatureGate) (⋆kubelet.Dependencies, error) {
// Initialize the TLS Options
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
if err != nil {
return nil, err
}
mounter := mount.New(s.ExperimentalMounterPath)
この先の処理はmountパッケージにあります。linux向けの場合は、mount_linux.goがbuildされています。ここの処理でmouterがnewされていますね。ちなみに余談ですが、mount_windows.go見るとcifsやらsmbやらの処理が書かれています。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/vendor/k8s.io/utils/mount/mount_linux.go#L59
func New(mounterPath string) Interface {
return &Mounter{
mounterPath: mounterPath,
withSystemd: detectSystemd(),
}
}
mounter.Mountを追いかけると、実際のmount処理にたどり着きます。
https://github.com/kubernetes/kubernetes/blob/v1.19.3/vendor/k8s.io/utils/mount/mount_linux.go#L147
func (mounter *Mounter) Mount(source string, target string, fstype string, options []string) error {
return mounter.MountSensitive(source, target, fstype, options, nil)
}
func (mounter *Mounter) MountSensitive(source string, target string, fstype string, options []string, sensitiveOptions []string) error {
//...
return mounter.doMount(mounterPath, defaultMountCommand, source, target, fstype, options, sensitiveOptions)
}
// doMount runs the mount command. mounterPath is the path to mounter binary if containerized mounter is used.
// sensitiveOptions is an extension of options except they will not be logged (because they may contain sensitive material)
func (mounter *Mounter) doMount(mounterPath string, mountCmd string, source string, target string, fstype string, options []string, sensitiveOptions []string) error {
mountArgs, mountArgsLogStr := MakeMountArgsSensitive(source, target, fstype, options, sensitiveOptions)
if len(mounterPath) > 0 {
mountArgs = append([]string{mountCmd}, mountArgs...)
mountArgsLogStr = mountCmd + " " + mountArgsLogStr
mountCmd = mounterPath
}
if mounter.withSystemd {
//...
mountCmd, mountArgs, mountArgsLogStr = AddSystemdScopeSensitive("systemd-run", target, mountCmd, mountArgs, mountArgsLogStr)
} else {
// No systemd-run on the host (or we failed to check it), assume kubelet
// does not run as a systemd service.
// No code here, mountCmd and mountArgs are already populated.
}
// Logging with sensitive mount options removed.
klog.V(4).Infof("Mounting cmd (%s) with arguments (%s)", mountCmd, mountArgsLogStr)
command := exec.Command(mountCmd, mountArgs...)
optionなどの引数をごにょごにょ操作してmountCmd、mountArgsを作って、exec.Commandで単に実行しています。ちなみにmounterPathはnilで渡ってくるようになっています。systemd管理下のOSの場合は、systemd-run --scope -- をコマンドの頭につけているようですね。さて、mountCmdの大元となるdefaultMountCommandをたどると...
https://github.com/kubernetes/kubernetes/blob/v1.19.3/vendor/k8s.io/utils/mount/mount.go#L33
package mount
//...
const (
// Default mount command if mounter path is not specified.
defaultMountCommand = "mount"
// Log message where sensitive mount options were removed
sensitiveOptionsRemoved = "<masked>"
)
なるほど、単にmountですね。ということでようやく、kubeletがmountコマンドをexecしているところまでたどり着きました。なるほど、単にmountOptionをつけてmountをexecしているということですね。ソースディレクトリ、ターゲットディレクトリも指定していますが、ここでは追いかけません。
また、コードではExperimentalMounterPathを指定することでこのmountCmdであるmountをdefaultから書き換えられるように見えますが、実際に書き換える処理はありません。実際にkubeletのconfigでexperimentalMounterPathを書き換えても、反映されないようです。
まとめ
ということで、コードから下記のことがわかりました。実際には環境や状況に応じていくつかの分岐がありますが、最終的にはmountコマンドを実行しているだけなんですね。
- kubernetesでNFSをmountするPodを作成すると、kubeletがmountコマンドを実行する
- マウントオプションにはspec.mountOptionsと、nfs.readOnly: trueのときだけroが設定される
https://github.com/kubernetes/kubernetesは大きなレポジトリですが、1つ1つ追いかけていくと意外と難しくはありません。とは言え、当たりをつけるのにkubernetesの基本的なアーキテクチャだけ理解しておく必要はありそうです。
おわりに
本記事では、Kubernetesのソースコード調査の内容をご紹介しました。クリエーションラインでは、Docker EnterpriseをはじめKubernetes Boost, Kubernetes TrainingなどKubernetesに関わる製品の取り扱いや技術支援を行っています。ご相談を承っておりますので、興味がありましたらお問い合わせよりご連絡ください。
