Kubernetes源码版本:remotes/origin/release-1.25
Kubernetes编译出来的Kubelet版本:Kubernetes v1.24.0-beta.0.2463+ee7799bab469d7
Kubernetes集群实验环境:使用Kubernetes v1.25.4二进制的方式搭建了一个单节点集群
K8S 单节点单节点搭建可以参考:Kubernetes v1.25 搭建单节点集群用于Debug K8S源码
Golang版本:go1.19.3 linux/amd64
IDEA版本:2022.2.3
Delve版本:1.9.1
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# dlv version
Delve Debugger
Version: 1.9.1
Build: $Id: d81b9fd12bfa603f3cf7a4bc842398bd61c42940 $
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# go version
go version go1.19.3 linux/amd64
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl version
WARNING: This version information is deprecated and will be replaced with the output from kubectl version --short. Use --output=yaml|json to get the full version.
Client Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:36:36Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
Kustomize Version: v4.5.7
Server Version: version.Info{Major:"1", Minor:"25", GitVersion:"v1.25.4", GitCommit:"872a965c6c6526caa949f0c6ac028ef7aff3fb78", GitTreeState:"clean", BuildDate:"2022-11-09T13:29:58Z", GoVersion:"go1.19.3", Compiler:"gc", Platform:"linux/amd64"}
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get nodes -owide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
k8s-master1 Ready 31h v1.25.4 192.168.11.71 CentOS Linux 7 (Core) 3.10.0-1160.80.1.el7.x86_64 containerd://1.6.10
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]#
[root@k8s-master1 kubernetes]# kubectl get componentstatus
Warning: v1 ComponentStatus is deprecated in v1.19+
NAME STATUS MESSAGE ERROR
etcd-0 Healthy {"health":"true","reason":""}
controller-manager Healthy ok
scheduler Healthy ok
[root@k8s-master1 kubernetes]#
Kubelet启动参数配置如下:
[root@k8s-master1 kubernetes]# ps -ef|grep "/usr/local/bin/kubelet"
root 7972 1 6 07:06 ? 00:00:06 /usr/local/bin/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.kubeconfig --kubeconfig=/etc/kubernetes/kubelet.kubeconfig --config=/etc/kubernetes/kubelet-conf.yml --container-runtime-endpoint=unix:///run/containerd/containerd.sock --node-labels=node.kubernetes.io/node= --v=8
root 9549 6424 0 07:07 pts/0 00:00:00 grep --color=auto /usr/local/bin/kubelet
[root@k8s-master1 kubernetes]#
Kubelet参数配置如下:
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
address: 0.0.0.0
port: 10250
readOnlyPort: 10255
authentication:anonymous:enabled: falsewebhook:cacheTTL: 2m0senabled: truex509:clientCAFile: /etc/kubernetes/pki/ca.pem
authorization:mode: Webhookwebhook:cacheAuthorizedTTL: 5m0scacheUnauthorizedTTL: 30s
cgroupDriver: systemd
cgroupsPerQOS: true
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
containerLogMaxFiles: 5
containerLogMaxSize: 10Mi
contentType: application/vnd.kubernetes.protobuf
cpuCFSQuota: true
cpuManagerPolicy: none
cpuManagerReconcilePeriod: 10s
enableControllerAttachDetach: true
enableDebuggingHandlers: true
enforceNodeAllocatable:
- pods
eventBurst: 10
eventRecordQPS: 5
evictionHard:imagefs.available: 15%memory.available: 100Minodefs.available: 10%nodefs.inodesFree: 5%
evictionPressureTransitionPeriod: 5m0s
failSwapOn: true
fileCheckFrequency: 20s
hairpinMode: promiscuous-bridge
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 20s
imageGCHighThresholdPercent: 85
imageGCLowThresholdPercent: 80
imageMinimumGCAge: 2m0s
iptablesDropBit: 15
iptablesMasqueradeBit: 14
kubeAPIBurst: 10
kubeAPIQPS: 5
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1
registryBurst: 10
registryPullQPS: 5
resolvConf: /etc/resolv.conf
rotateCertificates: true
runtimeRequestTimeout: 2m0s
serializeImagePulls: true
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 4h0m0s
syncFrequency: 1m0s
volumeStatsAggPeriod: 1m0s
RuntimeManager主要用于和底层的运行时打交道,处理容器的增删改查
| KubeGenericRuntime |
我们先来看看KubeGenericRuntime是如何定义的,如下:
// pkg/kubelet/kuberuntime/kuberuntime_manager.go
type KubeGenericRuntime interface {kubecontainer.Runtimekubecontainer.StreamingRuntimekubecontainer.CommandRunner
}
KubeGenericRuntime接口分别由Runtime, StreamingRuntim, CommandRunner这三个接口组合而成,下面我们分别来看看这三个接口是如何定义的。
Runtime接口显然是这三个接口中最为重要的一个接口,容器运行时必须实现这个接口,并且每个方法的实现都必须是线程安全的。
type Runtime interface {// 返回当前容器运行时到底是哪一种运行时Type() string// 返回容器运行时的版本信息Version() (Version, error)// 返回容器运行时的`API`版本APIVersion() (Version, error)// 返回容器运行时的状态Status() (*RuntimeStatus, error)// 获取所有的`Pod`, 布尔参数用于指定是否要获取所有的Pod,包括已经退出的容器或者死亡的容器GetPods(all bool) ([]*Pod, error)// 使用指定的容器`GC`策略删除已经死亡的容器GarbageCollect(gcPolicy GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error// 同步Pod到指定的状态SyncPod(pod *v1.Pod, podStatus *PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) PodSyncResult// 杀死一个PodKillPod(pod *v1.Pod, runningPod Pod, gracePeriodOverride *int64) error// 获取容器的状态GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)// 获取容器的日志GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error)// 删除容器DeleteContainer(containerID ContainerID) error// 镜像相关的接口ImageServiceUpdatePodCIDR(podCIDR string) errorCheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error
}type ImageService interface {PullImage(image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)GetImageRef(image ImageSpec) (string, error)ListImages() ([]Image, error)RemoveImage(image ImageSpec) errorImageStats() (*ImageStats, error)
}
通过方法名,我们大致能猜测这三个命令应该和kubectl exec/attach/portforward命令相关
type StreamingRuntime interface {GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
}
CommandRunner接口用于在容器中执行命令
type CommandRunner interface {RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error)
}
kubeGenericRuntimeManager为KubeGenericRuntime的实现,我们来看看kubeGenericRuntimeManager为了实现kubeGenericRuntime接口到底依赖了哪些组件。
type kubeGenericRuntimeManager struct {// 用于指定当前的运行时的名字runtimeName string// 事件记录器,运用记录容器整个生命周期过程中产生的事件recorder record.EventRecorder// 创建/删除目录、连接、文件、管道的接口抽象,不同操作系统的实现不同osInterface kubecontainer.OSInterface// 通过cAdvisor获取机器的信息machineInfo *cadvisorapi.MachineInfo// 用于管理容器GCcontainerGC *containerGC// 用于拉取镜像keyring credentialprovider.DockerKeyringrunner kubecontainer.HandlerRunner// 用于生成运行时的选项runtimeHelper kubecontainer.RuntimeHelperlivenessManager proberesults.ManagerreadinessManager proberesults.ManagerstartupManager proberesults.ManagercpuCFSQuota boolcpuCFSQuotaPeriod metav1.DurationimagePuller images.ImageManagerruntimeService internalapi.RuntimeServiceimageService internalapi.ImageManagerServiceversionCache *cache.ObjectCacheseccompProfileRoot stringinternalLifecycle cm.InternalContainerLifecyclelogManager logs.ContainerLogManagerruntimeClassManager *runtimeclass.ManagerlogReduction *logreduction.LogReductionpodStateProvider podStateProviderseccompDefault boolmemorySwapBehavior stringgetNodeAllocatable func() v1.ResourceListmemoryThrottlingFactor float64
}
我们一起来看看RuntimeManager是如何实例化出来的
可以看到,参数相当之多。逻辑并不是很复杂,主要有以下逻辑:
0.1.0就直接退出 Kubernetes v1.25.x版本的容器运行时的Api Version只支持0.1.0版本kubelet的宿主机没有创建/var/log/pods目录,就创建该目录ImageManagerCotainerGcManagerfunc NewKubeGenericRuntimeManager(recorder record.EventRecorder,livenessManager proberesults.Manager,readinessManager proberesults.Manager,startupManager proberesults.Manager,rootDirectory string,machineInfo *cadvisorapi.MachineInfo,podStateProvider podStateProvider,osInterface kubecontainer.OSInterface,runtimeHelper kubecontainer.RuntimeHelper,httpClient types.HTTPGetter,imageBackOff *flowcontrol.Backoff,serializeImagePulls bool,imagePullQPS float32,imagePullBurst int,imageCredentialProviderConfigFile string,imageCredentialProviderBinDir string,cpuCFSQuota bool,cpuCFSQuotaPeriod metav1.Duration,runtimeService internalapi.RuntimeService,imageService internalapi.ImageManagerService,internalLifecycle cm.InternalContainerLifecycle,logManager logs.ContainerLogManager,runtimeClassManager *runtimeclass.Manager,seccompDefault bool,memorySwapBehavior string,getNodeAllocatable func() v1.ResourceList,memoryThrottlingFactor float64,
) (KubeGenericRuntime, error) {runtimeService = newInstrumentedRuntimeService(runtimeService)imageService = newInstrumentedImageManagerService(imageService)kubeRuntimeManager := &kubeGenericRuntimeManager{recorder: recorder,cpuCFSQuota: cpuCFSQuota,cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,seccompProfileRoot: filepath.Join(rootDirectory, "seccomp"),livenessManager: livenessManager,readinessManager: readinessManager,startupManager: startupManager,machineInfo: machineInfo,osInterface: osInterface,runtimeHelper: runtimeHelper,runtimeService: runtimeService,imageService: imageService,internalLifecycle: internalLifecycle,logManager: logManager,runtimeClassManager: runtimeClassManager,logReduction: logreduction.NewLogReduction(identicalErrorDelay),seccompDefault: seccompDefault,memorySwapBehavior: memorySwapBehavior,getNodeAllocatable: getNodeAllocatable,memoryThrottlingFactor: memoryThrottlingFactor,}typedVersion, err := kubeRuntimeManager.getTypedVersion()if err != nil {klog.ErrorS(err, "Get runtime version failed")return nil, err}if typedVersion.Version != kubeRuntimeAPIVersion {klog.ErrorS(err, "This runtime api version is not supported","apiVersion", typedVersion.Version,"supportedAPIVersion", kubeRuntimeAPIVersion)return nil, ErrVersionNotSupported}kubeRuntimeManager.runtimeName = typedVersion.RuntimeNameklog.InfoS("Container runtime initialized","containerRuntime", typedVersion.RuntimeName,"version", typedVersion.RuntimeVersion,"apiVersion", typedVersion.RuntimeApiVersion)if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {klog.ErrorS(err, "Failed to create pod log directory", "path", podLogsRootDirectory)}}if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletCredentialProviders) && (imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "") {klog.InfoS("Flags --image-credential-provider-config or --image-credential-provider-bin-dir were set but the feature gate was disabled, these flags will be ignored","featureGate", features.KubeletCredentialProviders)}if utilfeature.DefaultFeatureGate.Enabled(features.KubeletCredentialProviders) && (imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "") {if err := plugin.RegisterCredentialProviderPlugins(imageCredentialProviderConfigFile, imageCredentialProviderBinDir); err != nil {klog.ErrorS(err, "Failed to register CRI auth plugins")os.Exit(1)}}kubeRuntimeManager.keyring = credentialprovider.NewDockerKeyring()kubeRuntimeManager.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder),kubeRuntimeManager,imageBackOff,serializeImagePulls,imagePullQPS,imagePullBurst)kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)kubeRuntimeManager.podStateProvider = podStateProviderkubeRuntimeManager.versionCache = cache.NewObjectCache(func() (interface{}, error) {return kubeRuntimeManager.getTypedVersion()},versionCacheTTL,)return kubeRuntimeManager, nil
}
| Version |
func (m *kubeGenericRuntimeManager) getTypedVersion() (*runtimeapi.VersionResponse, error) {// 通过gRPC调用底层运行时的Version接口typedVersion, err := m.runtimeService.Version(kubeRuntimeAPIVersion)if err != nil {return nil, fmt.Errorf("get remote runtime typed version failed: %v", err)}return typedVersion, nil
}func (m *kubeGenericRuntimeManager) Version() (kubecontainer.Version, error) {// 通过gRPC调用底层运行时的Version接口typedVersion, err := m.getTypedVersion()if err != nil {return nil, err}return newRuntimeVersion(typedVersion.RuntimeVersion)
}
| APIVersion |
func (m *kubeGenericRuntimeManager) APIVersion() (kubecontainer.Version, error) {versionObject, err := m.versionCache.Get(m.machineInfo.MachineID)if err != nil {return nil, err}typedVersion := versionObject.(*runtimeapi.VersionResponse)return newRuntimeVersion(typedVersion.RuntimeApiVersion)
}
| Status |
func (m *kubeGenericRuntimeManager) Status() (*kubecontainer.RuntimeStatus, error) {// runtimeService实际上就是RemoteRuntimeService,它是CRI的gRPC接口定义的客户端resp, err := m.runtimeService.Status(false)if err != nil {return nil, err}if resp.GetStatus() == nil {return nil, errors.New("runtime status is nil")}return toKubeRuntimeStatus(resp.GetStatus()), nil
}
| GetPods |
来看看GetPods具体有哪些逻辑:
CRI ListPodSandbox接口查询所有已经处于Ready的SandboxSandbox按照Metadata.Uid进行归类,相同Metadata.Uid属于同一个PodCRI ListContainers接口查询所有处于Ready的ContainerContainer按照Metadata.Uid进行归类,相同Metadata.Uid属于同一个Podfunc (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {filter := &runtimeapi.ContainerFilter{}if !allContainers {filter.State = &runtimeapi.ContainerStateValue{State: runtimeapi.ContainerState_CONTAINER_RUNNING,}}containers, err := m.runtimeService.ListContainers(filter)if err != nil {klog.ErrorS(err, "ListContainers failed")return nil, err}return containers, nil
}
func (m *kubeGenericRuntimeManager) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {var filter *runtimeapi.PodSandboxFilterif !all {readyState := runtimeapi.PodSandboxState_SANDBOX_READYfilter = &runtimeapi.PodSandboxFilter{State: &runtimeapi.PodSandboxStateValue{State: readyState,},}}resp, err := m.runtimeService.ListPodSandbox(filter)if err != nil {klog.ErrorS(err, "Failed to list pod sandboxes")return nil, err}return resp, nil
}
func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {pods := make(map[kubetypes.UID]*kubecontainer.Pod)sandboxes, err := m.getKubeletSandboxes(all)if err != nil {return nil, err}for i := range sandboxes {s := sandboxes[i]if s.Metadata == nil {klog.V(4).InfoS("Sandbox does not have metadata", "sandbox", s)continue}podUID := kubetypes.UID(s.Metadata.Uid)if _, ok := pods[podUID]; !ok {pods[podUID] = &kubecontainer.Pod{ID: podUID,Name: s.Metadata.Name,Namespace: s.Metadata.Namespace,}}p := pods[podUID]converted, err := m.sandboxToKubeContainer(s)if err != nil {klog.V(4).InfoS("Convert sandbox of pod failed", "runtimeName", m.runtimeName, "sandbox", s, "podUID", podUID, "err", err)continue}p.Sandboxes = append(p.Sandboxes, converted)}containers, err := m.getKubeletContainers(all)if err != nil {return nil, err}for i := range containers {c := containers[i]if c.Metadata == nil {klog.V(4).InfoS("Container does not have metadata", "container", c)continue}labelledInfo := getContainerInfoFromLabels(c.Labels)pod, found := pods[labelledInfo.PodUID]if !found {pod = &kubecontainer.Pod{ID: labelledInfo.PodUID,Name: labelledInfo.PodName,Namespace: labelledInfo.PodNamespace,}pods[labelledInfo.PodUID] = pod}converted, err := m.toKubeContainer(c)if err != nil {klog.V(4).InfoS("Convert container of pod failed", "runtimeName", m.runtimeName, "container", c, "podUID", labelledInfo.PodUID, "err", err)continue}pod.Containers = append(pod.Containers, converted)}// Convert map to list.var result []*kubecontainer.Podfor _, pod := range pods {result = append(result, pod)}return result, nil
}
| computePodActions |
接下来我们来看看computePodActions函数,该方法是一个辅助函数,之后的放回会调用到这个接口,该方法主要是通过对比Pod的状态,计算出需要对当前Pod进行何种操作。虽然代码只有一点,但是想要理解其判断逻辑还是挺难理解的,暂时先不关注细节,只需要知道这个函数可以计算当前Pod需要进行哪种动作,譬如创建Sandbox、杀掉Pod,哪些容器需要启动,哪些容器需要被Kill
func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod))// 根据pod podstatus判断当前pod的Sandbox是否已经发生该改变createPodSandbox, attempt, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus)changes := podActions{KillPod: createPodSandbox,CreateSandbox: createPodSandbox,SandboxID: sandboxID,Attempt: attempt,ContainersToStart: []int{},ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo),}if createPodSandbox { // 如果需要重建PodSandbox// 如果用户指定当前Pod的重启策略为Never并且尝试次数大于零,并且启动了多个容器,那么直接返回if !shouldRestartOnFailure(pod) && attempt != 0 && len(podStatus.ContainerStatuses) != 0 {changes.CreateSandbox = falsereturn changes}var containersToStart []intfor idx, c := range pod.Spec.Containers {if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) {continue}containersToStart = append(containersToStart, idx)}if len(containersToStart) == 0 {_, _, done := findNextInitContainerToRun(pod, podStatus)if done {changes.CreateSandbox = falsereturn changes}}if len(pod.Spec.InitContainers) != 0 {// Pod has init containers, return the first one.changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]return changes}changes.ContainersToStart = containersToStartreturn changes}// Ephemeral containers may be started even if initialization is not yet complete.for i := range pod.Spec.EphemeralContainers {c := (*v1.Container)(&pod.Spec.EphemeralContainers[i].EphemeralContainerCommon)// Ephemeral Containers are never restartedif podStatus.FindContainerStatusByName(c.Name) == nil {changes.EphemeralContainersToStart = append(changes.EphemeralContainersToStart, i)}}// Check initialization progress.initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)if !done {if next != nil {initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus)if initFailed && !shouldRestartOnFailure(pod) {changes.KillPod = true} else {// Always try to stop containers in unknown state first.if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{name: next.Name,container: next,message: fmt.Sprintf("Init container is in %q state, try killing it before restart",initLastStatus.State),reason: reasonUnknown,}}changes.NextInitContainerToStart = next}}return changes}keepCount := 0for idx, container := range pod.Spec.Containers {containerStatus := podStatus.FindContainerStatusByName(container.Name)if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning {if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil {klog.ErrorS(err, "Internal container post-stop lifecycle hook failed for container in pod with error","containerName", container.Name, "pod", klog.KObj(pod))}}if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {klog.V(3).InfoS("Container of pod is not in the desired state and shall be started", "containerName", container.Name, "pod", klog.KObj(pod))changes.ContainersToStart = append(changes.ContainersToStart, idx)if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown {changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{name: containerStatus.Name,container: &pod.Spec.Containers[idx],message: fmt.Sprintf("Container is in %q state, try killing it before restart",containerStatus.State),reason: reasonUnknown,}}}continue}// The container is running, but kill the container if any of the following condition is met.var message stringvar reason containerKillReasonrestart := shouldRestartOnFailure(pod)if _, _, changed := containerChanged(&container, containerStatus); changed {message = fmt.Sprintf("Container %s definition changed", container.Name)restart = true} else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {// If the container failed the liveness probe, we should kill it.message = fmt.Sprintf("Container %s failed liveness probe", container.Name)reason = reasonLivenessProbe} else if startup, found := m.startupManager.Get(containerStatus.ID); found && startup == proberesults.Failure {// If the container failed the startup probe, we should kill it.message = fmt.Sprintf("Container %s failed startup probe", container.Name)reason = reasonStartupProbe} else {// Keep the container.keepCount++continue}if restart {message = fmt.Sprintf("%s, will be restarted", message)changes.ContainersToStart = append(changes.ContainersToStart, idx)}changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{name: containerStatus.Name,container: &pod.Spec.Containers[idx],message: message,reason: reason,}klog.V(2).InfoS("Message for Container of pod", "containerName", container.Name, "containerStatusID", containerStatus.ID, "pod", klog.KObj(pod), "containerMessage", message)}if keepCount == 0 && len(changes.ContainersToStart) == 0 {changes.KillPod = true}return changes
}
| PodSandboxChanged |
PodSandboxChanged方法的第一个参数是ApiServer中定义的Pod,Apiserver通过接收这种类型的Pod调用CRI的接口创建真正的Container。而这第二个参数则是从Runtime中查询回来的Pod状态了。显然,pod参数为用户期望的状态,而podStatus为底层实际运行的状态,kubelet的目标就是通过对比pod以及podStatus的状态,从而实现底层运行的Pod到达用户期望的状态
既然PodSandbox是为Container运行提供一种运行时的环境,那么为什么一个Pod需要多个PodSandbox???到底如何理解PodSandbox???从后续的代码中可以看出来,实际上一个Pod应该只需要一个PodSandbox,当一个Pod出现了多个PodSandbox,就说明当前Pod不正常,需要Reconcile
PodSandboxChanged方法用于判断是否需要重建,即先Kill然后在Create,除了下面的集中情况PodSandbox需要重建外,其余情况都不需要重建Podsandbox,如下:
PodSandbox还未创建,每个Pod都需要一个PodSandbox来提供运行时环境,如果还没有创建,显然需要先创建PodSandbox PodSandbox可以理解为底层的Infra容器么?Pod存在多个PodSandbox,说明该Pod不正常,也需要重建PodSandbox的状态不处于Ready,那么需要重建该SandboxPod的网络空间不对,也需要重建SandboxPod的网络空间正确,但是PodSandbox没有分配IP,需要重建Sandboxfunc PodSandboxChanged(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, uint32, string) {// 如果没有找到PodSandbox,说明还没有创建,此时需要创建PodSandbox// PodSandbox可以认为是Container运行所需要的运行时环境,一次一个Pod的运行必须需要PodSandboxif len(podStatus.SandboxStatuses) == 0 {klog.V(2).InfoS("No sandbox for pod can be found. Need to start a new one", "pod", klog.KObj(pod))return true, 0, ""}// 统计目前已经处于Ready状态的PodSandboxreadySandboxCount := 0for _, s := range podStatus.SandboxStatuses {if s.State == runtimeapi.PodSandboxState_SANDBOX_READY {readySandboxCount++}}// 获取第一个PodSandbox,是不是可以理解为一般情况下,每个Pod只会有一个PodSandboxsandboxStatus := podStatus.SandboxStatuses[0]if readySandboxCount > 1 {// 从日志中可以看出来,每个Pod应该只会有一个PodSandbox,如果有多个就需要进行Reconcileklog.V(2).InfoS("Multiple sandboxes are ready for Pod. Need to reconcile them", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id}// 说明Sandbox还没有Ready,后续需要Kill然后ReCreateif sandboxStatus.State != runtimeapi.PodSandboxState_SANDBOX_READY {klog.V(2).InfoS("No ready sandbox for pod can be found. Need to start a new one", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id}// 如果Pod的网络空间不对,那么Sandbox也需要重建if sandboxStatus.GetLinux().GetNamespaces().GetOptions().GetNetwork() != NetworkNamespaceForPod(pod) {klog.V(2).InfoS("Sandbox for pod has changed. Need to start a new one", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, ""}// 如果Pod的网络属于Pod网络,并且IP没有分配,那么也需要重建PodSandboxif !kubecontainer.IsHostNetworkPod(pod) && sandboxStatus.Network != nil && sandboxStatus.Network.Ip == "" {klog.V(2).InfoS("Sandbox for pod has no IP address. Need to start a new one", "pod", klog.KObj(pod))return true, sandboxStatus.Metadata.Attempt + 1, sandboxStatus.Id}return false, sandboxStatus.Metadata.Attempt, sandboxStatus.Id
}
| findNextInitContainerToRun |
从这段代码中就可以看出来,Pod同步给Runtime的时候是如何处理Init Container的。我们都知道,Pod Init Contaier的执行顺序是安装定义先后顺序来的,先定义的限制性。而Init Contaienr并非一次就能执行成功,而哪些已经成功执行的Init Contaienr显然不能再执行一次。所以findNextInitContainerToRun函数的目的就是为了找到失败的那个Init Container,具体逻辑如下:
Init Container,直接退出Container,如果找到一个Container正在运行,说明Init Container已经全部运行成功。直接退出Init Container,如果找到一个Init Container处于Failed状态,那么就返回这个Init ContainerFailed的Init Container,那么就找到下一个还未运行的Init Containerfunc findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (status *kubecontainer.Status, next *v1.Container, done bool) {if len(pod.Spec.InitContainers) == 0 {return nil, nil, true}for i := range pod.Spec.Containers {container := &pod.Spec.Containers[i]status := podStatus.FindContainerStatusByName(container.Name)if status != nil && status.State == kubecontainer.ContainerStateRunning {return nil, nil, true}}for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {container := &pod.Spec.InitContainers[i]status := podStatus.FindContainerStatusByName(container.Name)if status != nil && isInitContainerFailed(status) {return status, container, false}}for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {container := &pod.Spec.InitContainers[i]status := podStatus.FindContainerStatusByName(container.Name)if status == nil {continue}// container is still running, return not done.if status.State == kubecontainer.ContainerStateRunning {return nil, nil, false}if status.State == kubecontainer.ContainerStateExited {// all init containers successfulif i == (len(pod.Spec.InitContainers) - 1) {return nil, nil, true}// all containers up to i successful, go to i+1return nil, &pod.Spec.InitContainers[i+1], false}}return nil, &pod.Spec.InitContainers[0], false
}
| SyncPod |
SyncPod用于同步Pod,可能是删除Pod,也可能是创建Pod,名字倒是取得比较合适。具体逻辑如下:
Pod的Sandbox以及container是否发生改变,如果PodSandbox发生改变,发送SandboxChanged事件Pod已经被Kill,那么先Kill当前Pod的所有Container,然后Stop这个Pod的Sandbox。Pod的Sandbox还需要被创建,那么移除当前Pod所有的InitContainer,由于后续PodSandbox起来之后,InitContainer需要一个一个执行,而具体执行哪一个InitContainer是通过底层容器运行的状态得出的,所以这里首先需要清理掉上一次的环境。Pod不需要被Kill,那么就把这个Pod中需要Kill的Container干掉InitContainerPodSandbox,那么: pod.Spec.SecurityContext,主要原因是sysctl的变量分隔符为.,如果是反斜杠就会报错RunPodSandbox接口运行PodSandbox,如果创建PodSandbox失败了,那么记录事件PodSandboxStatus接口查询Sandbox的状态,如果转台为空,直接返回Pod的网络不是Host网络,那么使用CRI上报上来的IP覆盖Pod的资源清单生成PodSandbox的配置Init ContaienrContainerfunc (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {podContainerChanges := m.computePodActions(pod, podStatus)klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))if podContainerChanges.CreateSandbox {ref, err := ref.GetReference(legacyscheme.Scheme, pod)if err != nil {klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))}if podContainerChanges.SandboxID != "" {m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")} else {klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))}}if podContainerChanges.KillPod {if podContainerChanges.CreateSandbox {klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))} else {klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))}killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)result.AddPodSyncResult(killResult)if killResult.Error() != nil {klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")return}if podContainerChanges.CreateSandbox {m.purgeInitContainers(pod, podStatus)}} else {for containerID, containerInfo := range podContainerChanges.ContainersToKill {klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)result.AddSyncResult(killContainerResult)if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))return}}}m.pruneInitContainersBeforeStart(pod, podStatus)var podIPs []stringif podStatus != nil {podIPs = podStatus.IPs}podSandboxID := podContainerChanges.SandboxIDif podContainerChanges.CreateSandbox {var msg stringvar err errorklog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))metrics.StartedPodsTotal.Inc()createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))result.AddSyncResult(createSandboxResult)sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext)podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)if err != nil {if m.podStateProvider.IsPodTerminationRequested(pod.UID) {klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID)return}metrics.StartedPodsErrorsTotal.Inc()createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod))ref, referr := ref.GetReference(legacyscheme.Scheme, pod)if referr != nil {klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))}m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)return}klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))resp, err := m.runtimeService.PodSandboxStatus(podSandboxID, false)if err != nil {ref, referr := ref.GetReference(legacyscheme.Scheme, pod)if referr != nil {klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))}m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod))result.Fail(err)return}if resp.GetStatus() == nil {result.Fail(errors.New("pod sandbox status is nil"))return}if !kubecontainer.IsHostNetworkPod(pod) {// Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, resp.GetStatus())klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))}}podIP := ""if len(podIPs) != 0 {podIP = podIPs[0]}// Get podSandboxConfig for containers to start.configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)result.AddSyncResult(configPodSandboxResult)podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)if err != nil {message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)return}start := func(typeName, metricLabel string, spec *startSpec) error {startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))return err}metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()}klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()}startContainerResult.Fail(err, msg)switch {case err == images.ErrImagePullBackOff:klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)default:utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))}return err}return nil}for _, idx := range podContainerChanges.EphemeralContainersToStart {start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))}// Step 6: start the init container.if container := podContainerChanges.NextInitContainerToStart; container != nil {// Start the next init container.if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {return}// Successfully started the container; clear the entry in the failureklog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))}// Step 7: start containers in podContainerChanges.ContainersToStart.for _, idx := range podContainerChanges.ContainersToStart {start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))}return
}
| killPodWithSyncResult |
当前逻辑并不难,首先通过调用killContainersWithSyncResult干掉当前Pod的所有容器,然后干掉这个Pod的Sandbox
func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {// 把当前Pod的所有Container全部Kill,并且把结果返回killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)for _, containerResult := range killContainerResults {result.AddSyncResult(containerResult)}// 把当前Pod的所有Sandbox全部Stop,并且把结果返回killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)result.AddSyncResult(killSandboxResult)// Stop all sandboxes belongs to same podfor _, podSandbox := range runningPod.Sandboxes {if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil && !crierror.IsNotFound(err) {killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())klog.ErrorS(nil, "Failed to stop sandbox", "podSandboxID", podSandbox.ID)}}return
}
| killContainersWithSyncResult |
killContainersWithSyncResult的逻辑倒是不难理解,重点应该关注GenericRuntimeManager是如何杀掉一个容器的,也就是关注:killContainer方法。每个容器都启用一个协程去关闭指定容器(所以,底层运行时实现的CRI接口必须是线程安全的),当所有协程都停止容器成功之后,主协程携带所有容器的停止结果返回。
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))wg := sync.WaitGroup{}wg.Add(len(runningPod.Containers))for _, container := range runningPod.Containers {go func(container *kubecontainer.Container) {defer utilruntime.HandleCrash()defer wg.Done()killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)if err := m.killContainer(pod, container.ID, container.Name, "", reasonUnknown, gracePeriodOverride); err != nil {killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())// Use runningPod for logging as the pod passed in could be *nil*.klog.ErrorS(err, "Kill container failed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID,"containerName", container.Name, "containerID", container.ID)}containerResults <- killContainerResult}(container)}wg.Wait()close(containerResults)for containerResult := range containerResults {syncResults = append(syncResults, containerResult)}return
}
| purgeInitContainers |
purgeInitContainers方法就是为了把Pod的所有InitContainer全部移除
func (m *kubeGenericRuntimeManager) purgeInitContainers(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {initContainerNames := sets.NewString()for _, container := range pod.Spec.InitContainers {initContainerNames.Insert(container.Name)}for name := range initContainerNames {count := 0for _, status := range podStatus.ContainerStatuses {if status.Name != name {continue}count++// Purge all init containers that match this container nameklog.V(4).InfoS("Removing init container", "containerName", status.Name, "containerID", status.ID.ID, "count", count)if err := m.removeContainer(status.ID.ID); err != nil {utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))continue}}}
}
| removeContainer |
我们来看看GenericRuntimeManager是如何移除一个容器的,逻辑如下:
InternalLifeCycle的PostStopCotainer去停止一个容器,可以看到移除容器的时间点应该发生在停止容器之后TopologyManager,那么调用TopologyManager的RemoveContainer方法,移除容器podFullname_ContainerName.logCRI规范的容器运行时的RemoveContainer接口移除容器func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {klog.V(4).InfoS("Removing container", "containerID", containerID)if err := m.internalLifecycle.PostStopContainer(containerID); err != nil {return err}if err := m.removeContainerLog(containerID); err != nil {return err}// Remove the container.return m.runtimeService.RemoveContainer(containerID)
}func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error {if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {err := i.topologyManager.RemoveContainer(containerID)if err != nil {return err}}return nil
}func (m *kubeGenericRuntimeManager) removeContainerLog(containerID string) error {// Use log manager to remove rotated logs.err := m.logManager.Clean(containerID)if err != nil {return err}// 通过CRI接口调用底层运行时接口获取容器的状态resp, err := m.runtimeService.ContainerStatus(containerID, false)if err != nil {return fmt.Errorf("failed to get container status %q: %v", containerID, err)}status := resp.GetStatus()if status == nil {return remote.ErrContainerStatusNil}labeledInfo := getContainerInfoFromLabels(status.Labels)// 获取容器的日志路径,大致是legacySymlink=podFullName_ContainerName.loglegacySymlink := legacyLogSymlink(containerID, labeledInfo.ContainerName, labeledInfo.PodName,labeledInfo.PodNamespace)// 调用底层操作系统的接口移除日志if err := m.osInterface.Remove(legacySymlink); err != nil && !os.IsNotExist(err) {return fmt.Errorf("failed to remove container %q log legacy symbolic link %q: %v",containerID, legacySymlink, err)}return nil
}
| killContainer |
我们来看看kubelet是如何干掉一个容器的,具体逻辑如下:
pod非空,那么根据容器名获取容器的Spec,如果pod为空,那么通过CRI接口查询底层运行时逆向推出podKillingContainer事件Internal Lifecycle Hook的PreStopContainer方法,不过目前该方法是空的LifeCycle,并且LifeCycle.PreStop也指定了,那么执行该方法gracePeriod,把刚才Lifecycle.PreStop方法使用的时间去掉gracePeriodOverride参数,那么使用gracePeriodOverride参数覆盖gracePeriod参数,也就是说GenericRuntimeManager通过CRI接口调用底层的运行时停止一个容器的最长时间不能超过gracePeriodOverride所指定的时间func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64) error {var containerSpec *v1.Containerif pod != nil {if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {return fmt.Errorf("failed to get containerSpec %q (id=%q) in pod %q when killing container for reason %q",containerName, containerID.String(), format.Pod(pod), message)}} else {// Restore necessary information if one of the specs is nil.restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID)if err != nil {return err}pod, containerSpec = restoredPod, restoredContainer}// From this point, pod and container must be non-nil.gracePeriod := setTerminationGracePeriod(pod, containerSpec, containerName, containerID, reason)if len(message) == 0 {message = fmt.Sprintf("Stopping container %s", containerSpec.Name)}m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeNormal, events.KillingContainer, message)// Run internal pre-stop lifecycle hookif err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {return err}// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run itif containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)}// always give containers a minimal shutdown window to avoid unnecessary SIGKILLsif gracePeriod < minimumGracePeriodInSeconds {gracePeriod = minimumGracePeriodInSeconds}if gracePeriodOverride != nil {gracePeriod = *gracePeriodOverrideklog.V(3).InfoS("Killing container with a grace period override", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)}klog.V(2).InfoS("Killing container with a grace period", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)if err != nil && !crierror.IsNotFound(err) {klog.ErrorS(err, "Container termination failed with gracePeriod", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)return err}klog.V(3).InfoS("Container exited normally", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerName, "containerID", containerID.String())return nil
}
| executePreStopHook |
可以看到,用户指定的Lifecycle.PreStop的处理流程不能太耗时,如果在gracePeriod的时间内,Lifecycle.PreStop还没有执行结束,就会强制返回
func (m *kubeGenericRuntimeManager) executePreStopHook(pod *v1.Pod, containerID kubecontainer.ContainerID, containerSpec *v1.Container, gracePeriod int64) int64 {klog.V(3).InfoS("Running preStop hook", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", containerSpec.Name, "containerID", containerID.String())start := metav1.Now()done := make(chan struct{})go func() {defer close(done)defer utilruntime.HandleCrash()if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil {klog.ErrorS(err, "PreStop hook failed", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerSpec.Name, "containerID", containerID.String())m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeWarning, events.FailedPreStopHook, msg)}}()select {case <-time.After(time.Duration(gracePeriod) * time.Second):klog.V(2).InfoS("PreStop hook not completed in grace period", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerSpec.Name, "containerID", containerID.String(), "gracePeriod", gracePeriod)case <-done:klog.V(3).InfoS("PreStop hook completed", "pod", klog.KObj(pod), "podUID", pod.UID,"containerName", containerSpec.Name, "containerID", containerID.String())}return int64(metav1.Now().Sub(start.Time).Seconds())
}
| pruneInitContainersBeforeStart |
func (m *kubeGenericRuntimeManager) pruneInitContainersBeforeStart(pod *v1.Pod, podStatus *kubecontainer.PodStatus) {initContainerNames := sets.NewString()for _, container := range pod.Spec.InitContainers {initContainerNames.Insert(container.Name)}for name := range initContainerNames {count := 0for _, status := range podStatus.ContainerStatuses {if status.Name != name ||(status.State != kubecontainer.ContainerStateExited &&status.State != kubecontainer.ContainerStateUnknown) {continue}count++if count == 1 {continue}klog.V(4).InfoS("Removing init container", "containerName", status.Name, "containerID", status.ID.ID, "count", count)if err := m.removeContainer(status.ID.ID); err != nil {utilruntime.HandleError(fmt.Errorf("failed to remove pod init container %q: %v; Skipping pod %q", status.Name, err, format.Pod(pod)))continue}}}
}
| createPodSandbox |
来看看GenericRuntimeManager是如何创建PodSandbox的,具体逻辑如下:
Pod的资源清单,生成PodSandbox的配置Pod的日志目录Pod.Spec.RuntimeClassName找到RuntimeHandlerRunPodSandbox接口运行PodSandboxfunc (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)if err != nil {message := fmt.Sprintf("Failed to generate sandbox config for pod %q: %v", format.Pod(pod), err)klog.ErrorS(err, "Failed to generate sandbox config for pod", "pod", klog.KObj(pod))return "", message, err}// Create pod logs directoryerr = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)if err != nil {message := fmt.Sprintf("Failed to create log directory for pod %q: %v", format.Pod(pod), err)klog.ErrorS(err, "Failed to create log directory for pod", "pod", klog.KObj(pod))return "", message, err}runtimeHandler := ""if m.runtimeClassManager != nil {runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)if err != nil {message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)return "", message, err}if runtimeHandler != "" {klog.V(2).InfoS("Running pod with runtime handler", "pod", klog.KObj(pod), "runtimeHandler", runtimeHandler)}}podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)if err != nil {message := fmt.Sprintf("Failed to create sandbox for pod %q: %v", format.Pod(pod), err)klog.ErrorS(err, "Failed to create sandbox for pod", "pod", klog.KObj(pod))return "", message, err}return podSandBoxID, "", nil
}
| SyncPod.func |
一起来看看SyncPod中这个匿名函数是怎么启动一个容器的,具体逻辑如下:
StartContainr启动容器,实际上是分两步执行的,第一步是调用容器运行时接口CreateContainer创建容器,然后调用容器运行时接口StartContainer启动容器 CpuManager, MemoryManager, TopologyManager, Pod.Lifecycle.PostStart start := func(typeName, metricLabel string, spec *startSpec) error {startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))return err}metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()}klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()if sc.HasWindowsHostProcessRequest(pod, spec.container) {metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()}startContainerResult.Fail(err, msg)switch {case err == images.ErrImagePullBackOff:klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)default:utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))}return err}return nil}
| doBackOff |
从名字上来看,doBackOff是用于备份容器的,那么具体是如何备份的呢?
ContainerName从PodStatus中找到那个需要备份的ContainerBackOff缓存中,并且还未过期,说明正在备份func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, string, error) {var cStatus *kubecontainer.Statusfor _, c := range podStatus.ContainerStatuses {if c.Name == container.Name && c.State == kubecontainer.ContainerStateExited {cStatus = cbreak}}if cStatus == nil {return false, "", nil}klog.V(3).InfoS("Checking backoff for container in pod", "containerName", container.Name, "pod", klog.KObj(pod))// Use the finished time of the latest exited container as the start point to calculate whether to do back-off.ts := cStatus.FinishedAt// backOff requires a unique key to identify the container.key := getStableKey(pod, container)if backOff.IsInBackOffSince(key, ts) {if containerRef, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {m.recorder.Eventf(containerRef, v1.EventTypeWarning, events.BackOffStartContainer, "Back-off restarting failed container")}err := fmt.Errorf("back-off %s restarting failed container=%s pod=%s", backOff.Get(key), container.Name, format.Pod(pod))klog.V(3).InfoS("Back-off restarting failed container", "err", err.Error())return true, err.Error(), kubecontainer.ErrCrashLoopBackOff}backOff.Next(key, ts)return false, "", nil
}
| startContainer |
来看GenericRuntimeManager启动一个容器都做了哪些事情:
ImageManager拉取镜像PodStatus中查询容器的状态,如果状态为空,说明是第一次创建容器,那么生成容器的日志目录PodStatus中获取容器的IDinternal.Lifecycle的PreCreateContainer,主要是调用CpuManager设置cpuset,以及调用MemoryManager设置memorySetCreateContainer创建容器Internal.Lifecycle的PreStartContainer方法,主要是把创建好的容器交给CpuManager, MemoryManager, TopologyManagerStartContainer启动容器Lifecycle.PostStart,执行PostStart方法func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {container := spec.containerimageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return msg, err}restartCount := 0containerStatus := podStatus.FindContainerStatusByName(container.Name)if containerStatus != nil {restartCount = containerStatus.RestartCount + 1} else {logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)restartCount, err = calcRestartCountByLogDir(logDir)if err != nil {klog.InfoS("Log directory exists but could not calculate restartCount", "logDir", logDir, "err", err)}}target, err := spec.getTargetID(podStatus)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return s.Message(), ErrCreateContainerConfig}containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)if cleanupAction != nil {defer cleanupAction()}if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return s.Message(), ErrCreateContainerConfig}err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())return s.Message(), ErrPreCreateHook}containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())return s.Message(), ErrCreateContainer}err = m.internalLifecycle.PreStartContainer(pod, container, containerID)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())return s.Message(), ErrPreStartHook}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))err = m.runtimeService.StartContainer(containerID)if err != nil {s, _ := grpcstatus.FromError(err)m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())return s.Message(), kubecontainer.ErrRunContainer}m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))containerMeta := containerConfig.GetMetadata()sandboxMeta := podSandboxConfig.GetMetadata()legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,sandboxMeta.Namespace)containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,"containerID", containerID, "containerLogPath", containerLog)}}// Step 4: execute the post start hook.if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {kubeContainerID := kubecontainer.ContainerID{Type: m.runtimeName,ID: containerID,}msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)if handlerErr != nil {klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())}return msg, ErrPostStartHook}}return "", nil
}
| PreCreateContainer、 |
设置CPU, Memory的限制
func (i *internalContainerLifecycleImpl) PreCreateContainer(pod *v1.Pod, container *v1.Container, containerConfig *runtimeapi.ContainerConfig) error {if i.cpuManager != nil {allocatedCPUs := i.cpuManager.GetCPUAffinity(string(pod.UID), container.Name)if !allocatedCPUs.IsEmpty() {containerConfig.Linux.Resources.CpusetCpus = allocatedCPUs.String()}}if i.memoryManager != nil {numaNodes := i.memoryManager.GetMemoryNUMANodes(pod, container)if numaNodes.Len() > 0 {var affinity []stringfor _, numaNode := range numaNodes.List() {affinity = append(affinity, strconv.Itoa(numaNode))}containerConfig.Linux.Resources.CpusetMems = strings.Join(affinity, ",")}}return nil
}
func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {if i.cpuManager != nil {i.cpuManager.AddContainer(pod, container, containerID)}if i.memoryManager != nil {i.memoryManager.AddContainer(pod, container, containerID)}if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {i.topologyManager.AddContainer(pod, container, containerID)}return nil
}
| generatePodSandboxConfig |
正如其名,generatePodSandboxConfig方法用于生成PodSandbox的配置,主要是获取DNS配置,获取主机名,获取日志目录,获取端口映射,计算资源配置
func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *v1.Pod, attempt uint32) (*runtimeapi.PodSandboxConfig, error) {podUID := string(pod.UID)podSandboxConfig := &runtimeapi.PodSandboxConfig{Metadata: &runtimeapi.PodSandboxMetadata{Name: pod.Name,Namespace: pod.Namespace,Uid: podUID,Attempt: attempt,},Labels: newPodLabels(pod),Annotations: newPodAnnotations(pod),}dnsConfig, err := m.runtimeHelper.GetPodDNS(pod)if err != nil {return nil, err}podSandboxConfig.DnsConfig = dnsConfigif !kubecontainer.IsHostNetworkPod(pod) {podHostname, podDomain, err := m.runtimeHelper.GeneratePodHostNameAndDomain(pod)if err != nil {return nil, err}podHostname, err = util.GetNodenameForKernel(podHostname, podDomain, pod.Spec.SetHostnameAsFQDN)if err != nil {return nil, err}podSandboxConfig.Hostname = podHostname}logDir := BuildPodLogsDirectory(pod.Namespace, pod.Name, pod.UID)podSandboxConfig.LogDirectory = logDirportMappings := []*runtimeapi.PortMapping{}for _, c := range pod.Spec.Containers {containerPortMappings := kubecontainer.MakePortMappings(&c)for idx := range containerPortMappings {port := containerPortMappings[idx]hostPort := int32(port.HostPort)containerPort := int32(port.ContainerPort)protocol := toRuntimeProtocol(port.Protocol)portMappings = append(portMappings, &runtimeapi.PortMapping{HostIp: port.HostIP,HostPort: hostPort,ContainerPort: containerPort,Protocol: protocol,})}}if len(portMappings) > 0 {podSandboxConfig.PortMappings = portMappings}lc, err := m.generatePodSandboxLinuxConfig(pod)if err != nil {return nil, err}podSandboxConfig.Linux = lcif runtime.GOOS == "windows" {wc, err := m.generatePodSandboxWindowsConfig(pod)if err != nil {return nil, err}podSandboxConfig.Windows = wc}// Update config to include overhead, sandbox level resourcesif err := m.applySandboxResources(pod, podSandboxConfig); err != nil {return nil, err}return podSandboxConfig, nil
}
| KillPod |
杀死容器的逻辑倒是不难,killPodWithSyncResult方法我们也在分析SyncPod方法 的过程中已经看过一遍了。实际上就是先删除Pod中的一个一个的Container,然后删除PodSandbox
func (m *kubeGenericRuntimeManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {err := m.killPodWithSyncResult(pod, runningPod, gracePeriodOverride)return err.Error()
}
| GetPodStatus |
我们来看看GetPodStatus是如何获取Pod的状态的,逻辑如下:
ListPodSandbox,查询所有io.kubernetes.pod.uid为当前Pod.UID的所有SandboxPodSandboxStatus接口查询每个PodSandbox的状态ListContainer接口查询所有io.kubernetes.pod.uid为Pod.UID的ContainerContainerStatus接口查询每个Container的状态func (m *kubeGenericRuntimeManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {podSandboxIDs, err := m.getSandboxIDByPodUID(uid, nil)if err != nil {return nil, err}pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name,Namespace: namespace,UID: uid,},}podFullName := format.Pod(pod)klog.V(4).InfoS("getSandboxIDByPodUID got sandbox IDs for pod", "podSandboxID", podSandboxIDs, "pod", klog.KObj(pod))sandboxStatuses := []*runtimeapi.PodSandboxStatus{}podIPs := []string{}for idx, podSandboxID := range podSandboxIDs {resp, err := m.runtimeService.PodSandboxStatus(podSandboxID, false)if crierror.IsNotFound(err) {continue}if err != nil {klog.ErrorS(err, "PodSandboxStatus of sandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))return nil, err}if resp.GetStatus() == nil {return nil, errors.New("pod sandbox status is nil")}sandboxStatuses = append(sandboxStatuses, resp.Status)// Only get pod IP from latest sandboxif idx == 0 && resp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY {podIPs = m.determinePodSandboxIPs(namespace, name, resp.Status)}}containerStatuses, err := m.getPodContainerStatuses(uid, name, namespace)if err != nil {if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))}return nil, err}m.logReduction.ClearID(podFullName)return &kubecontainer.PodStatus{ID: uid,Name: name,Namespace: namespace,IPs: podIPs,SandboxStatuses: sandboxStatuses,ContainerStatuses: containerStatuses,}, nil
}func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, state *runtimeapi.PodSandboxState) ([]string, error) {filter := &runtimeapi.PodSandboxFilter{LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(podUID)},}if state != nil {filter.State = &runtimeapi.PodSandboxStateValue{State: *state,}}sandboxes, err := m.runtimeService.ListPodSandbox(filter)if err != nil {klog.ErrorS(err, "Failed to list sandboxes for pod", "podUID", podUID)return nil, err}if len(sandboxes) == 0 {return nil, nil}// Sort with newest first.sandboxIDs := make([]string, len(sandboxes))sort.Sort(podSandboxByCreated(sandboxes))for i, s := range sandboxes {sandboxIDs[i] = s.Id}return sandboxIDs, nil
}
| getPodContainerStatuses |
具体逻辑如下:
ListContainer接口查询所有io.kubernetes.pod.uid为Pod.UID的ContainerContainerStatus接口查询每个Container的状态func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, name, namespace string) ([]*kubecontainer.Status, error) {// Select all containers of the given pod.containers, err := m.runtimeService.ListContainers(&runtimeapi.ContainerFilter{LabelSelector: map[string]string{types.KubernetesPodUIDLabel: string(uid)},})if err != nil {klog.ErrorS(err, "ListContainers error")return nil, err}statuses := []*kubecontainer.Status{}// TODO: optimization: set maximum number of containers per container name to examine.for _, c := range containers {resp, err := m.runtimeService.ContainerStatus(c.Id, false)if crierror.IsNotFound(err) {continue}if err != nil {// Merely log this here; GetPodStatus will actually report the error out.klog.V(4).InfoS("ContainerStatus return error", "containerID", c.Id, "err", err)return nil, err}status := resp.GetStatus()if status == nil {return nil, remote.ErrContainerStatusNil}cStatus := toKubeContainerStatus(status, m.runtimeName)if status.State == runtimeapi.ContainerState_CONTAINER_EXITED {// Populate the termination message if needed.annotatedInfo := getContainerInfoFromAnnotations(status.Annotations)// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs.fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError &&cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun"tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs)if checkLogs {tMessage = m.readLastStringFromContainerLogs(status.GetLogPath())}// Enrich the termination message written by the application is not emptyif len(tMessage) != 0 {if len(cStatus.Message) != 0 {cStatus.Message += ": "}cStatus.Message += tMessage}}statuses = append(statuses, cStatus)}sort.Sort(containerStatusByCreated(statuses))return statuses, nil
}
| GarbageCollect |
这里是通过Container Garbge Collect Manager来实现的,等分析GCManager的时候我们再来分析这里具体的逻辑
func (m *kubeGenericRuntimeManager) GarbageCollect(gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {return m.containerGC.GarbageCollect(gcPolicy, allSourcesReady, evictNonDeletedPods)
}
| UpdatePodCIDR |
func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error {klog.InfoS("Updating runtime config through cri with podcidr", "CIDR", podCIDR)return m.runtimeService.UpdateRuntimeConfig(&runtimeapi.RuntimeConfig{NetworkConfig: &runtimeapi.NetworkConfig{PodCidr: podCIDR,},})
}
| CheckpointContainer |
func (m *kubeGenericRuntimeManager) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {return m.runtimeService.CheckpointContainer(options)
}
| GetContainerLogs |
GetContainerLogs方法用于获取容器的日志,代码倒是不难理解。直接调用容器运行时的ContainerStatus接口查询容器状态,然后根据查询到的状态获取日志的路径,然后直接读取该路径下的日志文件即可
func (m *kubeGenericRuntimeManager) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {resp, err := m.runtimeService.ContainerStatus(containerID.ID, false)if err != nil {klog.V(4).InfoS("Failed to get container status", "containerID", containerID.String(), "err", err)return fmt.Errorf("unable to retrieve container logs for %v", containerID.String())}status := resp.GetStatus()if status == nil {return remote.ErrContainerStatusNil}return m.ReadLogs(ctx, status.GetLogPath(), containerID.ID, logOptions, stdout, stderr)
}func (m *kubeGenericRuntimeManager) ReadLogs(ctx context.Context, path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {// Convert v1.PodLogOptions into internal log options.opts := logs.NewLogOptions(apiOpts, time.Now())return logs.ReadLogs(ctx, path, containerID, opts, m.runtimeService, stdout, stderr)
}
| DeleteContainer |
DeleteContainer逻辑非常简单,直接委托给removeContainer方法来完成,这个方法我们在分析SyncPod的时候分析过,先调用Lifecycle.StopContainer,然后移除容器日志,最后调用容器运行时的RemoveContainer移除容器
// DeleteContainer removes a container.
func (m *kubeGenericRuntimeManager) DeleteContainer(containerID kubecontainer.ContainerID) error {return m.removeContainer(containerID.ID)
}func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {klog.V(4).InfoS("Removing container", "containerID", containerID)// Call internal container post-stop lifecycle hook.if err := m.internalLifecycle.PostStopContainer(containerID); err != nil {return err}// Remove the container log.// TODO: Separate log and container lifecycle management.if err := m.removeContainerLog(containerID); err != nil {return err}// Remove the container.return m.runtimeService.RemoveContainer(containerID)
}
| GetExec |
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {req := &runtimeapi.ExecRequest{ContainerId: id.ID,Cmd: cmd,Tty: tty,Stdin: stdin,Stdout: stdout,Stderr: stderr,}resp, err := m.runtimeService.Exec(req)if err != nil {return nil, err}return url.Parse(resp.Url)
}
| GetAttach |
func (m *kubeGenericRuntimeManager) GetAttach(id kubecontainer.ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {req := &runtimeapi.AttachRequest{ContainerId: id.ID,Stdin: stdin,Stdout: stdout,Stderr: stderr,Tty: tty,}resp, err := m.runtimeService.Attach(req)if err != nil {return nil, err}return url.Parse(resp.Url)
}
| GetPortForward |
func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID, ports []int32) (*url.URL, error) {sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil)if err != nil {return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)}if len(sandboxIDs) == 0 {return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))}req := &runtimeapi.PortForwardRequest{PodSandboxId: sandboxIDs[0],Port: ports,}resp, err := m.runtimeService.PortForward(req)if err != nil {return nil, err}return url.Parse(resp.Url)
}
| AttachContainer |
func (m *MockAttacher) AttachContainer(id container.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {m.ctrl.T.Helper()ret := m.ctrl.Call(m, "AttachContainer", id, stdin, stdout, stderr, tty, resize)ret0, _ := ret[0].(error)return ret0
}
| RunInContainer |
func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, timeout)return append(stdout, stderr...), err
}
| PullImage |
func (m *kubeGenericRuntimeManager) PullImage(image kubecontainer.ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {img := image.ImagerepoToPull, _, _, err := parsers.ParseImageName(img)if err != nil {return "", err}keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, m.keyring)if err != nil {return "", err}imgSpec := toRuntimeAPIImageSpec(image)creds, withCredentials := keyring.Lookup(repoToPull)if !withCredentials {klog.V(3).InfoS("Pulling image without credentials", "image", img)imageRef, err := m.imageService.PullImage(imgSpec, nil, podSandboxConfig)if err != nil {klog.ErrorS(err, "Failed to pull image", "image", img)return "", err}return imageRef, nil}var pullErrs []errorfor _, currentCreds := range creds {auth := &runtimeapi.AuthConfig{Username: currentCreds.Username,Password: currentCreds.Password,Auth: currentCreds.Auth,ServerAddress: currentCreds.ServerAddress,IdentityToken: currentCreds.IdentityToken,RegistryToken: currentCreds.RegistryToken,}imageRef, err := m.imageService.PullImage(imgSpec, auth, podSandboxConfig)// If there was no error, return successif err == nil {return imageRef, nil}pullErrs = append(pullErrs, err)}return "", utilerrors.NewAggregate(pullErrs)
}
| GetImageRef |
func (m *kubeGenericRuntimeManager) GetImageRef(image kubecontainer.ImageSpec) (string, error) {resp, err := m.imageService.ImageStatus(toRuntimeAPIImageSpec(image), false)if err != nil {klog.ErrorS(err, "Failed to get image status", "image", image.Image)return "", err}if resp.Image == nil {return "", nil}return resp.Image.Id, nil
}
| ListImages |
func (m *kubeGenericRuntimeManager) ListImages() ([]kubecontainer.Image, error) {var images []kubecontainer.ImageallImages, err := m.imageService.ListImages(nil)if err != nil {klog.ErrorS(err, "Failed to list images")return nil, err}for _, img := range allImages {images = append(images, kubecontainer.Image{ID: img.Id,Size: int64(img.Size_),RepoTags: img.RepoTags,RepoDigests: img.RepoDigests,Spec: toKubeContainerImageSpec(img),})}return images, nil
}
func (m *kubeGenericRuntimeManager) RemoveImage(image kubecontainer.ImageSpec) error {err := m.imageService.RemoveImage(&runtimeapi.ImageSpec{Image: image.Image})if err != nil {klog.ErrorS(err, "Failed to remove image", "image", image.Image)return err}return nil
}
| ImageStats |
func (m *kubeGenericRuntimeManager) ImageStats() (*kubecontainer.ImageStats, error) {allImages, err := m.imageService.ListImages(nil)if err != nil {klog.ErrorS(err, "Failed to list images")return nil, err}stats := &kubecontainer.ImageStats{}for _, img := range allImages {stats.TotalStorageBytes += img.Size_}return stats, nil
}
上一篇:软件工程选择与判断题
下一篇:开发过程中-辅助工具链接