T4L2

Time to learn for everything

0%

KubeEdge源码分析之EdgeController

EdgeController是KubeEdge云端部分比较重要的组件之一,主要负责边缘节点管理和应用状态元数据云边同步。

1. EdgeController.Start()

经过对CloudCore源码的整体分析可知,云端各个组件的重点在于各自的Start()函数。于是,我们进入edgecontroller.go文件,查看EdgeController的Start()函数。

1
func (ec *EdgeController) Start() {
2
    // 创建启动上行控制器
3
    upstream, err := controller.NewUpstreamController()
4
    if err != nil {
5
        klog.Errorf("new upstream controller failed with error: %s", err)
6
        os.Exit(1)
7
    }
8
    upstream.Start()
9
    // 创建启动下行控制器
10
    downstream, err := controller.NewDownstreamController()
11
    if err != nil {
12
        klog.Warningf("new downstream controller failed with error: %s", err)
13
        os.Exit(1)
14
    }
15
    downstream.Start()
16
}

Start()中只做了两件事:创建上行控制器upstream和下行控制器downstream并启动它们。我们从upstream的创建函数NewUpstreamController()看起。

2. 上行控制器

2.1. 上行控制器的结构

先确定一下上行控制器的结构:

1
type UpstreamController struct {
2
    kubeClient   *kubernetes.Clientset
3
    messageLayer messagelayer.MessageLayer
4
 
5
    // 消息信道
6
    nodeStatusChan            chan model.Message
7
    podStatusChan             chan model.Message
8
    secretChan                chan model.Message
9
    configMapChan             chan model.Message
10
    serviceChan               chan model.Message
11
    endpointsChan             chan model.Message
12
    persistentVolumeChan      chan model.Message
13
    persistentVolumeClaimChan chan model.Message
14
    volumeAttachmentChan      chan model.Message
15
    queryNodeChan             chan model.Message
16
    updateNodeChan            chan model.Message
17
}

可见,上行控制器结构体中,只有一个kubeClient和一个分发消息用的messageLayer,剩下的就是为各种类型的消息准备的信道。

2.2. 上行控制器的创建
1
Func NewUpstreamController() (*UpstreamController, error) {
2
    cli, err := utils.KubeClient()
3
    if err != nil {
4
        klog.Warningf("create kube client failed with error: %s", err)
5
        return nil, err
6
    }
7
    uc := &UpstreamController{
8
        kubeClient:   cli,
9
        messageLayer: messagelayer.NewContextMessageLayer(),
10
    }
11
    return uc, nil
12
}

创建上行控制器时,也只实例化了kubeClient和messageLayer,各个管道要在上行控制器的Start()函数中创建。这里值得注意的是,NewContextMessageLayer()实际上返回的是一个ContextMessageLayer类型,基于context并实现了messagelayer接口。因此后面调用它的Receive()时实际上是调用的ContextMessageLayer类型的Receive()方法,而这里面又是调用了beehiveContext.Receive(),实际还是通过beehiveContext与其他组件通信。

2.3. 上行控制器的启动
1
func (uc *UpstreamController) Start() error {
2
    klog.Info("start upstream controller")
3
 
4
    // 1、创建上行控制器中的各个信道
5
    uc.nodeStatusChan = make(chan model.Message, config.Config.Buffer.UpdateNodeStatus)
6
    // ...
7
    uc.updateNodeChan = make(chan model.Message, config.Config.Buffer.UpdateNode)
8
 
9
    // 2、启动消息分发的协程
10
    go uc.dispatchMessage()
11
    // 3、启动各个处理消息的协程,包括更新操作、请求操作和删除pod操作
12
    for i := 0; i < int(config.Config.Load.UpdateNodeStatusWorkers); i++ {
13
        go uc.updateNodeStatus()
14
    }
15
    // ...
16
    for i := 0; i < int(config.Config.Load.UpdateNodeWorkers); i++ {
17
        go uc.updateNode()
18
    }
19
    return nil
20
}

UpstramController.Start()中主要做了3件事:创建各个信道、启动消息分发协程和处理更新协程。消息分发函数dispatchMessage()如下:

1
func (uc *UpstreamController) dispatchMessage() {
2
    for {
3
        select {
4
        case <-beehiveContext.Done():
5
            klog.Info("stop dispatchMessage")
6
            return
7
        default:
8
        }
9
        // 1、获取消息msg
10
        msg, err := uc.messageLayer.Receive()
11
        if err != nil {
12
            klog.Warningf("receive message failed, %s", err)
13
            continue
14
        }
15
 
16
        klog.Infof("dispatch message ID: %s", msg.GetID())
17
        klog.V(5).Infof("dispatch message content: %++v", msg)
18
        // 2、取得消息的资源类型和操作类型
19
        resourceType, err := messagelayer.GetResourceType(msg)
20
        if err != nil {
21
            klog.Warningf("parse message: %s resource type with error: %s", msg.GetID(), err)
22
            continue
23
        }
24
        klog.Infof("message: %s, resource type is: %s", msg.GetID(), resourceType)
25
        operationType := msg.GetOperation()
26
        klog.Infof("message: %s, operation type is: %s", msg.GetID(), operationType)
27
        // 3、消息的资源类型和操作类型分发到对应信道
28
        switch resourceType {
29
        case model.ResourceTypeNodeStatus:
30
            uc.nodeStatusChan <- msg
31
        // ...
32
        case common.ResourceTypeService:
33
            uc.serviceChan <- msg
34
        // ...
35
        case model.ResourceTypeNode:
36
            switch operationType {
37
            case model.QueryOperation:
38
                uc.queryNodeChan <- msg
39
            case model.UpdateOperation:
40
                uc.updateNodeChan <- msg
41
            default:
42
                klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), operationType)
43
            }
44
        default:
45
            klog.Errorf("message: %s, resource type: %s unsupported", msg.GetID(), resourceType)
46
        }
47
    }
48
}

函数内首先是通过uc.messageLayer.Receive()接收到一个消息。经过分析Receive()函数我们已知,这里是通过beehiveContext接收到了其他组件给EdgeController的消息msg,然后从msg中取得消息资源类型,并送入控制器中对应的信道,再接收下一个消息,循环往复。

回到Start(),将消息送入对应信道后,每一个类型对应建立config中要求数量的处理协程,然后Start()就结束了。这里我们以updateNodeStatus()为例分析一下各个处理协程里都做了什么。

