深入浅出kubernetes之device-plugins

记得大学刚毕业那年看了侯俊杰的《深入浅出MFC》,就对深入浅出这四个字特别偏好,并且成为了自己对技术的要求标准——对于技术的理解要足够的深刻以至于可以用很浅显的道理给别人讲明白。以下内容为个人见解,如有雷同,纯属巧合,如有错误,烦请指正。

本文基于kubernetes1.11版本,后续会根据kubernetes版本更新及时更新文档,所有代码引用为了简洁都去掉了日志打印相关的代码,尽量只保留有价值的内容。

延续我个人风格,在个人发挥前,先看看官方对于device-plugins的定义是什么?

Starting in version 1.8, Kubernetes provides a device plugin framework for vendors to advertise their resources to the kubelet without changing Kubernetes core code. Instead of writing custom Kubernetes code, vendors can implement a device plugin that can be deployed manually or as a DaemonSet. The targeted devices include GPUs, High-performance NICs, FPGAs, InfiniBand, and other similar computing resources that may require vendor specific initialization and setup.

大概意思是:从kubernetes1.8版本开始,提供了设备插件框架,设备厂商无需修改kubernetes核心代码就可以将自己生产的设备的资源(kubernetes可管理的资源包括CPU、内存和存储资源)可以让kubelet使用(这一点与操作系统一样,所有设备厂商自己实现驱动)。设备厂商可以自己人工或者以DaemonSet方式部署,而不是定制kubernetes代码。目标设备包括GPU、高性能NIC(网络接口卡)、FPGA、InfiniBand以及其他类似的需要厂商指定初始化和安装的计算资源。上文引用自kubernetes官方文档,读者可以自行了解一下官方对于device-plugin的说明,如下图所示(用图比链接好,担心链接以后会变):

了解device-plugin的是什么了,接下来就是看看kubernetes是如何实现并工作的。我写这篇文章的核心目的是了解kubernetes如何管理GPU的,因为我的项目需要一个集群同时管理CPU和GPU,根据用户的需求选择合适的资源计算。所以,后面所有的分析都是以GPU为例,读者如果需要了解其他类型的设备根据本文的思路自行分析即可。

好了,我们可以进入正题了。让我们先忘记一部分内容,看看下面这个图:

 

如果我作为kubernetes开发者,思路是由kubelet汇总所有的资源,然后在汇总到管理端,kubernetes也就是apiserver。当创建Pod时,请求会发送给scheduler,scheduler根据节点状态选择一个最优的节点,最后由最优节点的kubelet创建这个Pod。嗯,这个思路应该没什么大毛病,至少我开发的一个分布式计算系统采用的就是这个方式,没问题!好,我们先假设这个想法是就是kubernetes的设计方案,此处我们不讲内存、CPU、存储这些资源是kubelet是怎么获取的,因为本文的重点是device-plugins,我们只说GPU这个kubelet是怎么获取的。

上面说到了,kubernetes有设备插件框架,那这个框架又是什么样的呢?说白了也很简单,就是kubernetes定义了一套机制和接口,各设备厂商按照协议开发就可以了,这个和Linux驱动原理是一样的,只是实现方式不一样而已。我们来看看kubernetes是怎么实现的,首先我们先说说机制:

  1. 厂商自行实现一个管理设备资源的程序,部署到相应的节点上,我们称之为插件;
  2. 插件需要向kubelet注册,注册内容要包含自己的endpoint(endpoint就是一个用于通信的地址)以及一些其他信息(后面会说明);
  3. kubelet连接插件的endpoint,就此kubelet和插件就建立了联系;
  4. kubelet监听/var/lib/kubelet/device-plugins/kubelet.sock(unix sockets)这个地址,插件监听的也是类似的地址,只是地址变成了/var/lib/kubelet/device-plugins/gpu.sock(举个例子)

