Kubernetes API 分析 ( Kube-apiserver )

kubernetes 概览

以下是 k8s 的整体架构,在 master 节点上主要是 kube-apiserver(整合了 kube-aggregator),还有 kube-scheduler,以及 kube-controller-manager,包括后端存储 etcd。

其中 kube-apiserver 是一个比较关键的部分,而且前期写得坑很多,导致这一部分虽然看起来是一个 API server 其实代码很复杂,特别冗余,而且目前对 kube-apiserver 还要做拆分,能够支持插入第三方的 apiserver,也就是又一个 aggregated apiserver 的 feature,也是和 kube-apiserver 和里面包的一层 genericserver 揉合在一起了,感觉一个大的系统 API server 越写越挫是一个通病,还好现在 k8s 迷途知返正在调整。

kube-apiserver

Kube-apiserver 可以是认为在 generic server 上封装的一层官方默认的 apiserver,有第三方需要的情况下,自己也可以在 generic server 上封装一层加入到集成模式中,这里主要介绍 kube-apiserver 的结构。

restful API

kube-apiserver 是一个 restful 服务,请求直接通过 HTTP 请求发送,例如创建一个 ubuntu 的 pod,用以下的 pod.yaml 文件。

apiVersion: v1
kind: Pod
metadata:
  name: ubuntu1
  labels:
    name: ubuntu1
spec:
  containers:
  - name: ubuntu1
    image: ubuntu
    command: ["sleep", "1d"]

执行命令 kubectl create -f ./pod.yaml -v=8,可以看到对应的 POST 请求如下。

Request Body: {"apiVersion":"v1","kind":"Pod","metadata":{"labels":{"name":"ubuntu1"},"name":"ubuntu1","namespace":"default"},"spec":{"containers":[{"command":["sleep","1d"],"image":"ubuntu","name":"ubuntu1"}],"schedulerName":"default-scheduler"}}
curl -k -v -XPOST  -H "Content-Type: application/json" -H "Accept: application/json" -H "User-Agent: kubectl/v1.7.5 (linux/amd64) kubernetes/17d7182" https://localhost:6443/api/v1/namespaces/default/pods
POST https://localhost:6443/api/v1/namespaces/default/pods 201 Created in 6 milliseconds
Response Headers:
    Content-Type: application/json
    Content-Length: 1208
    Date: Wed, 18 Oct 2017 15:04:17 GMT
Response Body: {"kind":"Pod","apiVersion":"v1","metadata":{"name":"ubuntu1","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/ubuntu1","uid":"9c9af581-b415-11e7-8033-024d1ba659e8","resourceVersion":"486154","creationTimestamp":"2017-10-18T15:04:17Z","labels":{"name":"ubuntu1"}},"spec":{"volumes":[{"name":"default-token-p0980","secret":{"secretName":"default-token-p0980","defaultMode":420}}],"containers":[{"name":"ubuntu1","image":"ubuntu","command":["sleep","1d"],"resources":{},"volumeMounts":[{"name":"default-token-p0980","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","securityContext":{},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.alpha.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}]},"status":{"phase":"Pending","qosClass":"BestEffort"}}

从 url path 里面可以看到几个划分,path 的分类大概有下面这几种。

路径上整体分成 group, version, resource, 作为核心 API group 的 core(包括 pod, node 之类的 resource),不带 group,直接接在 /api/ 后面,其他的 api group 则接在 /apis 后面。以 pod 为例,pod 对应的数据类型如下,这个数据结构和 POST 请求中的结构的参数是一致的。

如果是 job 的话则是在,pkg/apis/batch/v2alpha1/types.go,和 API 路径是对应的。例子当中 kubectl 加上 level 大于 8 的 log 就会打印请求和相应的 body,可以看到 request body 和上面的数据结构是一致的。这个请求会发送到 apiserver 进行处理并且返回存储之后的 pod。

重要结构体

Config

父结构,主要的配置内容,其中有一个结构 RESTOptionsGetter genericregistry.RESTOptionsGetter 是和 API 初始化相关的,这个接口的实现是在 k8s.io/apiserver/pkg/server/options/etcd.go 中的 storageFactoryRestOptionsFactory 实现的,对应的实现函数是

func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    if err != nil {
        return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
    }

    ret := generic.RESTOptions{
        StorageConfig:           storageConfig,
        Decorator:               generic.UndecoratedStorage,
        DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
        EnableGarbageCollection: f.Options.EnableGarbageCollection,
        ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
    }
    if f.Options.EnableWatchCache {
        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
        if err != nil {
            return generic.RESTOptions{}, err
        }
        cacheSize, ok := sizes[resource]
        if !ok {
            cacheSize = f.Options.DefaultWatchCacheSize
        }
        ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
    }

    return ret, nil
}

APIGroupInfo

APIGroupInfo 主要定义了一个 API 组的相关信息,观察一下 APIGroupInfo 是如何初始化的。

在 k8s.io/pkg/master/master.go 当中,每个 Resource 都要提供自己的 Provider,比如说 storagerest 就在 k8s.io/kubernetes/pkg/registry/storage/rest/storage_storage.go 定义了 NewRESTStorage 方法。而默认的 resource 的 legacy provider 单独处理。

    if c.ExtraConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
        legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
            StorageFactory:       c.ExtraConfig.StorageFactory,
            ProxyTransport:       c.ExtraConfig.ProxyTransport,
            KubeletClientConfig:  c.ExtraConfig.KubeletClientConfig,
            EventTTL:             c.ExtraConfig.EventTTL,
            ServiceIPRange:       c.ExtraConfig.ServiceIPRange,
            ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
            LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
        }
        m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
    }

