一、前言
Kubernetes是当前事实上的容器编排引擎,强大的容器编排能力依靠其内置的诸多Workload(工作负载,可以理解为一个应用服务实体,通过这个实体可以定义服务实例数,服务的相关配置等),比如Deployment,StatefulSet,DaemonSet,Job等,每种Workload适用于不同的场景。
尽管Kubernetes定义了不同的Workload以满足不同的需求,但企业内部的服务通过Kubernetes在生产环境部署和运维时,还是遇到了一些当前Kubernetes并不支持的需求,比如Pod的原地重启,指定Pod实例删除/更新,sidecar容器的管理等。面对新的需求,企业和社区开发者尝试通过Kubernetes Operator扩展机制去解决这些问题,OpenKruise就是其中一个比较优秀的项目。它是阿里开源的一个项目,定义了多种CRD用来替换原生的Workload,或者增强原生的Workload的特性,或者实现其它的需求。SidecarSet是OpenKruise当中的一种CRD,用来对sidecar容器进行管理,为企业在业务容器化过程中对sidecar容器的管理提供了一种高效便捷的手段,本文我将重点和大家分享下关于SidecarSet的一些实现细节。
在Kubernetes当中,Pod是调度容器资源的基本单元,一个Pod当中可以有多个容器,这些容器共享网络命名空间(network namespace),它们访问彼此的服务就像在本地访问一样。在这一组容器中,通常会有一个业务容器,即这个Pod对外提供的服务或实现了其它能力(比如:Web程序、HttpServer服务等);此外,为了更好的管理业务容器的日志、满足服务注册等需求,我们会在同一个Pod当中部署其它的容器去实现特定的功能,这种其它的容器就被称作sidecar容器,平时交流中,也有同学称它们为旁路容器、边车容器等,其实都是直译过来的。本文我们统一称为sidecar容器。
这种sidecar容器的功能是不是可以封装到业务容器里,用另外一个进程(sidecar进程)去实现呢?答案是可以的,但是强烈不建议这么做,因为这会带来如下问题:
升级困难。sidecar进程的代码更新后,业务容器的镜像必须重新制作;升级时业务容器必须重启。
如果sidecar进程实现的是一个通用功能,当它的代码有变更时,所有业务都会受影响。
将sidecar进程和业务进程放在同一个容器当中,不是云原生的最佳实践方式,这种方式与在宿主机上部署多个进程是一样的。
基于此,在一个Pod当中部署sidecar容器去辅助业务容器完成一些功能,是在容器化场景下合理的做法。sidecar进程与业务进程分别部署在不同的容器当中,资源隔离,互不影响;通过共享网络命名空间,相互通信又如在本地;升级时只需升级sidecar容器镜像或者配置,不影响业务容器。
在业务的容器化过程中,对sidecar容器的需求是多种多样的。例如服务的注册,有些是向consul注册,有些是向Nacos注册;再如日志的处理:有些是通过logtail采集,有些是通过fluent-bit采集。这些不同的需求,对sidecar容器的部署管理需求是不一样的,所以,如何在业务全面容器化过程中,可靠高效地管理这些sidecar容器也是一件重要的事情。
SidecarSet通过Kubernetes的Admission Webhook(准入控制器)机制可自动为符合条件的新建Pod注入sidecar容器,无需用户在新建各种Workload的.spec信息时就必须考虑好需要注入sidecar容器信息。用户可以通过创建一个SidecarSet资源对象,来指定需要匹配哪些Pod,需要注入什么样的sidecar容器。SidecarSet将sidecar容器的定义和生命周期管理与业务容器的做了解耦,它主要特性有:
注入过程不改变Pod所属Workload的.spec信息(比如:deployment、cloneset等)。
具备对匹配的Pod 通过原地升级方式升级其中已经注入的 sidecar 容器的能力(原地升级条件:sidecar容器的.spec中只有image信息发生了变化)
支持对所有匹配的Pod按照滚动升级的方式进行sidecar容器的升级。
支持对所有匹配的Pod按照灰度的策略更新sidecar容器。
支持暂停更新。暂停更新时,对于新建的Pod依旧会实现注入能力,已经更新的Pod会保持更新后的版本不动,还没有更新的Pod会暂停更新。
支持热升级。普通的升级方式(冷升级)会先停止旧版本的容器,然后创建新版本的容器。这种方式适合不影响业务服务可用性的sidecar容器,例如日志处理sidecar容器。但对于一些代理sidecar容器,例如Istio Envoy,这种升级方法有损服务质量,热升级方式适合这种场景。
SidecarSet是一种CR(Kubernetes当中的自定义资源),这种资源定义了需要给指定Pod注入的sidecar容器的信息,它对应的控制器是sidecarset-controller,负责对SidecarSet CR做处理。下图是sidecarset-controller对SidecarSet CR调谐的基本流程图。
调谐主入口还是熟悉的Reconcile()函数,这个函数里面在拿到SidecarSet CR之后,直接进入到了UpdateSidecarSet()函数,从这个函数的命名不难看出sidecarset-controller调谐的主要工作就是去更新SidecarSet的状态信息,这似乎不是前面介绍的SidecarSet的主要功能做sidecar容器注入?对的,注入不是在这里做的,如前所述,sidecar容器的注入主要是通过Webook机制去实现的,下节我们会详细介绍,本节的目的还是主要去梳理清楚SidecarSet CR资源的管理过程。我们继续来看。
// 调谐逻辑主要做事的函数
func (p *Processor) UpdateSidecarSet(sidecarSet *appsv1alpha1.SidecarSet) (reconcile.Result, error) {
......
// 1. get matching pods with the sidecarSet
pods, err := p.getMatchingPods(sidecarSet)
// 2. calculate SidecarSet status based on pods
status := calculateStatus(control, pods)
......
// 3. If sidecar container hot upgrade complete, then set the other one(empty sidecar container) image to HotUpgradeEmptyImage
if isSidecarSetHasHotUpgradeContainer(sidecarSet) {
var podsInHotUpgrading []*corev1.Pod
for _, pod := range pods {
// flip other hot sidecar container to empty, in the following:
// 1. the empty sidecar container image isn't equal HotUpgradeEmptyImage
// 2. all containers with exception of empty sidecar containers is updated and consistent
// 3. all containers with exception of empty sidecar containers is ready
// don't contain sidecar empty containers
sidecarContainers := sidecarcontrol.GetSidecarContainersInPod(sidecarSet)
for _, sidecarContainer := range sidecarSet.Spec.Containers {
if sidecarcontrol.IsHotUpgradeContainer(&sidecarContainer) {
_, emptyContainer := sidecarcontrol.GetPodHotUpgradeContainers(sidecarContainer.Name, pod)
sidecarContainers.Delete(emptyContainer)
}
}
if isPodSidecarInHotUpgrading(sidecarSet, pod) && control.IsPodUpdatedConsistently(pod, sidecarContainers) &&
isHotUpgradingReady(sidecarSet, pod) {
podsInHotUpgrading = append(podsInHotUpgrading, pod)
}
}
if err := p.flipHotUpgradingContainers(control, podsInHotUpgrading); err != nil {
return reconcile.Result{}, err
}
}
// 4. SidecarSet upgrade strategy type is NotUpdate
if !isSidecarSetNotUpdate(sidecarSet) {
return reconcile.Result{}, nil
}
// 5. sidecarset already updates all matched pods, then return
if isSidecarSetUpdateFinish(status) {
klog.V(3).Infof("sidecarSet(%s) matched pods(number=%d) are latest, and don't need update", sidecarSet.Name, len(pods))
return reconcile.Result{}, nil
}
// 6. Paused indicates that the SidecarSet is paused to update matched pods
if sidecarSet.Spec.UpdateStrategy.Paused {
klog.V(3).Infof("sidecarSet is paused, name: %s", sidecarSet.Name)
return reconcile.Result{}, nil
}
// 7. upgrade pod sidecar
if err := p.updatePods(control, pods); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
上面是UpdateSidecarSet()函数的摘要。可以看到,该函数进来以后:
首先,找到该SidecarSet CR匹配到的Pod列表,然后根据这些Pod的状态,计算出SidecarSet的状态(有多少Pod更新了,还有多少没更新,更新的Pod有多少Ready了等信息,详细信息可以从SidecarSet的.status字段信息当中看到)。
接着,判断该SidecarSet是否包含需要热升级的容器。如果包含,先找到具体做热升级的容器,然后检查该容器是否完成热升级,如果完成了,则需要将原sidecar容器的镜像替换为一个空镜像(hotUpgradeEmptyImage值表示的镜像),也就是flipPodSidecarContainerDo()函数的功能。这里需要注意的,这里只是完成了热升级3个步骤当中的最后一步(Reset);第1步的注入是在Admission Webhook当中做,第2步的流量迁移是由PostStart脚本及业务进程配合完成。
// 热升级完成后,将原sidecar容器的镜像替换为empty容器的镜像
func flipPodSidecarContainerDo(control sidecarcontrol.SidecarControl, pod *corev1.Pod) {
sidecarSet := control.GetSidecarset()
containersInPod := make(map[string]*corev1.Container)
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
containersInPod[container.Name] = container
}
var changedContainer []string
for _, sidecarContainer := range sidecarSet.Spec.Containers {
if sidecarcontrol.IsHotUpgradeContainer(&sidecarContainer) {
workContainer, emptyContainer := sidecarcontrol.GetPodHotUpgradeContainers(sidecarContainer.Name, pod)
if containersInPod[emptyContainer].Image == sidecarContainer.UpgradeStrategy.HotUpgradeEmptyImage {
continue
}
// flip the empty sidecar container image
containerNeedFlip := containersInPod[emptyContainer]
klog.V(3).Infof("try to reset %v/%v/%v from %s to empty(%s)", pod.Namespace, pod.Name, containerNeedFlip.Name,
containerNeedFlip.Image, sidecarContainer.UpgradeStrategy.HotUpgradeEmptyImage)
// 这里做镜像的替换
containerNeedFlip.Image = sidecarContainer.UpgradeStrategy.HotUpgradeEmptyImage
changedContainer = append(changedContainer, containerNeedFlip.Name)
// update pod sidecarSet version annotations
pod.Annotations[sidecarcontrol.GetPodSidecarSetVersionAnnotation(containerNeedFlip.Name)] = "0"
pod.Annotations[sidecarcontrol.GetPodSidecarSetVersionAltAnnotation(workContainer)] = "0"
}
}
// record the updated container status, to determine if the update is complete
control.UpdatePodAnnotationsInUpgrade(changedContainer, pod)
}
处理完热升级的情况后,接着做了SidecarSet是否更新、SiecarSet是否完成升级操作两个判断。根据sidecarSet.Spec.UpdateStrategy.Paused字段决定是否暂停调谐。
以上所有逻辑的操作,都是围绕着“是否修改Pod的.spec信息”的目的去做的。最后,就是去更新一把Pod的信息了,我们可以看到随着p.Client.Update()函数的调用,调谐的逻辑基本告一段落。
前面多次提到,sidecar容器的注入是基于Kubernetes的Admission Webhook机制实现的。什么是Webhook呢?它是一种 Http 回调机制,在某些条件下会触发一个Http Post 请求,通过Http Post发送简单事件通知。Admission Webhook是Kubernetes当中,支持用户自定义基于Webhook机制对资源对象做变更(Mutating)和有效性校验(Validating)的扩展机制,下图展示了Admission Webhook在Kubernetes API的生命周期中的作用过程。
Admission Webhook是一段代码,它会在请求通过认证和授权之后、对象被持久化之前拦截到达api-server的请求。Admission Webhook分为2种,一种是Mutating Admission Webhook,负责对资源对象做默认值设置、属性值更改等操作;另一种是Validating Admission Webhook,负责对资源对象的属性值做合法性校验。一个Kubernetes集群中,可以有多个Mutating Admission Webhook和Validating Admission Webhook,要想让Kubernetes集群发现这些Webhook,只需要配置mutatingwebhookconfigurations和validatingwebhookconfigurations这两种资源即可。对Pod进行sidecar容器的注入,就是通过Mutating Admission Webhook去做的。
# kruise-controller部署时创建的mutatingwebhookconfigurations资源,以便Kubernetes集群发现kruise的Mutating Admission Webhook,进而实现对Pod注入sidecar容器的功能
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: kruise-mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1beta1
clientConfig:
caBundle: LS0tLS1CRUdJTkkxx=
service:
name: kruise-webhook-service
namespace: kruise-system
path: /mutate-pod
port: 443
name: mpod.kb.io
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
resources:
- pods
scope: '*'
timeoutSeconds: 30
下面我们详细分析下kruise的Mutating Admission Webhook当中对Pod注入sidecar容器的流程,下图是基于源码梳理的流程图。
// 根据SidecarSet对象修改Pod的属性
func (h *PodCreateHandler) sidecarsetMutatingPod(ctx context.Context, req admission.Request, pod *corev1.Pod) error {
......
var oldPod *corev1.Pod
var isUpdated bool
//when Operation is update, decode older object
if req.AdmissionRequest.Operation == admissionv1beta1.Update {
isUpdated = true
oldPod = new(corev1.Pod)
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1beta1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldPod); err != nil {
return err
}
}
......
for _, sidecarSet := range sidecarsetList.Items {
if matched, err := sidecarcontrol.PodMatchedSidecarSet(pod, sidecarSet); err != nil {
return err
} else if !matched {
continue
}
// check whether sidecarSet is active
// when sidecarSet is not active, it will not perform injections and upgrades process.
control := sidecarcontrol.New(sidecarSet.DeepCopy())
if !control.IsActiveSidecarSet() {
continue
}
matchedSidecarSets = append(matchedSidecarSets, control)
}
if len(matchedSidecarSets) == 0 {
return nil
}
// 判断当前是否为Pod的Update事件
if isUpdated {
if !matchedSidecarSets[0].IsPodAvailabilityChanged(pod, oldPod) {
klog.V(3).Infof("pod(%s.%s) availability unchanged for sidecarSet, and ignore", pod.Namespace, pod.Name)
return nil
}
}
// 获取待注入的Container、Volume等信息
klog.V(3).Infof("[sidecar inject] begin to operation(%s) pod(%s/%s) resources(%s) subResources(%s)",
req.Operation, req.Namespace, req.Name, req.Resource, req.SubResource)
//build sidecar containers, sidecar initContainers, sidecar volumes, annotations to inject into pod object
sidecarContainers, sidecarInitContainers, volumesInSidecar, injectedAnnotations, err := buildSidecars(isUpdated, pod, oldPod, matchedSidecarSets)
if err != nil {
return err
} else if len(sidecarContainers) == 0 && len(sidecarInitContainers) == 0 {
......
// 执行注入动作
// 1. inject init containers, sort by their name, after the original init containers
sort.SliceStable(sidecarInitContainers, func(i, j int) bool {
return sidecarInitContainers[i].Name < sidecarInitContainers[j].Name
})
for _, initContainer := range sidecarInitContainers {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer.Container)
}
// 2. inject containers
pod.Spec.Containers = mergeSidecarContainers(pod.Spec.Containers, sidecarContainers)
// 3. inject volumes
pod.Spec.Volumes = util.MergeVolumes(pod.Spec.Volumes, volumesInSidecar)
// 4. apply annotations
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
for k, v := range injectedAnnotations {
pod.Annotations[k] = v
}
klog.V(4).Infof("[sidecar inject] after mutating: %v", util.DumpJSON(pod))
return nil
}
首先,会通过PodMatchedSidecarSet()找到该Pod是否有匹配的SidecarSet对象,如果没有则结束整个流程;如果有,继续检查该请求的事件是Pod的Create事件还是Update事件,如果是Update事件,则会通过IsPodAvailabilityChanged(pod, oldPod)来判断该Update事件是否改变了Pod本身的Availability(可用性/有效性),如果Pod的可用性没有被改变,则不需要再做任何sidecar注入动作,否则的话走正常注入逻辑。
当前kruise当中的IsPodAvailabilityChanged(pod, oldPod)是直接返回false的,相当于直接忽略了Pod的Update事件。为何多此一举呢?笔者认为这里是做了一些超前设计的考虑,以便支持后续我们在Pod层面可以做更多的事情。比如:某一种Workload支持Pod重新编排它的所有容器,这个编排不是新建Pod,而是更新Pod,所以此时是Pod Update事件,而不是Create事件,那么这时候就需要为这个Pod注入sidecar容器。
// 直接返回false,相当于直接忽略了Pod的Update事件
func (c *commonControl) IsPodAvailabilityChanged(pod, oldPod *v1.Pod) bool {
return false
}
如果通过上面的流程判断该Pod是需要被注入sidecar容器的,接着就会进入到buildSidecars()函数,打开去看,逻辑还是比较清晰的,自上而下依次从SidecarSet当中解析出来需要设置给Pod的Containers(ENV信息是在Container对象里的)、InitContainers、volumes和Annotations对象信息。这里需要注意2个Annotation:
kruise.io/sidecarset-hash:记录了该Pod匹配到的所有SidecarSet的hash值,便于调谐过程中判断该Pod的sidecar容器是否需要更新。
kruise.io/sidecarset-hash-without-image:记录了剔除image属性外的SidecarSet的hash值,用于判断是否可以对sidecar容器做原地升级。
如果我们需要根据业务场景,对注入的sidecar做一些通用的定制化设置,在buildSidecars()函数里做个patch修改再合适不过了。当然如果SidecarSet的.spec支持的话,在SidecarSet CR中设置也是可以的,但是对于通用的设置,比如得物的所有容器都必须设置某一个环境变量,就必须在所有的SidecarSet CR中都去设置,难免会有遗漏,相比之下在这里做修改更合适。
buildSidecars()函数实现流程中,还需要关注下IsHotUpgradeContainer()这个函数,它是为sidecar容器的热升级做的一个处理。如果一个sidecar容器名叫dw-agent,它是需要热升级的,这里会给Pod注入名称分别为dw-agent-1和dw-agent-2的两个容器,其中dw-agent-2容器的image会被设置为SidecarSet当中指定的hotUpgradeEmptyImage的值,比如:empty:v1。这个dw-agent-2容器,用于在升级这个sidecar容器时,先把它的镜像由empty:v1替换为真正的新版本镜像,比如agent:v2,待完成流量迁移以后,会把dw-agent-1的image由旧版本(agent:v1)的镜像替换为empty:v1。
// pkg/webhook/pod/mutating/sidecarset.go:284
func buildSidecars(isUpdated bool, pod *corev1.Pod, oldPod *corev1.Pod, matchedSidecarSets []sidecarcontrol.SidecarControl) (
sidecarContainers, sidecarInitContainers []*appsv1alpha1.SidecarContainer,
volumesInSidecars []corev1.Volume, injectedAnnotations map[string]string, err error) {
......
// 处理热升级的情况
if sidecarcontrol.IsHotUpgradeContainer(sidecarContainer) {
hotContainers, annotations := injectHotUpgradeContainers(pod, sidecarContainer)
sidecarContainers = append(sidecarContainers, hotContainers...)
for k, v := range annotations {
injectedAnnotations[k] = v
}
} else {
sidecarContainers = append(sidecarContainers, sidecarContainer)
}
......
}
在拿到需要注入的对象信息之后,调用mergeSidecarContainers()、MergeVolumes()函数执行了对Pod的.spec信息的修改。修改Container时会根据在SidecarSet当中指定的注入顺序对Container按序注入;修改Volume时,会做去重操作。至此,kruise-webhook-service服务会对api-server发送的POST /mutate-pod请求做出响应,返回修改后的Pod对象,该对象会被持久化保存到ETCD中。
sidecar容器的升级方式分为冷升级和热升级两种升级方式,这里的冷/热方式主要是针对流量而言的。冷升级的方式是指先停掉旧的容器,再用新的镜像拉起新的容器;热升级方式是指先用新版本的镜像拉起新的容器,然后将流量从旧的容器迁移到新的容器(流量迁移过程需要业务代码支持,业务方需要提供相应的迁移脚本/接口),最后再停掉旧的容器,热升级的方式对流量做了优雅处理。冷/热方式通过SidecarSet的.spec.containers[].upgradeStrategy.upgradeType字段指定。
apiVersion: apps.kruise.io/v1alpha1
kind: SidecarSet
metadata:
name: hotupgrade-sidecarset
spec:
selector:
matchLabels:
app: hotupgrade
containers:
- name: sidecar
image: openkruise/sidecar:v1
imagePullPolicy: Always
lifecycle:
postStart:
exec:
command:
- /bin/sh
- /run.sh
upgradeStrategy:
# 冷升级方式
upgradeType: ColdUpgrade
# 热升级方式
# upgradeType: HotUpgrade
apiVersion: apps.kruise.io/v1alpha1
kind: SidecarSet
metadata:
name: sidecarset
spec:
# 在这里设置对SidecarSet匹配到的所有Pod的更新策略
updateStrategy:
type: RollingUpdate
maxUnavailable: 20%
冷升级方式下,sidecar容器的升级可以通过Pod的重建,也可以在Pod不重建的情况下原地重启,不过原地重启是有条件的:只能是容器的image发生了变化。
Pod重建时的sidecar容器升级,正常走Mutating Admission Webhook当中的逻辑即可完成,其实就是Pod重建时被注入的sidecar容器就是新版本的。
如果sidecar容器只是image发生了变化,就会走原地升级的逻辑,这个逻辑是在SidecarSet CR的调谐逻辑中去实现的。让我们继续回到SidecarSet Controller的基本调谐原理分析的那块代码,这次我们重点看下GetNextUpgradePods()这个函数。
// 按照一定策略查找待更新的Pod列表
func (p *spreadingStrategy) GetNextUpgradePods(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) (upgradePods []*corev1.Pod) {
......
//1. 查找的依据是:Pod的并没有更新到最新版本的sidecar,同时只有容器的image字段发生了变化
for index, pod := range pods {
isUpdated := sidecarcontrol.IsPodSidecarUpdated(sidecarset, pod)
if !isUpdated && isSelected(pod) && control.IsSidecarSetUpgradable(pod) {
waitUpgradedIndexes = append(waitUpgradedIndexes, index)
}
}
//2. 对Pod做排序并做打散策略
waitUpgradedIndexes = SortUpdateIndexes(strategy, pods, waitUpgradedIndexes)
//3. 计算更新步长(多少Pod需要更新)
needToUpgradeCount := calculateUpgradeCount(control, waitUpgradedIndexes, pods)
if needToUpgradeCount < len(waitUpgradedIndexes) {
waitUpgradedIndexes = waitUpgradedIndexes[:needToUpgradeCount]
}
//4. 从预选的Pod列表当中拿到指定数量的Pod
for _, idx := range waitUpgradedIndexes {
upgradePods = append(upgradePods, pods[idx])
}
return
}
// pkg/controller/sidecarset/sidecarset_strategy.go:115
func calculateUpgradeCount(coreControl sidecarcontrol.SidecarControl, waitUpdateIndexes []int, pods []*corev1.Pod) int {
......
var needUpgradeCount int
for _, i := range waitUpdateIndexes {
// If pod is not ready, then not included in the calculation of maxUnavailable
if !coreControl.IsPodReady(pods[i]) {
needUpgradeCount++
continue
}
// 决定更新步长
if upgradeAndNotReadyCount >= maxUnavailable {
break
}
upgradeAndNotReadyCount++
needUpgradeCount++
}
return needUpgradeCount
}
如前所述,热升级对流量而言是先启动新的容器,然后将流量从旧容器迁移到新的容器,最后再停止掉旧的容器。热升级时sidecar容器的注入和更新过程如下框图所示,以本文前面提到的dw-agent这个sidecar容器为例说明。
当Pod创建时,Kruise-Mutating-Admission-Webhook组件会发现dw-agent这个sidecar容器是需要做热升级的,所以会给该Pod注入名为dw-agent-1和dw-agent-2的两个容器,前者使用正常的镜像,后者使用SidecarSet当中指定的hotUpgradeEmptyImage镜像。此时,当Pod被调度,容器被创建之后,dw-agent-1是正常的sidecar容器,接受流量;而dw-agent-2容器不做任何事情,它的创建完全是为升级做准备的。
// buildSidecars()当中处理热升级容器的逻辑
func buildSidecars(isUpdated bool, pod *corev1.Pod, oldPod *corev1.Pod, matchedSidecarSets []sidecarcontrol.SidecarControl) (
sidecarContainers, sidecarInitContainers []*appsv1alpha1.SidecarContainer,
volumesInSidecars []corev1.Volume, injectedAnnotations map[string]string, err error) {
......
if sidecarcontrol.IsHotUpgradeContainer(sidecarContainer) {
// 针对热升级容器,需要注入两个sidecar容器
hotContainers, annotations := injectHotUpgradeContainers(pod, sidecarContainer)
sidecarContainers = append(sidecarContainers, hotContainers...)
for k, v := range annotations {
injectedAnnotations[k] = v
}
} else {
sidecarContainers = append(sidecarContainers, sidecarContainer)
}
......
}
// 为Pod注入了两个sidecar容器,以及Annotation(这当中记录了两个容器的版本号)
func injectHotUpgradeContainers(pod *corev1.Pod, sidecarContainer *appsv1alpha1.SidecarContainer) (
sidecarContainers []*appsv1alpha1.SidecarContainer, injectedAnnotations map[string]string) {
injectedAnnotations = make(map[string]string)
hotUpgradeWorkContainer := sidecarcontrol.GetPodHotUpgradeInfoInAnnotations(pod)
// container1 is current worked container
// container2 is empty container, and don't work now
container1, container2 := generateHotUpgradeContainers(sidecarContainer)
sidecarContainers = append(sidecarContainers, container1)
sidecarContainers = append(sidecarContainers, container2)
//mark sidecarset.version in annotations
// "1" indicates sidecar container is first injected into pod, and not upgrade process
injectedAnnotations[sidecarcontrol.GetPodSidecarSetVersionAnnotation(container1.Name)] = "1"
injectedAnnotations[sidecarcontrol.GetPodSidecarSetVersionAltAnnotation(container1.Name)] = "0"
// "0" indicates sidecar container is hot upgrade empty container
injectedAnnotations[sidecarcontrol.GetPodSidecarSetVersionAnnotation(container2.Name)] = "0"
injectedAnnotations[sidecarcontrol.GetPodSidecarSetVersionAltAnnotation(container2.Name)] = "1"
// used to mark which container is currently working, first is container1
// format: map[container.name] = pod.spec.container[x].name
hotUpgradeWorkContainer[sidecarContainer.Name] = container1.Name
// store working HotUpgrade container in pod annotations
by, _ := json.Marshal(hotUpgradeWorkContainer)
injectedAnnotations[sidecarcontrol.SidecarSetWorkingHotUpgradeContainer] = string(by)
return sidecarContainers, injectedAnnotations
}
升级过程分为3步,官方原理示意图如下,供参考。
// 调谐流程中更新Pod的Container(升级旧版容器)
func updatePodSidecarContainer(control sidecarcontrol.SidecarControl, pod *corev1.Pod) {
sidecarset := control.GetSidecarset()
var changedContainers []string
for _, sidecarContainer := range sidecarset.Spec.Containers {
......
var changedContainer string
if sidecarcontrol.IsHotUpgradeContainer(&sidecarContainer) {
// 对热更新的Container做处理
// 这里会对镜像为hotUpgradeEmptyImage值的那个容器做image字段的修改(其实就是把旧版本的容器做下升级)
changedContainer = updateHotUpgradeContainerInPod(&sidecarContainer, control, pod)
} else {
changedContainer = updateColdUpgradeContainerInPod(&sidecarContainer, control, pod)
}
if changedContainer != "" {
changedContainers = append(changedContainers, changedContainer)
}
}
// update pod information in upgrade
control.UpdatePodAnnotationsInUpgrade(changedContainers, pod)
return
}
// pkg/controller/sidecarset/sidecarset_hotupgrade.go:154
// 更新Pod当中需要热升级的sidecar容器
func updateHotUpgradeContainerInPod(sidecarContainer *appsv1alpha1.SidecarContainer, control sidecarcontrol.SidecarControl, pod *corev1.Pod) (changedContainer string) {
sidecarSet := control.GetSidecarset()
containerInPods := make(map[string]corev1.Container)
for _, containerInPod := range pod.Spec.Containers {
containerInPods[containerInPod.Name] = containerInPod
}
// 找到那个旧版本的容器
nameToUpgrade := findContainerToHotUpgrade(sidecarContainer, pod, control)
containerToUpgrade := containerInPods[nameToUpgrade]
// 替换镜像
newContainer := control.UpdateSidecarContainerToLatest(sidecarContainer.Container, containerToUpgrade)
// new hot upgrade sidecar container specification
afterContainerSpec := util.DumpJSON(newContainer)
// older hot upgrade sidecar container specification
var beforeContainerSpec string
// older sidecar container
var olderSidecar string
name1, name2 := sidecarcontrol.GetHotUpgradeContainerName(sidecarContainer.Name)
if nameToUpgrade == name1 {
beforeContainerSpec = util.DumpJSON(containerInPods[name2])
olderSidecar = name2
} else {
beforeContainerSpec = util.DumpJSON(containerInPods[name1])
olderSidecar = name1
}
// sidecarToUpgrade: sidecarSet.Spec.Container[x].name -> sidecar container in pod
// for example: mesh -> mesh-1, envoy -> envoy-2...
sidecarToUpgrade := make(map[string]string)
// pod.container definition changed, then update container spec in pod
if beforeContainerSpec != afterContainerSpec {
......
// 对相关的Annotation做更新
pod.Annotations[sidecarcontrol.GetPodSidecarSetVersionAnnotation(newContainer.Name)] = sidecarSet.ResourceVersion
pod.Annotations[sidecarcontrol.GetPodSidecarSetVersionAltAnnotation(newContainer.Name)] = pod.Annotations[sidecarcontrol.GetPodSidecarSetVersionAnnotation(olderSidecar)]
pod.Annotations[sidecarcontrol.GetPodSidecarSetVersionAltAnnotation(olderSidecar)] = sidecarSet.ResourceVersion
changedContainer = newContainer.Name
}
return
}
旧的sidecar容器的镜像被替换为新版本之后,容器会按照正常流程启动,启动之后会调用执行lifecycle.postStart脚本,该脚本配合业务进程自身对流量的迁移处理逻辑,完成流量从旧实例到新实例的迁移。在处理流程过程中,有两个环境变量至关重要:
SIDECARSET_VERSION:当前容器的业务代码版本。
SIDECARSET_VERSION_ALT:对侧(另外一个/影子)容器的业务代码版本。
通过这两个环境变量,业务进程可以感知到自己是新版本还是旧版本(比如:可以通过对比这俩环境变量的值的大小去判断),以便决定是否执行流量热处理逻辑。那么这两个环境变量的值是如何更新的呢?如果直接Patch Pod的container[].env字段,会导致Pod无法原地重启的,所以这里用了Kubernetes的Downward API机制,这是一种可以将Pod的一些信息传递给Container自身,但又不需要和Kubernetes做交互的方式。在Webhook的注入源码中,我们可以看到这个设置过程。
// pkg/webhook/pod/mutating/sidecarset_hotupgrade.go:56
// 生成需要热升级的sidecar容器的对象
func generateHotUpgradeContainers(container *appsv1alpha1.SidecarContainer) (*appsv1alpha1.SidecarContainer, *appsv1alpha1.SidecarContainer) {
name1, name2 := sidecarcontrol.GetHotUpgradeContainerName(container.Name)
container1, container2 := container.DeepCopy(), container.DeepCopy()
container1.Name = name1
container2.Name = name2
// set the non-working hot upgrade container image to empty, first is container2
container2.Container.Image = container.UpgradeStrategy.HotUpgradeEmptyImage
// set sidecarset.version in container env
setSidecarContainerVersionEnv(&container1.Container)
setSidecarContainerVersionEnv(&container2.Container)
return container1, container2
}
// 使用DownwardAPI方式设置环境变量,可以看到环境变量的值是从Annotation当中拿到的
func setSidecarContainerVersionEnv(container *corev1.Container) {
// inject SIDECARSET_VERSION
container.Env = append(container.Env, corev1.EnvVar{
Name: sidecarcontrol.SidecarSetVersionEnvKey,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", sidecarcontrol.GetPodSidecarSetVersionAnnotation(container.Name)),
},
},
})
// inject SIDECARSET_VERSION_ALT
container.Env = append(container.Env, corev1.EnvVar{
Name: sidecarcontrol.SidecarSetVersionAltEnvKey,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", sidecarcontrol.GetPodSidecarSetVersionAltAnnotation(container.Name)),
},
},
})
}
流量迁移完成之后,SidecarSet调谐流程中会通过对flipPodSidecarContainerDo()函数的调用,将旧版容器的image替换为hotUpgradeEmptyImage指定的镜像,相当于下线旧版服务。至此,sidecar容器的热升级流程结束,后续版本的升级过程会重复以上三个步骤。
SidecarSet将sidecar容器的管理做了标准化处理,避免了在容器平台管控层面需要根据不同需求向Pod的.spec中提前注入不同的sidecar容器的问题。本文通过对SidecarSet的源码按照SidecarSet CR的调谐过程、sidecar容器的注入过程和sidecar容器的升级过程 这三个模块做了分析,三个模块的处理流程有交叉,所以文中介绍时会有看到后面时又得回过头去看前面的现象,如果阅读过程中有不理解或者不正确之处,欢迎联系笔者再做详细交流。此外,阅读SidecarSet的源码时,建议结合SidecarSet的工作场景去阅读分析,这样更便于理解。
虽然SidecarSet实现的功能不是太复杂,但是它的功能涉及到的几个重要场景(Webhook注入、sidecar容器升级、流量迁移、滚动更新等)对我们基于Kubernetes去实现一些定制化的平台功能具有较大的借鉴意义。
参考链接:
【1】https://github.com/openkruise/samples/tree/master/hotupgrade
*文/王伟东