以上是插件如何让kubernetes发现自己,接下来就是插件和kubelet之间的通信接口了,kubelet与插件采用grpc通信,通信接口定义在kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto(不了解protobuf和grpc的同学请自行学习)中。文件中定义了两个用于通信的接口:

  service Registration {
 rpc Register(RegisterRequest) returns (Empty) {}
}
service DevicePlugin {
 rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
 rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
 rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
 rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

第一个接口用于插件向kubelet注册,kubelet是服务端,插件是客户端;第二个接口是kubelet向插件索要支持,kubelet是客户端,插件是服务端,我们会在后面的代码分析中证明这一点。我们接下来看看插件注册的时候需要提供哪些信息(代码源于api.proto)?

 message RegisterRequest {
 string version = 1; // 版本信息
 string endpoint = 2; // 插件的endpoint
 string resource_name = 3; // 资源名称
 DevicePluginOptions options = 4; // 插件选项
}
// 那插件选项又包含什么呢?
message DevicePluginOptions {
 bool pre_start_required = 1; // 启动容器前是否调用DevicePlugin.PreStartContainer()
}

version、endpoint、resource_name都比较好理解,pre_start_required 不是很明确,其实就是启动容器前先通知插件做一下准备,多一个机制扩展性要好一些。接下来我们看看插件能为kubelet提供什么样的服务,亦或说kubelet需要插件提供什么样的功能。

1.rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {},这个和注册提供的信息是一样的,只是变成了kubelet可以再获取;

2.rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {},这个就是kubelet启动容器前调用的,其中PreStartContainerRequest,PreStartContainerResponse定义如下,非常简单:

message PreStartContainerRequest{

message PreStartContainerRequest

{    

repeated string devicesIDs = 1; // 需要使用设备的所有ID,数组形式}
message PreStartContainerResponse { // 什么也没有

}

3.rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {},kubelet监听设备变化,一旦有设备更新,插件就会通知kubelet,注意此处返回是stream类型。我们再来看看插件返回的结果都有什么?

 message ListAndWatchResponse {
 repeated Device devices = 1; //设备数组
}
message Device {
 string ID = 1; // 设备唯一ID
 string health = 2; // 设备健康情况,就是是好的还是坏的
}
// 内容非常少,其实内容越少弹性越大,内容越多约束越多。

4.rpc Allocate(AllocateRequest) returns (AllocateResponse) {},这个就是kubelet向插件申请资源的接口了,申请资源需要提供如下信息:

message AllocateRequest {message AllocateRequest {    
repeated ContainerAllocateRequest container_requests = 1;
}
message ContainerAllocateRequest {    
repeated string devicesIDs = 1; // 设备ID数组
} 

从上面的类型可以看出申请资源接口可以同时为多个容器申请资源,AllocateRequest .container_requests代表的是多个容器对于资源的需求,ContainerAllocateRequest.devicesIDs是一个容器对于资源的需求。我们再来看看插件返回给kubelet什么信息?

 message AllocateResponse {
 repeated ContainerAllocateResponse container_responses = 1;
}
message ContainerAllocateResponse {
 map<string, string> envs = 1; // 环境变量,需要为容器添加这些环境变量
 repeated Mount mounts = 2; // 挂载信息
 repeated DeviceSpec devices = 3; // 设备信息
 map<string, string> annotations = 4; // 需要加入到容器的annotations字段
}
message Mount {
 string container_path = 1; // 设备在容器中的路径
 string host_path = 2; // 设备在宿主机上的路径
 bool read_only = 3; // 是否只读
}
message DeviceSpec {
 string container_path = 1; // 设备在容器中的路径
 string host_path = 2; // 设备在宿主机上的路径
 string permissions = 3; // 访问设备需要的权限
}

和AllocateRequest一样,返回的申请结果也是多容器的。综合以上信息,我们要把图调整一下:

接下来我们就要探索一下kubelet是如何管理者插件以及相关资源的,开启我们的代码分析之旅啦,我个人的理解会用注释混合在代码中!我们先从kubernetes/pkg/kubelet/cm/devicemanager包开始,不要问我为什么从这里开始,反正从字面意思看这里是设备管理器就是了(cm是容器管理器的意思)~首先我们来看看kubernetes对于设备管理器的定义是什么(源于kubernetes/pkg/kubelet/cm/devicemanager/types.go)?

 type Manager interface {
 Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
 Devices() map[string][]pluginapi.Device
 Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
 Stop() error
 GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)
 GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
}

Manager是个interface类型,而实现在kubernetes/pkg/kubelet/cm/devicemanager/manager.go文件中的ManagerImpl,这名字起得非常易懂!知道实现的地方了,我们就要用代码证明前面的说法,至少先找到kubelet监听/var/lib/kubelet/device-plugins/kubelet.sock的地方吧?

 func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
 ......
 // 创建socket目录,也就是/var/lib/kubelet/device-plugins/
 socketPath := filepath.Join(m.socketdir, m.socketname)
 os.MkdirAll(m.socketdir, 0755)
 // 删除socket目录下的所有文件(文件夹除外)
 if err := m.removeContents(m.socketdir); err != nil {
 glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
 }
 // 监听socket文件
 s, err := net.Listen("unix", socketPath)
 if err != nil {
 glog.Errorf(errListenSocket+" %+v", err)
 return err
 }
 // 创建grpc的server端
 m.wg.Add(1)
 m.server = grpc.NewServer([]grpc.ServerOption{}...)
 // 将ManagerImpl注册为Registration这个grpc接口的服务端处理器
 pluginapi.RegisterRegistrationServer(m.server, m)
 // 开启协程启动grpc服务端
 go func() {
 defer m.wg.Done()
 m.server.Serve(s)
 }()
 return nil
}

上面代码足以证明kubelet是通过devicemanager.Manager实现的插件注册服务,其中m.socketdir和m.socketname是如何赋值的请看下面的代码:

 //以下代码来自kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func NewManagerImpl() (*ManagerImpl, error) {
 return newManagerImpl(pluginapi.KubeletSocket)
}
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
 if socketPath == "" || !filepath.IsAbs(socketPath) {
 return nil, fmt.Errorf(errBadSocket+" %v", socketPath)
 }
 dir, file := filepath.Split(socketPath)
 manager := &ManagerImpl{
 endpoints: make(map[string]endpoint),
 socketname: file,
 socketdir: dir,
 healthyDevices: make(map[string]sets.String),
 unhealthyDevices: make(map[string]sets.String),
 allocatedDevices: make(map[string]sets.String),
 pluginOpts: make(map[string]*pluginapi.DevicePluginOptions),
 podDevices: make(podDevices),
 }
 ......
}
//以下代码来自kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go
const (
 Healthy = "Healthy"
 Unhealthy = "Unhealthy"
 Version = "v1beta1"
 DevicePluginPath = "/var/lib/kubelet/device-plugins/"
 KubeletSocket = DevicePluginPath + "kubelet.sock"
 KubeletPreStartContainerRPCTimeoutInSecs = 30
)

