解析 | openshift源码简析之pod网络配置(下)

【编者按】openshift底层是通过kubelet来管理pod,kubelet通过CNI插件来配置pod网络.openshift node节点在启动的时会在一个goroutine中启动kubelet, 由kubelet来负责pod的管理工作。

本文主要从源码的角度入手,简单分析在openshift环境下kubelet是如何通过调用openshift sdn插件来配置pod网络。

上一节分析了openshift-sdn插件是如何配置Pod网络的,本节分析openshift-sdn插件获取Pod IP时cniServer的处理流程。

CNIServer流程

在上面的分析中我们知道,openshift-sdn插件是通过方法doCNIServerAdd向cniserver来请求IP的,那cniserver是如何处理请求的呢?我们先来看cniServer的逻辑。
cniServer的定义位于openshit代码库的pkg/network/node/cniserver/cniserver.go文件,定义如下:

1type CNIServer struct {
2    http.Server
3    requestFunc cniRequestFunc
4    rundir      string
5    config      *Config
6}

它包括了一个http server,以及一个处理请求的handler cniRequestFunc, 还有一些配置相关的字段。
cniSever的构造器方法位于pkg/network/node/cniserver/cniserver.go#L120, 内容如下:

 1// Create and return a new CNIServer object which will listen on a socket in the given path
 2func NewCNIServer(rundir string, config *Config) *CNIServer {
 3    router := mux.NewRouter()
 4
 5    s := &CNIServer{
 6        Server: http.Server{
 7            Handler: router,
 8        },
 9        rundir: rundir,
10        config: config,
11    }
12    router.NotFoundHandler = http.HandlerFunc(http.NotFound)
13    router.HandleFunc("/", s.handleCNIRequest).Methods("POST")
14    return s
15}

从上面第13行的代码可以看出,该server只处理一条POST方法的路由,处理请求的handler是handleCNIRequest这个方法,该方法的定义位于                        pkg/network/node/cniserver/cniserver.go#L277,内容如下:

 1// Dispatch a pod request to the request handler and return the result to the
 2// CNI server client
 3func (s *CNIServer) handleCNIRequest(w http.ResponseWriter, r *http.Request) {
 4    req, err := cniRequestToPodRequest(r)
 5    if err != nil {
 6        http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
 7        return
 8    }
 9
10    glog.V(5).Infof("Waiting for %s result for pod %s/%s", req.Command, req.PodNamespace, req.PodName)
11    result, err := s.requestFunc(req)
12    if err != nil {
13        http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
14    } else {
15        // Empty response JSON means success with no body
16        w.Header().Set("Content-Type""application/json")
17        if _, err := w.Write(result); err != nil {
18            glog.Warningf("Error writing %s HTTP response: %v", req.Command, err)
19        }
20    }
21}

从第11行可以看出,该方法又是调用requestFunc这个方法来处理请求,请求结束后通过w.Write或者是http.Error返回调用者的response。requestFunc是在cniserver的Start的方法中传入的,传入的实际上是podManager的handleCNIRequest方法,该方法位于文件pkg/network/node/pod.go#L25,内容如下:

1// Enqueue incoming pod requests from the CNI server, wait on the result,
2// and return that result to the CNI client
3func (m *podManager) handleCNIRequest(request *cniserver.PodRequest) ([]byte, error) {
4    glog.V(5).Infof("Dispatching pod network request %v", request)
5    m.addRequest(request)
6    result := m.waitRequest(request)
7    glog.V(5).Infof("Returning pod network request %v, result %s err %v", request, string(result.Response), result.Err)
8    return result.Response, result.Err
9}

在第5行该方法先通过addRequest方法把请求放到一个队列里面,然后调用第6行的waitRequest等待请求执行完成。
addRequest定义位于pkg/network/node/pod.go#L240, 内容如下:

1// Add a request to the podManager CNI request queue
2func (m *podManager) addRequest(request *cniserver.PodRequest) {
3    m.requests <- request
4}

可以看出请求被放到了m.requests这个channel里面,也就是在这里用channel做的队列。
waitRequest是从一个channel里取出结果,定义位于pkg/network/node/pod.go#L245,内容如下:

1// Wait for and return the result of a pod request
2func (m *podManager) waitRequest(request *cniserver.PodRequest) *cniserver.PodResult {
3    return <-request.Result
4}