1
func (uc *UpstreamController) updateNodeStatus() {
2
    for {
3
        select {
4
        case <-beehiveContext.Done():
5
            klog.Warning("stop updateNodeStatus")
6
            return
7
        // 1、从上面的信道里接收到消息
8
        case msg := <-uc.nodeStatusChan:
9
            klog.Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
10
            // 2-1、将消息转换成json格式
11
            var data []byte
12
            switch msg.Content.(type) {
13
            case []byte:
14
                data = msg.GetContent().([]byte)
15
            default:
16
                var err error
17
                data, err = json.Marshal(msg.GetContent())
18
                if err != nil {
19
                    klog.Warningf("message: %s process failure, marshal message content with error: %s", msg.GetID(), err)
20
                    continue
21
                }
22
            }
23
            // 2-2、获取消息的命名空间和资源名
24
            namespace, err := messagelayer.GetNamespace(msg)
25
            if err != nil {
26
                klog.Warningf("message: %s process failure, get namespace failed with error: %s", msg.GetID(), err)
27
                continue
28
            }
29
            name, err := messagelayer.GetResourceName(msg)
30
            if err != nil {
31
                klog.Warningf("message: %s process failure, get resource name failed with error: %s", msg.GetID(), err)
32
                continue
33
            }
34
            // 2-3、判断消息的操作类型
35
            switch msg.GetOperation() {
36
            // 2-3-1、插入新节点的操作,有已存在和创建成功两种正确结果,无论成败都要回复ACK消息
37
            case model.InsertOperation:
38
                _, err := uc.kubeClient.CoreV1().Nodes().Get(name, metaV1.GetOptions{})
39
                if err == nil {
40
                    klog.Infof("node: %s already exists, do nothing", name)
41
                    uc.nodeMsgResponse(name, namespace, "OK", msg)
42
                    continue
43
                }
44
 
45
                if !errors.IsNotFound(err) {
46
                    klog.Errorf("get node %s info error: %v , register node failed", name, err)
47
                    uc.nodeMsgResponse(name, namespace, "", msg)
48
                    continue
49
                }
50
 
51
                node := &v1.Node{}
52
                err = json.Unmarshal(data, node)
53
                if err != nil {
54
                    klog.Errorf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
55
                    uc.nodeMsgResponse(name, namespace, "", msg)
56
                    continue
57
                }
58
 
59
                if _, err = uc.createNode(name, node); err != nil {
60
                    klog.Errorf("create node %s error: %v , register node failed", name, err)
61
                    uc.nodeMsgResponse(name, namespace, "", msg)
62
                    continue
63
                }
64
 
65
                uc.nodeMsgResponse(name, namespace, "OK", msg)
66
            // 2-3-2、更新节点状态的操作
67
            
68
            case model.UpdateOperation:
69
                // 2-3-2-1、将json数据解码为nodeStatusRequest结构,包括uid、节点状态和扩展资源
70
                // 从k8s中获取节点getNode
71
                nodeStatusRequest := &edgeapi.NodeStatusRequest{}
72
                err := json.Unmarshal(data, nodeStatusRequest)
73
                if err != nil {
74
                    klog.Warningf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
75
                    continue
76
                }
77
 
78
                getNode, err := uc.kubeClient.CoreV1().Nodes().Get(name, metaV1.GetOptions{})
79
                if errors.IsNotFound(err) {
80
                    klog.Warningf("message: %s process failure, node %s not found", msg.GetID(), name)
81
                    continue
82
                }
83
 
84
                if err != nil {
85
                    klog.Warningf("message: %s process failure with error: %s, namespaces: %s name: %s", msg.GetID(), err, namespace, name)
86
                    continue
87
                }
88
                // 2-3-2-2、在非EdgeSite状态下,如果元数据服务中存储的状态过期,自动更新心跳
89
                if !config.Config.EdgeSiteEnable {
90
                    for i := range nodeStatusRequest.Status.Conditions {
91
                        if time.Now().Sub(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(config.Config.NodeUpdateFrequency)*time.Second {
92
                            nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())
93
                        }
94
 
95
                        if time.Now().Sub(nodeStatusRequest.Status.Conditions[i].LastTransitionTime.Time) > time.Duration(config.Config.NodeUpdateFrequency)*time.Second {
96
                            nodeStatusRequest.Status.Conditions[i].LastTransitionTime = metaV1.NewTime(time.Now())
97
                        }
98
                    }
99
                }
100
                // 2-3-2-3、设置节点注释,将扩展资源(N卡要特殊处理)加入注释
101
                if getNode.Annotations == nil {
102
                    klog.Warningf("node annotations is nil map, new a map for it. namespace: %s, name: %s", getNode.Namespace, getNode.Name)
103
                    getNode.Annotations = make(map[string]string)
104
                }
105
                for name, v := range nodeStatusRequest.ExtendResources {
106
                    if name == constants.NvidiaGPUScalarResourceName {
107
                        var gpuStatus []types.NvidiaGPUStatus
108
                        for _, er := range v {
109
                            gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})
110
                        }
111
                        if len(gpuStatus) > 0 {
112
                            data, _ := json.Marshal(gpuStatus)
113
                            getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)
114
                        }
115
                    }
116
                    data, err := json.Marshal(v)
117
                    if err != nil {
118
                        klog.Warningf("message: %s process failure, extend resource list marshal with error: %s", msg.GetID(), err)
119
                        continue
120
                    }
121
                    getNode.Annotations[string(name)] = string(data)
122
                }
123
                // 2-3-2-4、VolumesAttached由kube-controller-manager维护,与k8s保持一致
124
                nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached
125
                getNode.Status = nodeStatusRequest.Status
126
                // 2-3-2-5、更新k8s中的节点状态
127
                node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(getNode)
128
                if err != nil {
129
                    klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name)
130
                    continue
131
                }
132
                // 2-3-2-6、更新成功后,回复OK的ACK消息
133
                resMsg := model.NewMessage(msg.GetID())
134
                resMsg.SetResourceVersion(node.ResourceVersion)
135
                resMsg.Content = "OK"
136
                nodeID, err := messagelayer.GetNodeID(msg)
137
                if err != nil {
138
                    klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
139
                    continue
140
                }
141
                resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)
142
                if err != nil {
143
                    klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
144
                    continue
145
                }
146
                resMsg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
147
                if err = uc.messageLayer.Response(*resMsg); err != nil {
148
                    klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
149
                    continue
150
                }
151
 
