1. 设备控制器概览
DeviceController是KubeEdge云端部分比较重要的组件之一,主要负责接入和管理边缘设备、设备元数据云边协同。在开始之前,首先介绍一下两种资源:
- 设备模板抽象(DeviceModel):包含一类设备的通用属性字段(properties)以及属性访问方式字段(propertyVisitors)
- 设备实例定义(DeviceInstance):包含从设备模板继承的属性字段(deviceModelRef)、访问协议字段(protocol)、设备关联节点信息字段(nodeSelector)以及从设备获取的属性字段(status.twins)
经过对CloudCore源码的整体分析可知,云端各个组件的重点在于各自的Start()函数。于是,我们进入devicecontroller.go文件,查看DeviceController的Start()函数。
1 | func (dc *DeviceController) Start() { |
2 | downstream, err := controller.NewDownstreamController() |
3 | if err != nil { |
4 | klog.Errorf("New downstream controller failed with error: %s", err) |
5 | os.Exit(1) |
6 | } |
7 | upstream, err := controller.NewUpstreamController(downstream) |
8 | if err != nil { |
9 | klog.Errorf("new upstream controller failed with error: %s", err) |
10 | os.Exit(1) |
11 | } |
12 | |
13 | downstream.Start() |
14 | time.Sleep(1 * time.Second) |
15 | upstream.Start() |
16 | } |
DeviceController.Start()结构类似于EdgeController.Start(),都是创建并启动了下行控制器和上行控制器,唯一的区别在于下行控制器要在上行控制器启动之前启动完毕,因为下行控制器是上行控制器的一个成员。因此,我们先从下行控制器看起。
2. 下行控制器
相关流程:用户在k8s上使用相关命令:kubectl create/patch/delete device, device model
,下行控制器通过api-server监听这些设备模板和设备实例的更新,主要有以下几种:
- 新建设备模板,在云端新建即可
- 新建设备实例,控制器会新建一个configMap来存储设备模板中定义的设备属性和属性访问方式,configMap存在etcd中。边缘控制器会将configMap同步到边缘,这样边缘的mapper容器可以获取到更新的设备属性并访问设备。下行控制器还会将device twin元数据同步给边缘节点
- 更新设备-节点关联关系到边缘
- 更新device twin到边缘
- 删除设备实例,控制器发送device twin删除事件,删除所有关联的device twin和configMap,并同步到边缘。mapper会停止设备的操作
2.1. 下行控制器的结构
1 | type DownstreamController struct { |
2 | // kubeClient,用于与k8s进行交互 |
3 | kubeClient *kubernetes.Clientset |
4 | // 消息分发,用于将下行消息发给cloudhub |
5 | messageLayer messagelayer.MessageLayer |
6 | // 设备实例管理器,包含一个信道和一个Map |
7 | deviceManager *manager.DeviceManager |
8 | // 设备模板管理器,包含一个信道和一个Map |
9 | deviceModelManager *manager.DeviceModelManager |
10 | // configMap管理器,只有一个sync.Map成员,存nodeID - configMap |
11 | // configMap存储设备实例的配置,但是目前的代码里好像还没有用处 |
12 | configMapManager *manager.ConfigMapManager |
13 | // crdClient,当上行控制器收到设备状态更新时,也会通过其将设备状态更新给k8s |
14 | crdClient *rest.RESTClient |
15 | } |
下行控制器的结构如上所示。
为什么要使用configMap来存储设备属性和属性字段的访问方式呢?
因为只有边缘节点上的mapper才会需要这些元数据,从而可以连接到设备并收集数据。如果Mapper作为容器,就可以将这些属性作为configMap加载。下行控制器会监听云端对属性、访问方式等字段的增添、删除和修改,并更新etcd中的config map。如果mapper想知道一个设备支持哪些属性,它可以从设备实例获取设备模板信息;它也可以从设备实例获取协议信息来连接到设备。为了访问到某个属性,mapper需要获取属性的访问方式信息,这可以从propertyVisitors字段中检索到。最后,mapper可以通过visitorsConfig来读写属性相关的数据。
2.2. 下行控制器的创建
1 | func NewDownstreamController() (*DownstreamController, error) { |
2 | // 创建KubeClient,实际是clientSet类型 |
3 | cli, err := utils.KubeClient() |
4 | if err != nil { |
5 | klog.Warningf("Create kube client failed with error: %s", err) |
6 | return nil, err |
7 | } |
8 | // 创建KubeConfig,实际是rest.Config类型,仅用于创建restClient |
9 | config, err := utils.KubeConfig() |
10 | if err != nil { |
11 | klog.Warningf("Get kubeConfig error: %v", err) |
12 | return nil, err |
13 | } |
14 | // 创建一个restClient,添加了device crds,仅用于创建下面两个Manager |
15 | crdcli, err := utils.NewCRDClient(config) |
16 | if err != nil { |
17 | klog.Warningf("Failed to create crd client: %s", err) |
18 | return nil, err |
19 | } |
20 | // 创建设备实例管理器 |
21 | deviceManager, err := manager.NewDeviceManager(crdcli, v1.NamespaceAll) |
22 | if err != nil { |
23 | klog.Warningf("Create device manager failed with error: %s", err) |
24 | return nil, err |
25 | } |
26 | // 创建设备模板管理器 |
27 | deviceModelManager, err := manager.NewDeviceModelManager(crdcli, v1.NamespaceAll) |
28 | if err != nil { |
29 | klog.Warningf("Create device manager failed with error: %s", err) |
30 | return nil, err |
31 | } |
32 | |
33 | dc := &DownstreamController{ |
34 | kubeClient: cli, |
35 | deviceManager: deviceManager, |
36 | deviceModelManager: deviceModelManager, |
37 | messageLayer: messagelayer.NewContextMessageLayer(), |
38 | configMapManager: manager.NewConfigMapManager(), // 返回了一个空Map |
39 | } |
40 | return dc, nil |
41 | } |
下行控制器的创建中做了如下几件事:
- 创建与k8s交互用的kubeClient
- 创建kubeConfig,仅用于创建下面的crdClient
- 创建crdClient,仅用于在下面的设备实例管理器和设备模板管理器中创建ListWatch
- 创建设备实例管理器和设备模板管理器
- 实例化下行控制器,同时实例化了messageLayer和configMap管理器,返回下行控制器
其中kubeClient不必多说;kubeConfig证书是在开启了TSL的集群中用于与集群交互时的身份认证;crdClient是为device crds创建的一个restClient,用于访问k8s中的device crd资源。
设备实例管理器结构如下:
1 | // DeviceManager 监听设备实例改变事件 |
2 | type DeviceManager struct { |
3 | // api-server的事件信道,ListWatch监听到的事件会发到这里 |
4 | events chan watch.Event |
5 | // device.Name - *v1alpha1.Device{} |
6 | Device sync.Map |
7 | } |
里面只有两个成员:事件信道和设备实例Map。
设备实例管理器的创建函数如下:
1 | func NewDeviceManager(crdClient *rest.RESTClient, namespace string) (*DeviceManager, error) { |
2 | // 创建ListWatch,用于创建SharedInformer |
3 | lw := cache.NewListWatchFromClient(crdClient, "devices", namespace, fields.Everything()) |
4 | // 创建设备实例控制器中的成员:设备事件信道 |
5 | events := make(chan watch.Event, config.Config.Buffer.DeviceEvent) |
6 | // 创建SharedInformer中的处理程序 |
7 | rh := NewCommonResourceEventHandler(events) |
8 | // 创建SharedInformer |
9 | si := cache.NewSharedInformer(lw, &v1alpha1.Device{}, 0) |
10 | si.AddEventHandler(rh) |
11 | |
12 | pm := &DeviceManager{events: events} |
13 | |
14 | stopNever := make(chan struct{}) |
15 | go si.Run(stopNever) |
16 | |
17 | return pm, nil |
18 | } |
显然,创建好的设备实例管理器可以从自己的事件信道里获取SharedInformer协程监听到的设备实例相关事件。
设备模板管理器的结构和创建函数与设备实例管理器基本一致。至于configMap管理器,这里只返回了一个空的Map,要在后面才会用到。
2.3. 下行控制器的启动
1 | // 启动下行控制器,启动了同步device model和同步device的两个协程 |
2 | func (dc *DownstreamController) Start() error { |
3 | klog.Info("Start downstream devicecontroller") |
4 | |
5 | go dc.syncDeviceModel() |
6 | // 等添加完所有的设备模板 |
7 | // TODO need to think about sync |
8 | time.Sleep(1 * time.Second) |
9 | go dc.syncDevice() |
10 | |
11 | return nil |
12 | } |
下行控制器实际上就是启动了同步设备实例和设备模板两种资源的两个协程,具体如下。
2.3.1. 设备模板的添加/删除/更新
1 | func (dc *DownstreamController) syncDeviceModel() { |
2 | for { |
3 | select { |
4 | case <-beehiveContext.Done(): |
5 | klog.Info("stop syncDeviceModel") |
6 | return |
7 | case e := <-dc.deviceModelManager.Events(): |
8 | deviceModel, ok := e.Object.(*v1alpha1.DeviceModel) |
9 | if !ok { |
10 | klog.Warningf("object type: %T unsupported", deviceModel) |
11 | continue |
12 | } |
13 | switch e.Type { |
14 | case watch.Added: |
15 | dc.deviceModelAdded(deviceModel) // 简单存到map里 |
16 | case watch.Deleted: |
17 | dc.deviceModelDeleted(deviceModel) // 从map里删掉,将来还需要删掉所有相关设备 |
18 | case watch.Modified: |
19 | dc.deviceModelUpdated(deviceModel) |
20 | default: |
21 | klog.Warningf("deviceModel event type: %s unsupported", e.Type) |
22 | } |
23 | } |
24 | } |
25 | } |
同步设备模板时,从下行控制器的信道里取得设备模板,根据事件类型进行下一步操作:如果是添加(Added)或删除(Deleted),只操作设备模板控制器的Map即可(删除还需要删掉所有相关设备,但是还没有写这部分代码);如果是更新(Modified)就相对复杂一些:
1 | func (dc *DownstreamController) deviceModelUpdated(deviceModel *v1alpha1.DeviceModel) { |
2 | // 先判断这个设备模板是否已经存在于设备模板管理器的Map,若存在,保存下旧的设备模板value |
3 | value, ok := dc.deviceModelManager.DeviceModel.Load(deviceModel.Name) |
4 | // 将更新的设备模板存进到Map |
5 | dc.deviceModelManager.DeviceModel.Store(deviceModel.Name, deviceModel) |
6 | // 如果原本存在,即进行了更新 |
7 | if ok { |
8 | cachedDeviceModel := value.(*v1alpha1.DeviceModel) |
9 | if isDeviceModelUpdated(cachedDeviceModel, deviceModel) { // 新旧对比判断是否有更新 |
10 | dc.updateAllConfigMaps(deviceModel) // 这个函数是空的,官方还没写 |
11 | } |
12 | // 如果原本不存在的话就等同于添加,通过添加函数再存一遍(实际上和上面重复了,不过对于Map无影响) |
13 | } else { |
14 | dc.deviceModelAdded(deviceModel) |
15 | } |
16 | } |
更新设备模板不仅要更新设备模板控制器里的Map,还要更新下行控制器中的configMap,而这个函数官方暂时还没有写。
2.3.2. 设备实例的添加
下面是同步设备实例的函数:
1 | func (dc *DownstreamController) syncDevice() { |
2 | for { |
3 | select { |
4 | case <-beehiveContext.Done(): |
5 | klog.Info("Stop syncDevice") |
6 | return |
7 | case e := <-dc.deviceManager.Events(): |
8 | device, ok := e.Object.(*v1alpha1.Device) |
9 | if !ok { |
10 | klog.Warningf("Object type: %T unsupported", device) |
11 | continue |
12 | } |
13 | switch e.Type { |
14 | case watch.Added: |
15 | dc.deviceAdded(device) // 创建device,存map,发到边缘 |
16 | case watch.Deleted: |
17 | dc.deviceDeleted(device) // 从map中删掉,向边缘发送删除消息 |
18 | case watch.Modified: |
19 | // 更新map并确认。 |
20 | // 如果nodeSelector更新了,删掉旧节点上的device,创建在新节点上 |
21 | // 如果twin更新了,向边缘发送更新请求 |
22 | dc.deviceUpdated(device) |
23 | default: |
24 | klog.Warningf("Device event type: %s unsupported", e.Type) |
25 | } |
26 | } |
27 | } |
28 | } |
可见,其于设备模板同步函数结构基本一致,但设备实例是需要云边同步的,因此操作要更复杂一些。我们先来看添加Added操作。
1 | // deviceAdded creates a device, adds in deviceManagers map, send a message to edge node if node selector is present. |
2 | func (dc *DownstreamController) deviceAdded(device *v1alpha1.Device) { |
3 | // 首先还是存入设备实例管理器的Map |
4 | dc.deviceManager.Device.Store(device.Name, device) |
5 | // 设备实例中有关联节点字段时才做进一步处理 |
6 | if len(device.Spec.NodeSelector.NodeSelectorTerms) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values) != 0 { |
7 | // 将设备实例添加到configMap,具体来说: |
8 | // 1、通过节点id在configMap管理器中获取节点对应的config,没有的话为其创建一个 |
9 | // 2、将设备配置文件添加到configMap |
10 | // 3、将configMap存入configMap管理器 |
11 | dc.addToConfigMap(device) |
12 | // 根据v1alpha1.Device创建types.Device,用于构建message.content |
13 | edgeDevice := createDevice(device) |
14 | // 要发给边缘节点的消息 |
15 | msg := model.NewMessage("") |
16 | // 构建消息 |
17 | resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "") |
18 | if err != nil { |
19 | klog.Warningf("Built message resource failed with error: %s", err) |
20 | return |
21 | } |
22 | msg.BuildRouter(constants.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation) |
23 | |
24 | content := types.MembershipUpdate{AddDevices: []types.Device{ |
25 | edgeDevice, |
26 | }} |
27 | content.EventID = uuid.NewV4().String() |
28 | content.Timestamp = time.Now().UnixNano() / 1e6 |
29 | msg.Content = content |
30 | // 将消息发给cloudhub |
31 | err = dc.messageLayer.Send(*msg) |
32 | if err != nil { |
33 | klog.Errorf("Failed to send device addition message %v due to error %v", msg, err) |
34 | } |
35 | } |
36 | } |
对于设备实例的添加,首先也要添加到设备实例管理器的Map中,不过如果设备实例中含有关联节点相关字段时,要添加到configMap并发消息到边缘节点。
2.3.3. 设备实例的删除
相应的,删除设备实例的函数如下:
1 | func (dc *DownstreamController) deviceDeleted(device *v1alpha1.Device) { |
2 | dc.deviceManager.Device.Delete(device.Name) |
3 | dc.deleteFromConfigMap(device) |
4 | edgeDevice := createDevice(device) |
5 | msg := model.NewMessage("") |
6 | |
7 | if len(device.Spec.NodeSelector.NodeSelectorTerms) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values) != 0 { |
8 | resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "") |
9 | msg.BuildRouter(constants.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation) |
10 | |
11 | content := types.MembershipUpdate{RemoveDevices: []types.Device{ |
12 | edgeDevice, |
13 | }} |
14 | content.EventID = uuid.NewV4().String() |
15 | content.Timestamp = time.Now().UnixNano() / 1e6 |
16 | msg.Content = content |
17 | if err != nil { |
18 | klog.Warningf("Built message resource failed with error: %s", err) |
19 | return |
20 | } |
21 | err = dc.messageLayer.Send(*msg) |
22 | if err != nil { |
23 | klog.Errorf("Failed to send device addition message %v due to error %v", msg, err) |
24 | } |
25 | } |
26 | } |
整体上与添加设备实例一致,删掉了管理器和configMap中对应的内容,并发Delete消息到边缘节点。
2.3.4. 设备实例的更新
更新设备实例的函数如下:
1 | // deviceUpdated updates the map, check if device is actually updated. |
2 | // If nodeSelector is updated, call add device for newNode, deleteDevice for old Node. |
3 | // If twin is updated, send twin update message to edge |
4 | func (dc *DownstreamController) deviceUpdated(device *v1alpha1.Device) { |
5 | // 首先保存下旧设备实例value,如果有的话 |
6 | value, ok := dc.deviceManager.Device.Load(device.Name) |
7 | // 设备实例管理器存入更新的设备实例 |
8 | dc.deviceManager.Device.Store(device.Name, device) |
9 | // 如果存在旧设备实例,就是更新操作 |
10 | if ok { |
11 | cachedDevice := value.(*v1alpha1.Device) |
12 | // 如果新旧设备实例不同,需要进行相应的更新 |
13 | if isDeviceUpdated(cachedDevice, device) { |
14 | // 如果nodeSelector更新了,需要在新节点添加设备实例,在旧节点删除设备实例 |
15 | if isNodeSelectorUpdated(cachedDevice.Spec.NodeSelector, device.Spec.NodeSelector) { |
16 | dc.deviceAdded(device) |
17 | deletedDevice := &v1alpha1.Device{ObjectMeta: cachedDevice.ObjectMeta, |
18 | Spec: cachedDevice.Spec, |
19 | Status: cachedDevice.Status, |
20 | TypeMeta: device.TypeMeta, |
21 | } |
22 | dc.deviceDeleted(deletedDevice) |
23 | // 如果Protocol更新了,更新configMap中的设备配置文件 |
24 | // Protocols字段,删掉旧属性,添加新属性 |
25 | // DeviceInstances字段,更新该设备实例的Protocol字段(实际就是上面那个新属性的名字) |
26 | } else if isProtocolConfigUpdated(&cachedDevice.Spec.Protocol, &device.Spec.Protocol) { |
27 | dc.updateProtocolInConfigMap(device) |
28 | // 如果twin更新了,建立一个MsgTwin的map,将更新的twin和删除的twin加进去 |
29 | // 该map将是消息的一部分 |
30 | } else { |
31 | // TODO: add an else if condition to check if DeviceModelReference has changed, if yes whether deviceModelReference exists |
32 | twin := make(map[string]*types.MsgTwin) |
33 | addUpdatedTwins(device.Status.Twins, twin, device.ResourceVersion) |
34 | addDeletedTwins(cachedDevice.Status.Twins, device.Status.Twins, twin, device.ResourceVersion) |
35 | msg := model.NewMessage("") |
36 | |
37 | resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "device/"+device.Name+"/twin/cloud_updated", "") |
38 | if err != nil { |
39 | klog.Warningf("Built message resource failed with error: %s", err) |
40 | return |
41 | } |
42 | msg.BuildRouter(constants.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation) |
43 | content := types.DeviceTwinUpdate{Twin: twin} |
44 | content.EventID = uuid.NewV4().String() |
45 | content.Timestamp = time.Now().UnixNano() / 1e6 |
46 | msg.Content = content |
47 | // 构建消息并发送给cloudhub |
48 | err = dc.messageLayer.Send(*msg) |
49 | if err != nil { |
50 | klog.Errorf("Failed to send deviceTwin message %v due to error %v", msg, err) |
51 | } |
52 | } |
53 | } |
54 | } else { |
55 | // 如果设备实例原本不存在于设备实例控制器的话,更新就等同于添加 |
56 | dc.deviceAdded(device) |
57 | } |
58 | } |
更新设备实例时,同样首先将新设备实例存到设备实例管理器。如果该设备实例原本不存在,等同于添加设备实例;如果原本存在的话,且新旧设备实例不同,即确实进行了更新操作,则根据情况进行更新:
- 如果是NodeSelector更新了,要在新节点上添加设备实例,在旧节点上删除设备实例,通过前面的设备实例的添加/删除函数
- 如果是Protocol更新了,更新下行控制器configMap管理器中的节点对应configMap中的设备配置文件
- 如果是Twin更新了,将更新情况构建message并发送到cloudhub
综上所述,下行控制器主要同步设备模板和设备实例。其中设备模板主要缓存在自己的Map中,不过就现有代码来看,后期可能也需要缓存在设备控制器的configMap中。设备实例还要将对设备实例的操作送到对应的边缘节点上,更新边缘的设备实例。
3. 上行控制器
相关流程:边缘的mapper监听设备更新,并通过MQTT broker报告给event bus。event bus将更新发送到device twin,其将在本地更新并在云端同步。上行控制器通过cloudhub监听并接收状态更新。
3.1. 上行控制器的结构
上行控制器的结构如下:
1 | type UpstreamController struct { |
2 | crdClient *rest.RESTClient |
3 | messageLayer messagelayer.MessageLayer |
4 | // 设备状态消息信道 |
5 | deviceStatusChan chan model.Message |
6 | |
7 | // 下行控制器用于更新缓存中的设备状态 |
8 | dc *DownstreamController |
9 | } |
上行控制器主要传递边缘设备的状态,因此有一个信道接收cloudhub发来的消息,同时设备状态也要在下行控制器中进行更新。
3.2. 上行控制器的创建
1 | // NewUpstreamController create UpstreamController from config |
2 | func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) { |
3 | config, err := utils.KubeConfig() |
4 | if err != nil { |
5 | klog.Warningf("Failed to create kube client: %s", err) |
6 | return nil, err |
7 | } |
8 | |
9 | crdcli, err := utils.NewCRDClient(config) |
10 | if err != nil { |
11 | klog.Warningf("Failed to create crd client: %s", err) |
12 | return nil, err |
13 | } |
14 | |
15 | uc := &UpstreamController{ |
16 | crdClient: crdcli, |
17 | messageLayer: messagelayer.NewContextMessageLayer(), |
18 | dc: dc, |
19 | } |
20 | return uc, nil |
21 | } |
上行控制器创建中的内容在下行控制器的创建中都已经分析过了。
3.3. 上行控制器的启动
1 | // Start UpstreamController |
2 | func (uc *UpstreamController) Start() error { |
3 | klog.Info("Start upstream devicecontroller") |
4 | |
5 | uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus) |
6 | go uc.dispatchMessage() |
7 | |
8 | for i := 0; i < int(config.Config.Buffer.UpdateDeviceStatus); i++ { |
9 | go uc.updateDeviceStatus() |
10 | } |
11 | return nil |
12 | } |
上行控制器启动时,主要做了以下几件事:
- 实例化自己接收cloudhub消息的信道
- 启动消息分发协程,通过beehiveContext接收cloudhub发来的消息,转发到自己的设备状态消息信道
- 启动多个更新设备状态协程
其中设备状态的更新函数如下:
1 | func (uc *UpstreamController) updateDeviceStatus() { |
2 | for { |
3 | select { |
4 | case <-beehiveContext.Done(): |
5 | klog.Info("Stop updateDeviceStatus") |
6 | return |
7 | // 收到消息 |
8 | case msg := <-uc.deviceStatusChan: |
9 | klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource()) |
10 | // 获取到消息里的msgTwin和设备id |
11 | msgTwin, err := uc.unmarshalDeviceStatusMessage(msg) |
12 | if err != nil { |
13 | klog.Warningf("Unmarshall failed due to error %v", err) |
14 | continue |
15 | } |
16 | deviceID, err := messagelayer.GetDeviceID(msg.GetResource()) |
17 | if err != nil { |
18 | klog.Warning("Failed to get device id") |
19 | continue |
20 | } |
21 | // 通过设备id获得下行控制器设备实例管理器中对应的设备实例device |
22 | device, ok := uc.dc.deviceManager.Device.Load(deviceID) |
23 | if !ok { |
24 | klog.Warningf("Device %s does not exist in downstream controller", deviceID) |
25 | continue |
26 | } |
27 | // cacheDevice是未更新的设备实例 |
28 | cacheDevice, ok := device.(*v1alpha1.Device) |
29 | if !ok { |
30 | klog.Warning("Failed to assert to CacheDevice type") |
31 | continue |
32 | } |
33 | // deviceStatus是旧的设备实例状态,根据msgTwin进行更新 |
34 | deviceStatus := &DeviceStatus{Status: cacheDevice.Status} |
35 | for twinName, twin := range msgTwin.Twin { |
36 | for i, cacheTwin := range deviceStatus.Status.Twins { |
37 | if twinName == cacheTwin.PropertyName && twin.Actual != nil && twin.Actual.Value != nil { |
38 | reported := v1alpha1.TwinProperty{} |
39 | reported.Value = *twin.Actual.Value |
40 | reported.Metadata = make(map[string]string) |
41 | if twin.Actual.Metadata != nil { |
42 | reported.Metadata["timestamp"] = strconv.FormatInt(twin.Actual.Metadata.Timestamp, 10) |
43 | } |
44 | if twin.Metadata != nil { |
45 | reported.Metadata["type"] = twin.Metadata.Type |
46 | } |
47 | deviceStatus.Status.Twins[i].Reported = reported |
48 | break |
49 | } |
50 | } |
51 | } |
52 | |
53 | // 更新cacheDevice的状态到最新,存回下行控制器设备实例管理器的Map |
54 | cacheDevice.Status = deviceStatus.Status |
55 | uc.dc.deviceManager.Device.Store(deviceID, cacheDevice) |
56 | // 设备状态数据json化为body |
57 | body, err := json.Marshal(deviceStatus) |
58 | if err != nil { |
59 | klog.Errorf("Failed to marshal device status %v", deviceStatus) |
60 | continue |
61 | } |
62 | // 更新k8s中设备实例的状态 |
63 | result := uc.crdClient.Patch(MergePatchType).Namespace(cacheDevice.Namespace).Resource(ResourceTypeDevices).Name(deviceID).Body(body).Do() |
64 | if result.Error() != nil { |
65 | klog.Errorf("Failed to patch device status %v of device %v in namespace %v", deviceStatus, deviceID, cacheDevice.Namespace) |
66 | continue |
67 | } |
68 | klog.Infof("Message: %s process successfully", msg.GetID()) |
69 | } |
70 | } |
71 | } |
主要将边缘的设备状态更新到下行控制器设备实例管理器Map和k8s中去。
4. 总结
综上所述,下行控制器主要负责将云端对设备实例的操作本地更新并同步给对应的边缘节点,将云端对设备模板的操作更新在设备模板管理器中;上行控制器主要负责将设备的状态实时同步给云端,并更新下行控制器里的设备实例。目前还没有发现configMap管理器具体的作用。