T4L2

Time to learn for everything

0%

KubeEdge源码分析之SyncController

0. SyncController与KubeEdge v1.2

SyncController是KubeEdge云端部分四大组件之一,是KubeEdge v1.2新增的组件,它主要负责周期性检查各个边缘节点的同步状态,对比K8s中资源的数据,将不一致的状态同步到边缘,确保云边状态的最终一致性。

KubeEdge v1.2的新特性之一就是增强了云边协同传输的可靠性,其一是增加了ACK消息作为校验机制,这在其他模块的分析中可以看到;其二是云端会实时记录每个边缘节点同步成功的最新消息版本号(ResourceVersion),并以CRD的形式(ClusterObjectSync和ObjectSync)持久化保存到K8s中。该机制可以保证在边际场景下云端故障或者边缘离线重启后消息发送的顺序性和连续性,避免重发旧消息引起云边状态不一致问题;其三就是新模块SyncController了。

就现版本来说,带ack的消息传递机制尚未完善,云边间网络不稳定会导致连接频繁断开。如果cloudcore或edgecore重启或离线一段时间的话,可能导致发到边缘的消息丢失,使得云边间不一致。SyncController就是为了解决这个问题诞生的。

1. SyncController的结构

SyncController的结构如下:

1
// SyncController use beehive context message layer
2
type SyncController struct {
3
   enable bool
4
5
   // informer,主要为下面提供同步判断和列出资源的接口
6
   podInformer               coreinformers.PodInformer
7
   configMapInformer         coreinformers.ConfigMapInformer
8
   secretInformer            coreinformers.SecretInformer
9
   serviceInformer           coreinformers.ServiceInformer
10
   endpointInformer          coreinformers.EndpointsInformer
11
   nodeInformer              coreinformers.NodeInformer
12
   deviceInformer            deviceinformer.DeviceInformer
13
   clusterObjectSyncInformer syncinformer.ClusterObjectSyncInformer
14
   objectSyncInformer        syncinformer.ObjectSyncInformer
15
16
   // synced,判断SharedInformer中的存储是否已同步
17
   podSynced               cache.InformerSynced
18
   configMapSynced         cache.InformerSynced
19
   secretSynced            cache.InformerSynced
20
   serviceSynced           cache.InformerSynced
21
   endpointSynced          cache.InformerSynced
22
   nodeSynced              cache.InformerSynced
23
   deviceSynced            cache.InformerSynced
24
   clusterObjectSyncSynced cache.InformerSynced
25
   objectSyncSynced        cache.InformerSynced
26
27
   // lister,用于查找ObjectSync对应的资源
28
   podLister               corelisters.PodLister
29
   configMapLister         corelisters.ConfigMapLister
30
   secretLister            corelisters.SecretLister
31
   serviceLister           corelisters.ServiceLister
32
   endpointLister          corelisters.EndpointsLister
33
   nodeLister              corelisters.NodeLister
34
   deviceLister            devicelister.DeviceLister
35
   clusterObjectSyncLister synclister.ClusterObjectSyncLister
36
   objectSyncLister        synclister.ObjectSyncLister
37
}

可见,SyncController中主要包含以下三种类型的成员:

  • informer,各类资源的Informer接口实现,提供对资源的Informer函数和Lister函数的访问
  • synced,是InformerSynced函数,用于确定informer是否已经同步,这有助于确定缓存是否已经同步。由后面可知,实际上是infomer中Informer成员函数的HasSynced成员函数
  • lister,即informer中的Lister,可以列出所有的该类资源

此外,除了pod等k8s本身的资源类型,还有以下三种资源类型:

  • device,即设备实例
  • clusterObjectSync,用于保存集群作用域的对象(cluster scoped object),现版本还没有实用
  • objectSync,用于保存命名空间作用域的对象(namespace scoped object),例如pod、service等等

ClusterObjectSync的结构如下(ObjectSync的结构与其基本一致):

1
type ClusterObjectSync struct {
2
    v1.TypeMeta   `json:",inline"`
3
    v1.ObjectMeta `json:"metadata,omitempty"`
4
    Spec              ObjectSyncSpec   `json:"spec,omitempty"`
5
    Status            ObjectSyncStatus `json:"status,omitempty"`
6
}