上面的代码不需要我多说,非常简单,在实例化ManagerImpl的时候把socket路径传进去了,关键是要找到代码的位置。socket路径已经通过常数方式定义了,所以想通过配置选项调整也是不可能了,除非自己修改代码。下面就是分析当插件注册后Manager是如何处理的,位置也非常好找,只要找到ManagerImpl的Register函数就可以了。因为在启动grpc服务的时候将ManagerImpl注册为插件注册服务的处理器,那么ManagerImpl就必须要有相应的注册函数。下面是相应的代码:

 // 代码来自kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
 metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
 // 检测插件是否是兼容的版本,此处就用到了api.proto中定义的注册信息中的版本
 // 从代码中可以看出,kubelet是可以兼容多个版本的,当前只兼容一个版本
 // var SupportedVersions = [...]string{"v1beta1"},这段代码定义在
 // kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go文件中
 var versionCompatible bool
 for _, v := range pluginapi.SupportedVersions {
 if r.Version == v {
 versionCompatible = true
 break
 }
 }
 // 如果版本不兼容报错,因为插件和kubelet是两个独立的工程,版本校验非常重要
 if !versionCompatible {
 errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
 return &pluginapi.Empty{}, fmt.Errorf(errorString)
 }
 // 校验资源名称的合法性,乍一看以为是判断是不是扩展资源(Is ExtendResource Name)的名称呢,但是分析逻辑是
 // 错误的,是扩展资源的名字反而报错了。其实这里就是判断资源名称的合法性(Is Extend ResourceName),
 // kubernetes为资源定义了格式,vendor/device,比如nvidia.com/gpu
 // kubernetes的native资源格式kubernetes.io/name此处资源名称不能以kubernetes.io/开头
 // 同时也不能以requests.开头,因为这是kubernetes默认的资源请求前缀
 if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
 errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
 return &pluginapi.Empty{}, fmt.Errorf(errorString)
 }
 // 开启协程添加插件,此处为什么要开协程?我猜是需要创建与插件的连接,一个协程内
 // 形成rpc的调用者被调用的环形操作,容易形成死锁
 go m.addEndpoint(r)
 return &pluginapi.Empty{}, nil
}