然后通过调用 k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider 的 NewLegacyRESTStorage 来初始化基础对象的 apigroup info,比如初始化 podStorage,serviceStorage 和 nodeStorage 等等。legacy ApiGrouInfo 的 Scheme, ParamaterCodec, NegotiatedSerializer 都是用 “k8s.io/kubernetes/pkg/api” 包下的全局变量初始化的。

        Scheme:                      api.Scheme,
        ParameterCodec:              api.ParameterCodec,
        NegotiatedSerializer:        api.Codecs,

然后合并成一个 restStorage 存入 apiGroupInfo 中。

    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.Binding,
    ...

举个例子 podStorage 就是用的 genericregistry.Store,这是一个通用的 etc 辅助结构,把 etcd 抽象成存储结构。

// REST implements a RESTStorage for pods
type REST struct {
    *genericregistry.Store
    proxyTransport http.RoundTripper
}

serialization

pkg/api.Codecs 是全局默认的 codec 来自下面这段代码。

func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
    serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory)
    return newCodecFactory(scheme, serializers)
}

默认具体定义了这几种 serilizer。

func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType {
    jsonSerializer := json.NewSerializer(mf, scheme, scheme, false)
    jsonPrettySerializer := json.NewSerializer(mf, scheme, scheme, true)
    yamlSerializer := json.NewYAMLSerializer(mf, scheme, scheme)
    ...

而且标准库的 json 有很严重的性能问题,换用了 json-iter 但是有很多标准库不兼容的问题,性能提升了大概 20% 但是没办法和进主线,我尝试在上面工作的了一段时间,改了两个问题还是有错,由于时间关系,暂时放弃了这个工作,相关的 issue 在这里

filters

首先通过 ./staging/src/k8s.io/apiserver/pkg/server/config.go 下的 DefaultBuildHandlerChain 构建 filters。

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    handler := genericapifilters.WithAuthorization(apiHandler, c.RequestContextMapper, c.Authorizer, c.Serializer)
    handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
    handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer, c.Serializer)
    if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) {
        handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
    } else {
        handler = genericapifilters.WithLegacyAudit(handler, c.RequestContextMapper, c.LegacyAuditWriter)
    }
    failedHandler := genericapifilters.Unauthorized(c.RequestContextMapper, c.Serializer, c.SupportsBasicAuth)
    if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedAuditing) {
        failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.RequestContextMapper, c.AuditBackend, c.AuditPolicyChecker)
    }
    handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
    handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
    handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
    handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
    handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
    handler = genericfilters.WithPanicRecovery(handler)
    return handler
}
panic recover

genericfilters.WithPanicRecovery 在 handler 的最外层对出现的 panic 恢复,并且打印每次请求的 log,所以你想观察 API 请求的情况可以 grep wrap.go 就能看到。

request context

apirequest.WithRequestContext 给 request 绑定一个 Context

RequestInfo

跟路 url 提取后续请求需要的 group, version, namespace, verb, resource 等信息。

WithTimeoutForNonLongRunningRequests

限制 API 调用时间,超时处理提前终止 write。

WithCORS

允许跨域访问。

authentication

在 k8s.io/apiserver/pkg/endpoints/filters/authentication.go 下。WithAuthentication 插入鉴权信息,例如证书鉴权,token 鉴权等,并且从鉴权信息当中获取 user 信息(可能是 service account 也可能是外部用户)user 身份是由 里面的几种方式确认的

authorization

检查是否有权限进行对应资源的操作。一种是 RBAC 一种是 Node。具体这两种方式可以看这个介绍,RBAC 主要是针对服务的,而 Node 模式主要是针对 kubelet 的。

impersonation

让用户伪装成其他用户,比如 admin 可以用普通用户的身份创建资源。

路由

通过 genericapiserver 的 InstallLegacyAPIGroup 就注册到路由当中。具体的做法就是根据 version, resource, sub resource, verb 等信息构造路由,然后用 go-restful 注册处理函数。比如说 GET

            route := ws.GET(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                Writes(producedObject)

handler 里面做的内容就是序列化,然后根据具体的要求(GET DELETE 等)到 etcd 中操作,当然本身还有一层缓存,这取决于 API 的 options 是希望更新还是直接读缓存(缓存会比 etcd 旧一些),比如对于 kubelet 会不断查询 node 信息,但是 kubelet 本身并不需要最新的信息,这个时候就会从缓存中读取。

性能调优

开启代理 kubectl proxy,就可以通过 localhost 直接访问 kube-apiserver HTTP 服务。然后执行 go tool pprof http://localhost:8001/debug/pprof/profile 可以获得 profile 结果,下图红色的部分就是调用耗时最多的部分。

prof.png

除此之外,kube-apiserver 本身也暴露了很多 prometheus 的 metrics 但是往上现在没有现成的模板,只能根据自己的需求来在 prometheus 当作做 query。可以在 k8s.io/apiserver/pkg/endpoints/metrics/metrics.go 里面看到。

之前也说过,超时间调用时会打 log 的,在代码中保存了一些 trace 日志,可以通过 grep Trace来过滤。Trace[%d] 这样开头, %d 是一个 id 可以看到具体的 trace 信息。

原文:https://ggaaooppeenngg.github.io/zh-CN/2017/11/05/Kubernetes-API-分析/