4000-520-616
欢迎来到免疫在线!(蚂蚁淘生物旗下平台)  请登录 |  免费注册 |  询价篮
主营:原厂直采,平行进口,授权代理(蚂蚁淘为您服务)
咨询热线电话
4000-520-616
当前位置: 首页 > 新闻动态 >
新闻详情
Kubernetes informer 原理分析 - SegmentFault 思否
来自 : segmentfault.com/a/11900000226 发布时间:2021-03-25
\"segmentfault\"/注册登录首页问答专栏资讯课程活动发现立即登录免费注册首页专栏kubernetes文章详情0Kubernetes informer 原理分析

\"\"朱伟发布于 2020-05-14

熟悉 openstack 的人都知道在 openstack 的模块内部的不同进程间是通过 rabbitmq 消息队列来传递消息的,在不同模块之间靠 http 协议交换信息,kubernetes 中 informer 机制实在太重要了,基本承担了整个 kubernetes 中的资源信息传递,不仅仅在 kubernetes 内部,当我们写一些 k8s 扩展时,只要涉及到资源信息比如 pod、node、serviceaccount等等,都逃离不了 informer 这个框架。没有它我们获取到 k8s中资源会很麻烦。有了 informer 框架可以让 k8s 各个模块至之间协作起来很优雅。

informer 框架作用

k8s 中的 informer 框架可以很方便的让每个子模块以及扩展程序拿到 k8s 中的资源信息。

informer 框架的使用

先来看一下 informer 的整体工作原理图
\"k8s
reflector:reflector 用来直接和 kuber api server 通信,内部实现了 listwatch 机制,listwatch 就是用来监听资源变化的,一个listwatch 只对应一个资源,这个资源可以是 k8s 中内部的资源也可以是自定义的资源,当收到资源变化时(创建、删除、修改)时会将资源放到 Delta Fifo 队列中。
informer:informer 是我们要监听的资源的一个代码抽象,在 controller 的驱动下运行,能够将 delta filo 队列中的数据弹出,然后保存到本地缓存也就是图中的步骤5,同时将数据分发到自定义controller 中进行事件处理也就是图中的步骤6。
indexer: indexer 能够基于一些索引函数以及对象的标签计算出索引存储到本地缓存,在自定义 controller 中处理对象时就是基于对象的索引在本地缓存将对象查询出来进行处理。

