kubernetes1.9源码阅读 List-Watch及Reflector机制

List-Watch是kubernetes的核心机制。组件kubelet、kube-controller-manager、kube-scheduler需要监控各种资源(pod、service等)的变化,当这些对象发生变化时(add、delete、update),kube-apiserver会主动通知这些组件。这个过程类似一个发布-订阅系统。本文章将从代码角度探究一下list-watch的实现方式。

本次分析是基于kubernetes tag v1.9.0

kube-apiserver对etcd的List-watch机制

构建PodStorage

pkg/registry/core/pod/storage.go NewStorage()

  1. kube-apiserver针对每一类资源(pod、service、endpoint、replication controller、depolyments),都会创建Storage对象,如:PodStorage。PodStorage.Pod.Store封装了对etcd的操作;
  2. RESTOptions包括Decorator对象,它是针对storage的一个装饰器;

创建StorageDecorator

apiserver/registry/storage_factory.go

  1. 通过StorageFactory工厂模式创建Cacher对象,返回StorageDecorator,StorageDecorator即为带有cache的storage;

创建Cacher

apiserver/pkg/storage/cacher.go NewCacherFromConfig()

  1. Cacher对象,主要的数据成员:storage、watchCache、reflector、watchers及incoming channel;
    1. watchCache是一个cache,用来存储apiserver从etcd那里watch到的对象;
    2. watchers是一个map,map的值类型为cacheWatcher,当kubelet、kube-scheduler需要watch某类资源时,他们会向kube-apiserver发起watch请求,kube-apiserver就会生成一个cacheWatcher,cacheWatcher负责将watch的资源通过http从apiserver传递到kubelet、kube-scheduler;
    3. Reflector对象,主要数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();listerWatcher包装了Storage,主要是将watch到的对象存到watchCache中;
    4. incoming channel接收watchCacheEvent;
  2. 注册cacher.processEvent方法,协程调用cacher.dispatchEvents();
  3. 协程调用cacher.startCaching();

StartCaching

client-go/tools/cache/reflector.go ListAndWatch()

  1. ListAndWatch()方法
    1. 首先,创建watchCache对象和cacheListerWatcher对象,cacheListWatcher对象是ListerWatcher接口实现,实现了List()和Watch()方法;
    2. 之后,执行cacheListerWatcher的List()方法和Watch()方法;
    3. 最后,调用reflector的watchHandler()方法;

List/Watch

apiserver/pkg/storage/cacher.go List()和Watch();

  1. List()方法将调用storage.List()方法;
  2. Watch()方法将调用storage.watch()方法;

Storage List/Watch

apiserver/pkg/storage/etcd/etcd_helper.go

  1. etcdHelper对象是Storage接口对象的实现;
  2. etcdHelper的List()方法:
    1. 获取etcd的对象,包括resourceVersion信息;
  3. etcdHelper的WatchList()方法:
    1. 创建etcdWatcher;
    2. etcdWatcher对象,实现了Watch接口;
    3. etcdWatcher对象,主要的数据成员是etcdIncoming channel和outgoing channel;
  4. 协程执行etcdWatcher.translate()
  5. 最后,协程运行etcdWatcher.etcdWatch()

etcdWatcher.etcdWatch

apiserver/pkg/storage/etcd/etcd_watcher.go etcdWatch()

  1. 如果resourceVersion==0, 运行etcdGetInitialWatchState(),获取所有的pods,并将结果输入到etcdIncoming channel;
  2. 之后,不停的调用watcher.Next(),并将结果输入到etcdIncoming channel;

etcdWatcher.translate

apiserver/pkg/storage/etcd/etcd_watcher.go translate()

  1. 读取etcdIncoming channel信息;
  2. 调用etcdWatcher.sendResult()进行转化;
  3. 输入到outgoing channel;

reflector.watchHandler

client-go/tools/cache/reflector.go watchHandler()

  1. 读取outgoing channel信息,更新watchCache;

更新watchCache

apiserver/pkg/storage/watch_cache.go Add()/Delete()/Get()/Update()

处理事件processEvent

apiserver/pkg/storage/watch_cache.go processEvent()

  1. 创建watchCacheEvent
  2. 调用watchCache.updateCache(),更新cache;

kube-apiserver的watch功能

kube-apiserver的watch功能是作为一个restful api提供给其他组件(kubelet、kube-controller-manager、kube-scheduler、kube-proxy)。watch的处理流程和PUT、DELETE、GET等REST API处理流程类似。

registerResourceHandlers

apiserver/pkg/endpoints/installer.go registerResourceHandlers()

ListResource

apiserver/pkg/endpoints/handlers/rest.go ListResource()

1.调用rest.watcher.watch()方法,这里将会调用Store.watch()

2.调用serveWatch()方法;

Store.watch

apiserver/pkg/registry/generic/registry/store.go watch()/watchPredicate()

  1. 会调用Storage.Watch()方法,这里将会调用Cacher.watch()

Cacher.watch

apiserver/pkg/storage/cacher.go watch()/watchList()

  1. 首先,调用newCacheWatcher生成一个watcher,并将watcher插入到cacher.watchers中去;
  2. 协程调用cacheWatcher.process()方法,此方法将会操作cacheWatcher的input channel的消息;

3. 会将watchCacheEvent通过add()添加到cacheWatcher的input channel中;

操作input channel

apiserver/pkg/storage/cacher.go process()

  1. 读取input channel的信息,并调用sendWatchCacheEvent()方法;

sendWatchCacheEvent

apiserver/pkg/storage/cacher.go sendWatchCacheEvent()

  1. kube-apiserver的watch会带过滤的功能;
  2. 对watchCacheEvent进行Filter,输出到cacheWatcher的result channel中;

serveWatch

apiserver/pkg/endpoints/handlers/rest.go serveWatch()

对result Channel信息进行序列化,并发送给调用者;

 

相关阅读:

1. http://licyhust.com/

K8S中文社区微信公众号

评论 抢沙发

登录后评论

立即登录