List-Watch是kubernetes的核心机制。组件kubelet、kube-controller-manager、kube-scheduler需要监控各种资源(pod、service等)的变化,当这些对象发生变化时(add、delete、update),kube-apiserver会主动通知这些组件。这个过程类似一个发布-订阅系统。本文章将从代码角度探究一下list-watch的实现方式。
本次分析是基于kubernetes tag v1.9.0
kube-apiserver对etcd的List-watch机制
构建PodStorage
pkg/registry/core/pod/storage.go NewStorage()
- kube-apiserver针对每一类资源(pod、service、endpoint、replication controller、depolyments),都会创建Storage对象,如:PodStorage。PodStorage.Pod.Store封装了对etcd的操作;
- RESTOptions包括Decorator对象,它是针对storage的一个装饰器;
创建StorageDecorator
apiserver/registry/storage_factory.go
- 通过StorageFactory工厂模式创建Cacher对象,返回StorageDecorator,StorageDecorator即为带有cache的storage;
创建Cacher
apiserver/pkg/storage/cacher.go NewCacherFromConfig()
- Cacher对象,主要的数据成员:storage、watchCache、reflector、watchers及incoming channel;
- watchCache是一个cache,用来存储apiserver从etcd那里watch到的对象;
- watchers是一个map,map的值类型为cacheWatcher,当kubelet、kube-scheduler需要watch某类资源时,他们会向kube-apiserver发起watch请求,kube-apiserver就会生成一个cacheWatcher,cacheWatcher负责将watch的资源通过http从apiserver传递到kubelet、kube-scheduler;
- Reflector对象,主要数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();listerWatcher包装了Storage,主要是将watch到的对象存到watchCache中;
- incoming channel接收watchCacheEvent;
- 注册cacher.processEvent方法,协程调用cacher.dispatchEvents();
- 协程调用cacher.startCaching();
StartCaching
client-go/tools/cache/reflector.go ListAndWatch()
- ListAndWatch()方法
- 首先,创建watchCache对象和cacheListerWatcher对象,cacheListWatcher对象是ListerWatcher接口实现,实现了List()和Watch()方法;
- 之后,执行cacheListerWatcher的List()方法和Watch()方法;
- 最后,调用reflector的watchHandler()方法;
List/Watch
apiserver/pkg/storage/cacher.go List()和Watch();
- List()方法将调用storage.List()方法;
- Watch()方法将调用storage.watch()方法;
Storage List/Watch
apiserver/pkg/storage/etcd/etcd_helper.go
- etcdHelper对象是Storage接口对象的实现;
- etcdHelper的List()方法:
- 获取etcd的对象,包括resourceVersion信息;
- etcdHelper的WatchList()方法:
- 创建etcdWatcher;
- etcdWatcher对象,实现了Watch接口;
- etcdWatcher对象,主要的数据成员是etcdIncoming channel和outgoing channel;
- 协程执行etcdWatcher.translate()
- 最后,协程运行etcdWatcher.etcdWatch()
etcdWatcher.etcdWatch
apiserver/pkg/storage/etcd/etcd_watcher.go etcdWatch()
- 如果resourceVersion==0, 运行etcdGetInitialWatchState(),获取所有的pods,并将结果输入到etcdIncoming channel;
- 之后,不停的调用watcher.Next(),并将结果输入到etcdIncoming channel;
etcdWatcher.translate
apiserver/pkg/storage/etcd/etcd_watcher.go translate()
- 读取etcdIncoming channel信息;
- 调用etcdWatcher.sendResult()进行转化;
- 输入到outgoing channel;
reflector.watchHandler
client-go/tools/cache/reflector.go watchHandler()
- 读取outgoing channel信息,更新watchCache;
更新watchCache
apiserver/pkg/storage/watch_cache.go Add()/Delete()/Get()/Update()
处理事件processEvent
apiserver/pkg/storage/watch_cache.go processEvent()
- 创建watchCacheEvent
- 调用watchCache.updateCache(),更新cache;
kube-apiserver的watch功能
kube-apiserver的watch功能是作为一个restful api提供给其他组件(kubelet、kube-controller-manager、kube-scheduler、kube-proxy)。watch的处理流程和PUT、DELETE、GET等REST API处理流程类似。
registerResourceHandlers
apiserver/pkg/endpoints/installer.go registerResourceHandlers()
ListResource
apiserver/pkg/endpoints/handlers/rest.go ListResource()
1.调用rest.watcher.watch()方法,这里将会调用Store.watch();
2.调用serveWatch()方法;
Store.watch
apiserver/pkg/registry/generic/registry/store.go watch()/watchPredicate()
- 会调用Storage.Watch()方法,这里将会调用Cacher.watch();
Cacher.watch
apiserver/pkg/storage/cacher.go watch()/watchList()
- 首先,调用newCacheWatcher生成一个watcher,并将watcher插入到cacher.watchers中去;
- 协程调用cacheWatcher.process()方法,此方法将会操作cacheWatcher的input channel的消息;
3. 会将watchCacheEvent通过add()添加到cacheWatcher的input channel中;
操作input channel
apiserver/pkg/storage/cacher.go process()
- 读取input channel的信息,并调用sendWatchCacheEvent()方法;
sendWatchCacheEvent
apiserver/pkg/storage/cacher.go sendWatchCacheEvent()
- kube-apiserver的watch会带过滤的功能;
- 对watchCacheEvent进行Filter,输出到cacheWatcher的result channel中;
serveWatch
apiserver/pkg/endpoints/handlers/rest.go serveWatch()
对result Channel信息进行序列化,并发送给调用者;
相关阅读:
1. http://licyhust.com/
登录后评论
立即登录 注册