Kubelet v1.25.x源码——RuntimeManager
admin
2024-04-27 00:22:57
0

1. 环境说明

Kubernetes源码版本:remotes/origin/release-1.25
Kubernetes编译出来的Kubelet版本:Kubernetes v1.24.0-beta.0.2463+ee7799bab469d7
Kubernetes集群实验环境:使用Kubernetes v1.25.4二进制的方式搭建了一个单节点集群

K8S 单节点单节点搭建可以参考:Kubernetes v1.25 搭建单节点集群用于Debug K8S源码

Golang版本:go1.19.3 linux/amd64
IDEA版本:2022.2.3
Delve版本:1.9.1

[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# dlv version
Delve Debugger
Version: 1.9.1
Build: $Id: d81b9fd12bfa603f3cf7a4bc842398bd61c42940 $
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# go version
go version go1.19.3 linux/amd64
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl version
WARNING: This version information is deprecated and will be replaced with the output from kubectl version --short.  Use --output=yaml|json to get the full version.
Client Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:36:36Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
Kustomize Version: v4.5.7
Server Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:29:58Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get nodes -owide
NAME          STATUS   ROLES    AGE   VERSION   INTERNAL-IP     EXTERNAL-IP   OS-IMAGE                KERNEL-VERSION                CONTAINER-RUNTIME
k8s-master1   Ready       31h   v1.25.4   192.168.11.71           CentOS Linux 7 (Core)   3.10.0-1160.80.1.el7.x86_64   containerd://1.6.10
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get componentstatus
Warning: v1 ComponentStatus is deprecated in v1.19+
NAME                 STATUS    MESSAGE                         ERROR
etcd-0               Healthy   {"health":"true","reason":""}
controller-manager   Healthy   ok
scheduler            Healthy   ok
[root@k8s-master1 kubernetes]#

Kubelet启动参数配置如下:

[root@k8s-master1 kubernetes]# ps -ef|grep "/usr/local/bin/kubelet"
root       7972      1  6 07:06 ?        00:00:06 /usr/local/bin/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.kubeconfig --kubeconfig=/etc/kubernetes/kubelet.kubeconfig --config=/etc/kubernetes/kubelet-conf.yml --container-runtime-endpoint=unix:///run/containerd/containerd.sock --node-labels=node.kubernetes.io/node= --v=8
root       9549   6424  0 07:07 pts/0    00:00:00 grep --color=auto /usr/local/bin/kubelet
[root@k8s-master1 kubernetes]#

Kubelet参数配置如下:

apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
address: 0.0.0.0
port: 10250
readOnlyPort: 10255
authentication:anonymous:enabled: falsewebhook:cacheTTL: 2m0senabled: truex509:clientCAFile: /etc/kubernetes/pki/ca.pem
authorization:mode: Webhookwebhook:cacheAuthorizedTTL: 5m0scacheUnauthorizedTTL: 30s
cgroupDriver: systemd
cgroupsPerQOS: true
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf
cpuCFSQuota: true
cpuManagerPolicy: none
cpuManagerReconcilePeriod: 10s
enableControllerAttachDetach: true
enableDebuggingHandlers: true
enforceNodeAllocatable:
- pods
eventBurst: 10
eventRecordQPS: 5
evictionHard:imagefs.available: 15%memory.available: 100Minodefs.available: 10%nodefs.inodesFree: 5%
evictionPressureTransitionPeriod: 5m0s
failSwapOn: true
fileCheckFrequency: 20s
hairpinMode: promiscuous-bridge
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 20s
imageGCHighThresholdPercent: 85
imageGCLowThresholdPercent: 80
imageMinimumGCAge: 2m0s
iptablesDropBit: 15
iptablesMasqueradeBit: 14
kubeAPIBurst: 10
kubeAPIQPS: 5
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1
registryBurst: 10
registryPullQPS: 5
resolvConf: /etc/resolv.conf
rotateCertificates: true
runtimeRequestTimeout: 2m0s
serializeImagePulls: true
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
volumeStatsAggPeriod: 1m0s

2. 组件概览

RuntimeManager主要用于和底层的运行时打交道,处理容器的增删改查

3. 源码剖析

3.1. KubeGenericRuntime

KubeGenericRuntime

我们先来看看KubeGenericRuntime是如何定义的,如下:

// pkg/kubelet/kuberuntime/kuberuntime_manager.go
type KubeGenericRuntime interface {kubecontainer.Runtimekubecontainer.StreamingRuntimekubecontainer.CommandRunner
}

KubeGenericRuntime接口分别由Runtime, StreamingRuntim, CommandRunner这三个接口组合而成,下面我们分别来看看这三个接口是如何定义的。

3.1.1. kubecontainer.Runtime

Runtime接口显然是这三个接口中最为重要的一个接口,容器运行时必须实现这个接口,并且每个方法的实现都必须是线程安全的。

type Runtime interface {// 返回当前容器运行时到底是哪一种运行时Type() string// 返回容器运行时的版本信息Version() (Version, error)// 返回容器运行时的`API`版本APIVersion() (Version, error)// 返回容器运行时的状态Status() (*RuntimeStatus, error)// 获取所有的`Pod`, 布尔参数用于指定是否要获取所有的Pod,包括已经退出的容器或者死亡的容器GetPods(all bool) ([]*Pod, error)// 使用指定的容器`GC`策略删除已经死亡的容器GarbageCollect(gcPolicy GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error// 同步Pod到指定的状态SyncPod(pod *v1.Pod, podStatus *PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) PodSyncResult// 杀死一个PodKillPod(pod *v1.Pod, runningPod Pod, gracePeriodOverride *int64) error// 获取容器的状态GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)// 获取容器的日志GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error)// 删除容器DeleteContainer(containerID ContainerID) error// 镜像相关的接口ImageServiceUpdatePodCIDR(podCIDR string) errorCheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error
}type ImageService interface {PullImage(image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)GetImageRef(image ImageSpec) (string, error)ListImages() ([]Image, error)RemoveImage(image ImageSpec) errorImageStats() (*ImageStats, error)
}

3.1.2. kubecontainer.StreamingRuntime

通过方法名,我们大致能猜测这三个命令应该和kubectl exec/attach/portforward命令相关

type StreamingRuntime interface {GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
}

3.1.3. kubecontainer.CommandRunner

CommandRunner接口用于在容器中执行命令

type CommandRunner interface {RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error)
}

3.2. kubeGenericRuntimeManager

kubeGenericRuntimeManagerKubeGenericRuntime的实现,我们来看看kubeGenericRuntimeManager为了实现kubeGenericRuntime接口到底依赖了哪些组件。

type kubeGenericRuntimeManager struct {// 用于指定当前的运行时的名字runtimeName string// 事件记录器,运用记录容器整个生命周期过程中产生的事件recorder    record.EventRecorder// 创建/删除目录、连接、文件、管道的接口抽象,不同操作系统的实现不同osInterface kubecontainer.OSInterface// 通过cAdvisor获取机器的信息machineInfo *cadvisorapi.MachineInfo// 用于管理容器GCcontainerGC *containerGC// 用于拉取镜像keyring credentialprovider.DockerKeyringrunner kubecontainer.HandlerRunner// 用于生成运行时的选项runtimeHelper kubecontainer.RuntimeHelperlivenessManager  proberesults.ManagerreadinessManager proberesults.ManagerstartupManager   proberesults.ManagercpuCFSQuota boolcpuCFSQuotaPeriod metav1.DurationimagePuller images.ImageManagerruntimeService internalapi.RuntimeServiceimageService   internalapi.ImageManagerServiceversionCache *cache.ObjectCacheseccompProfileRoot stringinternalLifecycle cm.InternalContainerLifecyclelogManager logs.ContainerLogManagerruntimeClassManager *runtimeclass.ManagerlogReduction *logreduction.LogReductionpodStateProvider podStateProviderseccompDefault boolmemorySwapBehavior stringgetNodeAllocatable func() v1.ResourceListmemoryThrottlingFactor float64
}

3.3. NewKubeGenericRuntimeManager

我们一起来看看RuntimeManager是如何实例化出来的