ManagerImpl通过注册信息添加endpoint,一个endpoint就代表一个插件,下面是添加插件的代码:

 // 代码源自kubernetes/pkg/kubelet/cm/devicemanager/manager.go
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
 // 申请内存用于存储已经存在的设备
 existingDevs := make(map[string]pluginapi.Device)
 m.mutex.Lock()
 // 看看插件是不是已经注册过了,判断方法就是资源名称(唯一)
 old, ok := m.endpoints[r.ResourceName]
 if ok && old != nil {
 // 如果是已经注册过的插件,获取插件所有的设备,并且标记为不健康状态
 evices := make(map[string]pluginapi.Device)
 for _, device := range old.getDevices() {
 device.Health = pluginapi.Unhealthy
 devices[device.ID] = device
 }
 // 这些已经注册过的插件的设备存储在临时变量中,作为已经存在的设备
 existingDevs = devices
 }
 m.mutex.Unlock()
 // 在ManagerImpl中用endpointImpl表一个插件,要为新注册的插件创建一个
 // 新的endpointImpl对象,创建endpointImpl用到了已经存在的设备,后面会有详细说明
 socketPath := filepath.Join(m.socketdir, r.Endpoint)
 e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)
 if err != nil {
 return
 }
 m.mutex.Lock()
 // 存储插件的选项,前面在proto中有说明选项有什么意义
 if r.Options != nil {
 m.pluginOpts[r.ResourceName] = r.Options
 }
 // 这段代码肯定有一些人懵逼,前面不是已经赋值给old了么,现在有获取一次并且判断
 // 和old是否相同,这个重点在于这个函数锁的位置,上面代码创建endpointImpl的时候
 // 已经解锁了,再加锁存在一种可能就是插件又注册了一次,并且被其他协程处理完了
 // 为了安全起见,多一次判断是非常必要的。如果别的协程处理完了,此处就要停止
 // 已经创建的endpointImpl。
 ext := m.endpoints[r.ResourceName]
 if ext != old {
 m.mutex.Unlock()
 e.stop()
 return
 }
 // 添加到endpointImpl的map中,基本算是注册。
 m.endpoints[r.ResourceName] = e
 m.mutex.Unlock()
 // 如果同名的插件存在,那么就把老的插件停止掉
 if old != nil {
 old.stop()
 }
 // 开启协程运行endpointImpl,可以看出来ManagerImpl为每个插件都开一个协程与之交互
 go func() {
 e.run() // 这个函数一直运行直到没法从插件获取设备状态或者被停止
 e.stop() // 退出后就停止的endpointImpl
 m.mutex.Lock()
 // 这里要判断一次endpointImpl是不是自己创建的,如果是就把插件资源标记为不健康
 // 如果不是说明在别的协程中已经把这个插件处理过了,为什么这么做,还是锁定问题
 // 代码写的还是非常漂亮的,赞一个!
 if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
 m.markResourceUnhealthy(r.ResourceName)
 }
 m.mutex.Unlock()
 }()
}

这里我们要做一些简单的总结:

  1. ManagerImpl有两个map,endpoints和pluginOpts,他们的key都是资源名称,也就是插件注册时提供的RegisterRequest.resource_name,定义在api.proto中,你会发现proto定义的名称在经过编译输出的go代码却变成了RegisterRequest.ResourceName,这是因为go语言的大小写约束,同时go可以在定义类型的时候指定序列化和反序列化变量的名称,这一点注意一下,否则就对应不上了;
  2. endpoints的value类型是endpoint,用于存储插件在ManagerImpl的实例对象的,也就是说一个插件在ManagerImpl里面对应一个endpoint对象,其中endpoint是interface类型,endpointImpl是endpoint的实现类,所有对于插件的操作都是通过endpointImpl实现的,可以断定接口DevicePlugin的客户端放在了endpointImpl里面;
  3. pluginOpts里面记录了哪些插件支持“选项”特性;

