kubernetes1.9源码阅读 replication controller的Informer机制

replication controller是kube-controller-manager中一个重要的控制器,主要是rs进行控制,确保pods的数量恰好和rs的规定一致。因此replication controller主要对这两类进行watch,一类是replicationset,另一类是pods。本文是replication controller的源码阅读笔记,会包括client-go的Informer机制,希望帮助开始阅读kubernetes源码的小伙伴们,更希望与对kubernetes源码阅读感兴趣的小伙伴儿们交流,有错误的地方也希望能指出,共同进步。

入口程序

cmd/kube-controller-manager/controller-manager.go main

1. 调用options.NewCMServer,构建CMServer;

2. 调用app.Run方法,运行CMServer;

启动CMServer

cmd/kube-controller-manager/app/controllermanager.go Run

1. 调用createClients, 创建apiserver客户端,通过REST方式访问APIserver提供的API服务;

2. 启动协程调用go startHTTP,运行http Server;

3. 调用record.NewBroadcaster,创建eventBroadcaster对象,接收EventBroadcaster发送的event,输出到logging中,并输出到EventSink,并使用recorder记录”controller-mananger”的事件;

4. 调用CreateControllerContext,在CreateControllerContext方法中,会调用informers.NewSharedInformerFactory,创建sharedInformerFactory(client-go/informers/factory.go)对象;

5. 调用StartControllers,启动Controllers;

6. 调用ctx.InformerFactory.Start,在这里是调用sharedInformerFactory.start(client-go/informers/factory.go);

7. saTokenControllerInitFunc和NewControllerInitializers定义了controllers的InitFunc;

startReplicationController

cmd/kube-controller-manager/app/core.go startReplicationController

1. 协程启动调用replicationcontroller.NewReplicationManager,构建ReplicationManager;

(1) 调用ctx.InformerFactory.Core().V1().Pods(),这里调用sharedInformerFactory(client-go/informers/factory.go)对象的Core().V1().Pods()方法,将会构建PodInformer对象,

(2) 以此方式,创建ReplicationControllersInformer;

2. 运行ReplicationManager;

NewReplicationManager

pkg/controller/replication/replication_set.go NewReplicationManager

1. 在NewReplicationManager中,

(1) 首先,调用record.NewBroadcaster,创建eventBroadcaster对象,调用eventBroadcaster.StartLogging,接收EventBroadcaster发送的event,输出到logging中;调用 eventBroadcaster.StartRecordingToSink,event输出到EventSink,并调用eventBroadcaster.NewRecorder记录”replication-controller”的事件;

(2) 将调用NewBaseController;

2. 在NewBaseController方法中,

(1) 构建ReplicaSetController对象,包括了podControl,它定义了对Pod的操作,是由RealPodControl去调用apiserver完成创建实现;

(2) 将调用 rsInformer.Informer().AddEventHandler,这将调用rsInformer的构造函数NewReplicaSetInformer,rsInformer将event handler包装成listerner,然后添加到s.processor.listeners中,并定义对象处理的回调函数AddFunc、UpdateFunc、DeleteFunc;

(3) 同时,调用rsInformer的Lister方法;

(4) 最后,调用rsInformer.Informer().HasSynced,判断是否缓存完成;

(5) 以此方式,调用podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;

(6) 设置rsc.syncHandler;syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;

PodInformer

client-go/informers/core/v1/pod.go NewPodInformer

1. 构建cache.listWatch对象,定义了ListFunc和WatchFunc;

2. 调用cache.NewSharedIndexInformer;

NewSharedIndexInformer

client-go/tools/cache/shared_informer.go NewSharedIndexInformer

运行ReplicationManager

pkg/controller/replicaset/replica_set.go Run

1. 调用controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,将调用ca che.WaitForCacheSync;

2. 调用rsc.worker, 将启动workers调用rsc.syncHandler,syncHandler负责pod与rc的同步,确保Pod副本数与rc规定的相同;

sharedInformerFactory.Start

client-go/informers/factory.go Start

1. 调用Informer.Run,这里调用SharedIndexInformer.Run;

SharedIndexInformer.Run

client-go/tools/cache/shared_informer.go Run

1. 调用NewDeltaFIFO,创建queue;

2. 定义Deltas处理函数s.HandleDeltas;

3. 调用New(cfg),构建sharedIndexInformer的controller;

4. 调用s.cacheMutationDetector.Run,检查缓存对象是否变化;

5. 调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数;

6. 调用s.controller.Run,构建Reflector,进行对etcd的缓存;

sharedIndexedInformer.controller.Run

client-go/tools/cache/controller.go Run

1. 调用NewReflector,构建Reflector;

(1) Reflector对象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;

2. 调用r.run,将调用reflector.ListAndWatch,执行r.List、r.watch、r.watchHandler,进行对etcd的缓存;

3. 调用c.processLoop,reflector向queue里面添加数据,processLoop会不停去消费这里这些数据;

controller.processLoop

client-go/tools/cache/controller.go processLoop

1. cache.PopProcessFunc(c.config.Process)将前面Process函数传递进去;

DeltaFIFO.Pop

client-go/tools/cache/delta_fifo.go Pop

1. 主要从f.items取出object,然后调用process函数进行处理;

处理DeltaFIFO

client-go/tools/cache/shared_informer.go HandleDeltas

1. 调用s.process.distribute,将调用Listener.add,负责将watch的资源传到listener;

Listener.add/pop/run

client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;

1. listenser的add函数负责将notify装进pendingNotifications;

2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel;

3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数在ReplicaSetController注册,分别是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet;

rsc.addPod

pkg/controller/replicaset/replica_set.go addPod

1. 首先会根据pod返回rc,当pod不属于任何rc时,则返回。找到rc以后,更新rm.expectations.CreationObserved这个rc的期望值,也就是假如一个rc有4个pod,现在检测到创建了一个pod,则会将这个rc的期望值减少,变为3。然后将这个rc放入队列;

2. 调用rsc.enqueueReplicaSet,将调用rsc.queue.Add;

rsc.worker

pkg/controller/replicaset/replica_set.go worker()

1. 调用rsc.syncHandler,这里会调用rsc.syncReplicaSet,syncReplicaSet负责pod与rc的同步,确保Pod副本数与rc规定的相同;

K8S中文社区微信公众号

评论 抢沙发

登录后评论

立即登录