可以看到,参数相当之多。逻辑并不是很复杂,主要有以下逻辑:

  • 1、查询底层的容器运行时当前的版本信息
  • 2、判断如果当前的底层容器运行时的版本不等于0.1.0就直接退出
    • 也就是说Kubernetes v1.25.x版本的容器运行时的Api Version只支持0.1.0版本
  • 3、如果当前运行kubelet的宿主机没有创建/var/log/pods目录,就创建该目录
  • 4、创建ImageManager
  • 5、创建CotainerGcManager
func NewKubeGenericRuntimeManager(recorder record.EventRecorder,livenessManager proberesults.Manager,readinessManager proberesults.Manager,startupManager proberesults.Manager,rootDirectory string,machineInfo *cadvisorapi.MachineInfo,podStateProvider podStateProvider,osInterface kubecontainer.OSInterface,runtimeHelper kubecontainer.RuntimeHelper,httpClient types.HTTPGetter,imageBackOff *flowcontrol.Backoff,serializeImagePulls bool,imagePullQPS float32,imagePullBurst int,imageCredentialProviderConfigFile string,imageCredentialProviderBinDir string,cpuCFSQuota bool,cpuCFSQuotaPeriod metav1.Duration,runtimeService internalapi.RuntimeService,imageService internalapi.ImageManagerService,internalLifecycle cm.InternalContainerLifecycle,logManager logs.ContainerLogManager,runtimeClassManager *runtimeclass.Manager,seccompDefault bool,memorySwapBehavior string,getNodeAllocatable func() v1.ResourceList,memoryThrottlingFactor float64,
) (KubeGenericRuntime, error) {runtimeService = newInstrumentedRuntimeService(runtimeService)imageService = newInstrumentedImageManagerService(imageService)kubeRuntimeManager := &kubeGenericRuntimeManager{recorder:               recorder,cpuCFSQuota:            cpuCFSQuota,cpuCFSQuotaPeriod:      cpuCFSQuotaPeriod,seccompProfileRoot:     filepath.Join(rootDirectory, "seccomp"),livenessManager:        livenessManager,readinessManager:       readinessManager,startupManager:         startupManager,machineInfo:            machineInfo,osInterface:            osInterface,runtimeHelper:          runtimeHelper,runtimeService:         runtimeService,imageService:           imageService,internalLifecycle:      internalLifecycle,logManager:             logManager,runtimeClassManager:    runtimeClassManager,logReduction:           logreduction.NewLogReduction(identicalErrorDelay),seccompDefault:         seccompDefault,memorySwapBehavior:     memorySwapBehavior,getNodeAllocatable:     getNodeAllocatable,memoryThrottlingFactor: memoryThrottlingFactor,}typedVersion, err := kubeRuntimeManager.getTypedVersion()if err != nil {klog.ErrorS(err, "Get runtime version failed")return nil, err}if typedVersion.Version != kubeRuntimeAPIVersion {klog.ErrorS(err, "This runtime api version is not supported","apiVersion", typedVersion.Version,"supportedAPIVersion", kubeRuntimeAPIVersion)return nil, ErrVersionNotSupported}kubeRuntimeManager.runtimeName = typedVersion.RuntimeNameklog.InfoS("Container runtime initialized","containerRuntime", typedVersion.RuntimeName,"version", typedVersion.RuntimeVersion,"apiVersion", typedVersion.RuntimeApiVersion)if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {klog.ErrorS(err, "Failed to create pod log directory", "path", podLogsRootDirectory)}}if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletCredentialProviders) && (imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "") {klog.InfoS("Flags --image-credential-provider-config or --image-credential-provider-bin-dir were set but the feature gate was disabled, these flags will be ignored","featureGate", features.KubeletCredentialProviders)}if utilfeature.DefaultFeatureGate.Enabled(features.KubeletCredentialProviders) && (imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "") {if err := plugin.RegisterCredentialProviderPlugins(imageCredentialProviderConfigFile, imageCredentialProviderBinDir); err != nil {klog.ErrorS(err, "Failed to register CRI auth plugins")os.Exit(1)}}kubeRuntimeManager.keyring = credentialprovider.NewDockerKeyring()kubeRuntimeManager.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder),kubeRuntimeManager,imageBackOff,serializeImagePulls,imagePullQPS,imagePullBurst)kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)kubeRuntimeManager.podStateProvider = podStateProviderkubeRuntimeManager.versionCache = cache.NewObjectCache(func() (interface{}, error) {return kubeRuntimeManager.getTypedVersion()},versionCacheTTL,)return kubeRuntimeManager, nil
}

3.4. Version

Version
func (m *kubeGenericRuntimeManager) getTypedVersion() (*runtimeapi.VersionResponse, error) {// 通过gRPC调用底层运行时的Version接口typedVersion, err := m.runtimeService.Version(kubeRuntimeAPIVersion)if err != nil {return nil, fmt.Errorf("get remote runtime typed version failed: %v", err)}return typedVersion, nil
}func (m *kubeGenericRuntimeManager) Version() (kubecontainer.Version, error) {// 通过gRPC调用底层运行时的Version接口typedVersion, err := m.getTypedVersion()if err != nil {return nil, err}return newRuntimeVersion(typedVersion.RuntimeVersion)
}

3.5. APIVersion

APIVersion
func (m *kubeGenericRuntimeManager) APIVersion() (kubecontainer.Version, error) {versionObject, err := m.versionCache.Get(m.machineInfo.MachineID)if err != nil {return nil, err}typedVersion := versionObject.(*runtimeapi.VersionResponse)return newRuntimeVersion(typedVersion.RuntimeApiVersion)
}

3.6. Status

Status
func (m *kubeGenericRuntimeManager) Status() (*kubecontainer.RuntimeStatus, error) {// runtimeService实际上就是RemoteRuntimeService,它是CRI的gRPC接口定义的客户端resp, err := m.runtimeService.Status(false)if err != nil {return nil, err}if resp.GetStatus() == nil {return nil, errors.New("runtime status is nil")}return toKubeRuntimeStatus(resp.GetStatus()), nil
}

3.7. GetPods

GetPods

来看看GetPods具体有哪些逻辑:

  • 1、通过调用CRI ListPodSandbox接口查询所有已经处于ReadySandbox
  • 2、把查询出来的Sandbox按照Metadata.Uid进行归类,相同Metadata.Uid属于同一个Pod
  • 3、调用CRI ListContainers接口查询所有处于ReadyContainer
  • 4、把查询出来的Container按照Metadata.Uid进行归类,相同Metadata.Uid属于同一个Pod
func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {filter := &runtimeapi.ContainerFilter{}if !allContainers {filter.State = &runtimeapi.ContainerStateValue{State: runtimeapi.ContainerState_CONTAINER_RUNNING,}}containers, err := m.runtimeService.ListContainers(filter)if err != nil {klog.ErrorS(err, "ListContainers failed")return nil, err}return containers, nil
}
func (m *kubeGenericRuntimeManager) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {var filter *runtimeapi.PodSandboxFilterif !all {readyState := runtimeapi.PodSandboxState_SANDBOX_READYfilter = &runtimeapi.PodSandboxFilter{State: &runtimeapi.PodSandboxStateValue{State: readyState,},}}resp, err := m.runtimeService.ListPodSandbox(filter)if err != nil {klog.ErrorS(err, "Failed to list pod sandboxes")return nil, err}return resp, nil
}
func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {pods := make(map[kubetypes.UID]*kubecontainer.Pod)sandboxes, err := m.getKubeletSandboxes(all)if err != nil {return nil, err}for i := range sandboxes {s := sandboxes[i]if s.Metadata == nil {klog.V(4).InfoS("Sandbox does not have metadata", "sandbox", s)continue}podUID := kubetypes.UID(s.Metadata.Uid)if _, ok := pods[podUID]; !ok {pods[podUID] = &kubecontainer.Pod{ID:        podUID,Name:      s.Metadata.Name,Namespace: s.Metadata.Namespace,}}p := pods[podUID]converted, err := m.sandboxToKubeContainer(s)if err != nil {klog.V(4).InfoS("Convert sandbox of pod failed", "runtimeName", m.runtimeName, "sandbox", s, "podUID", podUID, "err", err)continue}p.Sandboxes = append(p.Sandboxes, converted)}containers, err := m.getKubeletContainers(all)if err != nil {return nil, err}for i := range containers {c := containers[i]if c.Metadata == nil {klog.V(4).InfoS("Container does not have metadata", "container", c)continue}labelledInfo := getContainerInfoFromLabels(c.Labels)pod, found := pods[labelledInfo.PodUID]if !found {pod = &kubecontainer.Pod{ID:        labelledInfo.PodUID,Name:      labelledInfo.PodName,Namespace: labelledInfo.PodNamespace,}pods[labelledInfo.PodUID] = pod}converted, err := m.toKubeContainer(c)if err != nil {klog.V(4).InfoS("Convert container of pod failed", "runtimeName", m.runtimeName, "container", c, "podUID", labelledInfo.PodUID, "err", err)continue}pod.Containers = append(pod.Containers, converted)}// Convert map to list.var result []*kubecontainer.Podfor _, pod := range pods {result = append(result, pod)}return result, nil
}

