本文从概念到实战,以一个假想产品——”电子货架标签“(Electronic Shelf Label,以下简称ESL)为例,介绍基于阿里云IoT的物联网应用开发。

数据交互流程

以云端下发命令到最终收到应答为例(虚线表示异步):

  • LoRaWAN:ESL所采用的通讯协议;
  • LoRaWAN NS:LoRaWAN网络的中枢大脑,控制通讯参数、实现QoS、节点入网和迁移、数据加解密等。
  • MQTT:基于Pub/Sub范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计。
  • Link WAN:阿里云物联网络管理平台,可用它快速组建LoRaWAN网络;简单地说,它主要扮演了LoRaWAN NS的角色;
  • AliIoT:阿里云物联网平台,基于MQTT。处理设备层和业务层的数据交互;
  • AMQP:消息队列,设备异步应答返回的消息通过此消息队列传递到云端。(广义上说,AMQP是一个协议,RabbitMQ就是该协议的一个实现)

ESL和LoRa网关是通过LoRa协议通信,LoRa可以看做是物理层面的信息调制协议或通讯协议,没有TCP的概念。

注意,MQTT并不局限于LoRaWAN场景,阿里云也在平台上将二者作了不同入口,前者对应AliIoT,后者对应Link WAN。初次接触不免困惑(这也是阿里云一贯的作风),其实背后就是这个关系。我们可以设备直连AliIoT做IoT应用开发(参看10分钟物联网设备接入阿里云IoT平台);如果是LoRaWAN系统,也可以同时借助 Link WAN 做LoRaWAN的网络管理。

网关要接入Link WAN,需要移植阿里云提供的SDK到网关与通信模组上,并且购买Link WAN密钥安装,并登录阿里云物联网络管理平台控制台添加网关。云端开发人员只要关注AliIoT、AMQP及业务层即可。