以上总结是对ManagerImpl一部分成员变量的说明,如果上来就把ManagerImpl的定义代码解释一遍,估计很多人不好理解,我们通过“相对合理”的流程逐一把ManagerImpl的成员变量过一边也就自然非常容易的理解ManagerImpl的原理与实现了。接下来很多人肯定会想后面的内容就是endpointImpl的定义是什么了,这是普遍的“深度优先”的思路,这里我打算采用“广度优先”的思路分析。我们暂且知道有endpointImpl这个东西就行了,我们来看看ManagerImpl是如何实现Manager这个interface的其他接口函数的。我们先看GetCapacity() (v1.ResourceList, v1.ResourceList, []string)这个函数,为什么我会选择这个函数呢?那就要先说一些题外话:因为我一直在想kubelet通过插件的方式扩展了资源,那么资源是如何汇总到apiserver的,同时又是如何被scheduler分配的,所以我就找kubernetes/pkg/apis/core/types.go里面对于Node的定义(Node是kubelet和apiserver之间同步节点状态的数据类型,扩展资源应该存储在这里),有一个NodeStatus类型,里面有Capacity(资源总容量)和Allocatable(资源分配量)两个成员变量,类型都是ResourceList。看到了么这两个变量的类型和ManagerImpl.GetCapacity()返回的类型是一样的,他们肯定有着什么关系,只是少ManagerImpl.GetCapacity()是获取节点上所有扩展资源(通过插件方式提供)的容量的。通过这条线我们就可以找到这些扩展资源是如何管理的了。在分析ManagerImpl.GetCapacity()前,我们先看看ResourceList的定义:

// 源自kubernetes/pkg/apis/core/types.go,内嵌的类型读者需要了解细节自行跳转

type ResourceName string

type Quantity struct { // kubernetes/staging/src/k8s.io/apimachinery/pkg/api/resource/quantity.go

    i int64Amount

    d infDecAmount

    s string

    Format

}

type ResourceList map[ResourceName]resource.Quantity

// 此处我们不用关心细节,只要知道资源列表是一个map,key是资源名,value是资源量,资源量是可以采用多种方式表达的,这道这些就可以了。

现在我们可以开始看看ManagerImpl.GetCapacity()的代码了:

 

// 代码源自kubernetes/pkg/kubelet/cm/devicemanager/manager.go

func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {

    needsUpdateCheckpoint := false

    // 临时变量,用于存储资源总容量,可分配资源量和已经删除的资源名称

    var capacity = v1.ResourceList{}

    var allocatable = v1.ResourceList{}

    deletedResources := sets.NewString()

    m.mutex.Lock()

    // 遍历健康的设备,ManagerImpl用healthyDevices成员变量存储着全部的健康设备

    // ManagerImpl.healthyDevices是一个map,key是资源名称,value是设备设备名称集合

    // ManagerImpl.healthyDevices按照资源名称分类,每个分类下面是所有的设备名称

    for resourceName, devices := range m.healthyDevices {

        e, ok := m.endpoints[resourceName]

        // 插件不存在或者插件长时间过期?

        if (ok && e.stopGracePeriodExpired()) || !ok {

            delete(m.endpoints, resourceName) // 删除插件

            delete(m.healthyDevices, resourceName) // 删除这个资源名称的健康设备

            deletedResources.Insert(resourceName) // 记录所有已删除的资源名称

            needsUpdateCheckpoint = true

        } else {

            // 把健康的设备添加到总资源容量和可分配资源容量两个列表中

            // 前面没有深究资源容量定义,看看这里采用了固定的单位

            // DecimalSI是10进制,精度为一百万(M),这个和CPU资源定义是一样的

            // 设备数*1000000就是节点上改资源名称的资源量

            capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)

            allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)

        }

    }

    // 遍历不健康的设备

    for resourceName, devices := range m.unhealthyDevices {

        e, ok := m.endpoints[resourceName]

        // 此处的处理方式和处理健康设备方式一样,不赘述

        if (ok && e.stopGracePeriodExpired()) || !ok {

            delete(m.endpoints, resourceName)

            delete(m.unhealthyDevices, resourceName)

            deletedResources.Insert(resourceName)

            needsUpdateCheckpoint = true

        } else {

            // 获取健康的资源数,因为上面先统计的是将康设备

            capacityCount := capacity[v1.ResourceName(resourceName)]

            // 计算不健康的资源数

            unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)

            // 该资源总量是健康+不讲康的总和

            capacityCount.Add(unhealthyCount)

            // 更新资源总容量

            capacity[v1.ResourceName(resourceName)] = capacityCount

        }

    }

    m.mutex.Unlock()

    if needsUpdateCheckpoint {

        m.writeCheckpoint()

    }

    // 这里可以到该函数返回的是扩展资源总量、可分配资源总量以及删除的资源

    return capacity, allocatable, deletedResources.UnsortedList()

}