3.8. computePodActions

computePodActions

接下来我们来看看computePodActions函数,该方法是一个辅助函数,之后的放回会调用到这个接口,该方法主要是通过对比Pod的状态,计算出需要对当前Pod进行何种操作。虽然代码只有一点,但是想要理解其判断逻辑还是挺难理解的,暂时先不关注细节,只需要知道这个函数可以计算当前Pod需要进行哪种动作,譬如创建Sandbox、杀掉Pod,哪些容器需要启动,哪些容器需要被Kill

func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod))// 根据pod podstatus判断当前pod的Sandbox是否已经发生该改变createPodSandbox, attempt, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus)changes := podActions{KillPod:           createPodSandbox,CreateSandbox:     createPodSandbox,SandboxID:         sandboxID,Attempt:           attempt,ContainersToStart: []int{},ContainersToKill:  make(map[kubecontainer.ContainerID]containerToKillInfo),}if createPodSandbox { // 如果需要重建PodSandbox// 如果用户指定当前Pod的重启策略为Never并且尝试次数大于零,并且启动了多个容器,那么直接返回if !shouldRestartOnFailure(pod) && attempt != 0 && len(podStatus.ContainerStatuses) != 0 {changes.CreateSandbox = falsereturn changes}var containersToStart []intfor idx, c := range pod.Spec.Containers {if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) {continue}containersToStart = append(containersToStart, idx)}if len(containersToStart) == 0 {_, _, done := findNextInitContainerToRun(pod, podStatus)if done {changes.CreateSandbox = falsereturn changes}}if len(pod.Spec.InitContainers) != 0 {// Pod has init containers, return the first one.changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]return changes}changes.ContainersToStart = containersToStartreturn changes}// Ephemeral containers may be started even if initialization is not yet complete.for i := range pod.Spec.EphemeralContainers {c := (*v1.Container)(&pod.Spec.EphemeralContainers[i].EphemeralContainerCommon)// Ephemeral Containers are never restartedif podStatus.FindContainerStatusByName(c.Name) == nil {changes.EphemeralContainersToStart = append(changes.EphemeralContainersToStart, i)}}// Check initialization progress.initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)if !done {if next != nil {initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus)if initFailed && !shouldRestartOnFailure(pod) {changes.KillPod = true} else {// Always try to stop containers in unknown state first.if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{name:      next.Name,container: next,message: fmt.Sprintf("Init container is in %q state, try killing it before restart",initLastStatus.State),reason: reasonUnknown,}}changes.NextInitContainerToStart = next}}return changes}keepCount := 0for idx, container := range pod.Spec.Containers {containerStatus := podStatus.FindContainerStatusByName(container.Name)if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning {if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil {klog.ErrorS(err, "Internal container post-stop lifecycle hook failed for container in pod with error","containerName", container.Name, "pod", klog.KObj(pod))}}if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {klog.V(3).InfoS("Container of pod is not in the desired state and shall be started", "containerName", container.Name, "pod", klog.KObj(pod))changes.ContainersToStart = append(changes.ContainersToStart, idx)if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown {changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{name:      containerStatus.Name,container: &pod.Spec.Containers[idx],message: fmt.Sprintf("Container is in %q state, try killing it before restart",containerStatus.State),reason: reasonUnknown,}}}continue}// The container is running, but kill the container if any of the following condition is met.var message stringvar reason containerKillReasonrestart := shouldRestartOnFailure(pod)if _, _, changed := containerChanged(&container, containerStatus); changed {message = fmt.Sprintf("Container %s definition changed", container.Name)restart = true} else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {// If the container failed the liveness probe, we should kill it.message = fmt.Sprintf("Container %s failed liveness probe", container.Name)reason = reasonLivenessProbe} else if startup, found := m.startupManager.Get(containerStatus.ID); found && startup == proberesults.Failure {// If the container failed the startup probe, we should kill it.message = fmt.Sprintf("Container %s failed startup probe", container.Name)reason = reasonStartupProbe} else {// Keep the container.keepCount++continue}if restart {message = fmt.Sprintf("%s, will be restarted", message)changes.ContainersToStart = append(changes.ContainersToStart, idx)}changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{name:      containerStatus.Name,container: &pod.Spec.Containers[idx],message:   message,reason:    reason,}klog.V(2).InfoS("Message for Container of pod", "containerName", container.Name, "containerStatusID", containerStatus.ID, "pod", klog.KObj(pod), "containerMessage", message)}if keepCount == 0 && len(changes.ContainersToStart) == 0 {changes.KillPod = true}return changes
}

3.8.1. PodSandboxChanged

PodSandboxChanged

PodSandboxChanged方法的第一个参数是ApiServer中定义的PodApiserver通过接收这种类型的Pod调用CRI的接口创建真正的Container。而这第二个参数则是从Runtime中查询回来的Pod状态了。显然,pod参数为用户期望的状态,而podStatus为底层实际运行的状态,kubelet的目标就是通过对比pod以及podStatus的状态,从而实现底层运行的Pod到达用户期望的状态

既然PodSandbox是为Container运行提供一种运行时的环境,那么为什么一个Pod需要多个PodSandbox???到底如何理解PodSandbox???从后续的代码中可以看出来,实际上一个Pod应该只需要一个PodSandbox,当一个Pod出现了多个PodSandbox,就说明当前Pod不正常,需要Reconcile

PodSandboxChanged方法用于判断是否需要重建,即先Kill然后在Create,除了下面的集中情况PodSandbox需要重建外,其余情况都不需要重建Podsandbox,如下:

  • 1、PodSandbox还未创建,每个Pod都需要一个PodSandbox来提供运行时环境,如果还没有创建,显然需要先创建PodSandbox
    • PodSandbox可以理解为底层的Infra容器么?
  • 2、如果一个Pod存在多个PodSandbox,说明该Pod不正常,也需要重建
  • 3、如果PodSandbox的状态不处于Ready,那么需要重建该Sandbox
  • 4、如果Pod的网络空间不对,也需要重建Sandbox
  • 5、如果Pod的网络空间正确,但是PodSandbox没有分配IP,需要重建Sandbox