152
                klog.V(4).Infof("message: %s, update node status successfully, namespace: %s, name: %s", msg.GetID(), getNode.Namespace, getNode.Name)
153
 
154
            default:
155
                klog.Warningf("message: %s process failure, node status operation: %s unsupported", msg.GetID(), msg.GetOperation())
156
            }
157
            klog.V(4).Infof("message: %s process successfully", msg.GetID())
158
        }
159
    }
160
}

处理函数很长,分析可知,里面循环工作的主要内容如下:

  • 接收消息msg,将content即消息内容统一编码成json字符串data
  • 获取消息的namespace和资源名name
  • 判断消息是对资源进行什么操作,对于更新节点状态来说,只有插入和更新两种操作
  • 对于插入节点的操作,首先在kubeClient中查找节点是否存在,如果已存在就跳过,否则新建一个node节点,将data再还原回原来的结构,然后通过uc.createNode(name, node)实例化这个节点(内部实际上还是通过kubeClient创建的这个节点),最后,根据成败回应消息,实际调用的还是beehiveContext的方法,当EdgeSiteEnable开启时,调用SendResp(),否则,调用Send()。
  • 对于更新节点操作,同理,云端更新节点信息后,根据成败回应消息。

这里通过Response函数回复ACK消息,在这里面,当EdgeSite模式开启的时候,调用的是SendResp,而当EdgeSite模式关闭的时候,调用的是Send。原因可能是EdgeSite模式下边缘不会接收ACK消息。

到1.2.1版本目前为止,上行操作包含:

  • 更新Update:如节点状态更新所示,更新k8s中对应资源,更新成功则回复OK的ACK消息给边缘
  • 请求Query:从k8s中取得请求的资源,作为ACK消息的内容回复给边缘
  • 删除Delete:目前只有删除pod一种操作,删除k8s中相应的pod,不回复

综上所述,上行控制器的工作就是不断从cloudhub中接收消息,根据消息中的操作通过kubeClient联系k8s进行相应的处理,并回复ACK消息给边缘。上行控制器与cloudhub之间的消息传递乃至各个组件间的消息传递都是通过beehiveContext实现的,详情可见我的博客“KubeEdge概念一览”。

3. 下行控制器

3.1. 下行控制器的结构

下面来看看下行控制器的结构。

1
type DownstreamController struct {
2
	kubeClient   *kubernetes.Clientset
3
	messageLayer messagelayer.MessageLayer
4
  // 各类资源管理器
5
	podManager *manager.PodManager
6
	configmapManager *manager.ConfigMapManager
7
	secretManager *manager.SecretManager
8
	nodeManager *manager.NodesManager
9
	serviceManager *manager.ServiceManager
10
	endpointsManager *manager.EndpointsManager
11
  // 缓存,用于快速查询边缘各种资源的状态或进行某些判断,例如判断一个node是不是edge node
12
	lc *manager.LocationCache
13
}

下行控制器中也有kubeClient和messageLayer,不过没有信道,而是有各类资源的管理器以及本地缓存LocationCache。LocationCache的结构如下:

1
type LocationCache struct {
2
	// EdgeNodes is a map, key is nodeName, value is Status
3
	EdgeNodes sync.Map
4
	// configMapNode is a map, key is namespace/configMapName, value is nodeName
5
	configMapNode sync.Map
6
	// secretNode is a map, key is namespace/secretName, value is nodeName
7
	secretNode sync.Map
8
	// services is a map, key is namespace/serviceName, value is v1.Service
9
	services sync.Map
10
	// endpoints is a map, key is namespace/endpointsName, value is v1.endpoints
11
	endpoints sync.Map
12
	// servicePods is a map, key is namespace/serviceName, value is []v1.Pod
13
	servicePods sync.Map
14
}

可见,它是由存储各资源类型的Map构成的。

管理器manager用于监听资源的改变,以PodManager为例:

1
type PodManager struct {
2
    // events from watch kubernetes api server
3
    realEvents chan watch.Event
4
 
5
    // events merged
6
    mergedEvents chan watch.Event
7
 
8
    // pods, key is UID, value is *v1.Pod
9
    pods sync.Map
10
}

里面有两个event类型信道和一个UID-pod类型的Map,我们将在Start()中进行具体的介绍。注意:除了PodManager,其他的管理器都只有一个Event信道,且没有Map。

3.2. 下行控制器的创建

先看看下行控制器是如何创建的:

1
func NewDownstreamController() (*DownstreamController, error) {
2
    // 1、创建本地缓存
3
    lc := &manager.LocationCache{}
4
    // 2、创建kubeClient
5
    cli, err := utils.KubeClient()
6
    if err != nil {
7
        klog.Warningf("create kube client failed with error: %s", err)
8
        return nil, err
9
    }
10
    // 3、节点名称,在edgesite模式下由edgesite的config提供
11
    var nodeName = ""
12
    if config.Config.EdgeSiteEnable {
13
        if config.Config.NodeName == "" {
14
            return nil, fmt.Errorf("kubeEdge node name is not provided in edgesite controller configuration")
15
        }
16
        nodeName = config.Config.NodeName
17
    }
18
    // 4、创建各个资源管理器
19
    podManager, err := manager.NewPodManager(cli, v1.NamespaceAll, nodeName)
20
    if err != nil {
21
        klog.Warningf("create pod manager failed with error: %s", err)
22
        return nil, err
23
    }
24
    configMapManager, err := manager.NewConfigMapManager(cli, v1.NamespaceAll)
25
    if err != nil {
26
        klog.Warningf("create configmap manager failed with error: %s", err)
27
        return nil, err
28
    }
29
    secretManager, err := manager.NewSecretManager(cli, v1.NamespaceAll)
30
    if err != nil {
31
        klog.Warningf("create secret manager failed with error: %s", err)
32
        return nil, err
33
    }
34
    nodesManager, err := manager.NewNodesManager(cli, v1.NamespaceAll)
35
    if err != nil {
36
        klog.Warningf("Create nodes manager failed with error: %s", err)
37
        return nil, err
38
    }
39
    serviceManager, err := manager.NewServiceManager(cli, v1.NamespaceAll)
40
    if err != nil {
41
        klog.Warningf("Create service manager failed with error: %s", err)
42
        return nil, err
43
    }
44
    endpointsManager, err := manager.NewEndpointsManager(cli, v1.NamespaceAll)
45
    if err != nil {
46
        klog.Warningf("Create endpoints manager failed with error: %s", err)
47
        return nil, err
48
    }
49
    // 5、实例化下行控制器
50
    dc := &DownstreamController{
51
        kubeClient:       cli,
52
        podManager:       podManager,
53
        configmapManager: configMapManager,
54
        secretManager:    secretManager,
55
        nodeManager:      nodesManager,
56
        serviceManager:   serviceManager,
57
        endpointsManager: endpointsManager,
58
        messageLayer:     messagelayer.NewContextMessageLayer(),
59
        lc:               lc,
60
    }
61
    if err := dc.initLocating(); err != nil {
62
        return nil, err
63
    }
64
 
65
    return dc, nil
66
}

这个创建函数很好理解,其中nodeName默认为空,也就会以ListWatch的方式监听所有节点的相关事件,否则会通过选择器监听特定节点的相关事件。对于管理器,我们以NewPodManager()为例:

1
func NewPodManager(kubeClient *kubernetes.Clientset, namespace, nodeName string) (*PodManager, error) {
2
	var lw *cache.ListWatch
3
	if "" == nodeName {
4
		lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, fields.Everything())
5
	} else {
6
		selector := fields.OneTermEqualSelector("spec.nodeName", nodeName)
7
		lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
8
	}
9
	realEvents := make(chan watch.Event, config.Config.Buffer.PodEvent)
10
	mergedEvents := make(chan watch.Event, config.Config.Buffer.PodEvent)
11
	rh := NewCommonResourceEventHandler(realEvents)
12
	si := cache.NewSharedInformer(lw, &v1.Pod{}, 0)
13
	si.AddEventHandler(rh)
14
15
	pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}
16
17
	stopNever := make(chan struct{})
18
	go si.Run(stopNever)
19
	go pm.merge()
20
21
	return pm, nil
22
}

这里面创建了两个goroutine,分别运行SharedInformer和merge。需要注意的是,由于其他资源类型的管理器只有一个信道,所以只运行了SharedInformer。SharedInformer是基于k8s的云原生应用绕不开的,它创建了一个ListWatch的新实例,注册事件监听,后面会单一篇博文讲它。那么podManager独有的merge()函数做了什么呢?