那么ManagerImpl.GetCapacity()是被谁调用的呢?在哪里调用就有可能在哪里找到资源管理方法,那利用比较好的IDE查找一下,就会发现kubelet有一个ContainerManager的类型(前面介绍devicemanager包的时候提到过cm是什么,就是ContainerManager的缩写),ContainerManager有一个接口函数定义为:

 // 代码源自kubernetes/pkg/kubelet/cm/container_manager.go
type ContainerManager interface {
 ......
 // 看看这个函数的返回值和ManagerImpl.GetCapacity()一样
 // 而且函数名称更加直观了,就是获取设备插件的资源容量
 GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string)
 ......
}
// ContainerManager是interface类型,具体实现在别的文件,本文引用linux系统的实现
// 代码源自kubernetes/pkg/kubelet/cm/container_manager_linux.go
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
 return cm.deviceManager.GetCapacity()
}
// containerManagerImpl是通过deviceManager成员变量返回的,该变量是在下面代码初始化的
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
 ......
 if devicePluginEnabled {
 // 就是这里,我们前面分析的ManagerImpl在这里被构造
 cm.deviceManager, err = devicemanager.NewManagerImpl()
 } else {
 // 从1.10开始默认设备插件是使能的,所以这里我们就不关注了
 cm.deviceManager, err = devicemanager.NewManagerStub()
 }
 ......
}

前后可以联系起来了,containerManagerImpl是通过devicemanager.ManagerImpl实现设备插件资源管理的。而ContainerManager是Kubelet这个类的成员变量(这个读者自行查找,此处不列举相关代码了),基本找到根儿了,因为Kubelet类基本代表了kubelet这个程序的全部。此处可以推出Kubelet利用ContainerManager获取插件资源,然后再将资源汇报给apiserver。我们继续用IDE的查找功能看看GetDevicePluginResourceCapacity()这个函数在哪里引用?

 //代码源自kubernetes/pkg/kubelet/kubelet_node_status.go
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
 ......
 // 函数返回一个函数数组,每个函数用于设置一种节点状态,这种实现方式挺好玩儿
 var setters []func(n *v1.Node) error
 setters = append(setters,
 ......
 // 这里可以看到插件资源容量作为节点MachineInfo的一部分
 nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore,
 kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
 kl.containerManager.GetDevicePluginResourceCapacity, 
 kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
 ......)
 ......
}
// 那就有必要看看MachineInfo里面是怎么使用插件资源的?
// 代码源于kubernetes/pkg/kubelet/nodestatus/setters.go
func MachineInfo(nodeName string, maxPods int, podsPerCore int, 
 machineInfoFunc func() (*cadvisorapiv1.MachineInfo, error), 
 capacityFunc func() v1.ResourceList, 
 devicePluginResourceCapacityFunc func() (v1.ResourceList, v1.ResourceList, []string), 
 nodeAllocatableReservationFunc func() v1.ResourceList, Kubelet.containerManager.GetNodeAllocatableReservation
 recordEventFunc func(eventType, event, message string), ) Setter {
 // 从函数名来看就是用来获取机器信息的,这个函数返回一个匿名函数设置节点信息
 // 这个函数传入了好多获取各种分类信息的函数,此函数算是统一完成节点信息的地方
 return func(node *v1.Node) error {
 // 申请内存用于设置节点状态中的资源容量列表
 if node.Status.Capacity == nil {
 node.Status.Capacity = v1.ResourceList{}
 }
 var devicePluginAllocatable v1.ResourceList
 var devicePluginCapacity v1.ResourceList
 var removedDevicePlugins []string
 
 // 不是我们关心的重点,不做过多注释
 info, err := machineInfoFunc()
 if err != nil {
 node.Status.Capacity[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI)
 node.Status.Capacity[v1.ResourceMemory] = resource.MustParse("0Gi")
 node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(int64(maxPods), resource.DecimalSI)
 } else {
 ......
 // 调用了我们上面提到的ContainerManager.GetDevicePluginResourceCapacity()
 devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = devicePluginResourceCapacityFunc()
 // 算是把所有插件的资源列表拷贝了一遍吧
 if devicePluginCapacity != nil {
 for k, v := range devicePluginCapacity {
 node.Status.Capacity[k] = v
 }
 }
 // 把已经删除的设备也记录在资源容量列表中,只是资源量为0,此处的目的暂不知
 for _, removedResource := range removedDevicePlugins {
 node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
 }
 }
 ......
 // 拷贝可分配设备资源列表
 if devicePluginAllocatable != nil {
 for k, v := range devicePluginAllocatable {
 node.Status.Allocatable[k] = v
 }
 }
 ......
 return nil
 }
}