func PodSandboxChanged(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, uint32, string) {// 如果没有找到PodSandbox,说明还没有创建,此时需要创建PodSandbox// PodSandbox可以认为是Container运行所需要的运行时环境,一次一个Pod的运行必须需要PodSandboxif len(podStatus.SandboxStatuses) == 0 {klog.V(2).InfoS("No sandbox for pod can be found. Need to start a new one", "pod", klog.KObj(pod))return true, 0, ""}// 统计目前已经处于Ready状态的PodSandboxreadySandboxCount := 0for _, s := range podStatus.SandboxStatuses {if s.State == runtimeapi.PodSandboxState_SANDBOX_READY {readySandboxCount++}}// 获取第一个PodSandbox,是不是可以理解为一般情况下,每个Pod只会有一个PodSandboxsandboxStatus := podStatus.SandboxStatuses[0]if readySandboxCount > 1 {// 从日志中可以看出来,每个Pod应该只会有一个PodSandbox,如果有多个就需要进行Reconcileklog.V(2).InfoS("Multiple sandboxes are ready for Pod. Need to reconcile them", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id}// 说明Sandbox还没有Ready,后续需要Kill然后ReCreateif sandboxStatus.State != runtimeapi.PodSandboxState_SANDBOX_READY {klog.V(2).InfoS("No ready sandbox for pod can be found. Need to start a new one", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id}// 如果Pod的网络空间不对,那么Sandbox也需要重建if sandboxStatus.GetLinux().GetNamespaces().GetOptions().GetNetwork() != NetworkNamespaceForPod(pod) {klog.V(2).InfoS("Sandbox for pod has changed. Need to start a new one", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, ""}// 如果Pod的网络属于Pod网络,并且IP没有分配,那么也需要重建PodSandboxif !kubecontainer.IsHostNetworkPod(pod) && sandboxStatus.Network != nil && sandboxStatus.Network.Ip == "" {klog.V(2).InfoS("Sandbox for pod has no IP address. Need to start a new one", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id}return false, sandboxStatus.Metadata.Attempt, sandboxStatus.Id
}

3.8.2. findNextInitContainerToRun

findNextInitContainerToRun

从这段代码中就可以看出来,Pod同步给Runtime的时候是如何处理Init Container的。我们都知道,Pod Init Contaier的执行顺序是安装定义先后顺序来的,先定义的限制性。而Init Contaienr并非一次就能执行成功,而哪些已经成功执行的Init Contaienr显然不能再执行一次。所以findNextInitContainerToRun函数的目的就是为了找到失败的那个Init Container,具体逻辑如下:

  • 1、如果用户没有指定Init Container,直接退出
  • 2、遍历所有的Container,如果找到一个Container正在运行,说明Init Container已经全部运行成功。直接退出
  • 3、倒序遍历所有的Init Container,如果找到一个Init Container处于Failed状态,那么就返回这个Init Container
  • 4、如果没有找到FailedInit Container,那么就找到下一个还未运行的Init Container
func findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (status *kubecontainer.Status, next *v1.Container, done bool) {if len(pod.Spec.InitContainers) == 0 {return nil, nil, true}for i := range pod.Spec.Containers {container := &pod.Spec.Containers[i]status := podStatus.FindContainerStatusByName(container.Name)if status != nil && status.State == kubecontainer.ContainerStateRunning {return nil, nil, true}}for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {container := &pod.Spec.InitContainers[i]status := podStatus.FindContainerStatusByName(container.Name)if status != nil && isInitContainerFailed(status) {return status, container, false}}for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {container := &pod.Spec.InitContainers[i]status := podStatus.FindContainerStatusByName(container.Name)if status == nil {continue}// container is still running, return not done.if status.State == kubecontainer.ContainerStateRunning {return nil, nil, false}if status.State == kubecontainer.ContainerStateExited {// all init containers successfulif i == (len(pod.Spec.InitContainers) - 1) {return nil, nil, true}// all containers up to i successful, go to i+1return nil, &pod.Spec.InitContainers[i+1], false}}return nil, &pod.Spec.InitContainers[0], false
}

3.9. SyncPod

SyncPod

SyncPod用于同步Pod,可能是删除Pod,也可能是创建Pod,名字倒是取得比较合适。具体逻辑如下:

  • 1、计算出当前PodSandbox以及container是否发生改变,如果PodSandbox发生改变,发送SandboxChanged事件
  • 2、如果Pod已经被Kill,那么先Kill当前Pod的所有Container,然后Stop这个PodSandbox
  • 3、如果当前PodSandbox还需要被创建,那么移除当前Pod所有的InitContainer,由于后续PodSandbox起来之后,InitContainer需要一个一个执行,而具体执行哪一个InitContainer是通过底层容器运行的状态得出的,所以这里首先需要清理掉上一次的环境。
  • 3、如果这个Pod不需要被Kill,那么就把这个Pod中需要KillContainer干掉
  • 4、移除一些不必要的InitContainer
  • 5、如果需要创建PodSandbox,那么:
    • 5.1、转换用户指定的pod.Spec.SecurityContext,主要原因是sysctl的变量分隔符为.,如果是反斜杠就会报错
    • 5.2、调用底层容器运行时的RunPodSandbox接口运行PodSandbox,如果创建PodSandbox失败了,那么记录事件
    • 5.3、调用底层容器运行时的PodSandboxStatus接口查询Sandbox的状态,如果转台为空,直接返回
    • 5.4、如果Pod的网络不是Host网络,那么使用CRI上报上来的IP覆盖
  • 6、根据Pod的资源清单生成PodSandbox的配置
  • 7、启动临时容器
  • 8、启动Init Contaienr
  • 9、启动Container
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {podContainerChanges := m.computePodActions(pod, podStatus)klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))if podContainerChanges.CreateSandbox {ref, err := ref.GetReference(legacyscheme.Scheme, pod)if err != nil {klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))}if podContainerChanges.SandboxID != "" {m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")} else {klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))}}if podContainerChanges.KillPod {if podContainerChanges.CreateSandbox {klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))} else {klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))}killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)result.AddPodSyncResult(killResult)if killResult.Error() != nil {klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")return}if podContainerChanges.CreateSandbox {m.purgeInitContainers(pod, podStatus)}} else {for containerID, containerInfo := range podContainerChanges.ContainersToKill {klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)result.AddSyncResult(killContainerResult)if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))return}}}m.pruneInitContainersBeforeStart(pod, podStatus)var podIPs []stringif podStatus != nil {podIPs = podStatus.IPs}podSandboxID := podContainerChanges.SandboxIDif podContainerChanges.CreateSandbox {var msg stringvar err errorklog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))metrics.StartedPodsTotal.Inc()createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))result.AddSyncResult(createSandboxResult)sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext)podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)if err != nil {if m.podStateProvider.IsPodTerminationRequested(pod.UID) {klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID)return}metrics.StartedPodsErrorsTotal.Inc()createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod))ref, referr := ref.GetReference(legacyscheme.Scheme, pod)if referr != nil {klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))}m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)return}klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))resp, err := m.runtimeService.PodSandboxStatus(podSandboxID, false)if err != nil {ref, referr := ref.GetReference(legacyscheme.Scheme, pod)if referr != nil {klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))}m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod))result.Fail(err)return}if resp.GetStatus() == nil {result.Fail(errors.New("pod sandbox status is nil"))return}if !kubecontainer.IsHostNetworkPod(pod) {// Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, resp.GetStatus())klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))}}podIP := ""if len(podIPs) != 0 {podIP = podIPs[0]}// Get podSandboxConfig for containers to start.configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)result.AddSyncResult(configPodSandboxResult)podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)if err != nil {message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)return}start := func(typeName, metricLabel string, spec *startSpec) error {startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))return err}metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()}klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()}startContainerResult.Fail(err, msg)switch {case err == images.ErrImagePullBackOff:klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)default:utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))}return err}return nil}for _, idx := range podContainerChanges.EphemeralContainersToStart {start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))}// Step 6: start the init container.if container := podContainerChanges.NextInitContainerToStart; container != nil {// Start the next init container.if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {return}// Successfully started the container; clear the entry in the failureklog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))}// Step 7: start containers in podContainerChanges.ContainersToStart.for _, idx := range podContainerChanges.ContainersToStart {start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))}return
}

3.9.1. killPodWithSyncResult

killPodWithSyncResult

当前逻辑并不难,首先通过调用killContainersWithSyncResult干掉当前Pod的所有容器,然后干掉这个PodSandbox

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {// 把当前Pod的所有Container全部Kill,并且把结果返回killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)for _, containerResult := range killContainerResults {result.AddSyncResult(containerResult)}// 把当前Pod的所有Sandbox全部Stop,并且把结果返回killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)result.AddSyncResult(killSandboxResult)// Stop all sandboxes belongs to same podfor _, podSandbox := range runningPod.Sandboxes {if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil && !crierror.IsNotFound(err) {killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())klog.ErrorS(nil, "Failed to stop sandbox", "podSandboxID", podSandbox.ID)}}return
}

3.9.1.1. killContainersWithSyncResult