Spec存储保留到边缘的对象的详细信息,包括APIVersion、Kind和Name,Status存储保留到边缘的对象的消息版本号(resourceVersion)。

2. 注册函数

经过对CloudCore源码的整体分析可知,云端各个组件的重点在于各自的Start()函数。但是SyncController不一样,它内部的各个成员在注册时就通过newSyncController函数实例化了。

1
func newSyncController(enable bool) *SyncController {
2
   config, err := buildConfig()
3
   if err != nil {
4
      klog.Errorf("Failed to build config, err: %v", err)
5
      os.Exit(1)
6
   }
7
   kubeClient := kubernetes.NewForConfigOrDie(config)
8
   crdClient := versioned.NewForConfigOrDie(config)
9
   // 为k8s资源和自定义资源创建SharedInformerFactory,用于实例化Informer
10
   kubeSharedInformers := informers.NewSharedInformerFactory(kubeClient, 0)
11
   crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)
12
   // 实例化各类资源的informer
13
   podInformer := kubeSharedInformers.Core().V1().Pods()
14
   configMapInformer := kubeSharedInformers.Core().V1().ConfigMaps()
15
   secretInformer := kubeSharedInformers.Core().V1().Secrets()
16
   serviceInformer := kubeSharedInformers.Core().V1().Services()
17
   endpointInformer := kubeSharedInformers.Core().V1().Endpoints()
18
   nodeInformer := kubeSharedInformers.Core().V1().Nodes()
19
   deviceInformer := crdFactory.Devices().V1alpha1().Devices()
20
   clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs()
21
   objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs()
22
23
   sctl := &SyncController{
24
      enable: enable,
25
      // 实例化各类资源的informer
26
      podInformer:               podInformer,
27
      configMapInformer:         configMapInformer,
28
      secretInformer:            secretInformer,
29
      serviceInformer:           serviceInformer,
30
      endpointInformer:          endpointInformer,
31
      nodeInformer:              nodeInformer,
32
      deviceInformer:            deviceInformer,
33
      clusterObjectSyncInformer: clusterObjectSyncInformer,
34
      objectSyncInformer:        objectSyncInformer,
35
      // 实例化各类资源的synced,通过informer中的HasSynced判断
36
      podSynced:               podInformer.Informer().HasSynced,
37
      configMapSynced:         configMapInformer.Informer().HasSynced,
38
      secretSynced:            secretInformer.Informer().HasSynced,
39
      serviceSynced:           serviceInformer.Informer().HasSynced,
40
      endpointSynced:          endpointInformer.Informer().HasSynced,
41
      nodeSynced:              nodeInformer.Informer().HasSynced,
42
      deviceSynced:            deviceInformer.Informer().HasSynced,
43
      clusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced,
44
      objectSyncSynced:        objectSyncInformer.Informer().HasSynced,
45
      // 实例化各类资源的lister,即informer中的Lister成员函数
46
      podLister:               podInformer.Lister(),
47
      configMapLister:         configMapInformer.Lister(),
48
      secretLister:            secretInformer.Lister(),
49
      serviceLister:           serviceInformer.Lister(),
50
      endpointLister:          endpointInformer.Lister(),
51
      nodeLister:              nodeInformer.Lister(),
52
      clusterObjectSyncLister: clusterObjectSyncInformer.Lister(),
53
      objectSyncLister:        objectSyncInformer.Lister(),
54
   }
55
56
   return sctl
57
}

可见SyncController中的三种成员,synced和lister实际上都是调用的各自的informer中的成员函数。

2. Start()函数

我们进入synccontroller.go文件,查看SyncController的Start()函数。