刚才说了addRequest会把请求放到m.requests这个队列里面,那队列里的请求是如何被执行的呢?答案就是podManager在启动时会在一个gorotine里调用processCNIRequests这个方法,该方法会循环的从m.requests这个channel里面取出请求执行。processCNIRequests定义位于pkg/network/node/pod.go#L286,内容如下:

 1// Process all CNI requests from the request queue serially.  Our OVS interaction
 2// and scripts currently cannot run in parallel, and doing so greatly complicates
 3// setup/teardown logic
 4func (m *podManager) processCNIRequests() {
 5    for request := range m.requests {
 6        glog.V(5).Infof("Processing pod network request %v", request)
 7        result := m.processRequest(request)
 8        glog.V(5).Infof("Processed pod network request %v, result %s err %v", request, string(result.Response), result.Err)
 9        request.Result <- result
10    }
11    panic("stopped processing CNI pod requests!")
12}

可以看出该方法通过一个for循环不断的从m.requests里面取出请求,然后调用processRequest方法来处理请求,最后把处理的结果在放到request.Result里面由上面的waitRequest来获取。
我们来分析processRequest方法的执行逻辑,该方法定义位于pkg/network/node/pod.go#L296,内容如下:

 1func (m *podManager) processRequest(request *cniserver.PodRequest) *cniserver.PodResult {
 2    m.runningPodsLock.Lock()
 3    defer m.runningPodsLock.Unlock()
 4
 5    pk := getPodKey(request)
 6    result := &cniserver.PodResult{}
 7    switch request.Command {
 8    case cniserver.CNI_ADD:
 9        ipamResult, runningPod, err := m.podHandler.setup(request)
10        if ipamResult != nil {
11            result.Response, err = json.Marshal(ipamResult)
12            if err == nil {
13                m.runningPods[pk] = runningPod
14                if m.ovs != nil {
15                    m.updateLocalMulticastRulesWithLock(runningPod.vnid)
16                }
17            }
18        }
19        if err != nil {
20            PodOperationsErrors.WithLabelValues(PodOperationSetup).Inc()
21            result.Err = err
22        }
23    case cniserver.CNI_UPDATE:
24        vnid, err := m.podHandler.update(request)
25        if err == nil {
26            if runningPod, exists := m.runningPods[pk]; exists {
27                runningPod.vnid = vnid
28            }
29        }
30        result.Err = err
31    case cniserver.CNI_DEL:
32        if runningPod, exists := m.runningPods[pk]; exists {
33            delete(m.runningPods, pk)
34            if m.ovs != nil {
35                m.updateLocalMulticastRulesWithLock(runningPod.vnid)
36            }
37        }
38        result.Err = m.podHandler.teardown(request)
39        if result.Err != nil {
40            PodOperationsErrors.WithLabelValues(PodOperationTeardown).Inc()
41        }
42    default:
43        result.Err = fmt.Errorf("unhandled CNI request %v", request.Command)
44    }
45    return result
46}