killContainersWithSyncResult

killContainersWithSyncResult的逻辑倒是不难理解,重点应该关注GenericRuntimeManager是如何杀掉一个容器的,也就是关注:killContainer方法。每个容器都启用一个协程去关闭指定容器(所以,底层运行时实现的CRI接口必须是线程安全的),当所有协程都停止容器成功之后,主协程携带所有容器的停止结果返回。

func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))wg := sync.WaitGroup{}wg.Add(len(runningPod.Containers))for _, container := range runningPod.Containers {go func(container *kubecontainer.Container) {defer utilruntime.HandleCrash()defer wg.Done()killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)if err := m.killContainer(pod, container.ID, container.Name, "", reasonUnknown, gracePeriodOverride); err != nil {killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())// Use runningPod for logging as the pod passed in could be *nil*.klog.ErrorS(err, "Kill container failed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID,"containerName", container.Name, "containerID", container.ID)}containerResults <- killContainerResult}(container)}wg.Wait()close(containerResults)for containerResult := range containerResults {syncResults = append(syncResults, containerResult)}return
}

3.9.2. purgeInitContainers

purgeInitContainers

purgeInitContainers方法就是为了把Pod的所有InitContainer全部移除

func (m *kubeGenericRuntimeManager) purgeInitContainers(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {initContainerNames := sets.NewString()for _, container := range pod.Spec.InitContainers {initContainerNames.Insert(container.Name)}for name := range initContainerNames {count := 0for _, status := range podStatus.ContainerStatuses {if status.Name != name {continue}count++// Purge all init containers that match this container nameklog.V(4).InfoS("Removing init container", "containerName", status.Name, "containerID", status.ID.ID, "count", count)if err := m.removeContainer(status.ID.ID); err != nil {utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))continue}}}
}

3.9.2.1. removeContainer

removeContainer

我们来看看GenericRuntimeManager是如何移除一个容器的,逻辑如下:

  • 1、先调用InternalLifeCyclePostStopCotainer去停止一个容器,可以看到移除容器的时间点应该发生在停止容器之后
  • 2、如果开启了TopologyManager,那么调用TopologyManagerRemoveContainer方法,移除容器
  • 3、调用底层操作系统的接口删除容器的日志,路径大致为:podFullname_ContainerName.log
  • 4、调用符合CRI规范的容器运行时的RemoveContainer接口移除容器
func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {klog.V(4).InfoS("Removing container", "containerID", containerID)if err := m.internalLifecycle.PostStopContainer(containerID); err != nil {return err}if err := m.removeContainerLog(containerID); err != nil {return err}// Remove the container.return m.runtimeService.RemoveContainer(containerID)
}func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error {if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {err := i.topologyManager.RemoveContainer(containerID)if err != nil {return err}}return nil
}func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error {// Use log manager to remove rotated logs.err := m.logManager.Clean(containerID)if err != nil {return err}// 通过CRI接口调用底层运行时接口获取容器的状态resp, err := m.runtimeService.ContainerStatus(containerID, false)if err != nil {return fmt.Errorf("failed to get container status %q: %v", containerID, err)}status := resp.GetStatus()if status == nil {return remote.ErrContainerStatusNil}labeledInfo := getContainerInfoFromLabels(status.Labels)// 获取容器的日志路径,大致是legacySymlink=podFullName_ContainerName.loglegacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName,labeledInfo.PodNamespace)// 调用底层操作系统的接口移除日志if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) {return fmt.Errorf("failed to remove container %q log legacy symbolic link %q: %v",containerID, legacySymlink, err)}return nil
}

3.9.3. killContainer

killContainer

我们来看看kubelet是如何干掉一个容器的,具体逻辑如下:

  • 1、如果pod非空,那么根据容器名获取容器的Spec,如果pod为空,那么通过CRI接口查询底层运行时逆向推出pod
  • 2、获取停止该容器的等待时间
  • 3、发送KillingContainer事件
  • 4、停止该容器前调用Internal Lifecycle HookPreStopContainer方法,不过目前该方法是空的
  • 5、如果当前停止的容器,用户指定了LifeCycle,并且LifeCycle.PreStop也指定了,那么执行该方法
  • 6、重新计算gracePeriod,把刚才Lifecycle.PreStop方法使用的时间去掉
  • 7、如果指定了gracePeriodOverride参数,那么使用gracePeriodOverride参数覆盖gracePeriod参数,也就是说GenericRuntimeManager通过CRI接口调用底层的运行时停止一个容器的最长时间不能超过gracePeriodOverride所指定的时间
func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64) error {var containerSpec *v1.Containerif pod != nil {if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {return fmt.Errorf("failed to get containerSpec %q (id=%q) in pod %q when killing container for reason %q",containerName, containerID.String(), format.Pod(pod), message)}} else {// Restore necessary information if one of the specs is nil.restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID)if err != nil {return err}pod, containerSpec = restoredPod, restoredContainer}// From this point, pod and container must be non-nil.gracePeriod := setTerminationGracePeriod(pod, containerSpec, containerName, containerID, reason)if len(message) == 0 {message = fmt.Sprintf("Stopping container %s", containerSpec.Name)}m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeNormal, events.KillingContainer, message)// Run internal pre-stop lifecycle hookif err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {return err}// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run itif containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)}// always give containers a minimal shutdown window to avoid unnecessary SIGKILLsif gracePeriod < minimumGracePeriodInSeconds {gracePeriod = minimumGracePeriodInSeconds}if gracePeriodOverride != nil {gracePeriod = *gracePeriodOverrideklog.V(3).InfoS("Killing container with a grace period override", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)}klog.V(2).InfoS("Killing container with a grace period", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)if err != nil && !crierror.IsNotFound(err) {klog.ErrorS(err, "Container termination failed with gracePeriod", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)return err}klog.V(3).InfoS("Container exited normally", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String())return nil
}

3.9.3.1. executePreStopHook

executePreStopHook

可以看到,用户指定的Lifecycle.PreStop的处理流程不能太耗时,如果在gracePeriod的时间内,Lifecycle.PreStop还没有执行结束,就会强制返回

func (m *kubeGenericRuntimeManager) executePreStopHook(pod *v1.Pod, containerID kubecontainer.ContainerID, containerSpec *v1.Container, gracePeriod int64) int64 {klog.V(3).InfoS("Running preStop hook", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", containerSpec.Name, "containerID", containerID.String())start := metav1.Now()done := make(chan struct{})go func() {defer close(done)defer utilruntime.HandleCrash()if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {klog.ErrorS(err, "PreStop hook failed", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerSpec.Name, "containerID", containerID.String())m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, msg)}}()select {case <-time.After(time.Duration(gracePeriod) * time.Second):klog.V(2).InfoS("PreStop hook not completed in grace period", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerSpec.Name, "containerID", containerID.String(), "gracePeriod", gracePeriod)case <-done:klog.V(3).InfoS("PreStop hook completed", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerSpec.Name, "containerID", containerID.String())}return int64(metav1.Now().Sub(start.Time).Seconds())
}

3.9.4. pruneInitContainersBeforeStart

pruneInitContainersBeforeStart
func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {initContainerNames := sets.NewString()for _, container := range pod.Spec.InitContainers {initContainerNames.Insert(container.Name)}for name := range initContainerNames {count := 0for _, status := range podStatus.ContainerStatuses {if status.Name != name ||(status.State != kubecontainer.ContainerStateExited &&status.State != kubecontainer.ContainerStateUnknown) {continue}count++if count == 1 {continue}klog.V(4).InfoS("Removing init container", "containerName", status.Name, "containerID", status.ID.ID, "count", count)if err := m.removeContainer(status.ID.ID); err != nil {utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))continue}}}
}

3.9.5. createPodSandbox

createPodSandbox

来看看GenericRuntimeManager是如何创建PodSandbox的,具体逻辑如下:

  • 1、根据Pod的资源清单,生成PodSandbox的配置
  • 2、创建Pod的日志目录
  • 3、根据用户指定的Pod.Spec.RuntimeClassName找到RuntimeHandler
  • 4、调用容器运行时RunPodSandbox接口运行PodSandbox
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)if err != nil {message := fmt.Sprintf("Failed to generate sandbox config for pod %q: %v", format.Pod(pod), err)klog.ErrorS(err, "Failed to generate sandbox config for pod", "pod", klog.KObj(pod))return "", message, err}// Create pod logs directoryerr = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)if err != nil {message := fmt.Sprintf("Failed to create log directory for pod %q: %v", format.Pod(pod), err)klog.ErrorS(err, "Failed to create log directory for pod", "pod", klog.KObj(pod))return "", message, err}runtimeHandler := ""if m.runtimeClassManager != nil {runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)if err != nil {message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)return "", message, err}if runtimeHandler != "" {klog.V(2).InfoS("Running pod with runtime handler", "pod", klog.KObj(pod), "runtimeHandler", runtimeHandler)}}podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)if err != nil {message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))return "", message, err}return podSandBoxID, "", nil
}