1
func (sctl *SyncController) Start() {
2
   // 启动各类资源的SharedInformer协程,提供synced和lister
3
   go sctl.podInformer.Informer().Run(beehiveContext.Done())
4
   go sctl.configMapInformer.Informer().Run(beehiveContext.Done())
5
   go sctl.secretInformer.Informer().Run(beehiveContext.Done())
6
   go sctl.serviceInformer.Informer().Run(beehiveContext.Done())
7
   go sctl.endpointInformer.Informer().Run(beehiveContext.Done())
8
   go sctl.nodeInformer.Informer().Run(beehiveContext.Done())
9
   go sctl.deviceInformer.Informer().Run(beehiveContext.Done())
10
   go sctl.clusterObjectSyncInformer.Informer().Run(beehiveContext.Done())
11
   go sctl.objectSyncInformer.Informer().Run(beehiveContext.Done())
12
13
   // 等待缓存同步,会不断运行各资源的synced程序判断,直到返回true、err或beehiveContext关闭
14
   if !cache.WaitForCacheSync(beehiveContext.Done(),
15
      sctl.podSynced,
16
      sctl.configMapSynced,
17
      sctl.secretSynced,
18
      sctl.serviceSynced,
19
      sctl.endpointSynced,
20
      sctl.nodeSynced,
21
      sctl.deviceSynced,
22
      sctl.clusterObjectSyncSynced,
23
      sctl.objectSyncSynced,
24
   ) {
25
      klog.Errorf("unable to sync caches for sync controller")
26
      return
27
   }
28
   // 在beehiveContext断开前每5秒循环调用reconcile函数
29
   go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())
30
}

SharedInformer属于k8s的基础知识,在这里不多介绍,那么SyncController的重点就在于reconcile函数。

2.1. Reconcile函数
1
func (sctl *SyncController) reconcile() {
2
   // 1、列出所有的ClusterObjectSync
3
   allClusterObjectSyncs, err := sctl.clusterObjectSyncLister.List(labels.Everything())
4
   if err != nil {
5
      klog.Errorf("Filed to list all the ClusterObjectSyncs: %v", err)
6
   }
7
   // 2、(空函数)将已经持久化到边缘的cluster scope objects和k8s中的进行比较,生成更新/删除事件到边缘
8
   sctl.manageClusterObjectSync(allClusterObjectSyncs)
9
10
   // 3、列出所有的ObjectSync
11
   allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything())
12
   if err != nil {
13
      klog.Errorf("Filed to list all the ObjectSyncs: %v", err)
14
   }
15
   // 4、将已经持久化到边缘的namespace scope objects和k8s中的进行比较,生成更新/删除事件到边缘
16
   sctl.manageObjectSync(allObjectSyncs)
17
   // 5、第一次没有成功持久化到边缘的对象在此重新创建
18
   // 这里只需要关注pod、service、endpoint
19
   sctl.manageCreateFailedObject()
20
}

ClusterObjectSync还没有实装,我们先看manageObjectSync函数是怎么做的:

1
func (sctl *SyncController) manageObjectSync(syncs []*v1alpha1.ObjectSync) {
2
   for _, sync := range syncs {
3
      switch sync.Spec.ObjectKind {
4
      case model.ResourceTypePod:
5
         sctl.managePod(sync)
6
      case model.ResourceTypeConfigmap:
7
         sctl.manageConfigMap(sync)
8
      case model.ResourceTypeSecret:
9
         sctl.manageSecret(sync)
10
      case commonconst.ResourceTypeService:
11
         sctl.manageService(sync)
12
      case commonconst.ResourceTypeEndpoints:
13
         sctl.manageEndpoint(sync)
14
      // TODO: add device here
15
      default:
16
         klog.Errorf("Unsupported object kind: %v", sync.Spec.ObjectKind)
17
      }
18
   }
19
}

每一个ObjectSync都有一个对应的资源对象,根据资源对象的类型运行相应的manage函数。以managePod为例分析:

1
func (sctl *SyncController) managePod(sync *v1alpha1.ObjectSync) {
2
   // 1、在云端查找这个pod对象
3
   pod, err := sctl.podLister.Pods(sync.Namespace).Get(sync.Spec.ObjectName)
4
5
   nodeName := getNodeName(sync.Name)
6
   // 2、如果没有找到,构建这个pod对象
7
   if err != nil {
8
      if apierrors.IsNotFound(err) {
9
         pod = &v1.Pod{
10
            ObjectMeta: metav1.ObjectMeta{
11
               Name:      sync.Spec.ObjectName,
12
               Namespace: sync.Namespace,
13
               UID:       types.UID(getObjectUID(sync.Name)),
14
            },
15
         }
16
      } else {
17
         klog.Errorf("Failed to manage pod sync of %s in namespace %s: %v", sync.Name, sync.Namespace, err)
18
         return
19
      }
20
   }
21
   // 3、云边对比,发送相应的事件到边缘
22
   sendEvents(err, nodeName, sync, model.ResourceTypePod, pod.ResourceVersion, pod)
23
}

