T4L2

Time to learn for everything

0%

KubeEdge源码分析之CloudHub(一)

1. CloudHub.Start()

CloudHub是cloudcore中比较重要的组件之一,负责与边端EdgeHub进行通信。经过对CloudCore源码的整体分析可知,云端各个组件的重点在于各自的Start()函数。于是,我们进入cloudhub.go文件,查看CloudHub的Start()函数。

1
func (a *cloudHub) Start() {
2
    objectSyncController := newObjectSyncController()
3
 
4
    if !cache.WaitForCacheSync(beehiveContext.Done(),
5
        objectSyncController.ClusterObjectSyncSynced,
6
        objectSyncController.ObjectSyncSynced,
7
    ) {
8
        klog.Errorf("unable to sync caches for objectSyncController")
9
        os.Exit(1)
10
    }
11
    
12
    // 创建一个通道消息队列
13
    messageq := channelq.NewChannelMessageQueue(objectSyncController)
14
 
15
    // start dispatch message from the cloud to edge node
16
    go messageq.DispatchMessage()
17
 
18
    servers.StartCloudHub(messageq)
19
 
20
    if hubconfig.Config.UnixSocket.Enable {
21
        // The uds server is only used to communicate with csi driver from kubeedge on cloud.
22
        // It is not used to communicate between cloud and edge.
23
        go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
24
    }
25
}

Start()函数中首先创建了一个同步控制器,这是cloudcore的四大组件之一,我们将在另一片文章中单独介绍。下面,Start()函数主要做了4件事:

  • 创建了一个信道消息队列messageq
  • 创建了消息分发的goroutine
  • 启动了CloudHub服务
  • 启动了UDS Server(可选)

2. 信道消息队列

那么,让我们首先来看一下信道消息队列的创建:

1
func NewChannelMessageQueue(objectSyncController *hubconfig.ObjectSyncController) *ChannelMessageQueue {
2
    return &ChannelMessageQueue{
3
        ObjectSyncController: objectSyncController,
4
    }
5
}

可见,messageq是一个ChannelMessageQueue类型,具体如下:

1
type ChannelMessageQueue struct {
2
    queuePool sync.Map // k:nodeID,v:NodeQueue
3
    storePool sync.Map // k:nodeID,v:NodeStore
4
    // 队列和存储都是在节点注册的时候分配给他们的,在那两个loop中处理
5
    listQueuePool sync.Map // k:nodeID,v:NodeListQueue
6
    listStorePool sync.Map // k:nodeID,v:NodeListStore
7
    // 同步控制器
8
    ObjectSyncController *hubconfig.ObjectSyncController
9
}

可见,该类型中除了前面创建的同步控制器,就是4个Map类型,它们的key都是nodeID,value就是各自名称代表的东西,例如queuePool的value就是NodeQueue。

3. 消息分发

1
func (q *ChannelMessageQueue) DispatchMessage() {
2
    for {
3
        // select语句类似于switch语句,在这里先检测beehiveContext有没有被cancel
4
        select {
5
        case <-beehiveContext.Done():
6
            klog.Warning("Cloudhub channel eventqueue dispatch message loop stoped")
7
            return
8
        default:
9
        }
10
11
        // Receive实际上是接收Cannel的消息,这个channel是cloudhub专门用来接收消息的
12
        msg, err := beehiveContext.Receive(model.SrcCloudHub)
13
        if err != nil {
14
            klog.Info("receive not Message format message")
15
            continue
16
        }
17
        nodeID, err := GetNodeID(&msg)
18
        if nodeID == "" || err != nil {
19
            klog.Warning("node id is not found in the message")
20
            continue
21
        }
22
        // 判断资源类型是不是podlist等
23
        if isListResource(&msg) {
24
            q.addListMessageToQueue(nodeID, &msg)
25
        } else {
26
            q.addMessageToQueue(nodeID, &msg)
27
        }
28
    }
29
}

在消息分发函数DispatchMessage()中,Select和Channel的关系可以参见我的博文“Go基础之Goroutines和Channels”,在这里的主要作用是监测beehiveContext有没有被Cancel。通过beehiveContext接收其他组件发给CloudHub的消息msg,beehiveContext是KubeEdge组件间通信的基础,将单独一篇博客介绍。最后,将接收到的消息存到前面的信道消息队列的各个Map中去,其中,queue存放messagekey,也就是msg.Header.ID,store存放msg本身。至于ListMessage和Message有什么区别呢?从后面的写操作中我们可以看到,一般的Message在发送后要求边缘回复一个ack消息,而ListMessage发送后就直接删除了云端该消息的缓存,也不需要ack消息,详情可见CloudHub的第二篇博文。

“在KubeEdge 1.2的更新中,云端发送状态同步消息到边缘时,边缘在接收并且持久化成功后,会回复状态同步成功的 ACK 消息给云端。如果云端未收到边缘状态同步成功的消息回复,则由业务层代码触发重传机制,重新进行状态同步。”

可见,这里的Message应该就是需要ack消息的状态同步消息,而ListMessage则不需要。

判断消息是否是list消息的函数如下:

1
func isListResource(msg *beehiveModel.Message) bool {
2
   msgResource := msg.GetResource()
3
   if strings.Contains(msgResource, beehiveModel.ResourceTypePodlist) ||
4
      strings.Contains(msgResource, commonconst.ResourceTypeServiceList) ||
5
      strings.Contains(msgResource, commonconst.ResourceTypeEndpointsList) ||
6
      strings.Contains(msgResource, "membership") ||
7
      strings.Contains(msgResource, "twin/cloud_updated") {
8
      return true
9
   }
10
11
   if msg.GetOperation() == beehiveModel.ResponseOperation {
12
      content, ok := msg.Content.(string)
13
      if ok && content == "OK" {
14
         return true
15
      }
16
   }
17
18
   if msg.GetSource() == edgeconst.EdgeControllerModuleName {
19
      resourceType, _ := edgemessagelayer.GetResourceType(*msg)
20
      if resourceType == beehiveModel.ResourceTypeNode {
21
         return true
22
      }
23
   }
24
25
   return false
26
}

可见,list消息有以下几种:

  • resource字段中含有”podlist”、”servicelist”、”endpointslist”、”membership”、”twin/cloud_updated”的
  • 内容为OK的ACK消息
  • EdgeController发出的node事件的消息

到此为止,CloudHub已经做了一件事,那就是云端其他组件发来的消息存到信道消息队列中去。