//sync这里略有不同//如果操作类型为sync 则会标识该数据来源于indexer本地存储 如果indexer中的资源对象已被删除 则直接返回// Resync will send a sync event for each itemfunc (f *DeltaFIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() if f.knownObjects nil { return nil keys : f.knownObjects.ListKeys() for _, k : range keys { if err : f.syncKeyLocked(k); err ! nil { return err return nil
从事件类型以及入队列方法可以看出 这是一个带有业务功能的队列 并不是单纯的“先入先出” 入队列方法中有两个非常精巧的设计
入队列的事件会先判断该资源是否存在未被消费的事件 然后适当处理
如果 list 方法时发现该资源已经被删除了 则不再处理
pop作为消费者方法使用 从deltafifo的头部去除最早进入队列的对象数据 pod需要传入process函数 用于接收并处理对象的回调方法
//直到将一个项目添加到队列中 然后将其返回。 如果多个项目已准备就绪 它们将按其顺序返回 添加/更新的时候 要先从queue store中把对应的item删除 因此 如果未成功处理 则需要使用AddIfNotPresent 重新添加//处理功能在锁定状态下调用 因此可以安全地更新数据结构 其中需要与队列同步 例如 knownKeys 。 PopProcessFunc可能返回带有嵌套错误的ErrRequeue实例以指示当前应该重新排队该项目 等同于在锁下调用AddIfNotPresent 。// Pop返回“ Deltas” 其中包含所有内容的完整列表当对象 队列 在队列中时发生的事情。func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { return nil, ErrFIFOClosed //如果队列中没有数据 就阻塞等待 只有收到cond.Broadcast() 解除阻塞 f.cond.Wait() //当队列不为空 取出第一个元素 id : f.queue[0] f.queue f.queue[1:] if f.initialPopulationCount 0 { f.initialPopulationCount-- item, ok : f.items[id] if !ok { // Item may have been deleted subsequently. continue delete(f.items, id) //通过上层传入的process 做回调函数 如果出错 则再次放入队列 err : process(item) if e, ok : err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err e.Err // Don t need to copyDeltas here, because we re transferring // ownership to the caller. return item, errtype PopProcessFunc func(interface{}) error四、Resync机制
Resync机制会将indexer本地存储中的资源对象同步指DeltaFIFO中并且设置为Sync的操作类型 Resync函数在Reflector中定时执行 他的执行周期由NewReflector函数传入的resyncPeriod参数设定。Resync syncKeyLocked
func (f *DeltaFIFO) syncKeyLocked(key string) error { //是indexer本地存储对象 通过该对象可以获取存储的所有资源 //indexer对象在NewDeltaFIFO函数实例化DeltaFIFO对象的时候传入 obj, exists, err : f.knownObjects.GetByKey(key) if err ! nil { klog.Errorf( Unexpected error %v during lookup of key %v, unable to queue object for sync , err, key) return nil } else if !exists { klog.Infof( Key %v does not exist in known objects store, unable to queue object for sync , key) return nil // If we are doing Resync() and there is already an event queued for that object, // we ignore the Resync for it. This is to avoid the race, in which the resync // comes with the previous value of object (since queueing an event for the object // doesn t trigger changing the underlying store knownObjects . id, err : f.KeyOf(obj) if err ! nil { return KeyError{obj, err} if len(f.items[id]) 0 { return nil if err : f.queueActionLocked(Sync, obj); err ! nil { return fmt.Errorf( couldn t queue object: %v , err) return nil五、感悟
type DeltaFIFO struct { items map[string]Deltas queue []string
queueActionLocked(actionType DeltaType, obj interface{})1.生产者处理
在生产者方法中 queueActionLocked 内部通过keyof拿到对应obj的一个id items是一个map[string]Deltas 的一个map 那么就以这个id为map的key value([]Delta)的话 初始化一个[]Delta 切片并且以f.items[id]为初始切片 再添加一个Delta{actionType, obj} 然后使用dedupDeltas对newDeltas做去重操作 然后我们会对f.items[id] items这个map中是否存在这个key(id)。如果不存在就在队列中添加一个id 添加完之后将 newDeltas 赋值给items[id]
并且调用cond.Broadcast()通知所有消费者并且解除阻塞
//这里queue负责存放的是id 这里主要是负责先进先出 //items存放的是对应id的 同一个obj的多种操作如图所示 objkey1中对应obj1的先added然后updated if _, exists : f.items[id]; !exists { f.queue append(f.queue, id)f.items[id] newDeltasf.cond.Broadcast()
如图所示
这里cond.Wait() 如果队列中没有数据那么就等待 当队列中有了数据之后从queue 里面拿出来第0个id 并且从队列 queue 中删除第0个元素 然后从items中做key的有效判断 如果不存在则跳过 存在的话那么我们就需要从map中把这个key给删除掉 这里主要是为了防止比如items中obj1 有added 有updated 忽然从生产者加了一个deleted 这里删除之后即使加deleted也是新的key会重新从队列中等待 这是对应objkey1的[]delta 中只有一个Deleted的操作 然后调用传入的回调函数process 做操作
... f.cond.Wait() id : f.queue[0] f.queue f.queue[1:] if f.initialPopulationCount 0 { f.initialPopulationCount-- item, ok : f.items[id] if !ok { // Item may have been deleted subsequently. continue delete(f.items, id) err : process(item) if e, ok : err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err e.Err3.回调函数process
handleDeltas函数作为proess回调函数 当资源对象的操作类型为Added、Updated、Deleted时 将该资源对象存储值indexer 并发安全的存储 并通过distribute函数将资源对象分发至Sharedinformer distribute函数将资源对象分发到该事件的处理函数中 详情见4
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest //这里循环的目的是因为我们传入的obj是一个Deltas 也就是一个[]Deltas 那么就是一个obj的多个操作 详情见上文图一个obj1 可能有added updated for _, d : range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync : d.Type Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err : s.indexer.Get(d.Object); err nil exists { if err : s.indexer.Update(d.Object); err ! nil { return err s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err : s.indexer.Add(d.Object); err ! nil { return err s.processor.distribute(addNotification{newObj: d.Object}, isSync) case Deleted: if err : s.indexer.Delete(d.Object); err ! nil { return err s.processor.distribute(deleteNotification{oldObj: d.Object}, false) return nil4.distribute函数处理
这段代码表示了通过区分是否是sync操作 通过遍历listener 并且调用add方法把obj写入了一个addch的管道中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener : range p.syncingListeners { listener.add(obj) } else { for _, listener : range p.listeners { listener.add(obj)type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Groupfunc (p *processorListener) add(notification interface{}) { p.addCh - notification
processor 是 sharedIndexInformer 中一个非常有趣的组件 Controller Manager 通过一个 Informer 单例工厂来保证不同的 Controller 共享了同一个 Informer 但是不同的 Controller 对该共享的 Informer 注册的 Handler 不同
sharedProcessor 的工作核心是围绕着 listeners 这个 Listener 切片展开的。
当我们注册一个 Handler 到 Informer 时 最终会被转换为一个名为 processorListener
该实例主要包含两个 channel 和外面注册的 Handler 方法。而此处被实例化的 processorListener 对象最终会被添加到 sharedProcessor.listeners 列表中
type processorListener struct { nextCh chan interface{} addCh chan interface{}
看到addCh这里不得不说一下processorListener 的pop以及run
1 poplistener 之所以包含了两个 channel addCh 和 nextCh 是因为 Informer 无法预知 listener.handler 的事件消费的速度是否大于事件生产的速度 因此添加了一个名为 pendingNotifications 的缓冲队列来保存未来得及消费的事件是否存在 buffer 如果存在则把事件添加到 buffer 中 如果不存在则尝试推给 nextCh。而另一方面 会判断 buffer 中是否还有事件 如果还有存量 则不停的传递给 nextCh。pop 方法实现了一个带 buffer 的分发机制 使得事件可以源源不断的从 addCh 到 nextCh。但是问题来了 那 addCh 的事件从哪来呢。其实来源非常简单 listener 有一个 add 方法 入参是一个事件 该方法会将新事件推入 addCh 中。而调用该 add 方法的是管理所有 listener 的 sharedProcessor
func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan - interface{} var notification interface{} for { select { case nextCh - notification: // Notification dispatched var ok bool notification, ok p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh nil // Disable this select case case notificationToAdd, ok : -p.addCh: if !ok { return if notification nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification notificationToAdd nextCh p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd)2 run
因为 listener 包含了 Controller 注册进来的 Handler 方法 因此 listener 最重要的职能就是当事件发生时来触发这些方法 而 listener.run 就是不停的从 nextCh 这个 channel 中拿到事件并执行对应的 handler可以看到 listener.run 不停的从 nextCh 这个 channel 中拿到事件 但是 nextCh 这个 channel 里的事件又是从哪来的呢 listener.pop 的职责便是将事件放入 nextCh 中
func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh : make(chan struct{}) wait.Until(func() { // this gives us a few quick retries before a long pause and then a few more quick retries err : wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { for next : range p.nextCh { switch notification : next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf( unrecognized notification: %T , next)) // the only way to get here is if the p.nextCh is empty and closed return true, nil // the only way to get here is if the p.nextCh is empty and closed if err nil { close(stopCh) }, 1*time.Minute, stopCh)3 addch总结
从图中可以看出当有一个新的event 这里新的event指的是从delta队列中通过distribute分发给所有Listener通过调用add 来对addch有一个写入的操作 然后通过缓存判定 也就是为了防止listener.handler生产速度大于消费速度 这里加了一层缓存层 如果没有缓存直接写入nextch 有缓存就写入缓存还会对缓存的剩余量做判断 并执行对应的 handler
总体来看 DeltaFIFO 的入队列方法 会先判断该资源是否已经在 items 中 如果已经存在 说明该资源还没有被消费 还在 queue 中排队 则直接将事件 append 到 items[resource_id] 中即可。如果发现不在 items 中 便会创建 items[resource_id] 并将资源 id append 到 queue 中。
而 DeltaFIFO 出队列方法 会从 queue 中拿到队列最前面的资源 id 然后从 items 中拿走该资源所有的事件 最后调用 Pop 方法传入的 PopProcessFunc 类型的处理函数。
因此 DeltaFIFO 的特点在于 入队列的是 资源的 事件 而出队列时是拿到的是最早入队列的资源的所有事件。这样的设计保证了不会因为有某个资源疯狂的制造事件 导致其他资源没有机会被处理而产生饥饿
本文链接: http://nextchempty.immuno-online.com/view-773095.html