继续分析sendEvents函数:

1
func sendEvents(err error, nodeName string, sync *v1alpha1.ObjectSync, resourceType string,
2
   objectResourceVersion string, obj interface{}) {
3
   // 1、在云端没找到资源的情况,发送删除事件到边缘
4
   if err != nil && apierrors.IsNotFound(err) {
5
      //trigger the delete event
6
      klog.Infof("%s: %s has been deleted in K8s, send the delete event to edge", resourceType, sync.Spec.ObjectName)
7
      msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.DeleteOperation, obj)
8
      beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
9
      return
10
   }
11
   // 2、消息版本号为空的情况,说明ObjectSync创建后还没有收到ACK消息,没有事件直接返回
12
   if sync.Status.ObjectResourceVersion == "" {
13
      klog.Errorf("The ObjectResourceVersion is empty in status of objectsync: %s", sync.Name)
14
      return
15
   }
16
   // 3、如果云端资源的消息版本号比边缘的新,发送更新事件到边缘
17
   // 参数中前者是k8s中pod的resourceVersion,代表云端要求的pod最新状态
18
   // 后者是这个pod对应的objectSync记录的resourceVersion,在收到边缘发来的ACK后才更新,代表边缘最新状态
19
   if CompareResourceVersion(objectResourceVersion, sync.Status.ObjectResourceVersion) > 0 {
20
      // trigger the update event
21
      klog.Infof("The resourceVersion: %s of %s in K8s is greater than in edgenode: %s, send the update event", objectResourceVersion, resourceType, sync.Status.ObjectResourceVersion)
22
      msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.UpdateOperation, obj)
23
      beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg)
24
   }
25
}

综上所述,云端的SyncController主要负责不断地对比云端和边端的资源情况并同步,相当于一个事后检查的作用。

3. ObjectSyncController

但是,SyncController中并没有给出ObjectSync是在哪里被创建的。经过对云端其他组件分析可知,ObjectSync是在CloudHub中被创建的,在CloudHub的Start函数中,第一步就是创建了一个ObjectSyncController。

3.1. ObjectSyncController的结构

CloudHub的Start()函数中创建了一个ObjectSyncController作为信道消息队列的成员,其结构如下:

1
// ObjectSyncController use beehive context message layer
2
type ObjectSyncController struct {
3
   CrdClient versioned.Interface
4
5
   // informer
6
   ClusterObjectSyncInformer syncinformer.ClusterObjectSyncInformer
7
   ObjectSyncInformer        syncinformer.ObjectSyncInformer
8
9
   // synced
10
   ClusterObjectSyncSynced cache.InformerSynced
11
   ObjectSyncSynced        cache.InformerSynced
12
13
   // lister
14
   ClusterObjectSyncLister synclister.ClusterObjectSyncLister
15
   ObjectSyncLister        synclister.ObjectSyncLister
16
}

其结构类似于SyncController,除了多了一个CrdClient接口。

3.2. ObjectSyncController的创建

ObjectSyncController的创建函数如下:

1
func newObjectSyncController() *hubconfig.ObjectSyncController {
2
   config, err := buildConfig()
3
   if err != nil {
4
      klog.Errorf("Failed to build config, err: %v", err)
5
      os.Exit(1)
6
   }
7
   // 1、实例化crdClient,并通过其创建crdFactory,后者用于实例化informer
8
   crdClient := versioned.NewForConfigOrDie(config)
9
   crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0)
10
   // 2、实例化两个informer
11
   clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs()
12
   objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs()
13
   // 3、实例化
14
   sc := &hubconfig.ObjectSyncController{
15
      CrdClient: crdClient,
16
17
      ClusterObjectSyncInformer: clusterObjectSyncInformer,
18
      ObjectSyncInformer:        objectSyncInformer,
19
20
      ClusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced,
21
      ObjectSyncSynced:        objectSyncInformer.Informer().HasSynced,
22
23
      ClusterObjectSyncLister: clusterObjectSyncInformer.Lister(),
24
      ObjectSyncLister:        objectSyncInformer.Lister(),
25
   }