package mainimport ( \"k8s.io/client-go/informers\" )func main() { // client 是 kube api server 客户端,因为要从 kube api // server 端拉取数据, resyncpersiod 是重新拉取周期,后面会细说 sharedInformers := informers.NewSharedInformerFactory(client,ResyncPeriod) // 监听 pod 资源 podInformer := sharedInformers.Core().V1().Pods() // 监听 service 资源 servicesInformer := sharedInformers.Core().V1().Services() podLister = podInformer.Lister() servicesLister = servicesInformer.Lister() sListerSynced = sInformer.Informer().HasSynced dc.podListerSynced = podInformer.Informer().HasSynced // 启动各个资源 informer sharedInformers.Start(stopChannel)}

这这么几步就实现了资源的获取,初始化 informer 工厂实例, 设置需要监听的资源。

informer 框架的原理

就利用上面的那一段代码看一下 informer 内部是如何工作的,

informer 的启动

vender/k8s.io/client-go/informers/factory.go

// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.func NewSharedInformerFactory(clientkubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync)}// NewFilteredSharedInformerFactory constructs a new instance of sharedInformerFactory.// Listers obtained via this SharedInformerFactory will be subject to the same filters// as specified here.// Deprecated: Please use NewSharedInformerFactoryWithOptions insteadfunc NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))}// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { // 主要在初始化这个数据结构 factory := sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory}// Start initializes all requested informers.func (f *sharedInformerFactory) Start(stopCh -chan struct{}) { f.lock.Lock() defer f.lock.Unlock() // 迭代 informers 中的元素将其启动,informers 就是我们监听的资源, 比如pod、services for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } }}
informer 注册

看完上面的几段代码缺了点什么?informers 中的元素什么时候注册进来的?

sListerSynced = sInformer.Informer().HasSynced

这一行代码就将一个要监听的资源注册到了 informer 框架中,看一下这一句代码背后做了什么
kubernetes/pkg/controller/service.go
下面这段代码是 service controller 的 Run 方法

func (s *Controller) Run(stopCh -chan struct{}, workers int) { defer runtime.HandleCrash() defer s.queue.ShutDown() klog.Info(\"Starting service controller\") defer klog.Info(\"Shutting down service controller\") // 等待第一次同步的触发 if !cache.WaitForNamedCacheSync(\"service\", stopCh, s.serviceListerSynced, s.nodeListerSynced) { return } for i := 0; i workers; i++ { go wait.Until(s.worker, time.Second, stopCh) } go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh) -stopCh}

client-go/informers/core/v1/service.go
每一个资源都有一个文件有着类似的代码结构

type ServiceInformer interface { Informer() cache.SharedIndexInformer Lister() v1.ServiceLister}type serviceInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string}// NewServiceInformer constructs a new informer for Service type.// Always prefer using an informer factory to get a shared informer instead of getting an independent// one. This reduces memory footprint and number of connections to the server.func NewServiceInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { return NewFilteredServiceInformer(client, namespace, resyncPeriod, indexers, nil)}// NewFilteredServiceInformer constructs a new informer for Service type.// Always prefer using an informer factory to get a shared informer instead of getting an independent// one. This reduces memory footprint and number of connections to the server.func NewFilteredServiceInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions( options) } return client.CoreV1().Services(namespace).List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions( options) } return client.CoreV1().Services(namespace).Watch(options) }, }, corev1.Service{}, resyncPeriod, indexers, )}func (f *serviceInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredServiceInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)}// 这个段代码将 service 资源注册到了 informers 列表中func (f *serviceInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor( corev1.Service{}, f.defaultInformer)}func (f *serviceInformer) Lister() v1.ServiceLister { return v1.NewServiceLister(f.Informer().GetIndexer())}
informer 运行

注册完后看一下 informer 的启动做了什么

func (s *sharedIndexInformer) Run(stopCh -chan struct{}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) // 初始化 controller 的配置,为接下来的运行做好准备 cfg := Config{ Queue: fifo, // delta fifo 队列 ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) // 在运行 controller 之前运行了一个 processor 的协程 // informer 是在controller 的驱动下运行的,controller 会是的数据从 delta fifo 队列中弹出,弹出来要被处理的, 可以理解 controller 和 delta fifo queue 结合成为了生产者, 那么这里的 processor 就是消费者 wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don\'t want any new listeners }() // 启动controller s.controller.Run(stopCh)}

看一下 processor 这个消费者做了哪些工作,消费者是我自己命的名理解成资源处理器也可以

func (p *sharedProcessor) run(stopCh -chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { // 可以看到在这里又启动了两个协程 p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() -stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop}
func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { // this gives us a few quick retries before a long pause and then a few more quick retries err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { // 会阻塞在这里, 等待数据对象发送过来 for next := range p.nextCh { // 根据注册的回调函数进行事件处理 switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf(\"unrecognized notification: %T\", next)) } } // the only way to get here is if the p.nextCh is empty and closed return true, nil }) // the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh)}
func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan - interface{} var notification interface{} for { select { // 在往 nextCh channle 中发送数据, 上面的 run 就会被触发执行 case nextCh - notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop // 没有数据时关闭通道 nextCh = nil // Disable this select case } case notificationToAdd, ok := -p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh // 有数据到来打开通道接收 } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } }}

回头看一下 controller 在启动后做了什么

func (c *controller) Run(stopCh -chan struct{}) { defer utilruntime.HandleCrash() go func() { -stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() // 运行了 reflect, 通过informer的架构图可以知道 reflect 要和 kube api server 通信的 使用 listwatch 方式监听资源的变动事件的发生,然后同步到 delta fifo queue 中 wg.StartWithChannel(stopCh, r.Run) // 从 delta fifo queu 中弹出数据,然后交由 informer 处理 wait.Until(c.processLoop, time.Second, stopCh)}
// 通过函数名就可以看出来是处理 delta fifo queue 的func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { // 执行 informer 架构图中的 5,6两步 switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil}

通过 informer 的架构图可以知道在自定义 controller 处理资源变动的事件时是通过 workerqueue 来获取数据的,所以在本地是存在两极缓存,workerqueue 只存储了资源的索引,在真正对资源进行处理时,需要根据资源索引去 thread safe store 这个缓存中获取数据的,
文章中提到了两个 controller 概念, 其中一处提到的是自定义 controller,另一处只是使用了 controller, 自定义controller值得的开发者可以写 k8s 的 controller 扩展,通常和 CRU(自定义资源)配合,同时也只 k8s 中的 deployment、service、deamon等 controller,另一个 controller 是指 informer 框架中的 controller

参考文章

https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md

kubernetes原理controller阅读 1.8k发布于 2020-05-14 赞收藏分享本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

\"avatar\"朱伟1 声望0 粉丝关注作者0 条评论得票时间提交评论

\"avatar\"朱伟1 声望0 粉丝关注作者宣传栏 目录▲产品热门问答热门专栏热门课程最新活动技术圈酷工作移动客户端课程Java 开发课程PHP 开发课程Python 开发课程前端开发课程移动开发课程资源每周精选用户排行榜徽章帮助中心声望与权限社区服务中心合作关于我们广告投放职位发布讲师招募联系我们合作伙伴关注产品技术日志社区运营日志市场运营日志团队日志社区访谈条款服务条款隐私政策下载 App

Copyright © 2011-2021 SegmentFault. 当前呈现版本 21.03.25

浙ICP备15005796号-2浙公网安备33010602002000号ICP 经营许可 浙B2-20201554

杭州堆栈科技有限公司版权所有

本文链接: http://nextchempty.immuno-online.com/view-773096.html

发布于 : 2021-03-25 阅读(0)