3.9.6. SyncPod.func

SyncPod.func

一起来看看SyncPod中这个匿名函数是怎么启动一个容器的,具体逻辑如下:

  • 1、判断容器是否还在备份,如果还在备份就直接返回
  • 2、调用容器运行时的StartContainr启动容器,实际上是分两步执行的,第一步是调用容器运行时接口CreateContainer创建容器,然后调用容器运行时接口StartContainer启动容器
    • 在创建和启动的过程中涉及到了CpuManager, MemoryManager, TopologyManager, Pod.Lifecycle.PostStart
	start := func(typeName, metricLabel string, spec *startSpec) error {startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))return err}metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()}klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()}startContainerResult.Fail(err, msg)switch {case err == images.ErrImagePullBackOff:klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)default:utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))}return err}return nil}

3.9.6.1. doBackOff

doBackOff

从名字上来看,doBackOff是用于备份容器的,那么具体是如何备份的呢?

  • 1、根据ContainerNamePodStatus中找到那个需要备份的Container
  • 2、如果状态为空,说明该容器还未启动,那么不需要备份,直接退出
  • 3、如果在BackOff缓存中,并且还未过期,说明正在备份
func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, string, error) {var cStatus *kubecontainer.Statusfor _, c := range podStatus.ContainerStatuses {if c.Name == container.Name && c.State == kubecontainer.ContainerStateExited {cStatus = cbreak}}if cStatus == nil {return false, "", nil}klog.V(3).InfoS("Checking backoff for container in pod", "containerName", container.Name, "pod", klog.KObj(pod))// Use the finished time of the latest exited container as the start point to calculate whether to do back-off.ts := cStatus.FinishedAt// backOff requires a unique key to identify the container.key := getStableKey(pod, container)if backOff.IsInBackOffSince(key, ts) {if containerRef, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {m.recorder.Eventf(containerRef, v1.EventTypeWarning, events.BackOffStartContainer, "Back-off restarting failed container")}err := fmt.Errorf("back-off %s restarting failed container=%s pod=%s", backOff.Get(key), container.Name, format.Pod(pod))klog.V(3).InfoS("Back-off restarting failed container", "err", err.Error())return true, err.Error(), kubecontainer.ErrCrashLoopBackOff}backOff.Next(key, ts)return false, "", nil
}

3.9.6.2. startContainer

startContainer

来看GenericRuntimeManager启动一个容器都做了哪些事情:

  • 1、如果要启动的容器镜像不存在,那么通过ImageManager拉取镜像
  • 2、在PodStatus中查询容器的状态,如果状态为空,说明是第一次创建容器,那么生成容器的日志目录
  • 3、从PodStatus中获取容器的ID
  • 4、生成容器的配置
  • 5、调用internal.LifecyclePreCreateContainer,主要是调用CpuManager设置cpuset,以及调用MemoryManager设置memorySet
  • 6、调用容器运行时接口CreateContainer创建容器
  • 7、调用Internal.LifecyclePreStartContainer方法,主要是把创建好的容器交给CpuManager, MemoryManager, TopologyManager
  • 8、调用容器运行接口StartContainer启动容器
  • 9、创建容器日志目录
  • 10、如果用户指定了Lifecycle.PostStart,执行PostStart方法
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {container := spec.containerimageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return msg, err}restartCount := 0containerStatus := podStatus.FindContainerStatusByName(container.Name)if containerStatus != nil {restartCount = containerStatus.RestartCount + 1} else {logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)restartCount, err = calcRestartCountByLogDir(logDir)if err != nil {klog.InfoS("Log directory exists but could not calculate restartCount", "logDir", logDir, "err", err)}}target, err := spec.getTargetID(podStatus)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return s.Message(), ErrCreateContainerConfig}containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)if cleanupAction != nil {defer cleanupAction()}if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return s.Message(), ErrCreateContainerConfig}err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())return s.Message(), ErrPreCreateHook}containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return s.Message(), ErrCreateContainer}err = m.internalLifecycle.PreStartContainer(pod, container, containerID)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())return s.Message(), ErrPreStartHook}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))err = m.runtimeService.StartContainer(containerID)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())return s.Message(), kubecontainer.ErrRunContainer}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))containerMeta := containerConfig.GetMetadata()sandboxMeta := podSandboxConfig.GetMetadata()legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,"containerID", containerID, "containerLogPath", containerLog)}}// Step 4: execute the post start hook.if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {kubeContainerID := kubecontainer.ContainerID{Type: m.runtimeName,ID:   containerID,}msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)if handlerErr != nil {klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())}return msg, ErrPostStartHook}}return "", nil
}
3.9.6.2.1. PreCreateContainer
PreCreateContainer、

设置CPU, Memory的限制

func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {if i.cpuManager != nil {allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)if !allocatedCPUs.IsEmpty() {containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String()}}if i.memoryManager != nil {numaNodes := i.memoryManager.GetMemoryNUMANodes(pod, container)if numaNodes.Len() > 0 {var affinity []stringfor _, numaNode := range numaNodes.List() {affinity = append(affinity, strconv.Itoa(numaNode))}containerConfig.Linux.Resources.CpusetMems = strings.Join(affinity, ",")}}return nil
}

3.9.6.3. PreStartContainer

func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {if i.cpuManager != nil {i.cpuManager.AddContainer(pod, container, containerID)}if i.memoryManager != nil {i.memoryManager.AddContainer(pod, container, containerID)}if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {i.topologyManager.AddContainer(pod, container, containerID)}return nil
}

3.9.7. generatePodSandboxConfig

generatePodSandboxConfig

正如其名,generatePodSandboxConfig方法用于生成PodSandbox的配置,主要是获取DNS配置,获取主机名,获取日志目录,获取端口映射,计算资源配置

func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *v1.Pod, attempt uint32) (*runtimeapi.PodSandboxConfig, error) {podUID := string(pod.UID)podSandboxConfig := &runtimeapi.PodSandboxConfig{Metadata: &runtimeapi.PodSandboxMetadata{Name:      pod.Name,Namespace: pod.Namespace,Uid:       podUID,Attempt:   attempt,},Labels:      newPodLabels(pod),Annotations: newPodAnnotations(pod),}dnsConfig, err := m.runtimeHelper.GetPodDNS(pod)if err != nil {return nil, err}podSandboxConfig.DnsConfig = dnsConfigif !kubecontainer.IsHostNetworkPod(pod) {podHostname, podDomain, err := m.runtimeHelper.GeneratePodHostNameAndDomain(pod)if err != nil {return nil, err}podHostname, err = util.GetNodenameForKernel(podHostname, podDomain, pod.Spec.SetHostnameAsFQDN)if err != nil {return nil, err}podSandboxConfig.Hostname = podHostname}logDir := BuildPodLogsDirectory(pod.Namespace, pod.Name, pod.UID)podSandboxConfig.LogDirectory = logDirportMappings := []*runtimeapi.PortMapping{}for _, c := range pod.Spec.Containers {containerPortMappings := kubecontainer.MakePortMappings(&c)for idx := range containerPortMappings {port := containerPortMappings[idx]hostPort := int32(port.HostPort)containerPort := int32(port.ContainerPort)protocol := toRuntimeProtocol(port.Protocol)portMappings = append(portMappings, &runtimeapi.PortMapping{HostIp:        port.HostIP,HostPort:      hostPort,ContainerPort: containerPort,Protocol:      protocol,})}}if len(portMappings) > 0 {podSandboxConfig.PortMappings = portMappings}lc, err := m.generatePodSandboxLinuxConfig(pod)if err != nil {return nil, err}podSandboxConfig.Linux = lcif runtime.GOOS == "windows" {wc, err := m.generatePodSandboxWindowsConfig(pod)if err != nil {return nil, err}podSandboxConfig.Windows = wc}// Update config to include overhead, sandbox level resourcesif err := m.applySandboxResources(pod, podSandboxConfig); err != nil {return nil, err}return podSandboxConfig, nil
}

