0. 引言
在KubeEdge中,mapper组件是边缘设备与边缘节点之间的桥梁。Mapper从边缘设备中采集设备数据,并通过Mqtt的形式将数据传递给Mqtt Broker,并通过EventBus进入边缘节点。因此,对于不同的边缘设备,也需要不同的mapper进行数据采集。社区目前已经提供了两种比较通用的mapper组件,分别支持蓝牙协议和modbus协议,这都是边缘设备中比较常见的协议。本文将对支持蓝牙协议的mapper进行分析,并结合社区CC2560 Sensortag的例子加以说明。
注:由于BluetoothMapper和demo都有些时间没有维护了,笔者在实践过程中也有许多不解之处,如有疑问欢迎与我交流。
1. 整体分析
Bluetooth Mapper主要包含以下5个组件:
- Action Manager 动作管理器
- Scheduler 调度器
- Watcher 监视器
- Controller 控制器
- Data Converter 数据转换器
其中控制器起到统领全局的作用,可以控制其他组件。我们将在下面遇到的时候加以介绍。
2. Main()函数
首先要从mapper的main函数看起:
1 | func main() { |
2 | // 1、初始化日志等杂项 |
3 | klog.InitFlags(nil) |
4 | pflag.CommandLine.AddGoFlagSet(flag.CommandLine) |
5 | pflag.Parse() |
6 | |
7 | // 2、初始化配置文件 |
8 | BleConfig := configuration.BLEConfig{} |
9 | err := BleConfig.Load() |
10 | if err != nil { |
11 | klog.Errorf("Error in loading configuration: %s", err) |
12 | os.Exit(1) |
13 | } |
14 | // 3、初始化控制器 |
15 | bleController := controller.Config{ |
16 | Watcher: BleConfig.Watcher, |
17 | ActionManager: BleConfig.ActionManager, |
18 | Scheduler: BleConfig.Scheduler, |
19 | Converter: BleConfig.Converter, |
20 | Device: BleConfig.Device, |
21 | Mqtt: BleConfig.Mqtt, |
22 | } |
23 | // 4、启动控制器 |
24 | bleController.Start() |
25 | } |
main()函数里主要做的就是将相关配置保存到一个BleConfig里,基于其来实例化bleController控制器并启动。Mapper即可连接设备、接收数据并传输给Mqtt Broker。
2.1. BleConfig结构
首先来看BleConfig的结构,它和blecontroller的成员是一致的:
1 | type BLEConfig struct { |
2 | Mqtt Mqtt `yaml:"mqtt"` |
3 | Device Device `yaml:"device"` |
4 | Watcher watcher.Watcher `yaml:"watcher"` |
5 | Scheduler scheduler.Scheduler `yaml:"scheduler"` |
6 | ActionManager actionmanager.ActionManager `yaml:"action-manager"` |
7 | Converter dataconverter.Converter `yaml:"data-converter"` |
8 | } |
其中,BLE(Bluetooth Low Energy)指的是低功耗蓝牙的意思,在边缘设备中比较常用。
2.1.1. ActionManager
1 | type ActionManager struct { |
2 | Actions []Action `yaml:"actions"` |
3 | } |
4 | |
5 | type Action struct { |
6 | PerformImmediately bool `yaml:"perform-immediately" json:"perform-immediately"` |
7 | Name string `yaml:"name" json:"name"` |
8 | Operation Operation `yaml:"operation" json:"operation"` |
9 | } |
10 | |
11 | type Operation struct { |
12 | Action string `yaml:"action" json:"action"` |
13 | CharacteristicUUID string `yaml:"characteristic-uuid" json:"characteristic-uuid"` |
14 | Value []byte `yaml:"value" json:"value"` |
15 | } |
ActionManager组件主要用于管理“动作”。其中Action主要用来定义动作,每个设备可以有多个这样的动作,每个动作对应设备的一组读/写操作。一个蓝牙设备可以通过向其物理寄存器写入特定值来控制,并从特定寄存器来读取数据。这些寄存器通过特征值(characteristic values)标示分辨,并以UUID的形式公开这些特征值。这些动作都需要通过config文件提供给ActionManage,而且这些在配置文件里的初始值可以通过Mqtt发布修改。各成员含义如下:
- ActionManager.Actions:动作数组,其中的每个动作都可以由ActionManager控制执行,或者由schedular和watcher调用
- Action.Name:动作名称,每个动作名称需要唯一
- Action.PerformImmediately:立即执行,为true则ActionManager会立即执行动作一次
- Action.Operation.Action:真正执行的动作,分为READ和WRITE
- Action.Operation.CharacteristicUUID:唯一确定要进行读写操作的设备寄存器
- Action.Operation.Value:读到的值或要写入的值
2.1.2. Scheduler
1 | type Scheduler struct { |
2 | Schedules []Schedule `yaml:"schedules" json:"schedules"` |
3 | } |
4 | |
5 | type Schedule struct { |
6 | Name string `yaml:"name" json:"name"` |
7 | Interval int `yaml:"interval" json:"interval"` |
8 | OccurrenceLimit int `yaml:"occurrence-limit" json:"occurrence-limit"` |
9 | Actions []string `yaml:"actions"` |
10 | } |
Schedular组件用于控制mapper按固定时间间隔执行一个或一组动作,属于可选模块。用户可以通过配置文件提供数组来定义多个schedular。schedular字段的具体各个字段如下:
- Schedule.Name:名称,要求唯一
- Schedule.Interval:时间间隔,以ms为单位
- Schedule.OccurrenceLimit:执行次数,0值则无限执行
- Schedule.Actions:即计划执行的动作序列名称,有序执行
2.1.3. Watcher
1 | type Watcher struct { |
2 | DeviceTwinAttributes []Attribute `yaml:"device-twin-attributes" json:"device-twin-attributes"` |
3 | } |
4 | |
5 | type Attribute struct { |
6 | Name string `yaml:"device-property-name" json:"device-property-name"` |
7 | Actions []string `yaml:"actions" json:"actions"` |
8 | } |
Watcher组件的指职责主要是:扫描蓝牙设备并连接到正确的设备;监视设备的双属性预期状态,并执行使实际状态与预期相等的动作;将双属性的实际状态报告给云端。这也是一个可选组件,watcher字段的具体各个字段如下:
- Attribute.Name:设备属性名称(
注:请注意property和attribute两个“属性”的区别,前者是我们需要读写的设备属性,后者是指蓝牙协议中一条带有标签的、可以被寻址的数据),创建设备时给定的设备双属性名称。观察者使用此名称监视预期状态的任何更改 - Attribute.Actions:动作数组,通过它们将实际状态转化为预期状态,有序
另外,以上三者的配置在运行时都可以通过向mqtt发布消息来进行更改,具体消发布格式可见官方文档。
2.1.4. Converter
1 | //Converter是保存数据转换特定配置的结构 |
2 | type Converter struct { |
3 | DataWrite DataWrite `yaml:"write"` |
4 | DataRead DataRead `yaml:"read"` |
5 | } |
数据转换器的结构中主要包含两部分,数据的写入和读取。其中数据写入部分的结构如下:
1 | //dataWrite结构保存特定的数据写入的配置信息 |
2 | type DataWrite struct { |
3 | Attributes []WriteAttribute `yaml:"attributes"` |
4 | } |
5 | |
6 | //WriteAttribute保存属性的名称以及要写入的值的DataMap |
7 | type WriteAttribute struct { |
8 | Name string `yaml:"name"` |
9 | Operations map[string]DataMap `yaml:"operations"` |
10 | } |
11 | |
12 | //DataMap 结构保存期望值和要写入设备字节值的map |
13 | type DataMap struct { |
14 | DataMapping map[string][]byte `yaml:"data-map"` |
15 | } |
数据读取部分的结构如下:
1 | //dataRead结构保存特定的数据读取的配置信息 |
2 | type DataRead struct { |
3 | Actions []ReadAction `yaml:"actions"` |
4 | } |
5 | |
6 | //ReadAction保存动作名称以及读取数据的转换操作 |
7 | type ReadAction struct { |
8 | ActionName string `yaml:"action-name"` |
9 | ConversionOperation ReadOperation `yaml:"conversion-operation"` |
10 | } |
11 | |
12 | //ReadOperation指定如何将从设备接收的数据转换为有意义的数据 |
13 | type ReadOperation struct { |
14 | StartIndex int `yaml:"start-index"` |
15 | EndIndex int `yaml:"end-index"` |
16 | ShiftLeft uint `yaml:"shift-left"` |
17 | ShiftRight uint `yaml:"shift-right"` |
18 | Multiply float64 `yaml:"multiply"` |
19 | Divide float64 `yaml:"divide"` |
20 | Add float64 `yaml:"add"` |
21 | Subtract float64 `yaml:"subtract"` |
22 | OrderOfExecution []string `yaml:"order-of-execution"` |
23 | } |
2.1.5. Device
1 | type Device struct { |
2 | ID string `yaml:"id"` |
3 | Name string `yaml:"name"` |
4 | } |
Device结构比较简单,仅保存了设备的ID和名称。
2.1.6. Mqtt
1 | type Mqtt struct { |
2 | Mode int `yaml:"mode"` |
3 | InternalServer string `yaml:"internal-server"` |
4 | Server string `yaml:"server"` |
5 | } |
Mqtt(消息队列遥测传输)是ISO 标准下基于发布/订阅范式的消息协议,本文不对Mqtt多加介绍。Mqtt协议有两种模式,分别是internal模式和external模式,分别用于内部通信和设备连接。
2.2. Load函数(一)
接下来看Load函数,该函数非常长,主要是实例化BleConfig的各个成员,除了ActionManager以外的成员的初始化都比较直接:
1 | //Load is used to consolidate the information loaded from the configuration file and the configmaps |
2 | func (b *BLEConfig) Load() error { |
3 | // 1、从目录中读取配置文件和configmap文件,后者运行时才创建 |
4 | readConfigFile := ReadConfigFile{} |
5 | readConfigMap := DeviceProfile{} |
6 | err := readConfigFile.ReadFromConfigFile() |
7 | if err != nil { |
8 | return errors.New("Error while reading from configuration file " + err.Error()) |
9 | } |
10 | err = readConfigMap.ReadFromConfigMap() |
11 | if err != nil { |
12 | return errors.New("Error while reading from config map " + err.Error()) |
13 | } |
14 | // 2-1、根据配置文件初始化BleConfig的Mqtt、Schedular和Watcher |
15 | b.Mqtt = readConfigFile.Mqtt |
16 | b.Scheduler = readConfigFile.Scheduler |
17 | b.Watcher = readConfigFile.Watcher |
18 | // 2-2、根据configmap初始化BleConfig的Device |
19 | for _, device := range readConfigMap.DeviceInstances { |
20 | if strings.EqualFold(device.Model, readConfigFile.DeviceModelName) { |
21 | b.Device.ID = device.ID |
22 | b.Device.Name = device.Model |
23 | } |
24 | } |
25 | // 2-3、初始化BleConfig的ActionManager(重点) |
26 | for _, actionConfig := range readConfigFile.ActionManager.Actions {...} |
27 | Config = b |
28 | return nil |
29 | } |
最后的Config是一个全局变量。在看初始化ACtionManager的代码前,先看一下这里面用到的两个文件路径:
1 | //ConfigFilePath contains the location of the configuration file |
2 | var ConfigFilePath = "configuration/config.yaml" |
3 | |
4 | //ConfigMapPath contains the location of the configuration file |
5 | var ConfigMapPath = "/opt/kubeedge/deviceProfile.json" |
前者,配置文件,对于每一个设备,用户都要自己构建一个config.yaml,放到bluetoothmapper中用于创建对应设备mapper的Dockerfile,用来定义动作、调度器和监视器。我们以CC2650为例来看一下:
1 | mqtt: |
2 | mode: 0 # 0 -internal mqtt broker 1 - external mqtt broker |
3 | server: tcp://127.0.0.1:1883 # external mqtt broker url. |
4 | internal-server: tcp://127.0.0.1:1884 # internal mqtt broker url. |
5 | device-model-name: cc2650-sensortag |
6 | action-manager: |
7 | actions: |
8 | - name: IRTemperatureConfiguration |
9 | perform-immediately: true |
10 | device-property-name: temperature-enable #property-name defined in the device model |
11 | - name: IRTemperatureData |
12 | perform-immediately: false |
13 | device-property-name: temperature #property-name defined in the device model |
14 | - name: IOConfigurationInitialize |
15 | perform-immediately: true |
16 | device-property-name: io-config-initialize #property-name defined in the device model |
17 | - name: IODataInitialize |
18 | perform-immediately: true |
19 | device-property-name: io-data-initialize #property-name defined in the device model |
20 | - name: IOConfiguration |
21 | perform-immediately: true |
22 | device-property-name: io-config #property-name defined in the device model |
23 | - name: IOData |
24 | perform-immediately: false |
25 | device-property-name: io-data #property-name defined in the device model |
26 | scheduler: |
27 | schedules: |
28 | - name: temperature |
29 | interval: 3000 |
30 | occurrence-limit: 10 # if it is 0, then the event will execute infinitely |
31 | actions: |
32 | - IRTemperatureData # Action name defined in the action-manager section |
33 | watcher: |
34 | device-twin-attributes : |
35 | - device-property-name: io-data # the twin attribute name defined while creating device |
36 | actions: # list of action names, defined in the action-manager section, to be executed on the device |
37 | - IOConfigurationInitialize |
38 | - IODataInitialize |
39 | - IOConfiguration |
40 | - IOData |
其首先定义了mqtt的属性和对应的设备模型,即使用cc2650-sensortag的设备模型,接着定义了各个动作,并且由调度器每3s读取红外温度1次,由监视器控制io-data,即CC2650的红灯、绿灯和蜂鸣器,具体怎么监控后面来讲。
后者,configmap文件,是运行后创建的,比照数据可见其是依据device和devicemodel创建的,具体来说,当device创建时,设备控制器的下行控制器会将其各个属性保存在一个名为device-profile-config-<nodename>的ConfigMap中,而mapper就是通过读取这个configmap来获取到这些信息的。我们可以通过如下命令进入容器来查看这个文件:
# docker exec -it <docker id> /bin/sh
# cat /opt/kubeedge/deviceProfile.json
这里以笔者边缘节点的deviceProfile.json文件为例,连接了CC2650-Sensortag和温度传感器两个设备:
1 | { |
2 | "deviceInstances": [{ |
3 | "id": "temperature", |
4 | "name": "temperature", |
5 | "model": "temperature-model" |
6 | }, { |
7 | "id": "sensor-tag-instance-01", |
8 | "name": "sensor-tag-instance-01", |
9 | "protocol": "bluetooth-sensor-tag-instance-01", |
10 | "model": "cc2650-sen" |
11 | }], |
12 | "deviceModels": [{ |
13 | "name": "temperature-model", |
14 | "properties": [{ |
15 | "name": "temperature-status", |
16 | "dataType": "string", |
17 | "description": "Temperature collected from the edge device", |
18 | "accessMode": "ReadOnly", |
19 | "defaultValue": "" |
20 | }] |
21 | }, { |
22 | "name": "cc2650-sen", |
23 | "properties": [{ |
24 | "name": "temperature", |
25 | "dataType": "int", |
26 | "description": "temperature in degree celsius", |
27 | "accessMode": "ReadOnly", |
28 | "defaultValue": 0, |
29 | "maximum": 100, |
30 | "unit": "degree celsius" |
31 | }, { |
32 | "name": "temperature-enable", |
33 | "dataType": "string", |
34 | "description": "enable data collection of temperature sensor", |
35 | "accessMode": "ReadWrite", |
36 | "defaultValue": "ON" |
37 | }, { |
38 | "name": "io-config-initialize", |
39 | "dataType": "int", |
40 | "description": "initialize io-config with value 0", |
41 | "accessMode": "ReadWrite", |
42 | "defaultValue": 0 |
43 | }, { |
44 | "name": "io-data-initialize", |
45 | "dataType": "int", |
46 | "description": "initialize io-data with value 0", |
47 | "accessMode": "ReadWrite", |
48 | "defaultValue": 0 |
49 | }, { |
50 | "name": "io-config", |
51 | "dataType": "int", |
52 | "description": "register activation of io-config", |
53 | "accessMode": "ReadWrite", |
54 | "defaultValue": 1 |
55 | }, { |
56 | "name": "io-data", |
57 | "dataType": "int", |
58 | "description": "data field to control io-control", |
59 | "accessMode": "ReadWrite", |
60 | "defaultValue": 0 |
61 | }] |
62 | }], |
63 | "protocols": [{ |
64 | "name": "bluetooth-sensor-tag-instance-01", |
65 | "protocol": "bluetooth", |
66 | "protocol_config": {} |
67 | }], |
68 | "propertyVisitors": [{ |
69 | "name": "temperature", |
70 | "propertyName": "temperature", |
71 | "modelName": "cc2650-sen", |
72 | "protocol": "bluetooth", |
73 | "visitorConfig": { |
74 | "characteristicUUID": "f000aa0104514000b000000000000000", |
75 | "dataConverter": { |
76 | "startIndex": 1, |
77 | "shiftRight": 2, |
78 | "orderOfOperations": [{ |
79 | "operationType": "Multiply", |
80 | "operationValue": 0.03125 |
81 | }] |
82 | } |
83 | } |
84 | }, { |
85 | "name": "temperature-enable", |
86 | "propertyName": "temperature-enable", |
87 | "modelName": "cc2650-sen", |
88 | "protocol": "bluetooth", |
89 | "visitorConfig": { |
90 | "characteristicUUID": "f000aa0204514000b000000000000000", |
91 | "dataWrite": { |
92 | "OFF": "AA==", |
93 | "ON": "AQ==" |
94 | }, |
95 | "dataConverter": {} |
96 | } |
97 | }, { |
98 | "name": "io-config-initialize", |
99 | "propertyName": "io-config-initialize", |
100 | "modelName": "cc2650-sen", |
101 | "protocol": "bluetooth", |
102 | "visitorConfig": { |
103 | "characteristicUUID": "f000aa6604514000b000000000000000", |
104 | "dataConverter": {} |
105 | } |
106 | }, { |
107 | "name": "io-data-initialize", |
108 | "propertyName": "io-data-initialize", |
109 | "modelName": "cc2650-sen", |
110 | "protocol": "bluetooth", |
111 | "visitorConfig": { |
112 | "characteristicUUID": "f000aa6504514000b000000000000000", |
113 | "dataConverter": {} |
114 | } |
115 | }, { |
116 | "name": "io-config", |
117 | "propertyName": "io-config", |
118 | "modelName": "cc2650-sen", |
119 | "protocol": "bluetooth", |
120 | "visitorConfig": { |
121 | "characteristicUUID": "f000aa6604514000b000000000000000", |
122 | "dataConverter": {} |
123 | } |
124 | }, { |
125 | "name": "io-data", |
126 | "propertyName": "io-data", |
127 | "modelName": "cc2650-sen", |
128 | "protocol": "bluetooth", |
129 | "visitorConfig": { |
130 | "characteristicUUID": "f000aa6504514000b000000000000000", |
131 | "dataWrite": { |
132 | "Buzzer": "BA==", |
133 | "BuzzerGreen": "Bg==", |
134 | "BuzzerRed": "BQ==", |
135 | "BuzzerRedGreen": "Bw==", |
136 | "Green": "Ag==", |
137 | "Red": "AQ==", |
138 | "RedGreen": "Aw==" |
139 | }, |
140 | "dataConverter": {} |
141 | } |
142 | }] |
143 | } |
读取上述文件后,根据配置文件,可以直接初始化mqtt、Schedular和Watcher;根据configmap文件也可以初始化设备的ID和名称(两者可以同值)。到此为止,BleConfig中就剩下了ActionManager和Converter还没有初始化了。
2.3. Load函数(二)
2.2中省略部分的代码如下:
1 | // 1、分别初始化ActionManager中的每一个动作 |
2 | for _, actionConfig := range readConfigFile.ActionManager.Actions { |
3 | // 1-1、创建action,直接初始化action.Name和action.PerformImmediately |
4 | action := actionmanager.Action{} |
5 | action.Name = actionConfig.Name |
6 | action.PerformImmediately = actionConfig.PerformImmediately |
7 | // 1-2、根据configmap文件里的propertyVistor(属性访问)字段进行初始化 |
8 | for _, propertyVisitor := range readConfigMap.PropertyVisitors { |
9 | if strings.EqualFold(propertyVisitor.ModelName, b.Device.Name) && strings.EqualFold(propertyVisitor.PropertyName, actionConfig.PropertyName) && strings.ToUpper(propertyVisitor.Protocol) == ProtocolName { |
10 | propertyVisitorBytes, err := json.Marshal(propertyVisitor.VisitorConfig) |
11 | if err != nil { |
12 | return errors.New("Error in marshalling data property visitor configuration: " + err.Error()) |
13 | } |
14 | bluetoothPropertyVisitor := VisitorConfigBluetooth{} |
15 | err = json.Unmarshal(propertyVisitorBytes, &bluetoothPropertyVisitor) |
16 | if err != nil { |
17 | return errors.New("Error in unmarshalling data property visitor configuration: " + err.Error()) |
18 | } |
19 | // 1-2-1、初始化action.Operation.CharacteristicUUID |
20 | action.Operation.CharacteristicUUID = bluetoothPropertyVisitor.CharacteristicUUID |
21 | newBluetoothVisitorConfig := VisitorConfigBluetooth{} |
22 | if !reflect.DeepEqual(bluetoothPropertyVisitor.BluetoothDataConverter, newBluetoothVisitorConfig.BluetoothDataConverter) { |
23 | // 1-2-2、初始化Converter.ReadAction中的各个字段 |
24 | readAction := dataconverter.ReadAction{} |
25 | readAction.ActionName = actionConfig.Name |
26 | readAction.ConversionOperation.StartIndex = bluetoothPropertyVisitor.BluetoothDataConverter.StartIndex |
27 | readAction.ConversionOperation.EndIndex = bluetoothPropertyVisitor.BluetoothDataConverter.EndIndex |
28 | readAction.ConversionOperation.ShiftRight = bluetoothPropertyVisitor.BluetoothDataConverter.ShiftRight |
29 | readAction.ConversionOperation.ShiftLeft = bluetoothPropertyVisitor.BluetoothDataConverter.ShiftLeft |
30 | for _, readOperations := range bluetoothPropertyVisitor.BluetoothDataConverter.OrderOfOperations { |
31 | readAction.ConversionOperation.OrderOfExecution = append(readAction.ConversionOperation.OrderOfExecution, readOperations.BluetoothOperationType) |
32 | switch strings.ToUpper(readOperations.BluetoothOperationType) { |
33 | case strings.ToUpper(BluetoothAdd): |
34 | readAction.ConversionOperation.Add = readOperations.BluetoothOperationValue |
35 | case strings.ToUpper(BluetoothSubtract): |
36 | readAction.ConversionOperation.Subtract = readOperations.BluetoothOperationValue |
37 | case strings.ToUpper(BluetoothMultiply): |
38 | readAction.ConversionOperation.Multiply = readOperations.BluetoothOperationValue |
39 | case strings.ToUpper(BluetoothDivide): |
40 | readAction.ConversionOperation.Divide = readOperations.BluetoothOperationValue |
41 | } |
42 | } |
43 | b.Converter.DataRead.Actions = append(b.Converter.DataRead.Actions, readAction) |
44 | } |
45 | if bluetoothPropertyVisitor.DataWriteToBluetooth != nil { |
46 | // 1-2-3、初始化Converter.WriteAttribute中的各个字段,数据转换器初始化完成 |
47 | writeAttribute := dataconverter.WriteAttribute{} |
48 | writeAttribute.Operations = make(map[string]dataconverter.DataMap, 1) |
49 | dataMap := dataconverter.DataMap{} |
50 | dataMap.DataMapping = bluetoothPropertyVisitor.DataWriteToBluetooth |
51 | writeAttribute.Operations[actionConfig.Name] = dataMap |
52 | writeAttribute.Name = propertyVisitor.PropertyName |
53 | b.Converter.DataWrite.Attributes = append(b.Converter.DataWrite.Attributes, writeAttribute) |
54 | } |
55 | } |
56 | } |
57 | // 1-3、根据configmap文件中的设备模型进行初始化 |
58 | for _, deviceModel := range readConfigMap.DeviceModels { |
59 | if strings.EqualFold(deviceModel.Name, b.Device.Name) { |
60 | for _, property := range deviceModel.Properties { |
61 | if strings.EqualFold(property.Name, actionConfig.PropertyName) { |
62 | if property.AccessMode == READWRITE { |
63 | action.Operation.Action = "Write" |
64 | if strings.ToUpper(property.DataType) == "INT" { |
65 | value := string(int(property.DefaultValue.(float64))) |
66 | action.Operation.Value = []byte(value) |
67 | } else if strings.ToUpper(property.DataType) == "STRING" { |
68 | for _, converterAttribute := range b.Converter.DataWrite.Attributes { |
69 | if strings.EqualFold(converterAttribute.Name, actionConfig.PropertyName) { |
70 | for operationName, dataMap := range converterAttribute.Operations { |
71 | if action.Name == operationName { |
72 | if _, ok := dataMap.DataMapping[property.DefaultValue.(string)]; ok { |
73 | action.Operation.Value = dataMap.DataMapping[property.DefaultValue.(string)] |
74 | } |
75 | } |
76 | } |
77 | } |
78 | } |
79 | } |
80 | } else if property.AccessMode == READ { |
81 | action.Operation.Action = "Read" |
82 | } |
83 | } |
84 | } |
85 | } |
86 | } |
87 | b.ActionManager.Actions = append(b.ActionManager.Actions, action) |
88 | } |
对于动作的初始化,名称和立即执行字段都可以直接初始化。
在第一个for循环中,根据PropertyVisitors字段初始化特征值UUID和数据转换器。特征UUID对应设备模型字段spec.propertyVisitors.bluetooth.characteristicUUID,数据转换,则对应设备模型字段spec.propertyVisitors.bluetooth.dataConverter。
在第二个for循环中,根据configmap文件的DeviceModels.deviceModel.property属性字段,初始化action.Operation.Action字段(Read和ReadWrite两种值)和action.Operation.Value字段(INT/STRING两种格式)。
最后,将初始化好的action加入到BleConfig的ActionManager的动作数组。待所有的action都放入后,将其作为一个全局变量。
接下来,回到main()函数,根据BleConfig初始化bleController并将其启动。
2.4. 启动bleController
2.4.1. 总览
启动函数如下:
1 | //Start starts the controller of the mapper |
2 | func (c *Config) Start() { |
3 | // 1、初始化TopicMap,就是初始化Mqtt对某些Topic的处理函数,用于通过Mqtt发布来在运行时更新配置 |
4 | c.initTopicMap() |
5 | // 2、通过配置文件信息连接到Mqtt并订阅1里注册的所有Topic |
6 | helper.MqttConnect(c.Mqtt.Mode, c.Mqtt.InternalServer, c.Mqtt.Server) |
7 | subscribeAllTopics() |
8 | // *这一句用了sync.Waitgroup,在Start()函数尾等待所有任务完结 |
9 | helper.ControllerWg.Add(1) |
10 | // 3、通过开源的gatt新建一个外设 |
11 | device, err := gatt.NewDevice(option.DefaultClientOptions...) |
12 | if err != nil { |
13 | klog.Fatalf("Failed to open device, err: %s\n", err) |
14 | return |
15 | } |
16 | // 4、通过之前的配置文件将这个外设初始化到watcher |
17 | go c.Watcher.Initiate(device, c.Device.Name, c.Device.ID, c.ActionManager.Actions, c.Converter) |
18 | // 5、当外设连接上之后,如果需要立即执行就立即执行一次 |
19 | <-watcher.DeviceConnected |
20 | for _, action := range c.ActionManager.Actions { |
21 | if action.PerformImmediately { |
22 | action.PerformOperation(c.Converter.DataRead) |
23 | } |
24 | } |
25 | // 6、按时执行 |
26 | for _, schedule := range c.Scheduler.Schedules { |
27 | helper.ControllerWg.Add(1) |
28 | go schedule.ExecuteSchedule(c.ActionManager.Actions, c.Converter.DataRead, c.Device.ID) |
29 | } |
30 | helper.ControllerWg.Wait() |
31 | } |
第一步初始化了下列处理函数:
1 | // initTopicMap initializes topics to their respective handler functions |
2 | func (c *Config) initTopicMap() { |
3 | topicMap[MapperTopicPrefix+c.Device.ID+WatcherTopicSuffix] = c.handleWatchMessage |
4 | topicMap[MapperTopicPrefix+c.Device.ID+SchedulerCreateTopicSuffix] = c.handleScheduleCreateMessage |
5 | topicMap[MapperTopicPrefix+c.Device.ID+SchedulerDeleteTopicSuffix] = c.handleScheduleDeleteMessage |
6 | topicMap[MapperTopicPrefix+c.Device.ID+ActionManagerCreateTopicSuffix] = c.handleActionCreateMessage |
7 | topicMap[MapperTopicPrefix+c.Device.ID+ActionManagerDeleteTopicSuffix] = c.handleActionDeleteMessage |
8 | } |
包括监视、调度器的创建和删除、动作的创建和删除。
第二步根据mqtt的配置建立连接,并订阅了topicmap中的所有主题,并对ControllerWg + 1。
第三步中,利用了github上的gatt包,它是用于编写Ble外围设备和中央设备的协议。其中外围设备,在此处也就是边缘设备,可以创建服务,特征和描述符,发布,接受连接以及处理请求;中央设备,在此处也就是边缘节点,可以扫描,连接,发现服务并提出请求。本文不对其多加分析,可以参考https://github.com/paypal/gatt。这里就和设备建立起了连接。此处可参考3.2. Bug总结。
第四步,将设备初始化到监视器,具体如下:
1 | //Initiate initiates the watcher module |
2 | func (w *Watcher) Initiate(device gatt.Device, nameOfDevice, idOfDevice string, actions []actionmanager.Action, converter dataconverter.Converter) { |
3 | deviceID = idOfDevice |
4 | deviceName = nameOfDevice |
5 | actionManager = actions |
6 | dataConverter = converter |
7 | // 注册处理程序 |
8 | device.Handle( |
9 | gatt.PeripheralConnected(w.onPeripheralConnected), |
10 | gatt.PeripheralDisconnected(onPeripheralDisconnected), |
11 | gatt.PeripheralDiscovered(onPeripheralDiscovered), |
12 | ) |
13 | // 注册边缘设备变化时执行的操作 |
14 | device.Init(onStateChanged) |
15 | <-done |
16 | klog.Infof("Watcher Done") |
17 | } |
监视器给设备注册了三个处理函数,分别处理连接建立、停止连接和设备发现事件,并且注册了边缘设备变化时执行的操作,用于扫描设备和停止扫描。
2.4.2. 设备发现
设备发现事件处理函数:
1 | func onPeripheralDiscovered(p gatt.Peripheral, a *gatt.Advertisement, rssi int) { |
2 | if strings.EqualFold(a.LocalName, strings.Replace(deviceName, "-", " ", -1)) { |
3 | klog.Infof("Device: %s found !!!! Stop Scanning for devices", deviceName) |
4 | // Stop scanning once we've got the peripheral we're looking for. |
5 | p.Device().StopScanning() |
6 | klog.Infof("Connecting to %s", deviceName) |
7 | p.Device().Connect(p) |
8 | } |
9 | } |
这里是通过设备的本地名称和配置名称进行对比确认是否是要找的设备的,参考3.2. Bug总结。
2.4.3. 连接建立
设备连接建立事件处理函数:
1 | //onPeripheralConnected contains the operations to be performed as soon as the peripheral device is connected |
2 | func (w *Watcher) onPeripheralConnected(p gatt.Peripheral, err error) { |
3 | actionmanager.GattPeripheral = p |
4 | ss, err := p.DiscoverServices(nil) |
5 | if err != nil { |
6 | klog.Errorf("Failed to discover services, err: %s\n", err) |
7 | os.Exit(1) |
8 | } |
9 | for _, s := range ss { |
10 | // Discovery characteristics |
11 | cs, err := p.DiscoverCharacteristics(nil, s) |
12 | if err != nil { |
13 | klog.Errorf("Failed to discover characteristics for service %s, err: %v\n", s.Name(), err) |
14 | continue |
15 | } |
16 | actionmanager.CharacteristicsList = append(actionmanager.CharacteristicsList, cs...) |
17 | } |
18 | DeviceConnected <- true |
19 | for { |
20 | newWatcher := &Watcher{} |
21 | if !reflect.DeepEqual(w, newWatcher) { |
22 | err := w.EquateTwinValue(deviceID) |
23 | if err != nil { |
24 | klog.Errorf("Error in watcher functionality: %s", err) |
25 | } |
26 | } |
27 | } |
28 | } |
在成功建立连接后,扫描设备所有的service,并将特征值存储到动作管理器中的特征列表,这个特征值包含了UUID。并且如果watcher非空,会运行EquateTwinValue函数,将设备实际状态与期望状态同步,并同步给云端:
1 | //EquateTwinValue is responsible for equating the actual state of the device to the expected state that has been set and syncing back the result to the cloud |
2 | func (w *Watcher) EquateTwinValue(deviceID string) error { |
3 | //1-1、构建设备twin更新消息 |
4 | var updateMessage helper.DeviceTwinUpdate |
5 | updatedActualValues := make(map[string]string) |
6 | //1-2、Wg+1,会在TwinSubscribe中完成 |
7 | helper.Wg.Add(1) |
8 | klog.Infof("Watching on the device twin values for device with deviceID: %s", deviceID) |
9 | //1-3、订阅twin的消息,topic = $hw/events/device/<deviceID>/twin/get/result,接收到后将消息放到helper.TwinResult,将每一个twin放到helper.TwinAttributes数组 |
10 | go helper.TwinSubscribe(deviceID) |
11 | //1-4、将updateMessage通过topic = $hw/events/device/<deviceID>/twin/get发布 |
12 | helper.GetTwin(updateMessage, deviceID) |
13 | //1-5、等待订阅到twin消息 |
14 | helper.Wg.Wait() |
15 | twinUpdated := false |
16 | //2、对于初始化的watcher中的twinAttribute |
17 | for _, twinAttribute := range w.DeviceTwinAttributes { |
18 | //2-1、如果包含在1-3中接收到的twin消息中 |
19 | if helper.TwinResult.Twin[twinAttribute.Name] != nil { |
20 | //2-1-1、如果期望非空实际为空或者期望值与实际值不符 |
21 | if helper.TwinResult.Twin[twinAttribute.Name].Expected != nil && ((helper.TwinResult.Twin[twinAttribute.Name].Actual == nil) && helper.TwinResult.Twin[twinAttribute.Name].Expected != nil || (*helper.TwinResult.Twin[twinAttribute.Name].Expected.Value != *helper.TwinResult.Twin[twinAttribute.Name].Actual.Value)) { |
22 | klog.Infof("%s Expected Value : %s", twinAttribute.Name, *helper.TwinResult.Twin[twinAttribute.Name].Expected.Value) |
23 | if helper.TwinResult.Twin[twinAttribute.Name].Actual == nil { |
24 | klog.Infof("%s Actual Value: %v", twinAttribute.Name, helper.TwinResult.Twin[twinAttribute.Name].Actual) |
25 | } else { |
26 | klog.Infof("%s Actual Value: %s", twinAttribute.Name, *helper.TwinResult.Twin[twinAttribute.Name].Actual.Value) |
27 | } |
28 | klog.Infof("Equating the actual value to expected value for: %s", twinAttribute.Name) |
29 | //2-1-1-1、对应的每一个动作,先判断动作管理器中是否存在该动作,再找到符合期望的写入值作为动作的值,执行这一动作 |
30 | for _, watcherAction := range twinAttribute.Actions { |
31 | actionExists := false |
32 | for _, action := range actionManager { |
33 | if strings.EqualFold(action.Name, watcherAction) { |
34 | actionExists = true |
35 | for _, converterAttribute := range dataConverter.DataWrite.Attributes { |
36 | if strings.EqualFold(converterAttribute.Name, twinAttribute.Name) { |
37 | for operationName, dataMap := range converterAttribute.Operations { |
38 | if action.Name == operationName { |
39 | expectedValue := helper.TwinResult.Twin[twinAttribute.Name].Expected.Value |
40 | if _, ok := dataMap.DataMapping[*expectedValue]; ok { |
41 | action.Operation.Value = dataMap.DataMapping[*expectedValue] |
42 | } |
43 | } |
44 | action.PerformOperation() |
45 | } |
46 | } |
47 | } |
48 | } |
49 | } |
50 | if !actionExists { |
51 | return errors.New("The action: " + watcherAction + " does not exist for this device") |
52 | } |
53 | } |
54 | //2-1-1-2、准备同步、更新 |
55 | updatedActualValues[twinAttribute.Name] = *helper.TwinResult.Twin[twinAttribute.Name].Expected.Value |
56 | twinUpdated = true |
57 | } |
58 | } else { |
59 | return errors.New("The attribute: " + twinAttribute.Name + " does not exist for this device") |
60 | } |
61 | } |
62 | //3、如果需要更新 |
63 | if twinUpdated { |
64 | //3-1、创建twin更新消息,包含twin实际值 |
65 | updateMessage = helper.CreateActualUpdateMessage(updatedActualValues) |
66 | //3-2、将twin更新消息发布到topic = $hw/events/device/<deviceID>/twin/update |
67 | helper.ChangeTwinValue(updateMessage, deviceID) |
68 | time.Sleep(2 * time.Second) |
69 | klog.Infof("Syncing to cloud.....") |
70 | //3-3、将twin更新消息发布到topic = $hw/events/device/<deviceID>/twin/cloud_updated |
71 | helper.SyncToCloud(updateMessage, deviceID) |
72 | } else { |
73 | klog.Infof("Actual values are in sync with Expected value") |
74 | } |
75 | return nil |
76 | } |
这里主要是对于期望值实际值不符的twin进行更新。
断开连接的处理函数和设备扫描不再细讲,可以参考paypal/gatt项目,至此,Watcher初始化完成。
2.4.4. 动作执行
第五步,当主函数接收到设备连接的消息后,扫描该设备的每一个动作,如果PerformImmediately字段为真,就立即执行一次。具体如下:
1 | //PerformOperation executes the operation |
2 | func (action *Action) PerformOperation(readConverter ...dataconverter.DataRead) { |
3 | klog.Infof("Performing operations associated with action: %s", action.Name) |
4 | // 1、获取要读/写的UUID |
5 | characteristic, err := FindCharacteristic(action.Operation.CharacteristicUUID) |
6 | if err != nil { |
7 | klog.Errorf("Error in finding characteristics: %s", err) |
8 | } |
9 | // 2、如果是读操作,读取寄存器里的值,如果需要转换的话则进行转换 |
10 | if strings.ToUpper(action.Operation.Action) == ActionRead { |
11 | readValue, err := ReadCharacteristic(GattPeripheral, characteristic) |
12 | if err != nil { |
13 | klog.Errorf("Error in reading characteristic: %s", err) |
14 | return |
15 | } |
16 | converted := false |
17 | for _, conversionAction := range readConverter[0].Actions { |
18 | if strings.EqualFold(conversionAction.ActionName, action.Name) { |
19 | convertedValue := fmt.Sprintf("%f", conversionAction.ConversionOperation.ConvertReadData(readValue)) |
20 | action.Operation.Value = []byte(convertedValue) |
21 | converted = true |
22 | } |
23 | } |
24 | if !converted { |
25 | action.Operation.Value = readValue |
26 | } |
27 | klog.Info("Read Successful") |
28 | // 3、如果是写操作,则将待写入的值写入到寄存器 |
29 | } else if strings.ToUpper(action.Operation.Action) == ActionWrite { |
30 | if action.Operation.Value == nil { |
31 | klog.Errorf("Please provide a value to be written") |
32 | return |
33 | } |
34 | err := WriteCharacteristic(GattPeripheral, characteristic, action.Operation.Value) |
35 | if err != nil { |
36 | klog.Errorf("Error in writing characteristic: %s", err) |
37 | return |
38 | } |
39 | klog.Info("Write Successful") |
40 | } |
41 | } |
2.4.5.
最后,调度器将需要按时执行的动作创建协程定时执行,并且在所有调度器执行完成之前不会停止(可能无限执行):
1 | // ExecuteSchedule is responsible for scheduling the operations |
2 | func (schedule *Schedule) ExecuteSchedule(actionManager []actionmanager.Action, dataConverter dataconverter.DataRead, deviceID string) { |
3 | klog.Infof("Executing schedule: %s", schedule.Name) |
4 | if schedule.OccurrenceLimit != 0 { |
5 | for iteration := 0; iteration < schedule.OccurrenceLimit; iteration++ { |
6 | schedule.performScheduleOperation(actionManager, dataConverter, deviceID) |
7 | } |
8 | } else { |
9 | for { |
10 | schedule.performScheduleOperation(actionManager, dataConverter, deviceID) |
11 | } |
12 | } |
13 | helper.ControllerWg.Done() |
14 | } |
函数中根据之前配置文件中的occurrence-limit字段判断执行次数,具体的执行函数如下:
1 | // performScheduleOperation is responsible for performing the operations associated with the schedule |
2 | func (schedule *Schedule) performScheduleOperation(actionManager []actionmanager.Action, dataConverter dataconverter.DataRead, deviceID string) { |
3 | var scheduleResult ScheduleResult |
4 | actionExists := false |
5 | for _, actionName := range schedule.Actions { |
6 | for _, action := range actionManager { |
7 | if strings.EqualFold(action.Name, actionName) { |
8 | actionExists = true |
9 | klog.Infof("Performing scheduled operation: %s", action.Name) |
10 | action.PerformOperation(dataConverter) |
11 | scheduleResult.EventName = actionName |
12 | scheduleResult.TimeStamp = time.Now().UnixNano() / 1e6 |
13 | scheduleResult.EventResult = fmt.Sprintf("%s", action.Operation.Value) |
14 | publishScheduleResult(scheduleResult, deviceID) |
15 | } |
16 | } |
17 | if schedule.Interval == 0 { |
18 | schedule.Interval = defaultEventFrequency |
19 | } |
20 | if !actionExists { |
21 | klog.Errorf("Action %s does not exist. Exiting from schedule !!!", actionName) |
22 | break |
23 | } |
24 | time.Sleep(time.Duration(time.Duration(schedule.Interval) * time.Millisecond)) |
25 | } |
26 | } |
执行函数对于调度器中的所有动作执行一次,并将结果发布到了topic = $ke/device/bluetooth-mapper/<deviceID>/scheduler/result,下面是一个结果消息的例子:{"EventName":"IRTemperatureData","TimeStamp":1590715568239,"EventResult":"511.968750"}
。
这样,BlueToothMapper各部分的分析就完成了。
3. 小结
3.1. 消息流总结
在蓝牙Mapper中,有三个主要的消息流:第一,用户可以通过发布Mqtt主题的方式,增删监视器、调度器和动作,而Controller订阅了这几个主题的mqtt消息,当收到消息后就可以根据用户的需求调用处理函数进行修改;第二,Watcher消息流,它主要监视twin消息,当twin消息的期望值与实际值不符时,执行相应的写动作调整到一致,并发布更新消息和同步消息;第三,Scheduler消息,定时执行动作,并将结果发布。
3.2. Bug总结
目前,由于BluetoothMapper很久没更新了,加之其引用的paypal/gatt包过于老旧,可能引起一些难以预料的bug,笔者就遇到了以下问题:
- 在paypal/gatt包的adv.go文件99行,由于缺少判断,可能出现
d := b[2 : 1]
的问题,如果遇到的话,需要添加对l
值的判断,防止切片错误 - 由于BluetoothMappe要通过设备的Local Name判断是否是目标设备,在连接前需要先判断设备的Local Name是否符合预期,笔者的CentOS虚拟机会自动将较长的设备Local Name剪短,导致和预期不匹配无法连接。可以通过
hcitool lescan
先找到自己的蓝牙设备,判断Local Name
3.3. 问题总结
在实践过程中,笔者也遇到了一些问题,欢迎大家一起探讨:
- 根据源码分析,对于只读类型的值,比如温度传感器监测到的温度,理论上应该在Scheduler拿到数据后发布给Eventbus,但是$ke/device/bluetooth-mapper/<deviceID>/scheduler/result这个topic及对应的消息格式似乎并没有被任何组件接收