1
func (pm *PodManager) merge() {
2
	for re := range pm.realEvents {
3
    // 1、从realevent事件中获取要操作的pod
4
		pod := re.Object.(*v1.Pod)
5
		switch re.Type {
6
    // 2、根据不同操作,更新管理器本地Map,将事件发送给mergedEvents信道
7
		case watch.Added:
8
			pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
9
			if pod.DeletionTimestamp == nil {
10
				pm.mergedEvents <- re
11
			} else {
12
				re.Type = watch.Modified
13
				pm.mergedEvents <- re
14
			}
15
		case watch.Deleted:
16
			pm.pods.Delete(pod.UID)
17
			pm.mergedEvents <- re
18
		case watch.Modified:
19
			value, ok := pm.pods.Load(pod.UID)
20
			pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
21
			if ok {
22
				cachedPod := value.(*CachePod)
23
				if pm.isPodUpdated(cachedPod, pod) {
24
					pm.mergedEvents <- re
25
				}
26
			} else {
27
				pm.mergedEvents <- re
28
			}
29
		default:
30
			klog.Warningf("event type: %s unsupported", re.Type)
31
		}
32
	}
33
}

还记得podManager独有的两个信道吗?它会将监听到的pod事件从信道realEvents中取出,根据pod事件更新自己的podMap缓存,再将事件发到mergedEvents中。因此,这里的mergedEvents信道就相当于其他manager中的单独存在的那个信道的作用。到此为止,各个资源的管理器就将监听到的资源事件送到下行控制器中各自对应的管理器内的信道中了。

3.3. 下行控制器的启动

下面来看下行控制器的Start()函数。

1
func (dc *DownstreamController) Start() error {
2
	klog.Info("start downstream controller")
3
	// pod
4
	go dc.syncPod()
5
	// configmap
6
	go dc.syncConfigMap()
7
	// secret
8
	go dc.syncSecret()
9
	// nodes
10
	go dc.syncEdgeNodes()
11
	// service
12
	go dc.syncService()
13
	// endpoints
14
	go dc.syncEndpoints()
15
16
	return nil
17
}

下行控制器启动后,开启了同步各个资源的协程。以syncPod()为例分析:

1
func (dc *DownstreamController) syncPod() {
2
	for {
3
		select {
4
		case <-beehiveContext.Done():
5
			klog.Warning("Stop edgecontroller downstream syncPod loop")
6
			return
7
    // 1、从信道收到事件
8
		case e := <-dc.podManager.Events():
9
      // 2、取得事件中的pod
10
			pod, ok := e.Object.(*v1.Pod)
11
			if !ok {
12
				klog.Warningf("object type: %T unsupported", pod)
13
				continue
14
			}
15
			if !dc.lc.IsEdgeNode(pod.Spec.NodeName) {
16
				continue
17
			}
18
      // 3、构建下行消息
19
			msg := model.NewMessage("")
20
			msg.SetResourceVersion(pod.ResourceVersion)
21
			resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name)
22
			if err != nil {
23
				klog.Warningf("built message resource failed with error: %s", err)
24
				continue
25
			}
26
			msg.Content = pod
27
      // 4、根据操作类型构建消息的Router,并在下行控制器的本地缓存中更新pod到最新
28
			switch e.Type {
29
			case watch.Added:
30
				msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)
31
				dc.lc.AddOrUpdatePod(*pod)
32
			case watch.Deleted:
33
				msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
34
			case watch.Modified:
35
				msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.UpdateOperation)
36
				dc.lc.AddOrUpdatePod(*pod)
37
			default:
38
				klog.Warningf("pod event type: %s unsupported", e.Type)
39
			}
40
      // 5、发送消息到边缘
41
			if err := dc.messageLayer.Send(*msg); err != nil {
42
				klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
43
			} else {
44
				klog.Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
45
			}
46
		}
47
	}
48
}

看过了上行控制器,这个函数也就很好理解了,它将事件从信道中取出,构建对应的Message类型消息,并更新管理器中的缓存,最后,还是利用beehiveContext.Send()将消息发送出去,理论上,也就是发送给cloudhub。

4. 总结

这样一来,EdgeControllrt的作用也就明晰了。上行控制器从cloudhub中接收消息,通过kubeClient与k8s通信进行更新,并将回应消息发回cloudhub。下行控制器监听k8s事件,并构建Message类型的消息发送给cloudhub,进而发送到边缘节点,进行相应的资源更新。