3.10. KillPod

KillPod

杀死容器的逻辑倒是不难,killPodWithSyncResult方法我们也在分析SyncPod方法 的过程中已经看过一遍了。实际上就是先删除Pod中的一个一个的Container,然后删除PodSandbox

func (m *kubeGenericRuntimeManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {err := m.killPodWithSyncResult(pod, runningPod, gracePeriodOverride)return err.Error()
}

3.11. GetPodStatus

GetPodStatus

我们来看看GetPodStatus是如何获取Pod的状态的,逻辑如下:

  • 1、通过调用底层容器运行时接口ListPodSandbox,查询所有io.kubernetes.pod.uid为当前Pod.UID的所有Sandbox
  • 2、调用PodSandboxStatus接口查询每个PodSandbox的状态
  • 3、调用容器运行时的ListContainer接口查询所有io.kubernetes.pod.uidPod.UIDContainer
  • 4、调用容器运行时的ContainerStatus接口查询每个Container的状态
func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {podSandboxIDs, err := m.getSandboxIDByPodUID(uid, nil)if err != nil {return nil, err}pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name:      name,Namespace: namespace,UID:       uid,},}podFullName := format.Pod(pod)klog.V(4).InfoS("getSandboxIDByPodUID got sandbox IDs for pod", "podSandboxID", podSandboxIDs, "pod", klog.KObj(pod))sandboxStatuses := []*runtimeapi.PodSandboxStatus{}podIPs := []string{}for idx, podSandboxID := range podSandboxIDs {resp, err := m.runtimeService.PodSandboxStatus(podSandboxID, false)if crierror.IsNotFound(err) {continue}if err != nil {klog.ErrorS(err, "PodSandboxStatus of sandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))return nil, err}if resp.GetStatus() == nil {return nil, errors.New("pod sandbox status is nil")}sandboxStatuses = append(sandboxStatuses, resp.Status)// Only get pod IP from latest sandboxif idx == 0 && resp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY {podIPs = m.determinePodSandboxIPs(namespace, name, resp.Status)}}containerStatuses, err := m.getPodContainerStatuses(uid, name, namespace)if err != nil {if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))}return nil, err}m.logReduction.ClearID(podFullName)return &kubecontainer.PodStatus{ID:                uid,Name:              name,Namespace:         namespace,IPs:               podIPs,SandboxStatuses:   sandboxStatuses,ContainerStatuses: containerStatuses,}, nil
}func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, state *runtimeapi.PodSandboxState) ([]string, error) {filter := &runtimeapi.PodSandboxFilter{LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(podUID)},}if state != nil {filter.State = &runtimeapi.PodSandboxStateValue{State: *state,}}sandboxes, err := m.runtimeService.ListPodSandbox(filter)if err != nil {klog.ErrorS(err, "Failed to list sandboxes for pod", "podUID", podUID)return nil, err}if len(sandboxes) == 0 {return nil, nil}// Sort with newest first.sandboxIDs := make([]string, len(sandboxes))sort.Sort(podSandboxByCreated(sandboxes))for i, s := range sandboxes {sandboxIDs[i] = s.Id}return sandboxIDs, nil
}

3.11.1. getPodContainerStatuses

getPodContainerStatuses

具体逻辑如下:

  • 1、调用容器运行时的ListContainer接口查询所有io.kubernetes.pod.uidPod.UIDContainer
  • 2、调用容器运行时的ContainerStatus接口查询每个Container的状态
func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, name, namespace string) ([]*kubecontainer.Status, error) {// Select all containers of the given pod.containers, err := m.runtimeService.ListContainers(&runtimeapi.ContainerFilter{LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(uid)},})if err != nil {klog.ErrorS(err, "ListContainers error")return nil, err}statuses := []*kubecontainer.Status{}// TODO: optimization: set maximum number of containers per container name to examine.for _, c := range containers {resp, err := m.runtimeService.ContainerStatus(c.Id, false)if crierror.IsNotFound(err) {continue}if err != nil {// Merely log this here; GetPodStatus will actually report the error out.klog.V(4).InfoS("ContainerStatus return error", "containerID", c.Id, "err", err)return nil, err}status := resp.GetStatus()if status == nil {return nil, remote.ErrContainerStatusNil}cStatus := toKubeContainerStatus(status, m.runtimeName)if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {// Populate the termination message if needed.annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)if checkLogs {tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())}// Enrich the termination message written by the application is not emptyif len(tMessage) != 0 {if len(cStatus.Message) != 0 {cStatus.Message += ": "}cStatus.Message += tMessage}}statuses = append(statuses, cStatus)}sort.Sort(containerStatusByCreated(statuses))return statuses, nil
}

3.12. GarbageCollect

GarbageCollect

这里是通过Container Garbge Collect Manager来实现的,等分析GCManager的时候我们再来分析这里具体的逻辑

func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {return m.containerGC.GarbageCollect(gcPolicy, allSourcesReady, evictNonDeletedPods)
}

3.13. UpdatePodCIDR

UpdatePodCIDR
func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error {klog.InfoS("Updating runtime config through cri with podcidr", "CIDR", podCIDR)return m.runtimeService.UpdateRuntimeConfig(&runtimeapi.RuntimeConfig{NetworkConfig: &runtimeapi.NetworkConfig{PodCidr: podCIDR,},})
}

3.14. CheckpointContainer

CheckpointContainer
func (m *kubeGenericRuntimeManager) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {return m.runtimeService.CheckpointContainer(options)
}

3.15. GetContainerLogs

GetContainerLogs

GetContainerLogs方法用于获取容器的日志,代码倒是不难理解。直接调用容器运行时的ContainerStatus接口查询容器状态,然后根据查询到的状态获取日志的路径,然后直接读取该路径下的日志文件即可

func (m *kubeGenericRuntimeManager) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {resp, err := m.runtimeService.ContainerStatus(containerID.ID, false)if err != nil {klog.V(4).InfoS("Failed to get container status", "containerID", containerID.String(), "err", err)return fmt.Errorf("unable to retrieve container logs for %v", containerID.String())}status := resp.GetStatus()if status == nil {return remote.ErrContainerStatusNil}return m.ReadLogs(ctx, status.GetLogPath(), containerID.ID, logOptions, stdout, stderr)
}func (m *kubeGenericRuntimeManager) ReadLogs(ctx context.Context, path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {// Convert v1.PodLogOptions into internal log options.opts := logs.NewLogOptions(apiOpts, time.Now())return logs.ReadLogs(ctx, path, containerID, opts, m.runtimeService, stdout, stderr)
}

3.16. DeleteContainer

DeleteContainer

DeleteContainer逻辑非常简单,直接委托给removeContainer方法来完成,这个方法我们在分析SyncPod的时候分析过,先调用Lifecycle.StopContainer,然后移除容器日志,最后调用容器运行时的RemoveContainer移除容器

// DeleteContainer removes a container.
func (m *kubeGenericRuntimeManager) DeleteContainer(containerID kubecontainer.ContainerID) error {return m.removeContainer(containerID.ID)
}func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {klog.V(4).InfoS("Removing container", "containerID", containerID)// Call internal container post-stop lifecycle hook.if err := m.internalLifecycle.PostStopContainer(containerID); err != nil {return err}// Remove the container log.// TODO: Separate log and container lifecycle management.if err := m.removeContainerLog(containerID); err != nil {return err}// Remove the container.return m.runtimeService.RemoveContainer(containerID)
}

3.17. GetExec

GetExec
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {req := &runtimeapi.ExecRequest{ContainerId: id.ID,Cmd:         cmd,Tty:         tty,Stdin:       stdin,Stdout:      stdout,Stderr:      stderr,}resp, err := m.runtimeService.Exec(req)if err != nil {return nil, err}return url.Parse(resp.Url)
}

3.18. GetAttach

GetAttach
func (m *kubeGenericRuntimeManager) GetAttach(id kubecontainer.ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {req := &runtimeapi.AttachRequest{ContainerId: id.ID,Stdin:       stdin,Stdout:      stdout,Stderr:      stderr,Tty:         tty,}resp, err := m.runtimeService.Attach(req)if err != nil {return nil, err}return url.Parse(resp.Url)
}

3.19. GetPortForward

GetPortForward
func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID, ports []int32) (*url.URL, error) {sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil)if err != nil {return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)}if len(sandboxIDs) == 0 {return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))}req := &runtimeapi.PortForwardRequest{PodSandboxId: sandboxIDs[0],Port:         ports,}resp, err := m.runtimeService.PortForward(req)if err != nil {return nil, err}return url.Parse(resp.Url)
}