可以看出该方法针对request.Command的三种不同取值有三部分逻辑来分别处理,我们重点分析Command等于cniserver.CNI_ADD时的逻辑,也就是前面调用openshift-sdn时传递ADD参数的处理逻辑。在Command等于cniserver.CNI_ADD部分的代码主要是调用第9行的podHandler的setup方法,该方法的定义位于pkg/network/node/pod.go#L497,内容如下:

 1// Set up all networking (host/container veth, OVS flows, IPAM, loopback, etc)
 2func (m *podManager) setup(req *cniserver.PodRequest) (cnitypes.Result, *runningPod, error) {
 3    defer PodOperationsLatency.WithLabelValues(PodOperationSetup).Observe(sinceInMicroseconds(time.Now()))
 4
 5    pod, err := m.kClient.Core().Pods(req.PodNamespace).Get(req.PodName, metav1.GetOptions{})
 6    if err != nil {
 7        return nilnil, err
 8    }
 9
10    ipamResult, podIP, err := m.ipamAdd(req.Netns, req.SandboxID)
11    if err != nil {
12        return nilnil, fmt.Errorf("failed to run IPAM for %v: %v", req.SandboxID, err)
13    }
14
15    // Release any IPAM allocations and hostports if the setup failed
16    var success bool
17    defer func() {
18        if !success {
19            m.ipamDel(req.SandboxID)
20            if mappings := m.shouldSyncHostports(nil); mappings != nil {
21                if err := m.hostportSyncer.SyncHostports(Tun0, mappings); err != nil {
22                    glog.Warningf("failed syncing hostports: %v", err)
23                }
24            }
25        }
26    }()
27
28    // Open any hostports the pod wants
29    var v1Pod v1.Pod
30    if err := kapiv1.Convert_core_Pod_To_v1_Pod(pod, &v1Pod, nil); err != nil {
31        return nilnil, err
32    }
33    podPortMapping := kubehostport.ConstructPodPortMapping(&v1Pod, podIP)
34    if mappings := m.shouldSyncHostports(podPortMapping); mappings != nil {
35        if err := m.hostportSyncer.OpenPodHostportsAndSync(podPortMapping, Tun0, mappings); err != nil {
36            return nilnil, err
37        }
38    }
39
40    vnid, err := m.policy.GetVNID(req.PodNamespace)
41    if err != nil {
42        return nilnil, err
43    }
44
45    if err := maybeAddMacvlan(pod, req.Netns); err != nil {
46        return nilnil, err
47    }
48
49    ofport, err := m.ovs.SetUpPod(req.SandboxID, req.HostVeth, podIP, vnid)
50    if err != nil {
51        return nilnil, err
52    }
53    if err := setupPodBandwidth(m.ovs, pod, req.HostVeth, req.SandboxID); err != nil {
54        return nilnil, err
55    }
56
57    m.policy.EnsureVNIDRules(vnid)
58    success = true
59    return ipamResult, &runningPod{podPortMapping: podPortMapping, vnid: vnid, ofport: ofport}, nil
60}

该方法的主要逻辑有两个,一是第10行调用m.ipamAdd获取IP,这里涉及到IPAM,后面单独分析;另一个是第49行调用ovs.SetUpPod设置OVS规则,后面也会单独分析。

至此,openshfit-sdn请求IP时cniServer的处理流程分析结束,下节我们分析cniServer如何调用IPAM插件来管理IP。

上面分析了openshfit-sdn请求IP时cniServer的处理流程,这一节我们分析cniServer调用IPAM插件来管理IP的逻辑。

IPAM

cniServer是调用IPAM插件host-local来做IP管理的,该插件位于/opt/cni/bin目录,是一个预编译的二进制可执行程序。本节将从IP的分配和释放两方面来分析cniServer跟host-local的交互流程。

IP分配

前面章节说了cniServer是调用了podManager的ipamAdd方法来获取IP的,那它又是如何同host-local插件交互的呢,我们来展开分析。
ipamAdd方法的定义位于pkg/network/node/pod.go#L422, 内容如下:

 1// Run CNI IPAM allocation for the container and return the allocated IP address
 2func (m *podManager) ipamAdd(netnsPath string, id string) (*cni020.Result, net.IP, error) {
 3    if netnsPath == "" {
 4        return nilnil, fmt.Errorf("netns required for CNI_ADD")
 5    }
 6
 7    args := createIPAMArgs(netnsPath, m.cniBinPath, cniserver.CNI_ADD, id)
 8    r, err := invoke.ExecPluginWithResult(m.cniBinPath+"/host-local", m.ipamConfig, args)
 9    if err != nil {
10        return nilnil, fmt.Errorf("failed to run CNI IPAM ADD: %v", err)
11    }
12
13    // We gave the IPAM plugin 0.2.0 config, so the plugin must return a 0.2.0 result
14    result, err := cni020.GetResult(r)
15    if err != nil {
16        return nilnil, fmt.Errorf("failed to parse CNI IPAM ADD result: %v", err)
17    }
18    if result.IP4 == nil {
19        return nilnil, fmt.Errorf("failed to obtain IP address from CNI IPAM")
20    }
21
22    return result, result.IP4.IP.IP, nil
23}

上面代码第7行先通过createIPAMArgs方法构建一个参数变量args,变量定义如下:

1struct {
2    Command string
3    ContainerID string
4    NetNS string
5    PluginArgs [][2]string
6    PluginArgsStr string
7    IfName string
8    Path string
9}