我们已经跟踪了好多代码,之后如何再调用defaultNodeStatusFuncs的分析读者用一个IDE基本就能搞定,不再继续分析了,是时候做一些总结性思考了,:

  1. Node.Status.Capacity和Node.Status.Allocatable记录了节点全部资源,包括可用于分配的资源,其中Node是类型,定义在kubernetes/staging/src/k8s.io/api/core/v1/types.go中,Status、Capacity、Allocatable是成员变量名称,这样描述利于读者理解;
  2. Kubelet.ContainerManager.DeviceManager是用于获取所有插件化资源信息的3个类型,其中ContainerManager和DeviceManager是interface,他们的实现分别为containerManagerImpl和deviceManagerImpl,注意ContainerManager的实现会根据操作系统的类型有不同的实现;
  3. 资源列定义为type ResourceList map[ResourceName]resource.Quantity,就是一个资源名称和资源量组合的map,所以必须要求所有插件资源名称要唯一,这也是为什么nvidia定义的gpu资源是nvidia.com/gpu,cpu资源是kubernetes.io/cpu,kubernetes.io/前缀是内部预定义的资源,不可以使用
  4. 从官方介绍插件式设备的文档来看,没有看到关于调度相关的内容,只有介绍怎么将设备加入到系统中。资源是名字唯一的,如果不修改核心代码就可以用的话,新扩展进来的资源scheduler只能通过名字匹配的方式找到拥有该资源的节点,比如通过yaml文件中spec.containers[i].resources.limits.nvidia.com/gpu:2;

上面代码的分析大概可以总结为下图:

注:T为类型,M为成员,+为基类

接下来我们再来看看Scheduler是如何使用这些资源的, 由于我会专门针对kubernetes的Scheduler的原理与实现写一篇文章,所以此处不详细说明Scheduler。先直接奔重点,看下面的代码:

 // 代码源于kubernetes/pkg/scheduler/cache/node_info.go
type Resource struct {
 MilliCPU int64 // CPU,单位为1/百万
 Memory int64 // 内存,单位为字节
 EphemeralStorage int64 // 存储,单位为字节
 AllowedPodNumber int // 允许的pod数量
 ScalarResources map[v1.ResourceName]int64 // 扩展资源,这个和ResourceList没差
}
 
// 通过ResourceList构造Resource 对象
func NewResource(rl v1.ResourceList) *Resource {
 r := &Resource{} // 创建资源对象
 r.Add(rl) // 添加资源
 return r // 返回资源对象
}
// 向Resource 对象添加资源
func (r *Resource) Add(rl v1.ResourceList) {
 if r == nil {
 return
 }
 // 遍历所有资源
 for rName, rQuant := range rl {
 switch rName {
 case v1.ResourceCPU: // 累加CPU资源
 r.MilliCPU += rQuant.MilliValue()
 case v1.ResourceMemory: // 累加内存资源
 r.Memory += rQuant.Value()
 case v1.ResourcePods: // 累加pod数量
 r.AllowedPodNumber += int(rQuant.Value())
 case v1.ResourceEphemeralStorage: //累加存储资源
 r.EphemeralStorage += rQuant.Value()
 default: // 其他的全部归属为扩展资源
 if v1helper.IsScalarResourceName(rName) {
 r.AddScalar(rName, rQuant.Value())
 }
 }
 }
}
// 添加扩展资源
func (r *Resource) AddScalar(name v1.ResourceName, quantity int64) {
 r.SetScalar(name, r.ScalarResources[name]+quantity)
}
// 设置扩展资源
func (r *Resource) SetScalar(name v1.ResourceName, quantity int64) {
 // 扩展资源如果没有初始化就新建map
 if r.ScalarResources == nil {
 r.ScalarResources = map[v1.ResourceName]int64{}
 }
 r.ScalarResources[name] = quantity
}