AliIoT控制台准备

  1. 公共实例-》创建产品。产品名称“电子货架标签”;节点类型表示该产品下设备的类型,选择直连设备(LoRa有IP的概念?),然后连网方式选择LoRaWAN;因为ESL设备收发的数据为未编码的字节数组,数据格式选择透传/自定义,后续需要提供数据解析脚本,将上行的自定义格式的数据转换为Alink JSON格式,将下行的Alink JSON格式数据解析为设备自定义格式,设备才能与云端进行通信。产品创建完毕获得ProductKey。

  2. 管理产品-》功能定义,即定义所谓的物模型。功能分为属性、服务、事件三种类型(同定义一个类一样,有属性、方法、事件)。一个产品可以定义多个物模型,即一个产品下面可以有提供不同功能的多种设备。这里我们为ESL定义——

    • 属性:shelfNo,所属货架,数据类型text。示例A.05.02,A区5排2号货架;
    • 服务:show,显示货品名称和对应价格,入参有productName:text,price:float,调用方式选择异步;
    • 事件:heart,心跳,我们可以定义一些输出参数如电池电量batteryLevel:int32,固件版本firmwareVersion:text,如此每次回报时这些信息也传给云端。

    这样,云端就可以下发查询电池电量和设置货品名称和对应价格的两种命令,同时也可以被动接收设备返回的心跳消息。当然,物模型只是定义了接口,具体实现需要设备端和云端共同完成。

    物模型中服务调用方式可设置同步或者异步。同步方式:物联网平台直接使用RRPC同步方式下行推送请求,设备返回RRPC响应消息。RRPC使用详情,请参见什么是RRPC。异步方式:物联网平台采用异步方式下行推送请求,设备采用异步方式返回结果。

  3. 管理产品-》数据解析。上面说到,设备和云端的交互数据需要中间的解析(序列化/反序列化)过程(发生在上图第1步之后和第4步之前)。以JavaScript脚本为例:

    var ALINK_EVENT_HEART_POST_METHOD = 'thing.event.heart.post'; //与云端绑定的topic相关,下同。设备心跳包上报
    var ALINK_EVENT_ACK_POST_METHOD = 'thing.event.ack.post'; //设备服务应答上报
    var ALINK_PROP_REPORT_METHOD = 'thing.event.property.post'; //设备属性上报
    var ALINK_PROP_SET_METHOD = 'thing.service.property.set'; //云端下发属性控制指令到设备端。
    var ALINK_PROP_SET_REPLY_METHOD = 'thing.service.property.set'; //设备上报属性设置的结果到云端。
    var ALINK_SERVICE_SHOW_METHOD = 'thing.service.show'; //云端调用设备show服务
    
    /**
     *  将Alink协议的数据转换为设备能识别的格式数据,物联网平台给设备下发数据时调用
     *  入参:jsonObj,对象,不能为空。
     *  出参:rawData,byte[]数组,不能为空。
     * 
     * 示例数据:
     * 云端下发属性设置指令:
     * 传入参数:
     *     {"method":"thing.service.property.set","id":"12345","version":"1.0","params":{"shelfNo":"A.05.02"}}
     * 注意:云端只下发{"shelfNo":"A.05.02"},其余结构是AliIoT封装的。
     */
    function protocolToRawData(jsonObj) {
        var method = jsonObj['method'];
        var params = json['params'];
        //按照自定义协议格式拼接 rawData
        var rawdata = [0x5d, 0x64, 0x00];
        if (method == ALINK_PROP_SET_METHOD) { //设置属性
            rawdata = rawdata.concat(textToByteArray(params['shelfNo']));
        } else if (method == ALINK_SERVICE_SHOW_METHOD) { //调用服务
            var productName = params['productName'];
            var price = params['price'];
            rawdata = rawdata.concat(textToByteArray(productName));
            rawdata = rawdata.concat(floatToByteArray(price));
        }
    
        //other commands ...
    
        return rawdata;
    }
    
    /**
     * 将设备的自定义格式数据转换为Alink协议的数据,设备上报数据到物联网平台时调用。
     * 入参:rawData,byte[]数组,不能为空。
     * 出参:jsonObj,对象,不能为空。
     * 
     * 示例数据:
     * 设备心跳上报:
     * 传入参数:
     *     0xFF1020010005
     * 输出结果:
     *     {"method":"thing.event.heart.post","id":"12345678","params":{"batteryLevel":32,"firmwareVersion":"1.0.5"},"version":"1.0"}
     */
    function rawDataToProtocol(rawData) {
        var uint8Array = new Uint8Array(rawData.length);
        for (var i = 0; i < bytes.length; i++) {
            uint8Array[i] = bytes[i] & 0xff;
        }
        var dataView = new DataView(uint8Array.buffer, 0);
        var jsonObj = new Object();
        var params = {};
        
        var head = uint8Array.slice(0, 2).join(); //自定义协议包头
        if (head[0] == 0xFF && head[1] == 0x10) {
            params['batteryLevel'] = dataView.getInt8(2);
            params['firmwareVersion'] = `${dataView.getInt8(3)}.${dataView.getInt8(4)}.${dataView.getInt8(5)}`;
            jsonObj['method'] = ALINK_EVENT_HEART_POST_METHOD;
        } else {
            //其它数据包转换
        }
        
        jsonObj['version'] = '1.0'; //ALink JSON格式,协议版本号固定字段。
        jsonObj['id'] = '12345678' //ALink JSON格式,标示该次请求id值。
        jsonObj['params'] = params;
    
        return jsonObj;
    }
    
    /**
     * 处理自定义Topic,本示例不涉及
     */
    function transformPayload(topic, rawData) {
        var jsonObj = {}
        return jsonObj;
    }
    

    数据解析的前提之一是设备收发的数据格式要确定好。

    设备端发布/订阅的topic(阿里云控制台的产品Topic类列表中设置)和云端处理的topic不一样,云端处理的topic是由Alink协议定义的,和method值有关。详见Alink协议

    上述脚本将业务数据和字节数组进行了转换,若是担心数据协议外泄[给阿里云?],这部分工作也可以放在云端,脚本文件只用来进行字节数组的转发(这种情况下,物模型所有功能的出参入参都只需要一个,数据格式为int32array)。

  4. 管理产品-》服务端订阅。创建AMQP订阅,AMQP会将消息推送给列表中的所有消费组,一个消费组可看做是一个消息队列,云端作为客户端连接某队列得到设备上报消息。我们新建名称为“电子货架标签-Q1”的消费组,得到一串自动生成的消费组ID。

