0. SyncController与KubeEdge v1.2
SyncController是KubeEdge云端部分四大组件之一,是KubeEdge v1.2新增的组件,它主要负责周期性检查各个边缘节点的同步状态,对比K8s中资源的数据,将不一致的状态同步到边缘,确保云边状态的最终一致性。
KubeEdge v1.2的新特性之一就是增强了云边协同传输的可靠性,其一是增加了ACK消息作为校验机制,这在其他模块的分析中可以看到;其二是云端会实时记录每个边缘节点同步成功的最新消息版本号(ResourceVersion),并以CRD的形式(ClusterObjectSync和ObjectSync)持久化保存到K8s中。该机制可以保证在边际场景下云端故障或者边缘离线重启后消息发送的顺序性和连续性,避免重发旧消息引起云边状态不一致问题;其三就是新模块SyncController了。
就现版本来说,带ack的消息传递机制尚未完善,云边间网络不稳定会导致连接频繁断开。如果cloudcore或edgecore重启或离线一段时间的话,可能导致发到边缘的消息丢失,使得云边间不一致。SyncController就是为了解决这个问题诞生的。
1. SyncController的结构
SyncController的结构如下:
1 | // SyncController use beehive context message layer |
2 | type SyncController struct { |
3 | enable bool |
4 | |
5 | // informer,主要为下面提供同步判断和列出资源的接口 |
6 | podInformer coreinformers.PodInformer |
7 | configMapInformer coreinformers.ConfigMapInformer |
8 | secretInformer coreinformers.SecretInformer |
9 | serviceInformer coreinformers.ServiceInformer |
10 | endpointInformer coreinformers.EndpointsInformer |
11 | nodeInformer coreinformers.NodeInformer |
12 | deviceInformer deviceinformer.DeviceInformer |
13 | clusterObjectSyncInformer syncinformer.ClusterObjectSyncInformer |
14 | objectSyncInformer syncinformer.ObjectSyncInformer |
15 | |
16 | // synced,判断SharedInformer中的存储是否已同步 |
17 | podSynced cache.InformerSynced |
18 | configMapSynced cache.InformerSynced |
19 | secretSynced cache.InformerSynced |
20 | serviceSynced cache.InformerSynced |
21 | endpointSynced cache.InformerSynced |
22 | nodeSynced cache.InformerSynced |
23 | deviceSynced cache.InformerSynced |
24 | clusterObjectSyncSynced cache.InformerSynced |
25 | objectSyncSynced cache.InformerSynced |
26 | |
27 | // lister,用于查找ObjectSync对应的资源 |
28 | podLister corelisters.PodLister |
29 | configMapLister corelisters.ConfigMapLister |
30 | secretLister corelisters.SecretLister |
31 | serviceLister corelisters.ServiceLister |
32 | endpointLister corelisters.EndpointsLister |
33 | nodeLister corelisters.NodeLister |
34 | deviceLister devicelister.DeviceLister |
35 | clusterObjectSyncLister synclister.ClusterObjectSyncLister |
36 | objectSyncLister synclister.ObjectSyncLister |
37 | } |
可见,SyncController中主要包含以下三种类型的成员:
- informer,各类资源的Informer接口实现,提供对资源的Informer函数和Lister函数的访问
- synced,是InformerSynced函数,用于确定informer是否已经同步,这有助于确定缓存是否已经同步。由后面可知,实际上是infomer中Informer成员函数的HasSynced成员函数
- lister,即informer中的Lister,可以列出所有的该类资源
此外,除了pod等k8s本身的资源类型,还有以下三种资源类型:
- device,即设备实例
- clusterObjectSync,用于保存集群作用域的对象(cluster scoped object),现版本还没有实用
- objectSync,用于保存命名空间作用域的对象(namespace scoped object),例如pod、service等等
ClusterObjectSync的结构如下(ObjectSync的结构与其基本一致):
1 | type ClusterObjectSync struct { |
2 | v1.TypeMeta `json:",inline"` |
3 | v1.ObjectMeta `json:"metadata,omitempty"` |
4 | Spec ObjectSyncSpec `json:"spec,omitempty"` |
5 | Status ObjectSyncStatus `json:"status,omitempty"` |
6 | } |
Spec存储保留到边缘的对象的详细信息,包括APIVersion、Kind和Name,Status存储保留到边缘的对象的消息版本号(resourceVersion)。
2. 注册函数
经过对CloudCore源码的整体分析可知,云端各个组件的重点在于各自的Start()函数。但是SyncController不一样,它内部的各个成员在注册时就通过newSyncController函数实例化了。
1 | func newSyncController(enable bool) *SyncController { |
2 | config, err := buildConfig() |
3 | if err != nil { |
4 | klog.Errorf("Failed to build config, err: %v", err) |
5 | os.Exit(1) |
6 | } |
7 | kubeClient := kubernetes.NewForConfigOrDie(config) |
8 | crdClient := versioned.NewForConfigOrDie(config) |
9 | // 为k8s资源和自定义资源创建SharedInformerFactory,用于实例化Informer |
10 | kubeSharedInformers := informers.NewSharedInformerFactory(kubeClient, 0) |
11 | crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0) |
12 | // 实例化各类资源的informer |
13 | podInformer := kubeSharedInformers.Core().V1().Pods() |
14 | configMapInformer := kubeSharedInformers.Core().V1().ConfigMaps() |
15 | secretInformer := kubeSharedInformers.Core().V1().Secrets() |
16 | serviceInformer := kubeSharedInformers.Core().V1().Services() |
17 | endpointInformer := kubeSharedInformers.Core().V1().Endpoints() |
18 | nodeInformer := kubeSharedInformers.Core().V1().Nodes() |
19 | deviceInformer := crdFactory.Devices().V1alpha1().Devices() |
20 | clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs() |
21 | objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs() |
22 | |
23 | sctl := &SyncController{ |
24 | enable: enable, |
25 | // 实例化各类资源的informer |
26 | podInformer: podInformer, |
27 | configMapInformer: configMapInformer, |
28 | secretInformer: secretInformer, |
29 | serviceInformer: serviceInformer, |
30 | endpointInformer: endpointInformer, |
31 | nodeInformer: nodeInformer, |
32 | deviceInformer: deviceInformer, |
33 | clusterObjectSyncInformer: clusterObjectSyncInformer, |
34 | objectSyncInformer: objectSyncInformer, |
35 | // 实例化各类资源的synced,通过informer中的HasSynced判断 |
36 | podSynced: podInformer.Informer().HasSynced, |
37 | configMapSynced: configMapInformer.Informer().HasSynced, |
38 | secretSynced: secretInformer.Informer().HasSynced, |
39 | serviceSynced: serviceInformer.Informer().HasSynced, |
40 | endpointSynced: endpointInformer.Informer().HasSynced, |
41 | nodeSynced: nodeInformer.Informer().HasSynced, |
42 | deviceSynced: deviceInformer.Informer().HasSynced, |
43 | clusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced, |
44 | objectSyncSynced: objectSyncInformer.Informer().HasSynced, |
45 | // 实例化各类资源的lister,即informer中的Lister成员函数 |
46 | podLister: podInformer.Lister(), |
47 | configMapLister: configMapInformer.Lister(), |
48 | secretLister: secretInformer.Lister(), |
49 | serviceLister: serviceInformer.Lister(), |
50 | endpointLister: endpointInformer.Lister(), |
51 | nodeLister: nodeInformer.Lister(), |
52 | clusterObjectSyncLister: clusterObjectSyncInformer.Lister(), |
53 | objectSyncLister: objectSyncInformer.Lister(), |
54 | } |
55 | |
56 | return sctl |
57 | } |
可见SyncController中的三种成员,synced和lister实际上都是调用的各自的informer中的成员函数。
2. Start()函数
我们进入synccontroller.go文件,查看SyncController的Start()函数。
1 | func (sctl *SyncController) Start() { |
2 | // 启动各类资源的SharedInformer协程,提供synced和lister |
3 | go sctl.podInformer.Informer().Run(beehiveContext.Done()) |
4 | go sctl.configMapInformer.Informer().Run(beehiveContext.Done()) |
5 | go sctl.secretInformer.Informer().Run(beehiveContext.Done()) |
6 | go sctl.serviceInformer.Informer().Run(beehiveContext.Done()) |
7 | go sctl.endpointInformer.Informer().Run(beehiveContext.Done()) |
8 | go sctl.nodeInformer.Informer().Run(beehiveContext.Done()) |
9 | go sctl.deviceInformer.Informer().Run(beehiveContext.Done()) |
10 | go sctl.clusterObjectSyncInformer.Informer().Run(beehiveContext.Done()) |
11 | go sctl.objectSyncInformer.Informer().Run(beehiveContext.Done()) |
12 | |
13 | // 等待缓存同步,会不断运行各资源的synced程序判断,直到返回true、err或beehiveContext关闭 |
14 | if !cache.WaitForCacheSync(beehiveContext.Done(), |
15 | sctl.podSynced, |
16 | sctl.configMapSynced, |
17 | sctl.secretSynced, |
18 | sctl.serviceSynced, |
19 | sctl.endpointSynced, |
20 | sctl.nodeSynced, |
21 | sctl.deviceSynced, |
22 | sctl.clusterObjectSyncSynced, |
23 | sctl.objectSyncSynced, |
24 | ) { |
25 | klog.Errorf("unable to sync caches for sync controller") |
26 | return |
27 | } |
28 | // 在beehiveContext断开前每5秒循环调用reconcile函数 |
29 | go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done()) |
30 | } |
SharedInformer属于k8s的基础知识,在这里不多介绍,那么SyncController的重点就在于reconcile函数。
2.1. Reconcile函数
1 | func (sctl *SyncController) reconcile() { |
2 | // 1、列出所有的ClusterObjectSync |
3 | allClusterObjectSyncs, err := sctl.clusterObjectSyncLister.List(labels.Everything()) |
4 | if err != nil { |
5 | klog.Errorf("Filed to list all the ClusterObjectSyncs: %v", err) |
6 | } |
7 | // 2、(空函数)将已经持久化到边缘的cluster scope objects和k8s中的进行比较,生成更新/删除事件到边缘 |
8 | sctl.manageClusterObjectSync(allClusterObjectSyncs) |
9 | |
10 | // 3、列出所有的ObjectSync |
11 | allObjectSyncs, err := sctl.objectSyncLister.List(labels.Everything()) |
12 | if err != nil { |
13 | klog.Errorf("Filed to list all the ObjectSyncs: %v", err) |
14 | } |
15 | // 4、将已经持久化到边缘的namespace scope objects和k8s中的进行比较,生成更新/删除事件到边缘 |
16 | sctl.manageObjectSync(allObjectSyncs) |
17 | // 5、第一次没有成功持久化到边缘的对象在此重新创建 |
18 | // 这里只需要关注pod、service、endpoint |
19 | sctl.manageCreateFailedObject() |
20 | } |
ClusterObjectSync还没有实装,我们先看manageObjectSync函数是怎么做的:
1 | func (sctl *SyncController) manageObjectSync(syncs []*v1alpha1.ObjectSync) { |
2 | for _, sync := range syncs { |
3 | switch sync.Spec.ObjectKind { |
4 | case model.ResourceTypePod: |
5 | sctl.managePod(sync) |
6 | case model.ResourceTypeConfigmap: |
7 | sctl.manageConfigMap(sync) |
8 | case model.ResourceTypeSecret: |
9 | sctl.manageSecret(sync) |
10 | case commonconst.ResourceTypeService: |
11 | sctl.manageService(sync) |
12 | case commonconst.ResourceTypeEndpoints: |
13 | sctl.manageEndpoint(sync) |
14 | // TODO: add device here |
15 | default: |
16 | klog.Errorf("Unsupported object kind: %v", sync.Spec.ObjectKind) |
17 | } |
18 | } |
19 | } |
每一个ObjectSync都有一个对应的资源对象,根据资源对象的类型运行相应的manage函数。以managePod为例分析:
1 | func (sctl *SyncController) managePod(sync *v1alpha1.ObjectSync) { |
2 | // 1、在云端查找这个pod对象 |
3 | pod, err := sctl.podLister.Pods(sync.Namespace).Get(sync.Spec.ObjectName) |
4 | |
5 | nodeName := getNodeName(sync.Name) |
6 | // 2、如果没有找到,构建这个pod对象 |
7 | if err != nil { |
8 | if apierrors.IsNotFound(err) { |
9 | pod = &v1.Pod{ |
10 | ObjectMeta: metav1.ObjectMeta{ |
11 | Name: sync.Spec.ObjectName, |
12 | Namespace: sync.Namespace, |
13 | UID: types.UID(getObjectUID(sync.Name)), |
14 | }, |
15 | } |
16 | } else { |
17 | klog.Errorf("Failed to manage pod sync of %s in namespace %s: %v", sync.Name, sync.Namespace, err) |
18 | return |
19 | } |
20 | } |
21 | // 3、云边对比,发送相应的事件到边缘 |
22 | sendEvents(err, nodeName, sync, model.ResourceTypePod, pod.ResourceVersion, pod) |
23 | } |
继续分析sendEvents函数:
1 | func sendEvents(err error, nodeName string, sync *v1alpha1.ObjectSync, resourceType string, |
2 | objectResourceVersion string, obj interface{}) { |
3 | // 1、在云端没找到资源的情况,发送删除事件到边缘 |
4 | if err != nil && apierrors.IsNotFound(err) { |
5 | //trigger the delete event |
6 | klog.Infof("%s: %s has been deleted in K8s, send the delete event to edge", resourceType, sync.Spec.ObjectName) |
7 | msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.DeleteOperation, obj) |
8 | beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg) |
9 | return |
10 | } |
11 | // 2、消息版本号为空的情况,说明ObjectSync创建后还没有收到ACK消息,没有事件直接返回 |
12 | if sync.Status.ObjectResourceVersion == "" { |
13 | klog.Errorf("The ObjectResourceVersion is empty in status of objectsync: %s", sync.Name) |
14 | return |
15 | } |
16 | // 3、如果云端资源的消息版本号比边缘的新,发送更新事件到边缘 |
17 | // 参数中前者是k8s中pod的resourceVersion,代表云端要求的pod最新状态 |
18 | // 后者是这个pod对应的objectSync记录的resourceVersion,在收到边缘发来的ACK后才更新,代表边缘最新状态 |
19 | if CompareResourceVersion(objectResourceVersion, sync.Status.ObjectResourceVersion) > 0 { |
20 | // trigger the update event |
21 | klog.Infof("The resourceVersion: %s of %s in K8s is greater than in edgenode: %s, send the update event", objectResourceVersion, resourceType, sync.Status.ObjectResourceVersion) |
22 | msg := buildEdgeControllerMessage(nodeName, sync.Namespace, resourceType, sync.Spec.ObjectName, model.UpdateOperation, obj) |
23 | beehiveContext.Send(commonconst.DefaultContextSendModuleName, *msg) |
24 | } |
25 | } |
综上所述,云端的SyncController主要负责不断地对比云端和边端的资源情况并同步,相当于一个事后检查的作用。
3. ObjectSyncController
但是,SyncController中并没有给出ObjectSync是在哪里被创建的。经过对云端其他组件分析可知,ObjectSync是在CloudHub中被创建的,在CloudHub的Start函数中,第一步就是创建了一个ObjectSyncController。
3.1. ObjectSyncController的结构
CloudHub的Start()函数中创建了一个ObjectSyncController作为信道消息队列的成员,其结构如下:
1 | // ObjectSyncController use beehive context message layer |
2 | type ObjectSyncController struct { |
3 | CrdClient versioned.Interface |
4 | |
5 | // informer |
6 | ClusterObjectSyncInformer syncinformer.ClusterObjectSyncInformer |
7 | ObjectSyncInformer syncinformer.ObjectSyncInformer |
8 | |
9 | // synced |
10 | ClusterObjectSyncSynced cache.InformerSynced |
11 | ObjectSyncSynced cache.InformerSynced |
12 | |
13 | // lister |
14 | ClusterObjectSyncLister synclister.ClusterObjectSyncLister |
15 | ObjectSyncLister synclister.ObjectSyncLister |
16 | } |
其结构类似于SyncController,除了多了一个CrdClient接口。
3.2. ObjectSyncController的创建
ObjectSyncController的创建函数如下:
1 | func newObjectSyncController() *hubconfig.ObjectSyncController { |
2 | config, err := buildConfig() |
3 | if err != nil { |
4 | klog.Errorf("Failed to build config, err: %v", err) |
5 | os.Exit(1) |
6 | } |
7 | // 1、实例化crdClient,并通过其创建crdFactory,后者用于实例化informer |
8 | crdClient := versioned.NewForConfigOrDie(config) |
9 | crdFactory := crdinformerfactory.NewSharedInformerFactory(crdClient, 0) |
10 | // 2、实例化两个informer |
11 | clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs() |
12 | objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs() |
13 | // 3、实例化 |
14 | sc := &hubconfig.ObjectSyncController{ |
15 | CrdClient: crdClient, |
16 | |
17 | ClusterObjectSyncInformer: clusterObjectSyncInformer, |
18 | ObjectSyncInformer: objectSyncInformer, |
19 | |
20 | ClusterObjectSyncSynced: clusterObjectSyncInformer.Informer().HasSynced, |
21 | ObjectSyncSynced: objectSyncInformer.Informer().HasSynced, |
22 | |
23 | ClusterObjectSyncLister: clusterObjectSyncInformer.Lister(), |
24 | ObjectSyncLister: objectSyncInformer.Lister(), |
25 | } |
26 | // 4、启动两个SharedInformer |
27 | go sc.ClusterObjectSyncInformer.Informer().Run(beehiveContext.Done()) |
28 | go sc.ObjectSyncInformer.Informer().Run(beehiveContext.Done()) |
29 | |
30 | return sc |
31 | } |
可见,ObjectSyncController从结构到创建,都和SyncController及其类似,那它有什么用呢?检查一下发现,它主要在channelq.go和messagehandler.go两个文件中用到。
3.3. 消息分发
其中channelq.go用在addMessageToQueue函数中:
1 | func (q *ChannelMessageQueue) addMessageToQueue(nodeID string, msg *beehiveModel.Message) { |
2 | // 1、如果消息版本号为空且消息并非执行删除操作,不发送了 |
3 | // 待分析 |
4 | if msg.GetResourceVersion() == "" && !isDeleteMessage(msg) { |
5 | return |
6 | } |
7 | // 2、获取节点的消息队列、消息缓存、消息key和消息体 |
8 | nodeQueue, err := q.GetNodeQueue(nodeID) |
9 | if err != nil { |
10 | klog.Errorf("fail to get nodeQueue for Node: %s", nodeID) |
11 | return |
12 | } |
13 | nodeStore, err := q.GetNodeStore(nodeID) |
14 | if err != nil { |
15 | klog.Errorf("fail to get nodeStore for Node: %s", nodeID) |
16 | return |
17 | } |
18 | messageKey, err := getMsgKey(msg) |
19 | if err != nil { |
20 | klog.Errorf("fail to get message key for message: %s", msg.Header.ID) |
21 | return |
22 | } |
23 | item, exist, _ := nodeStore.GetByKey(messageKey) |
24 | // 3、对于非删除操作的消息进行分析 |
25 | if !isDeleteMessage(msg) { |
26 | // 3-1、如果消息不在nodeStore中,说明要么之前没发过消息,要么发过且成功接收到ACK了 |
27 | // 此时和相应的objectSync中的消息版本号对比,如果是后者的情况,其消息版本号就存在甚至更新 |
28 | // 如果objectSync中该消息版本号存在或更新,就不需要发送了 |
29 | if !exist { |
30 | resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg) |
31 | resourceUID, err := GetMessageUID(*msg) |
32 | if err != nil { |
33 | klog.Errorf("fail to get message UID for message: %s", msg.Header.ID) |
34 | return |
35 | } |
36 | objectSync, err := q.ObjectSyncController.ObjectSyncLister.ObjectSyncs(resourceNamespace).Get(synccontroller.BuildObjectSyncName(nodeID, resourceUID)) |
37 | if err == nil && msg.GetResourceVersion() <= objectSync.ResourceVersion { |
38 | return |
39 | } |
40 | } |
41 | // 3-2、如果消息在nodeStore中,且消息版本号更新,也不需要发送了 |
42 | if exist { |
43 | msgInStore := item.(*beehiveModel.Message) |
44 | if msg.GetResourceVersion() <= msgInStore.GetResourceVersion() || |
45 | isDeleteMessage(msgInStore) { |
46 | return |
47 | } |
48 | } |
49 | } |
50 | |
51 | nodeStore.Add(msg) |
52 | nodeQueue.Add(messageKey) |
53 | } |
该函数在消息分发中,用于将非List消息,也就是需要ACK的消息,添加到对应节点的消息队列中。通过分析可见,该函数主要多了消息版本号对比的功能,避免重复发送旧消息,相当于事前检查的作用。
3.4. 消息处理
messagehandler.go中ObjectSyncController出现在saveSuccessPoint这个函数里,而这个函数出现在MessageWriteLoop/ListMessageWriteLoop -> handleMessage -> sendMsg函数中。经过之前对CloudHub的分析可知,这两个Loop是用来处理从云端发送到边端的消息的,而sendMsg则是用来发送非list消息,即需要ACK的消息的。这里需要具体分析一下sendMsg函数:
1 | func (mh *MessageHandle) sendMsg(hi hubio.CloudHubIO, info *model.HubInfo, msg, copyMsg *beehiveModel.Message, nodeStore cache.Store) { |
2 | // 创建接收ACK的信道,并存入MessageAcks中 |
3 | ackChan := make(chan struct{}) |
4 | mh.MessageAcks.Store(msg.GetID(), ackChan) |
5 | |
6 | // 为消息发送初始化计时器和重试次数 |
7 | var ( |
8 | retry = 0 |
9 | retryInterval time.Duration = 5 |
10 | ) |
11 | ticker := time.NewTimer(retryInterval * time.Second) |
12 | mh.send(hi, info, msg) |
13 | |
14 | LOOP: |
15 | for { |
16 | select { |
17 | // 接收到ACK了,一切顺利 |
18 | case <-ackChan: |
19 | mh.saveSuccessPoint(copyMsg, info, nodeStore) |
20 | break LOOP |
21 | // 超时重试,重试次数太多就放弃 |
22 | case <-ticker.C: |
23 | if retry == 4 { |
24 | break LOOP |
25 | } |
26 | mh.send(hi, info, msg) |
27 | retry++ |
28 | ticker.Reset(time.Second * retryInterval) |
29 | } |
30 | } |
31 | } |
可见,saveSuccessPoint用于边缘成功接收消息并本地化后,云端记录下这个。具体如下:
1 | func (mh *MessageHandle) saveSuccessPoint(msg *beehiveModel.Message, info *model.HubInfo, nodeStore cache.Store) { |
2 | // 1、如果是边缘控制器的消息 |
3 | if msg.GetGroup() == edgeconst.GroupResource { |
4 | // 1-1、从之前对消息的备份中获取消息命名空间、名称、类型、UID |
5 | resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg) |
6 | resourceName, _ := edgemessagelayer.GetResourceName(*msg) |
7 | resourceType, _ := edgemessagelayer.GetResourceType(*msg) |
8 | resourceUID, err := channelq.GetMessageUID(*msg) |
9 | if err != nil { |
10 | return |
11 | } |
12 | // 1-2、获取对应的objectSync的名称,目前命名规则就是nodeName+UID |
13 | objectSyncName := synccontroller.BuildObjectSyncName(info.NodeID, resourceUID) |
14 | // 1-3、如果消息是删除操作,收到ACK代表删除成功,删掉消息备份和对应的objectSync |
15 | if msg.GetOperation() == beehiveModel.DeleteOperation { |
16 | nodeStore.Delete(msg) |
17 | mh.deleteSuccessPoint(resourceNamespace, objectSyncName) |
18 | return |
19 | } |
20 | // 1-4、如果不是删除操作,从ObjectSyncController中获取对应的objectSync |
21 | objectSync, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{}) |
22 | // 1-4-1、获取成功,更新最新消息版本号 |
23 | if err == nil { |
24 | objectSync.Status.ObjectResourceVersion = msg.GetResourceVersion() |
25 | _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSync) |
26 | if err != nil { |
27 | klog.Errorf("Failed to update objectSync: %v, resourceType: %s, resourceNamespace: %s, resourceName: %s", |
28 | err, resourceType, resourceNamespace, resourceName) |
29 | } |
30 | // 1-4-2、获取失败,找不到,构建这个objectSync |
31 | } else if err != nil && apierrors.IsNotFound(err) { |
32 | objectSync := &v1alpha1.ObjectSync{ |
33 | ObjectMeta: metav1.ObjectMeta{ |
34 | Name: objectSyncName, |
35 | }, |
36 | Spec: v1alpha1.ObjectSyncSpec{ |
37 | ObjectAPIVersion: "", |
38 | ObjectKind: resourceType, |
39 | ObjectName: resourceName, |
40 | }, |
41 | } |
42 | _, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Create(objectSync) |
43 | if err != nil { |
44 | klog.Errorf("Failed to create objectSync: %s, err: %v", objectSyncName, err) |
45 | return |
46 | } |
47 | // 构建成功后,更新最新消息版本号 |
48 | objectSyncStatus, err := mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).Get(objectSyncName, metav1.GetOptions{}) |
49 | if err != nil { |
50 | klog.Errorf("Failed to get objectSync: %s, err: %v", objectSyncName, err) |
51 | } |
52 | objectSyncStatus.Status.ObjectResourceVersion = msg.GetResourceVersion() |
53 | mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSyncStatus) |
54 | } |
55 | } |
56 | |
57 | // TODO: save device info |
58 | if msg.GetGroup() == deviceconst.GroupTwin { |
59 | } |
60 | klog.Infof("saveSuccessPoint successfully for message: %s", msg.GetResource()) |
61 | } |
分析可知,其作用主要是对于成功发送到边缘并本地化的消息,更新对应的objectSync的状态(主要是消息版本号)。
4. 总结
综上所述,目前KubeEdge的可靠消息传输机制在云端主要体现在两个方面:四大组件之一的SyncController和CloudHub中的ObjectSyncController。前者的作用是不断地检查每个ObjectSync对应的资源云边消息版本号/删除与否是否一致;后者的作用一个是在消息分发前进行检查以免发送旧消息,一个是在收到ACK消息后更新ObjectSync的消息版本号等数据。