thingsboard 之 mqtt 协议介绍 · 物联网平台-威尼斯人最新

thingskit · 2020年03月03日 · 最后由 回复于 2022年08月19日 · 583 次阅读
本帖已被设为精华帖!

可复制:121202538

中文社区:

tb 的 mqtt 设备协议
tb 澳门人威尼斯3966官网:

tb github:

tb 提供的体验地址:

by thingsboard team

以下内容是在原文基础上演绎的译文。除非另行注明,页面上所有内容采用知识共享-[署名(cc by 2.5 au)协议]
() 共享。

原文地址: thingsboard api 参考:

视频演示:

mqtt 基础知识

mqtt 是一种轻量级的发布 - 订阅消息传递协议,可能使其最适合各种物联网设备。您可以在此处找到有关 mqtt 的更多信息。
thingsboard 服务器节点充当 mqtt broker,支持 qos 级别 0(最多一次)和 1(至少一次)以及一组预定义主题。

客户端库设置

您可以在 web 上找到大量 mqtt 客户端库。本文中的示例将基于,和,要设置其中一个工具。

客户端库设置

您可以在 web 上找到大量 mqtt 客户端库。本文中的示例将基于 mosquitto,mqtt.js 和 paho,要设置其中一个工具。

键值格式

默认情况下,thingsboard 支持 json 中的键值内容。key 始终是一个字符串,而 value 可以是 string,boolean,double 或 long。也可以使用自定义二进制格式或某些序列化框架。有关详细信息,请参阅物模型。例如:

{"stringkey":"value1", "booleankey":true, "doublekey":42.0, "longkey":73}

遥测上传 api

为了将遥测数据发布到 thingsboard 服务器节点,请将 publish 消息发送到以下主题:

v1/devices/me/telemetry

最简单的支持数据格式是:

{"key1":"value1", "key2":"value2"}

要么

[{"key1":"value1"}, {"key2":"value2"}]

请注意,在这种情况下,服务器端时间戳将分配给上传的数据!
如果您的设备能够获取客户端时间戳,您可以使用以下格式:

{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}

在上面的示例中,我们假设 “1451649600512” 是具有毫秒精度的 unix 时间戳。例如,值'1451649600512'对应于'fri,2016年1月1日12:00:00.512 gmt'

属性 api

thingsboard 属性 api 允许设备
将客户端设备属性上载到服务器。
将属性更新发布到服务器
要将客户端设备属性发布到 thingsboard 服务器节点,请将 publish 消息发送到以下主题:

v1/devices/me/attributes

thingsboard 的 mqtt 传输协议架构

因为 thingsboard 最新 release,是基于微服务架构,不利用单独理解代码。
thingsboard 源代码:
本文基于上面源代码后,剔除相关的安全验证和处理之后搭建简易的讲解项目:

mqtt 框架

因为 thingsboard 是一个 jvm 技术栈的 paas 平台,所以使用的是基于 java 通讯框架的 netty,如果有对 netty 不太熟悉的同学,可以参考我之前搭建的 netty 实践学习案例:

项目结构

├── iot-guide-mqtt.iml
├── pom.xml
└── src
    └── main
        └── java
            └── com
                └── sanshengshui
                    └── mqtt
                        ├── adapter
                        │   └── jsonmqttadaptor.java // mqtt json转换器,在跟thingsboard学习iot-物模型有所讲解
                        ├── iotmqttserver.java // mqtt服务
                        ├── mqtttopicmatcher.java
                        ├── mqtttopics.java 
                        ├── mqtttransporthandler.java //mqtt处理类
                        └── mqtttransportserverinitializer.java

项目代码讲解

iotmqttserver

private static final int port = 1884;
   private static final string leakdetectorlevel = "disabled";
   private static final integer bossgroupthreadcount = 1;
   private static final integer workergroupthreadcount = 12;
   private static final integer maxpayloadsize = 65536;
   public static void main(string[] args) throws exception {
       resourceleakdetector.setlevel(resourceleakdetector.level.valueof(leakdetectorlevel.touppercase()));
       eventloopgroup bossgroup = new nioeventloopgroup(bossgroupthreadcount);
       eventloopgroup workergroup = new nioeventloopgroup(workergroupthreadcount);
       try {
           serverbootstrap b = new serverbootstrap();
           b.group(bossgroup,workergroup)
                   .channel(nioserversocketchannel.class)
                   .handler(new logginghandler(loglevel.info))
                   .childhandler(new mqtttransportserverinitializer(maxpayloadsize));
           channelfuture f = b.bind(port);
           f.channel().closefuture().sync();
       } finally {
           bossgroup.shutdowngracefully();
           workergroup.shutdowngracefully();
       }
   }