3.20. AttachContainer

AttachContainer
func (m *MockAttacher) AttachContainer(id container.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {m.ctrl.T.Helper()ret := m.ctrl.Call(m, "AttachContainer", id, stdin, stdout, stderr, tty, resize)ret0, _ := ret[0].(error)return ret0
}

3.21. RunInContainer

RunInContainer
func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, timeout)return append(stdout, stderr...), err
}

3.22. PullImage

PullImage
func (m *kubeGenericRuntimeManager) PullImage(image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {img := image.ImagerepoToPull, _, _, err := parsers.ParseImageName(img)if err != nil {return "", err}keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, m.keyring)if err != nil {return "", err}imgSpec := toRuntimeAPIImageSpec(image)creds, withCredentials := keyring.Lookup(repoToPull)if !withCredentials {klog.V(3).InfoS("Pulling image without credentials", "image", img)imageRef, err := m.imageService.PullImage(imgSpec, nil, podSandboxConfig)if err != nil {klog.ErrorS(err, "Failed to pull image", "image", img)return "", err}return imageRef, nil}var pullErrs []errorfor _, currentCreds := range creds {auth := &runtimeapi.AuthConfig{Username:      currentCreds.Username,Password:      currentCreds.Password,Auth:          currentCreds.Auth,ServerAddress: currentCreds.ServerAddress,IdentityToken: currentCreds.IdentityToken,RegistryToken: currentCreds.RegistryToken,}imageRef, err := m.imageService.PullImage(imgSpec, auth, podSandboxConfig)// If there was no error, return successif err == nil {return imageRef, nil}pullErrs = append(pullErrs, err)}return "", utilerrors.NewAggregate(pullErrs)
}

3.23. GetImageRef

GetImageRef
func (m *kubeGenericRuntimeManager) GetImageRef(image kubecontainer.ImageSpec) (string, error) {resp, err := m.imageService.ImageStatus(toRuntimeAPIImageSpec(image), false)if err != nil {klog.ErrorS(err, "Failed to get image status", "image", image.Image)return "", err}if resp.Image == nil {return "", nil}return resp.Image.Id, nil
}

3.24. ListImages

ListImages
func (m *kubeGenericRuntimeManager) ListImages() ([]kubecontainer.Image, error) {var images []kubecontainer.ImageallImages, err := m.imageService.ListImages(nil)if err != nil {klog.ErrorS(err, "Failed to list images")return nil, err}for _, img := range allImages {images = append(images, kubecontainer.Image{ID:          img.Id,Size:        int64(img.Size_),RepoTags:    img.RepoTags,RepoDigests: img.RepoDigests,Spec:        toKubeContainerImageSpec(img),})}return images, nil
}

3.25. RemoveImage

func (m *kubeGenericRuntimeManager) RemoveImage(image kubecontainer.ImageSpec) error {err := m.imageService.RemoveImage(&runtimeapi.ImageSpec{Image: image.Image})if err != nil {klog.ErrorS(err, "Failed to remove image", "image", image.Image)return err}return nil
}

3.26. ImageStats

ImageStats
func (m *kubeGenericRuntimeManager) ImageStats() (*kubecontainer.ImageStats, error) {allImages, err := m.imageService.ListImages(nil)if err != nil {klog.ErrorS(err, "Failed to list images")return nil, err}stats := &kubecontainer.ImageStats{}for _, img := range allImages {stats.TotalStorageBytes += img.Size_}return stats, nil
}

相关内容

热门资讯

喵侦探 青春有你密室答案是谁? 喵侦探 青春有你密室答案是谁?不知不觉,《青春有你》这档节目也马上要迎来尾声了。在各位练习生选手们紧...
篮海青天指的什么意思? 篮海青天指的什么意思?篮海青天你说的这个应该是碧海青天吧,这个所指的意思就是指天空瓦蓝瓦蓝的,而大海...
从小就没有妈妈的女孩子的心理是... 从小就没有妈妈的女孩子的心理是如何样的应当会有恋父情结或者很欲望有一个照顾本身的女性长辈对男性依赖性...
爱过了,伤过了,该怎么做? 爱过了,伤过了,该怎么做?爱过伤过结局不完美也无所谓 坦然接受在伤之前有过爱幸福快乐的爱过已足够收起...
历史上的刘禅真的很没用吗 历史上的刘禅真的很没用吗错...历史上刘禅是一个很聪明的人,你可以找下正史,一开始刘禅是很聪明的一个...
如何彻底关闭小爱音箱啊? 如何彻底关闭小爱音箱啊?小爱音箱可以通过拔掉电源适配器实现关机。如果担心被语音误唤醒打扰,建议您按一...
我想当阴阳师,请问如何才能当? 我想当阴阳师,请问如何才能当?我不是开玩笑的,我想当阴阳师。如果有人认识阴阳师的话,请帮忙介绍。拜托...
英国经典文学作品列表 英国经典文学作品列表比较全面的哈,谢谢圣诞故事集(狄更斯的)还有沙士比亚的……呼啸山庄。简爱。其他可...
怎么开发客户 怎么开发客户1、重视客户的兴趣,与意愿与客户进行充分有效的沟通,实现厂商价值观念的一致性和认同性,树...
金鸡湖在苏州哪个区 金鸡湖在苏州哪个区工业园区,简称园区
美剧回生剧情 美剧回生剧情 《回生》是一部关于女孩死后,灵魂寻找被害真相的悬疑剧。这部改编自神秘系列剧《BeauS...
我和天蝎男交往!每天提醒我早点... 我和天蝎男交往!每天提醒我早点睡!叫我不要睡太晚!他是关心我!还是不想和我聊天!自从我们相互约定!我...
魔力泥巴 魔力泥巴年入百万,一个丫头的泥巴传奇
水浒传里谁最讲义气?要有事例的... 水浒传里谁最讲义气?要有事例的!一打祝家庄——林冲为第二拨人马领军统帅。 2、二打祝家庄——林冲神速...
有没有宝妈胎动特别明显的呀 有没有宝妈胎动特别明显的呀呵呵,之前很少有早上胎动,今天从早上到现在一直都有明显的胎动哦,有没有宝妈...
类似福尔摩斯夫人日常的文 类似福尔摩斯夫人日常的文类似福尔摩斯夫人日常的文福尔摩斯有夫人?这样的英美同人是没了不过有差不多情节...
网游金庸类小说搞笑的 网游金庸类小说搞笑的网游之武林霸图
铭字五行属什么 铭字五行属什么金铭是一种刻在器物上用来警戒自己、称述功德的文字。后来成为一种文体,这种文体一般都是用...
请教国内讲物流管理最好的职业讲... 请教国内讲物流管理最好的职业讲师呀??适合给企业做内部培训的物流管理和物流采购供应链中青咨询都有讲师...
妈妈一边做菜一边干什么?还有我... 妈妈一边做菜一边干什么?还有我一边写作业一边干嘛。人家一边玩,一边被老师骂。除了一边一边还有什么。1...