26
   // 4、启动两个SharedInformer
27
   go sc.ClusterObjectSyncInformer.Informer().Run(beehiveContext.Done())
28
   go sc.ObjectSyncInformer.Informer().Run(beehiveContext.Done())
29
30
   return sc
31
}

可见,ObjectSyncController从结构到创建,都和SyncController及其类似,那它有什么用呢?检查一下发现,它主要在channelq.go和messagehandler.go两个文件中用到。

3.3. 消息分发

其中channelq.go用在addMessageToQueue函数中:

1
func (q *ChannelMessageQueue) addMessageToQueue(nodeID string, msg *beehiveModel.Message) {
2
   // 1、如果消息版本号为空且消息并非执行删除操作,不发送了
3
   // 待分析
4
   if msg.GetResourceVersion() == "" && !isDeleteMessage(msg) {
5
      return
6
   }
7
   // 2、获取节点的消息队列、消息缓存、消息key和消息体
8
   nodeQueue, err := q.GetNodeQueue(nodeID)
9
   if err != nil {
10
      klog.Errorf("fail to get nodeQueue for Node: %s", nodeID)
11
      return
12
   }
13
   nodeStore, err := q.GetNodeStore(nodeID)
14
   if err != nil {
15
      klog.Errorf("fail to get nodeStore for Node: %s", nodeID)
16
      return
17
   }
18
   messageKey, err := getMsgKey(msg)
19
   if err != nil {
20
      klog.Errorf("fail to get message key for message: %s", msg.Header.ID)
21
      return
22
   }
23
   item, exist, _ := nodeStore.GetByKey(messageKey)
24
   // 3、对于非删除操作的消息进行分析
25
   if !isDeleteMessage(msg) {
26
      // 3-1、如果消息不在nodeStore中,说明要么之前没发过消息,要么发过且成功接收到ACK了
27
      // 此时和相应的objectSync中的消息版本号对比,如果是后者的情况,其消息版本号就存在甚至更新
28
      // 如果objectSync中该消息版本号存在或更新,就不需要发送了
29
      if !exist {
30
         resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
31
         resourceUID, err := GetMessageUID(*msg)
32
         if err != nil {
33
            klog.Errorf("fail to get message UID for message: %s", msg.Header.ID)
34
            return
35
         }
36
         objectSync, err := q.ObjectSyncController.ObjectSyncLister.ObjectSyncs(resourceNamespace).Get(synccontroller.BuildObjectSyncName(nodeID, resourceUID))
37
         if err == nil && msg.GetResourceVersion() <= objectSync.ResourceVersion {
38
            return
39
         }
40
      }
41
      // 3-2、如果消息在nodeStore中,且消息版本号更新,也不需要发送了
42
      if exist {
43
         msgInStore := item.(*beehiveModel.Message)
44
         if msg.GetResourceVersion() <= msgInStore.GetResourceVersion() ||
45
            isDeleteMessage(msgInStore) {
46
            return
47
         }
48
      }
49
   }
50
51
   nodeStore.Add(msg)
52
   nodeQueue.Add(messageKey)
53
}

该函数在消息分发中,用于将非List消息,也就是需要ACK的消息,添加到对应节点的消息队列中。通过分析可见,该函数主要多了消息版本号对比的功能,避免重复发送旧消息,相当于事前检查的作用。

3.4. 消息处理

messagehandler.go中ObjectSyncController出现在saveSuccessPoint这个函数里,而这个函数出现在MessageWriteLoop/ListMessageWriteLoop -> handleMessage -> sendMsg函数中。经过之前对CloudHub的分析可知,这两个Loop是用来处理从云端发送到边端的消息的,而sendMsg则是用来发送非list消息,即需要ACK的消息的。这里需要具体分析一下sendMsg函数:

1
func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg, copyMsg *beehiveModel.Message, nodeStore cache.Store) {
2
   // 创建接收ACK的信道,并存入MessageAcks中
3
   ackChan := make(chan struct{})
4
   mh.MessageAcks.Store(msg.GetID(), ackChan)
5
6
   // 为消息发送初始化计时器和重试次数
7
   var (
8
      retry                       = 0
9
      retryInterval time.Duration = 5
10
   )
11
   ticker := time.NewTimer(retryInterval * time.Second)
12
   mh.send(hi, info, msg)
13
14
LOOP:
15
   for {
16
      select {
17
      // 接收到ACK了,一切顺利
18
      case <-ackChan:
19
         mh.saveSuccessPoint(copyMsg, info, nodeStore)
20
         break LOOP
21
      // 超时重试,重试次数太多就放弃
22
      case <-ticker.C:
23
         if retry == 4 {
24
            break LOOP
25
         }
26
         mh.send(hi, info, msg)
27
         retry++
28
         ticker.Reset(time.Second * retryInterval)
29
      }
30
   }
31
}

可见,saveSuccessPoint用于边缘成功接收消息并本地化后,云端记录下这个。具体如下:

1
func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) {
2
   // 1、如果是边缘控制器的消息
3
   if msg.GetGroup() == edgeconst.GroupResource {
4
      // 1-1、从之前对消息的备份中获取消息命名空间、名称、类型、UID
5
      resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
6
      resourceName, _ := edgemessagelayer.GetResourceName(*msg)
7
      resourceType, _ := edgemessagelayer.GetResourceType(*msg)
8
      resourceUID, err := channelq.GetMessageUID(*msg)
9
      if err != nil {
10
         return
11
      }
12
      // 1-2、获取对应的objectSync的名称,目前命名规则就是nodeName+UID
13
      objectSyncName := synccontroller.BuildObjectSyncName(info.NodeID, resourceUID)
14
      // 1-3、如果消息是删除操作,收到ACK代表删除成功,删掉消息备份和对应的objectSync
15
      if msg.GetOperation() == beehiveModel.DeleteOperation {
16
         nodeStore.Delete(msg)
17
         mh.deleteSuccessPoint(resourceNamespace, objectSyncName)
18
         return
19
      }
20
      // 1-4、如果不是删除操作,从ObjectSyncController中获取对应的objectSync
21
      objectSync, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
22
      // 1-4-1、获取成功,更新最新消息版本号
23
      if err == nil {
24
         objectSync.Status.ObjectResourceVersion = msg.GetResourceVersion()
25
         _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSync)
26
         if err != nil {
27
            klog.Errorf("Failed to update objectSync: %v, resourceType: %s, resourceNamespace: %s, resourceName: %s",
28
               err, resourceType, resourceNamespace, resourceName)
29
         }
30
      // 1-4-2、获取失败,找不到,构建这个objectSync
31
      } else if err != nil && apierrors.IsNotFound(err) {
32
         objectSync := &v1alpha1.ObjectSync{
33
            ObjectMeta: metav1.ObjectMeta{
34
               Name: objectSyncName,
35
            },
36
            Spec: v1alpha1.ObjectSyncSpec{
37
               ObjectAPIVersion: "",
38
               ObjectKind:       resourceType,
39
               ObjectName:       resourceName,
40
            },
41
         }
42
         _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(objectSync)
43
         if err != nil {
44
            klog.Errorf("Failed to create objectSync: %s, err: %v", objectSyncName, err)
45
            return
46
         }
47
         // 构建成功后,更新最新消息版本号
48
         objectSyncStatus, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{})
49
         if err != nil {
50
            klog.Errorf("Failed to get objectSync: %s, err: %v", objectSyncName, err)
51
         }
52
         objectSyncStatus.Status.ObjectResourceVersion = msg.GetResourceVersion()
53
         mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSyncStatus)
54
      }
55
   }
56
57
   // TODO: save device info
58
   if msg.GetGroup() == deviceconst.GroupTwin {
59
   }
60
   klog.Infof("saveSuccessPoint successfully for message: %s", msg.GetResource())
61
}

分析可知,其作用主要是对于成功发送到边缘并本地化的消息,更新对应的objectSync的状态(主要是消息版本号)。

4. 总结

综上所述,目前KubeEdge的可靠消息传输机制在云端主要体现在两个方面:四大组件之一的SyncController和CloudHub中的ObjectSyncController。前者的作用是不断地检查每个ObjectSync对应的资源云边消息版本号/删除与否是否一致;后者的作用一个是在消息分发前进行检查以免发送旧消息,一个是在收到ACK消息后更新ObjectSync的消息版本号等数据。