1. CloudHub.Start()
CloudHub是cloudcore中比较重要的组件之一,负责与边端EdgeHub进行通信。经过对CloudCore源码的整体分析可知,云端各个组件的重点在于各自的Start()函数。于是,我们进入cloudhub.go文件,查看CloudHub的Start()函数。
1 | func (a *cloudHub) Start() { |
2 | objectSyncController := newObjectSyncController() |
3 | |
4 | if !cache.WaitForCacheSync(beehiveContext.Done(), |
5 | objectSyncController.ClusterObjectSyncSynced, |
6 | objectSyncController.ObjectSyncSynced, |
7 | ) { |
8 | klog.Errorf("unable to sync caches for objectSyncController") |
9 | os.Exit(1) |
10 | } |
11 | |
12 | // 创建一个通道消息队列 |
13 | messageq := channelq.NewChannelMessageQueue(objectSyncController) |
14 | |
15 | // start dispatch message from the cloud to edge node |
16 | go messageq.DispatchMessage() |
17 | |
18 | servers.StartCloudHub(messageq) |
19 | |
20 | if hubconfig.Config.UnixSocket.Enable { |
21 | // The uds server is only used to communicate with csi driver from kubeedge on cloud. |
22 | // It is not used to communicate between cloud and edge. |
23 | go udsserver.StartServer(hubconfig.Config.UnixSocket.Address) |
24 | } |
25 | } |
Start()函数中首先创建了一个同步控制器,这是cloudcore的四大组件之一,我们将在另一片文章中单独介绍。下面,Start()函数主要做了4件事:
- 创建了一个信道消息队列messageq
- 创建了消息分发的goroutine
- 启动了CloudHub服务
- 启动了UDS Server(可选)
2. 信道消息队列
那么,让我们首先来看一下信道消息队列的创建:
1 | func NewChannelMessageQueue(objectSyncController *hubconfig.ObjectSyncController) *ChannelMessageQueue { |
2 | return &ChannelMessageQueue{ |
3 | ObjectSyncController: objectSyncController, |
4 | } |
5 | } |
可见,messageq是一个ChannelMessageQueue类型,具体如下:
1 | type ChannelMessageQueue struct { |
2 | queuePool sync.Map // k:nodeID,v:NodeQueue |
3 | storePool sync.Map // k:nodeID,v:NodeStore |
4 | // 队列和存储都是在节点注册的时候分配给他们的,在那两个loop中处理 |
5 | listQueuePool sync.Map // k:nodeID,v:NodeListQueue |
6 | listStorePool sync.Map // k:nodeID,v:NodeListStore |
7 | // 同步控制器 |
8 | ObjectSyncController *hubconfig.ObjectSyncController |
9 | } |
可见,该类型中除了前面创建的同步控制器,就是4个Map类型,它们的key都是nodeID,value就是各自名称代表的东西,例如queuePool的value就是NodeQueue。
3. 消息分发
1 | func (q *ChannelMessageQueue) DispatchMessage() { |
2 | for { |
3 | // select语句类似于switch语句,在这里先检测beehiveContext有没有被cancel |
4 | select { |
5 | case <-beehiveContext.Done(): |
6 | klog.Warning("Cloudhub channel eventqueue dispatch message loop stoped") |
7 | return |
8 | default: |
9 | } |
10 | |
11 | // Receive实际上是接收Cannel的消息,这个channel是cloudhub专门用来接收消息的 |
12 | msg, err := beehiveContext.Receive(model.SrcCloudHub) |
13 | if err != nil { |
14 | klog.Info("receive not Message format message") |
15 | continue |
16 | } |
17 | nodeID, err := GetNodeID(&msg) |
18 | if nodeID == "" || err != nil { |
19 | klog.Warning("node id is not found in the message") |
20 | continue |
21 | } |
22 | // 判断资源类型是不是podlist等 |
23 | if isListResource(&msg) { |
24 | q.addListMessageToQueue(nodeID, &msg) |
25 | } else { |
26 | q.addMessageToQueue(nodeID, &msg) |
27 | } |
28 | } |
29 | } |
在消息分发函数DispatchMessage()中,Select和Channel的关系可以参见我的博文“Go基础之Goroutines和Channels”,在这里的主要作用是监测beehiveContext有没有被Cancel。通过beehiveContext接收其他组件发给CloudHub的消息msg,beehiveContext是KubeEdge组件间通信的基础,将单独一篇博客介绍。最后,将接收到的消息存到前面的信道消息队列的各个Map中去,其中,queue存放messagekey,也就是msg.Header.ID,store存放msg本身。至于ListMessage和Message有什么区别呢?从后面的写操作中我们可以看到,一般的Message在发送后要求边缘回复一个ack消息,而ListMessage发送后就直接删除了云端该消息的缓存,也不需要ack消息,详情可见CloudHub的第二篇博文。
“在KubeEdge 1.2的更新中,云端发送状态同步消息到边缘时,边缘在接收并且持久化成功后,会回复状态同步成功的 ACK 消息给云端。如果云端未收到边缘状态同步成功的消息回复,则由业务层代码触发重传机制,重新进行状态同步。”
可见,这里的Message应该就是需要ack消息的状态同步消息,而ListMessage则不需要。
判断消息是否是list消息的函数如下:
1 | func isListResource(msg *beehiveModel.Message) bool { |
2 | msgResource := msg.GetResource() |
3 | if strings.Contains(msgResource, beehiveModel.ResourceTypePodlist) || |
4 | strings.Contains(msgResource, commonconst.ResourceTypeServiceList) || |
5 | strings.Contains(msgResource, commonconst.ResourceTypeEndpointsList) || |
6 | strings.Contains(msgResource, "membership") || |
7 | strings.Contains(msgResource, "twin/cloud_updated") { |
8 | return true |
9 | } |
10 | |
11 | if msg.GetOperation() == beehiveModel.ResponseOperation { |
12 | content, ok := msg.Content.(string) |
13 | if ok && content == "OK" { |
14 | return true |
15 | } |
16 | } |
17 | |
18 | if msg.GetSource() == edgeconst.EdgeControllerModuleName { |
19 | resourceType, _ := edgemessagelayer.GetResourceType(*msg) |
20 | if resourceType == beehiveModel.ResourceTypeNode { |
21 | return true |
22 | } |
23 | } |
24 | |
25 | return false |
26 | } |
可见,list消息有以下几种:
- resource字段中含有”podlist”、”servicelist”、”endpointslist”、”membership”、”twin/cloud_updated”的
- 内容为OK的ACK消息
- EdgeController发出的node事件的消息
到此为止,CloudHub已经做了一件事,那就是云端其他组件发来的消息存到信道消息队列中去。