构建后的变量的Command的值是“ADD”,这样在调用host-local时就会执行ADD相关的操作。
第8行通过invoke.ExecPluginWithResult来调用执行host-local插件,传入了上面创建的参数变量args,同时传入了一个变量ipamConfig,ipamConfig里面包含了pod所在node的子网相关配置以及一些host-local插件的配置,内容类似如下:

 1{
 2    "cniVersion":"0.3.1",
 3    "name":"examplenet",
 4    "ipam":{
 5        "type":"host-local",
 6        "ranges":[
 7            [
 8                {
 9                    "subnet":"203.0.113.0/24"
10                }
11            ]
12        ],
13        "dataDir":"/tmp/cni-example"
14    }
15}

调用host-local类似如下命令:

1echo '{ "cniVersion""0.3.1""name""examplenet""ipam": { "type""host-local""ranges": [ [{"subnet""203.0.113.0/24"}]], "dataDir""/tmp/cni-example"  } }' | CNI_COMMAND=ADD CNI_CONTAINERID=example CNI_NETNS=/proc/48776/ns/net CNI_IFNAME=eth0 CNI_PATH=/opt/cni/bin /opt/cni/bin/host-local

调用返回的resut的值类似:

1{
2    "ips":[
3        {
4            "version":"4",
5            "address":"203.0.113.2/24",
6            "gateway":"203.0.113.1"
7        }
8    ]
9}

获取的IP信息以及网关信息在上面代码的第22行返回给调用者,也就是第三节中分析的podManager的setup方法的第10行。

IP释放

当cniServer接收到释放IP的请求时,会调用podManager的ipamDel方法,定义位于pkg/network/node/pod.go#L445,内容如下:

1// Run CNI IPAM release for the container
2func (m *podManager) ipamDel(id string) error {
3    args := createIPAMArgs("", m.cniBinPath, cniserver.CNI_DEL, id)
4    err := invoke.ExecPluginWithoutResult(m.cniBinPath+"/host-local", m.ipamConfig, args)
5    if err != nil {
6        return fmt.Errorf("failed to run CNI IPAM DEL: %v", err)
7    }
8    return nil
9}

该方法的逻辑跟ipamAdd一样,都是通过调用host-local插件来完成相应的操作,不同的是该方法在调用时传入了一个Command等于CNI_DEL的args,这样在调用host-local时就会执行IP释放的相关操作。

host-local会把所有已经分配过的IP记录到本地,也就是ipamConfig配置的dataDir目录下,在openshit环境下是记录到/var/lib/cni/networks/openshift-sdn目录下。目录下的内容类似如下:

1[root@master227 ~]# ls /var/lib/cni/networks/openshift-sdn
210.128.0.114  10.128.0.116  last_reserved_ip.0
3[root@master227 ~]#

上面列出的每一个以ip命名的文件都代表一个已经分配的IP,它的内容是该IP所在的pod的ID. 内容类似如下:

1[root@master227 ~]# cat /var/lib/cni/networks/openshift-sdn/10.128.0.114
27a1c2e242c2a2d750382837b81283952ad9878ae496195560f9854935d7e4d31[root@master227 ~]#

当分配IP时,host-local会在该目录下添加一条记录,释放IP时会删除相应的记录。

关于host-local的逻辑不再作分析,后面会有单独的章节来分析,有兴趣的可以看看源码,位于https://github.com/containernetworking/plugins/tree/master/plugins/ipam/host-local代码库下。

至此,IPAM的逻辑分析结束,下一节我们分析cniServer是如何调用ovs controller来设置Pod ovs规则。

上面我们分析了cniServer是如何通过IPAM插件来管理IP,本节主要分析cniServer是如何通过ovs controller设置pod相关的ovs规则。

OVS规则设置

openshift底层的网络用的是ovs, 那么在配置好pod IP之后,又是如何设置跟pod相关的ovs规则的呢?下面作一分析。
openshift node在启动时会创建一个ovs controller,由它来完成ovs网络配置的各种操作。在第三节我们分析过,cniServer是通过调用ovs controller的SetUpPod方法来设置pod ovs规则,调用的代码位于: pkg/network/node/pod.go#L544, 内容如下:

1ofport, err := m.ovs.SetUpPod(req.SandboxID, req.HostVeth, podIP, vnid)

SetUpPod的定义位于pkg/network/node/ovscontroller.go#L267,内容如下:

1func (oc *ovsController) SetUpPod(sandboxID, hostVeth string, podIP net.IP, vnid uint32) (int, error) {
2    ofport, err := oc.ensureOvsPort(hostVeth, sandboxID, podIP.String())
3    if err != nil {
4        return -1, err
5    }
6    return ofport, oc.setupPodFlows(ofport, podIP, vnid)
7}