从上面的代码可以看出,Scheduler对于资源重新进行了定义,这个很正常,两个不同的模块(Scheduler和Kubelet)对于同一个事物的看的角度不同,定义自然不同。对于Scheduler来说主要调度的就是CPU、内存这些资源,专门定义一个变量存储,这样访问效率要比每次都用名字从map中获取高很多,当然也有历史实现的原因。

知道Scheduler如何定义资源后,我们就要看Scheduler在调度的时候怎么使用这些资源。如下代码所示:

 // 代码源自kubernetes/pkg/scheduler/algorithms/predicates/predicates.go
var (
 predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
 GeneralPred, HostNamePred, PodFitsHostPortsPred,
 MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
 PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
 CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
 MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
 CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)
//上面的全局变量定义了predicates的顺序,我们只关注GeneralPred,其他的这里不说明
func GeneralPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 var predicateFails []algorithm.PredicateFailureReason
 fit, reasons, err := noncriticalPredicates(pod, meta, nodeInfo) // 下面有说明
 ......
}
func noncriticalPredicates(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 var predicateFails []algorithm.PredicateFailureReason
 fit, reasons, err := PodFitsResources(pod, meta, nodeInfo) // 下面有说明
 ......
}
// 上面的代码就是为了展示调用顺序,下面的才是主要内容,这个函数用于判断
// 节点资源是否匹配,从这点可以看出调用这个函数肯定是用一个pod对于资源的请求
// 来遍历所有的节点
func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
 // 节点不存在返回错误
 node := nodeInfo.Node() 
 if node == nil {
 return false, nil, fmt.Errorf("node not found")
 }
 // 节点允许的Pod数量是否超过上限
 var predicateFails []algorithm.PredicateFailureReason
 allowedPodNumber := nodeInfo.AllowedPodNumber()
 if len(nodeInfo.Pods())+1 > allowedPodNumber {
 predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
 }
 
 // 此处不是讨论重点,不做过多介绍,就是获取Pod的资源请求
 ignoredExtendedResources := sets.NewString()
 var podRequest *schedulercache.Resource
 if predicateMeta, ok := meta.(*predicateMetadata); ok {
 podRequest = predicateMeta.podRequest
 if predicateMeta.ignoredExtendedResources != nil {
 ignoredExtendedResources = predicateMeta.ignoredExtendedResources
 }
 } else {
 podRequest = GetResourceRequest(pod)
 }
 
 // 如果pod没有任何资源请求,那就直接返回,这里的资源请求就是我们在yaml里写的
 if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && 
 podRequest.EphemeralStorage == 0 && len(podRequest.ScalarResources) == 0 {
 return len(predicateFails) == 0, predicateFails, nil
 }
 // 看到没有,这里就在用我们全文都在提到的资源,主要判断的就是可用于分配的资源
 allocatable := nodeInfo.AllocatableResource()
 // 是否有足够的CPU资源?
 if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
 predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
 }
 // 是否有足够的内存资源?
 if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
 predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
 }
 // 是否有足够的存储资源
 if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
 predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
 }
 // 对于扩展资源的判断就在这里了,遍历pod所有扩展资源的需求
 for rName, rQuant := range podRequest.ScalarResources {
 // 判断资源名称是否合法,这个在前面提到过了
 if v1helper.IsExtendedResourceName(rName) {
 if ignoredExtendedResources.Has(string(rName)) {
 continue
 }
 }
 // 如果可用的扩展资源不足(包括不存在)则失败,就这么简单应该符合大部分读者的预期
 if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
 predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
 }
 }
 return len(predicateFails) == 0, predicateFails, nil
}

以上是Scheduler调度的第一阶段predicate,满足条件的Node还会进入第二阶段priorities,第一阶段主要用于过滤,第二阶段主要用于计算最优。我没有在第二阶段找到关于扩展资源相关的代码,所以我猜测Scheduler在调度扩展资源的时候不会计算扩展设备的负载率,这一点在有些场合可能不符合预期,开发人员你可以自行设计更优的调度策略。

至此,我们把插件化的设备资源如何加入到kubernetes中,并且如何调度使用的基本讲完了,关于DeviceManager和Plugin设备的交互细节暂时不做说明,读者应该可以看明白,等有时间我再补充上,毕竟这些内容的缺失不影响读者对本文的理解。

K8S中文社区微信公众号

评论 抢沙发

登录后评论

立即登录