云端开发

以Java/Kotlin为例,先引入SDK:

//下发命令依赖
implementation("com.aliyun:aliyun-java-sdk-core:4.5.22")
implementation("com.aliyun:aliyun-java-sdk-iot:7.27.0")
//获取应答依赖
implementation("org.apache.qpid:qpid-jms-client:0.59.0")
implementation("commons-codec:commons-codec:1.15")

下发show命令:

@Service
class AliIoTDemo {
    @Autowired
    lateinit var config: AliIoTConfig

    private lateinit var client: IAcsClient

    @PostConstruct
    fun init() {
        val profile =
            DefaultProfile.getProfile(config.regionId, config.accessKeyId, config.accessKeySecret)
        client = DefaultAcsClient(profile)
    }

    /**
    * loraId: 设备编号,对应AliIoT的DeviceName
    */
    fun show(loraId: String) {
        val gson = GsonInstance.get()
        val jo = JsonObject()    
        jo.addProperty("productName", "康师傅方便面")
        jo.addProperty("price", 3.50)

        val request = InvokeThingServiceRequest().apply {
            productKey = config.productKey //创建物联网产品时得到ProductKey
            deviceName = loraId
            identifier = "show" //物模型定义的服务名称
            args = gson.toJson(jo) //{"productName": "康师傅方便面", "price": 3.50}
        }

        client.doAction(request)
    }
}

代码中的client.doAction是无法得到应答的,所以我们还要写一个AMQP客户端去异步获得应答消息,具体参看官方示例Java SDK接入示例 - 阿里云物联网平台

多条异步命令顺序执行

如果一个事务只要下发一条命令,那就等着拿结果就好了;但是有多条异步命令需要顺序执行的话,就稍微有点麻烦了,我们要考虑上下文的挂起和恢复、超时取消等机制。以下为简单示例:

//保存各事务对应的等待发送的命令队列,命令一旦发送则须从队列中移除
//key为设备编号,二元组第一项表示事务开始时间,用于超时判断
private val cmdSetMap = ConcurrentHashMap<String, Pair<Long, Queue<InvokeThingServiceRequest>>>()

internal fun putInvokeThingServiceRequest(deviceNo: String, requests: Queue<InvokeThingServiceRequest>) {
    //同样设备之前的命令不再执行,移除
    cmdSetMap.remove(deviceNo)
    if (requests.size == 1) { //只有一条命令则直接发送
        client.doAction(requests.poll())
    } else {
        val request = requests.poll() //先发送第一条
        cmdSetMap[deviceNo] = Pair(System.currentTimeMillis(), requests)  //其余的存入待发送列表
        client.doAction(request)
    }
}

//...

{
//应答消息抵达后,若应答OK则执行下一条命令
    val request = cmdSetMap[deviceName]!!.second.poll()
    try { 
        client.doAction(request)
    } catch (ex: Exception) {
        logger.error(ex)
        // 发生错误 通知客户端
    }
    if (cmdSetMap[deviceName]!!.second.size == 0) cmdSetMap.remove(deviceName)
}

//每分钟清理过时事务
@Scheduled(cron = "0 * * * * *")
fun removeTimeoutCmd() {
    //...
}

在语言层面,不管是以前的回调地狱还是后来兴起的async/await、suspend、Promise等,都能处理这种场景。本质上,异步回调是指令寻址、变量出入栈的过程,有时还涉及到线程上下文的切换,各种语言/框架都帮我们考虑并且做了,我们只要按照既定语法编写业务代码即可。

为什么业务端不能直接订阅对应的topic呢,这样不就能直接拿到数据了吗?AliIoT似乎也没有提供业务层直接订阅 AliIoT topic 的入口。不过MQTT协议是基于PUB/SUB的异步通信模式,就算业务端能直接接收到应答,也要处理应答消息转发到对应的上下文、上下文挂起恢复等问题。