T4L2

Time to learn for everything

0%

KubeEdge源码分析之CloudHub(二)

1. CloudHub服务之InitHandler()

在上一篇文章中,CloudHub已经把云端其他组件发来的消息分发入了信道消息队列中目标边缘节点对应的nodeQueue和nodeStore当中,下面需要将这些消息发送给EdgeHub,并处理EdgeHub发来的消息。

1
func StartCloudHub(messageq *channelq.ChannelMessageQueue) {
2
    handler.InitHandler(messageq)
3
    // start websocket server
4
    if hubconfig.Config.WebSocket.Enable {
5
        go startWebsocketServer()
6
    }
7
    // start quic server
8
    if hubconfig.Config.Quic.Enable {
9
        go startQuicServer()
10
    }
11
}

CloudHub启动主要做了3件事:

  • 为websocket和quic服务注册handler
  • 启动websocket服务
  • 启动quic服务

本文主要分析handler部分。

1.1. 初始化handler
1
func InitHandler(eventq *channelq.ChannelMessageQueue) {
2
    once.Do(func() {
3
        CloudhubHandler = &MessageHandle{
4
            KeepaliveInterval: int(hubconfig.Config.KeepaliveInterval),
5
            WriteTimeout:      int(hubconfig.Config.WriteTimeout),
6
            MessageQueue:      eventq,
7
            NodeLimit:         int(hubconfig.Config.NodeLimit),
8
        }
9
 
10
        CloudhubHandler.KeepaliveChannel = make(map[string]chan struct{})
11
        CloudhubHandler.Handlers = []HandleFunc{ // 添加3个处理Loop
12
            CloudhubHandler.KeepaliveCheckLoop, // 判断边缘节点是否连接着
13
            CloudhubHandler.MessageWriteLoop, // 处理所有写请求
14
            CloudhubHandler.ListMessageWriteLoop, // 处理所有list资源的写请求
15
        } // 上面后两个Loop分别管理他们对应的消息队列
16
        
17
        CloudhubHandler.initServerEntries()
18
    })
19
}

这里创建了一个CloudhubHandler,它实际上是MessageHandle类型的,用于处理云边之间传递的消息,其结构如下:

1
// MessageHandle 处理云边消息
2
type MessageHandle struct {
3
    KeepaliveInterval int // 用于判断边缘节点存活的时间
4
    WriteTimeout      int // 写超时,在handleMessage中设置超时
5
    Nodes             sync.Map // nodeID-bool 这三个都是服务启动,节点注册后存进来的
6
    nodeConns         sync.Map // nodeID-hubio.CloudHubIO
7
    nodeLocks         sync.Map // nodeID-&sync.Mutex{}
8
    MessageQueue      *channelq.ChannelMessageQueue //前面提到的信道消息队列
9
    Handlers          []HandleFunc // Loop的集合
10
    NodeLimit         int // 边缘节点数量限制,超出限制会在HandleServer报警
11
    KeepaliveChannel  map[string]chan struct{} // nodeID-channel的map,用于判断边缘存活
12
    MessageAcks       sync.Map // msgID-ackChan
13
}

接着:

1
func (mh *MessageHandle) initServerEntries() {
2
    mux.Entry(mux.NewPattern("*").Op("*"), mh.HandleServer)
3
}

到这里,奇怪的知识就多了起来,需要一些解释。

首先,我们了解一下mux包,它的全称是HTTP request multiplexer,即“HTTP请求多路复用器”,用于将传入的请求和各自的处理程序进行匹配。mux.Entry函数中做了下列事情:

1) 通过NewMessageMux()函数创建了一个MessageMux对象,其结构如下:

1
type MessageMux struct {
2
   filter   *filter.MessageFilter
3
   muxEntry []*MessageMuxEntry
4
}

filter是一个消息过滤器,muxEntry就是存放具体的路由信息的地方了。MessageMuxEntry的结构如下:

1
type MessageMuxEntry struct {
2
	pattern    *MessagePattern
3
	handleFunc HandlerFunc
4
}

这里面的两个对象,就是Entry函数的两个参数。

2) 调用MessageMux.Entry(),将传入的参数组合成上面的MessageMuxEntry对象,并放入muxEntry数组中。这样,处理函数HandleServer()就被注册到MessageMux.muxEntry数组中的MessageMuxEntry对象中了。

1.2. 消息处理函数HandleServer

关于三个HandleFunc和HandleServer函数之间的关系,前者是Loop循环,是信道消息队列的管理者,而后者是消息处理者。其实从后面的分析我们可以发现,前者负责的是云到边的消息,后者负责的是边到云的消息。那么让我们具体看一下HandleServer:

1
func (mh *MessageHandle) HandleServer(container *mux.MessageContainer, writer mux.ResponseWriter) {
2
	nodeID := container.Header.Get("node_id")
3
	projectID := container.Header.Get("project_id")
4
5
	if mh.GetNodeCount() >= mh.NodeLimit {
6
		klog.Errorf("Fail to serve node %s, reach node limit", nodeID)
7
		return
8
	}
9
10
	if container.Message.GetOperation() == model.OpKeepalive {
11
		klog.Infof("Keepalive message received from node: %s", nodeID)
12
		mh.KeepaliveChannel[nodeID] <- struct{}{}
13
		return
14
	}
15
16
	// handle the reponse from edge
17
	if VolumeRegExp.MatchString(container.Message.GetResource()) {
18
		beehiveContext.SendResp(*container.Message)
19
		return
20
	}
21
22
	// handle the ack message from edge
23
	if container.Message.Router.Operation == beehiveModel.ResponseOperation {
24
		if ackChan, ok := mh.MessageAcks.Load(container.Message.Header.ParentID); ok {
25
			close(ackChan.(chan struct{}))
26
			mh.MessageAcks.Delete(container.Message.Header.ParentID)
27
		}
28
		return
29
	}
30
31
	err := mh.PubToController(&model.HubInfo{ProjectID: projectID, NodeID: nodeID}, container.Message)
32
	if err != nil {
33
		// if err, we should stop node, write data to edgehub, stop nodify
34
		klog.Errorf("Failed to serve handle with error: %s", err.Error())
35
	}
36
}

HandleServer处理所有边缘节点发出的请求,里面主要做了下列处理:

  • 判断节点数是否达到上限
  • 判断边缘节点是否保持连接
  • 处理response类型的消息,将消息送入beehiveContext中信道上下文的annoChannels中对应的信道
  • 处理ack类型的消息,关闭并删除MessageAcks中对应的信道
  • 将消息分发给控制器

深入分析最后这一步发现,它其实还是通过beehiveContext的SendToGroup()函数实现的消息分发,属于组件间消息传递,毋庸赘言。

到这里,我们可以发现,HandleServer主要负责处理边缘节点发往云端的消息,并保持连接。

1.3. 信道消息队列的管理者Loop

既然HandleServer负责了边到云的消息传送,那么Loop是否就负责云到边的消息传送了呢?MessageHandle中一共有三个Loop,分别负责保持连接和处理两种写请求,我们逐一分析:

1
func (mh *MessageHandle) KeepaliveCheckLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
2
    keepaliveTicker := time.NewTimer(time.Duration(mh.KeepaliveInterval) * time.Second)
3
    for {
4
        select {
5
        case _, ok := <-mh.KeepaliveChannel[info.NodeID]:
6
            if !ok {
7
                return
8
            }
9
            klog.Infof("Node %s is still alive", info.NodeID)
10
            keepaliveTicker.Reset(time.Duration(mh.KeepaliveInterval) * time.Second)
11
        case <-keepaliveTicker.C:
12
            klog.Warningf("Timeout to receive heart beat from edge node %s for project %s",
13
                info.NodeID, info.ProjectID)
14
            stopServe <- nodeDisconnect
15
            close(stopSendMsg)
16
            return
17
        }
18
    }
19
}

KeepaliveCheckLoop主要通过相应的信道检查边缘节点是否保持连接,并在断开链接后通知停止消息传输。

1
func (mh *MessageHandle) MessageWriteLoop(hi hubio.CloudHubIO, info *model.HubInfo, stopServe chan ExitCode, stopSendMsg chan struct{}) {
2
    nodeQueue, err := mh.MessageQueue.GetNodeQueue(info.NodeID)
3
    if err != nil {
4
        klog.Errorf("Failed to get nodeQueue for node %s: %v", info.NodeID, err)
5
        stopServe <- messageQueueDisconnect
6
        return
7
    }
8
    nodeStore, err := mh.MessageQueue.GetNodeStore(info.NodeID)
9
    if err != nil {
10
        klog.Errorf("Failed to get nodeStore for node %s: %v", info.NodeID, err)
11
        stopServe <- messageQueueDisconnect
12
        return
13
    }
14
 
15
    for {
16
        select {
17
        case <-stopSendMsg:
18
            klog.Errorf("Node %s disconnected and stopped sending messages", info.NodeID)
19
            return
20
        default:
21
            mh.handleMessage(nodeQueue, nodeStore, hi, info, stopServe, "message")
22
        }
23
    }
24
}