在上面代码的第2行,SetUpPod又调用了ensureOvsPort这个方法,该方法的定义位于pkg/network/node/ovscontroller.go#L227,内容如下:

1func (oc *ovsController) ensureOvsPort(hostVeth, sandboxID, podIP string) (int, error) {
2    return oc.ovs.AddPort(hostVeth, -1,
3        fmt.Sprintf(`external-ids=sandbox="%s",ip="%s"`, sandboxID, podIP),
4    )
5}

如代码所示,该方法又调用了ovs的AddPort方法,我们再来分析AddPort方法。该方法的定义位于pkg/util/ovs/ovs.go#L31,内容如下:

 1func (ovsif *ovsExec) AddPort(port string, ofportRequest int, properties ...string) (int, error) {
 2    args := []string{"--may-exist""add-port", ovsif.bridge, port}
 3    if ofportRequest > 0 || len(properties) > 0 {
 4        args = append(args, "--""set""Interface", port)
 5        if ofportRequest > 0 {
 6            args = append(args, fmt.Sprintf("ofport_request=%d", ofportRequest))
 7        }
 8        if len(properties) > 0 {
 9            args = append(args, properties...)
10        }
11    }
12    _, err := ovsif.exec(OVS_VSCTL, args...)
13    if err != nil {
14        return -1, err
15    }
16    ofport, err := ovsif.GetOFPort(port)
17    if err != nil {
18        return -1, err
19    }
20    if ofportRequest > 0 && ofportRequest != ofport {
21        return -1, fmt.Errorf("allocated ofport (%d) did not match request (%d)", ofport, ofportRequest)
22    }
23    return ofport, nil
24}

分析上面的代码你会发现,AddPort实际上是调用了底层的ovs-vsctl命令将pod的host端的虚拟网卡加入到了ovs网桥br0上,这样br0上的流量就可以通过该网卡进入pod了。该方法的调用类似于下面的命令行,假设pod host端的网卡是veth3258a5e2:

1ovs-vsctl --may-exist add-port br0 veth3258a5e2

接着回到SetUpPod方法,在第6行中调用了setupPodFlows来设置pod IP的ovs规则,该方法的定义位于pkg/network/node/ovscontroller.go#L233,内容如下:

 1func (oc *ovsController) setupPodFlows(ofport int, podIP net.IP, vnid uint32) error {
 2    otx := oc.ovs.NewTransaction()
 3
 4    ipstr := podIP.String()
 5    podIP = podIP.To4()
 6    ipmac := fmt.Sprintf("00:00:x:x:x:x/00:00:ff:ff:ff:ff", podIP[0], podIP[1], podIP[2], podIP[3])
 7
 8    // ARP/IP traffic from container
 9    otx.AddFlow("table=20, priority=100, in_port=%d, arp, nw_src=%s, arp_sha=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, ipmac, vnid)
10    otx.AddFlow("table=20, priority=100, in_port=%d, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, vnid)
11    if oc.useConnTrack {
12        otx.AddFlow("table=25, priority=100, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:30", ipstr, vnid)
13    }
14
15    // ARP request/response to container (not isolated)
16    otx.AddFlow("table=40, priority=100, arp, nw_dst=%s, actions=output:%d", ipstr, ofport)
17
18    // IP traffic to container
19    otx.AddFlow("table=70, priority=100, ip, nw_dst=%s, actions=load:%d->NXM_NX_REG1[], load:%d->NXM_NX_REG2[], goto_table:80", ipstr, vnid, ofport)
20
21    return otx.Commit()
22}

在上面代码的第9行到第19行,分别调用了AddFlow来设置各种ovs规则,第9行到第10行设置了从pod出去的ARP/IP流量的规则,第16行设置了进入POD的ARP流量规则,第19行设置了进入POD的IP流量规则。 AddFlow实际上是调用了命令行工具ovs-ofctl来设置各种ovs规则。关于这些规则的详细内容不再作分析,感兴趣的同学可以自行研究。

至此,ovs规则的设置流程分析完毕,openshit pod网络配置的流程也全部分析完毕。

K8S中文社区微信公众号

评论 1

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
  1. #1

    你好,利用这一原理解析,能否实现Pod IP的固定?在调用IPAM时,传入一个Pod的标识,返回指定的IP地址?

    cgz2个月前 (09-10)回复