第 8 行,设置服务端 netty 内存读写泄漏级别,缺省条件下为:disabled

第 10 行和第 11 行,设置 boss 线程组和 work 线程组的线程数量。默认情况下,boss 线程组的线程数量为 1,work 线程组的数量为运行服务机器内核数量的 2 倍。

第 15 行,通过创建 serverbootstrap 对象,在第 16 行设置使用 eventloopgroup。

在 17 和 19 行,设置要被实例化的 nioserversockerchannel 类,并设置最大的负载内容数量。

最后我们通过 shutdowgracefully() 函数优雅的关闭 bossgroup 和 workgroup。

mqtttransporthandler#processmqttmsg()

private void processmqttmsg(channelhandlercontext ctx, mqttmessage msg) {
        address = (inetsocketaddress) ctx.channel().remoteaddress();
        if (msg.fixedheader() == null) {
            processdisconnect(ctx);
            return;
        }
        switch (msg.fixedheader().messagetype()) {
            case connect:
                processconnect(ctx, (mqttconnectmessage) msg);
                break;
            case publish:
                processpublish(ctx, (mqttpublishmessage) msg);
                break;
            case subscribe:
                processsubscribe(ctx, (mqttsubscribemessage) msg);
                break;
            case unsubscribe:
                processunsubscribe(ctx, (mqttunsubscribemessage) msg);
                break;
            case pingreq:
                if (checkconnected(ctx)) {
                    ctx.writeandflush(new mqttmessage(new mqttfixedheader(pingresp,false,at_most_once, false, 0)));
                }
                break;
            case disconnect:
                if (checkconnected(ctx)) {
                    processdisconnect(ctx);
                }
                break;
            default:
                break;
        }
    }

第3行,通过判断消息的固定头部是否为空,如果空;则通过 processdisconnect(ctx) 将设备连接关闭。

processdisconnect(channelhandlercontext ctx)
private void processdisconnect(channelhandlercontext ctx) {
        ctx.close();  // 关闭socket通道
    }

第 8 行,通过判断固定头部的 mqtt 消息类型,针对不同消息做相应的处理。
mqtttransporthandler#publishdevicepublish
以下是对发布消息进行相关的解读,更多消息类型的处理类,大家请参考我上面的 iot-guide-mqtt 进行阅读。

private void processdevicepublish(channelhandlercontext ctx, mqttpublishmessage mqttmsg, string topicname, int msgid) {
        try {
            if (topicname.equals(mqtttopics.device_telemetry_topic)) { //如果主题为v1/devices/me/attributes
                jsonmqttadaptor.converttomsg(post_telemetry_request, mqttmsg);
            } else if(topicname.equals(device_attributes_topic)) {
                jsonmqttadaptor.converttomsg(post_attributes_request, mqttmsg);
            } else if(topicname.equals(mqtttopics.device_attributes_request_topic_prefix)) {
                jsonmqttadaptor.converttomsg(get_attributes_request, mqttmsg);
            }
        } catch (adaptorexception e) {
        }
    }

我上面的代码仅是对消息的主题进行判断,然后对主题内的内容进行物模型的解析,得到相关属性或者遥测数据的获得。

演示效果

我们通过 paho 或者 mqtt.js 和服务进行连接,发布消息到以下主题:

v1/devices/me/telemetry

简易的数据格式如下:

{"key1":"value1", "key2":"value2"}

paho 图示:

服务器控制台打印数据:

七月 24, 2019 1:37:18 下午 io.netty.handler.logging.logginghandler channelregistered
信息: [id: 0xf2bfb3a8] registered
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.logginghandler bind
信息: [id: 0xf2bfb3a8] bind: 0.0.0.0/0.0.0.0:1884
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.logginghandler channelactive
信息: [id: 0xf2bfb3a8, l:/0:0:0:0:0:0:0:0:1884] active
七月 24, 2019 1:37:22 下午 io.netty.handler.logging.logginghandler channelread
信息: [id: 0xf2bfb3a8, l:/0:0:0:0:0:0:0:0:1884] received: [id: 0xe08abd12, l:/127.0.0.1:1884 - r:/127.0.0.1:48816]
key= 1563946708305
属性名=temperature 属性值=38
属性名=humidity 属性值=60

如上所示,希望大家对 thingsboard 的 iot 架构-mqtt 设备协议这块有所了解!

本文作者: 穆书伟
本文链接: 物联网时代
澳门人威尼斯3966的版权声明: 本博客所有文章除特别声明外,均采用 by-nc-sa 许可协议。转载请注明出处!

thingskit 将本帖设为了精华贴 03月03日 03:46
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册
网站地图