MessageWriteLoop和ListMessageWriteLoop的结构大体是相似的,前面都从信道消息队列中获取到了节点的nodeQueue和nodeStore,根据前面的分析可知里面存着要发往边缘节点的消息ID和消息。重点在于handelMessage这个函数,显然,它是处理云往边发送消息的重点。

1
func (mh *MessageHandle) handleMessage(nodeQueue workqueue.RateLimitingInterface,
2
    nodeStore cache.Store, hi hubio.CloudHubIO,
3
    info *model.HubInfo, stopServe chan ExitCode, msgType string) {
4
    key, quit := nodeQueue.Get()
5
    if quit {
6
        klog.Errorf("nodeQueue for node %s has shutdown", info.NodeID)
7
        return
8
    }
9
    obj, exist, _ := nodeStore.GetByKey(key.(string))
10
    if !exist {
11
        klog.Errorf("nodeStore for node %s doesn't exist", info.NodeID)
12
        return
13
    }
14
 
15
    msg := obj.(*beehiveModel.Message)
16
    // 判断消息能否顺利传输
17
    if model.IsNodeStopped(msg) {
18
        klog.Infof("node %s is stopped, will disconnect", info.NodeID)
19
        stopServe <- nodeStop
20
        return
21
    }
22
    if !model.IsToEdge(msg) {
23
        klog.Infof("skip only to cloud event for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
24
        return
25
    }
26
    klog.V(4).Infof("event to send for node %s, %s, content %s", info.NodeID, dumpMessageMetadata(msg), msg.Content)
27
    // 对于非list消息需要接收ack并保存副本
28
    copyMsg := deepcopy(msg)
29
    // 为消息设置resources和operation
30
    trimMessage(msg)
31
    err := hi.SetWriteDeadline(time.Now().Add(time.Duration(mh.WriteTimeout) * time.Second))
32
    if err != nil {
33
        klog.Errorf("SetWriteDeadline error, %s", err.Error())
34
        stopServe <- hubioWriteFail
35
        return
36
    }
37
    if msgType == "listMessage" {
38
        mh.send(hi, info, msg)
39
        nodeStore.Delete(msg)
40
    } else {
41
        mh.sendMsg(hi, info, msg, copyMsg, nodeStore)
42
    }
43
 
44
    nodeQueue.Done(key)
45
}

首先取出之前存进去的messagekey和msg,quit和exist用于判断nodeQueue和nodeStore是否存在。然后判断边缘节点是否正常连接,判断消息是否是发往边缘节点的,判断完毕一切正常后开始发送。

首先,对消息做一个深拷贝,得到一个copyMsg,它主要用于非list消息,需要保存副本。然后为消息设置resource和operation。最后,如果是list类型的消息,直接通过hubIO发送到边缘,并删除云端nodeStore中存储的消息。而对于非list型的消息,我们需要为其在MessageAcks中创建一个信道,然后也通过hubIO发送到边缘。而边缘发回的ack消息,就由前面提到的HandleServer来处理了。

2. 启动通信服务

分析完消息处理函数,接下来便是通信服务。

1
func startWebsocketServer() {
2
   tlsConfig := createTLSConfig(hubconfig.Config.Ca, hubconfig.Config.Cert, hubconfig.Config.Key)
3
   svc := server.Server{
4
      Type:       api.ProtocolTypeWS, // 协议类型:WebSocket
5
      TLSConfig:  &tlsConfig,
6
      AutoRoute:  true,
7
      ConnNotify: handler.CloudhubHandler.OnRegister, // 注册连接
8
      Addr:       fmt.Sprintf("%s:%d", hubconfig.Config.WebSocket.Address, hubconfig.Config.WebSocket.Port),
9
      ExOpts:     api.WSServerOption{Path: "/"},
10
   }
11
   klog.Infof("Startting cloudhub %s server", api.ProtocolTypeWS)
12
   svc.ListenAndServeTLS("", "")
13
}

由于KubeEdge在部署的时候就准备了ca证书,我们可以使用安全的TSL通信。Server中比较关键的是ConnNotify这个成员,通过它就和上一篇文章中的CloudhubHandler联系起来了,它用于注册边缘到云端的连接,主要做了以下几件事:

  • 在KeepaliveChannel中建立自己的Channel来保持连接
  • 获取IO
  • 开启一个该连接的服务goroutine,将这个节点注册到信道消息队列和MessageHandler的三个Map中去;开启3个Loop的goroutine,这样,云到边的消息处理部分就准备好了

最后,启动监听,并将监听到的消息给HandleServer处理。

QuicServer在功能上和WebSocketServer没有什么本质的区别,代码结构也基本一致,在此略过,有兴趣的读者可以自行分析。