4000-520-616
欢迎来到免疫在线!(蚂蚁淘生物旗下平台)  请登录 |  免费注册 |  询价篮
主营:原厂直采,平行进口,授权代理(蚂蚁淘为您服务)
咨询热线电话
4000-520-616
当前位置: 首页 > 新闻动态 >
新闻详情
记一次在deployment中添加灰度暂停功能_ythunder的博客-CSDN博客
来自 : CSDN技术社区 发布时间:2021-03-25
/* 处理obj中存储的需要处理的对象 1. 处理类型为sync/add/update时 如果对象存在于cache 则取出并更新。 如果不存在 则添加到cache。 处理类型为delete时 直接删除cache对象 2. 然后都调用distribute,把处理对象添加到sharedIndexInformer.sharedProcesser.listener数组中每个元素的addCh中func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { // from oldest to newest for _, d : range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync : d.Type Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err : s.indexer.Get(d.Object); err nil exists { s.indexer.Update(d.Object) s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { err : s.indexer.Add(d.Object) s.processor.distribute(addNotification{newObj: d.Object}, isSync) case Deleted: s.indexer.Delete(d.Object) s.processor.distribute(deleteNotification{oldObj: d.Object}, false) return nil

在看step2. 执行启动mutationDetector.Run,以processorStopCh为退出标志
这个没太明白是什么

然后看step3 调用processor.Run()

func (p *sharedProcessor) run(stopCh -chan struct{}) { func() { for _, listener : range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) p.listenersStarted true -stopCh// listener.run如下 将for循环获取processLister.nextCh的内容 并根据事件类型调用对应的回调函数func (p *processorListener) run() { wait.Until(func() { err : wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { for next : range p.nextCh { switch notification : next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf( unrecognized notification: %T , next)) return true, nil // the only way to get here is if the p.nextCh is empty and closed if err nil { close(stopCh) }, 1*time.Minute, stopCh)

最后是step4 s.controller.Run()。informer的主要运行逻辑都在这里

func (c *controller) Run(stopCh -chan struct{}) { // reflector存储了之前config中初始化的部分 r : NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, c.reflector r // r.Run中主要是调用ListWatcher接口 也是单起协程来循环处理的 wg.StartWithChannel(stopCh, r.Run) // 主要从之前定义的队列中或者item并根据注册的处理方法处理 wait.Until(c.processLoop, time.Second, stopCh)// 关于r.Run 最终调用func (r *Reflector) ListAndWatch(stopCh -chan struct{}) // 单独看下c.ProcessLoop函数// 如下 将开启for循环 从queue中pop对象并对其执行c.config.Process函数(即初始化时注册的 HandleDeltas)func (c *controller) processLoop() { for { obj, err : c.config.Queue.Pop(PopProcessFunc(c.config.Process))

总结(以deploymentController为例)
kube-controller-manager启动为
1. 依次初始化各类型的controller controller中会向全局sharedInformerFactory注册一些关注的Informer 例如deploymentController启动时会注册DeploymentInformer、ReplicasSetInformer、PodInformer三种。 其他controller如果需要 则无需重复注册。
2. 启动controller 启动一个loop循环执行processNextWorkItem 即从deployment.queue中获取item 并调用syncHandler处理(syncHandler被初始化为syncDeployment函数)
3. 启动informer informer中包含两个重要部分
1) controller
启动reflector, 主要工作是调用List接口更新一次cache(在数据量大时 这里会做切片) 然后循环调用watcher获取对象变更信息经过hash处理后存入deltaqueue。继而启动controller.processLoop 主要工作从deltafifo拿出节点执行HandlerDeltas。HandlerDeltas一则将数据更新到cache, 二则将数据分发给processor
1) sharedProcessor
processor这边由addChannal接收来自controller分发的数据 processor中有用户注册多种类型Event的回调处理函数。启动prcessor.run中 将不断从addChannal中 获取数据 并添加到buffer中。 另一个select从buffer中取数据后 调用已注册的相应的回调函数。 这些回调函数基本都有一个共同的操作就是调用enqueueDeployment()将deployment对象信息入队到deploy.queue中 供第2部分逻辑pop执行sync.

syncDeployment 同步逻辑

syncDeployment代码阅读
(其中会讲到 滚动更新过程的步长计算逻辑)

如何在deploy中添加灰度暂停

看这里之前请读清楚上面内容
如上 deploymentController将对每个更新后的deployment对象执行syncDeployment 其中有代码

func (dc *DeploymentController) syncDeployment(key string) error { //暂停态时 执行sync同步状态 if d.Spec.Paused { return dc.sync(d, rsList) //根据两种发布策略检查并更新deployment到最新状态 switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(d, rsList)

滚动更新是一个多次滚动的过程 一个deployment的滚动更新通常会被多次执行syncDeployment 由代码又知:如果遇到deployment.spec.paused标志 将执行return dc.sync()从而不会进行下一次步长更新。

所以这次的灰度暂停 设计思路为 用户通过deployment.annotation设置期望灰度值 在到达灰度期望值后 设置paused来阻止下一次步长更新。

初版设计及测试

灰度数量通过annotation指定 下面函数获取灰度值
pkg/controller/deployment/util/deployment_util.go中添加逻辑

func Canary(deployment apps.Deployment) int32 { //TODO 注释规范化 canaryStr支持数字和百分号 canaryStr : deployment.Annotations[ canary ] canary, _ : strconv.ParseInt(canaryStr, 10, 64) return int32(canary)

在计算扩容数量时加入下列代码 会同时参考灰度期望值 保证本次扩容数量不超过灰度期望值。
pkg/controller/deploy/rolling.go添加

//rolloutRolling函数添加if deploymentutil.IsCanaryComplete(d, allRSs, newRS) { if err : dc.CanaryPauseDeployment(d); err ! nil { return err//scaleDownOldReplicaSetsForRollingUpdate函数添加。如果newRs到达灰度值 那么需要调整maxUnavailable为0 防止旧实例缩容太多。(目的为 灰度暂停后 新实例 旧实例 期望实例数)newRs : deploymentutil.FindNewReplicaSet(deployment, allRSs); if newRs ! nil { if *(newRs.Spec.Replicas) deploymentutil.Canary(*deployment) { maxUnavailable 0

pkg/controller/deployment/util/deployment_util.go

//NewRSNewReplicas函数添加。计算newRs扩容数量时 参考灰度值。 // TODO(完成) canary和计算值间选择较小的 canary : Canary(*deployment) oldActiveRs, _ : FindOldReplicaSets(deployment, allRSs) if canary ! 0 len(oldActiveRs) 0 canary *(deployment.Spec.Replicas) *(newRS.Spec.Replicas) canary { scaleUpCount int32(integer.IntMin(int(scaleUpCount), int(canary-*(newRS.Spec.Replicas))))// 判断达到灰度完成条件func IsCanaryComplete(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) bool { var isCanaryComplete false canaryCount : Canary(*deployment) if deployment.Spec.Strategy.Type apps.RollingUpdateDeploymentStrategyType canaryCount ! 0 { ReplicasCount : GetReplicaCountForReplicaSets(allRSs) if *(newRS.Spec.Replicas) canaryCount ReplicasCount *deployment.Spec.Replicas { isCanaryComplete true return isCanaryComplete

最后在达到灰度条件时 打暂停标志

pkg/controller/deployment/sync.go添加

//添加函数func (dc *DeploymentController) CanaryPauseDeployment(deployment *apps.Deployment) error { dpCopy : deployment.DeepCopy() dpCopy.Spec.Paused true _, err : dc.client.AppsV1().Deployments(dpCopy.Namespace).Update(dpCopy) if err nil { dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, CanaryPauseDeployment , Pause Deployment %s , deployment.Name) return err
遇到问题及原因(sync函数引起的比例扩缩问题)

在以上改动后 重新编译运行了kube-controller-manager组件 此时 kubectl edit deployment的模板信息使其滚动更新。

此时实例数为10 maxSurge为2 maxUavalible为1 灰度数量为3.
期望状态为 新实例为3 旧实例为7, deployment.spec.paused为true
实际状态为 新实例为3 旧实例为9 deployment.spec.paused为true

重读代码 发现是在暂停发起后

错误原因解决方案

pkg/controller/deployment/sync.go添加

//减去逻辑 scale函数allRSsReplicas : deploymentutil.GetReplicaCountForReplicaSets(allRSs) allowedSize : int32(0) if *(deployment.Spec.Replicas) 0 { allowedSize *(deployment.Spec.Replicas) deploymentutil.MaxSurge(*deployment)deploymentReplicasToAdd : allowedSize - allRSsReplicas//添加逻辑//TODO(完成) allowedSize设置为目前deployment数-replicas参考数 var deploymentReplicasToAdd int32 oldDesired, ok : deploymentutil.GetDesiredReplicasAnnotation(newRS) if ok { deploymentReplicasToAdd *(deployment.Spec.Replicas) - oldDesired } else { deploymentReplicasToAdd 0
statefulSet灰度对比 partation实现

本文链接: http://nextchempty.immuno-online.com/view-773092.html

发布于 : 2021-03-25 阅读(0)