可复制:121202538
中文社区:
tb 的 mqtt 设备协议
tb 澳门人威尼斯3966官网:
tb github:
tb 提供的体验地址:
by thingsboard team
以下内容是在原文基础上演绎的译文。除非另行注明,页面上所有内容采用知识共享-[署名(cc by 2.5 au)协议]
() 共享。
原文地址: thingsboard api 参考:
mqtt 是一种轻量级的发布 - 订阅消息传递协议,可能使其最适合各种物联网设备。您可以在此处找到有关 mqtt 的更多信息。
thingsboard 服务器节点充当 mqtt broker,支持 qos 级别 0(最多一次)和 1(至少一次)以及一组预定义主题。
您可以在 web 上找到大量 mqtt 客户端库。本文中的示例将基于,和,要设置其中一个工具。
您可以在 web 上找到大量 mqtt 客户端库。本文中的示例将基于 mosquitto,mqtt.js 和 paho,要设置其中一个工具。
默认情况下,thingsboard 支持 json 中的键值内容。key 始终是一个字符串,而 value 可以是 string,boolean,double 或 long。也可以使用自定义二进制格式或某些序列化框架。有关详细信息,请参阅物模型。例如:
{"stringkey":"value1", "booleankey":true, "doublekey":42.0, "longkey":73}
为了将遥测数据发布到 thingsboard 服务器节点,请将 publish 消息发送到以下主题:
v1/devices/me/telemetry
最简单的支持数据格式是:
{"key1":"value1", "key2":"value2"}
要么
[{"key1":"value1"}, {"key2":"value2"}]
请注意,在这种情况下,服务器端时间戳将分配给上传的数据!
如果您的设备能够获取客户端时间戳,您可以使用以下格式:
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
在上面的示例中,我们假设 “1451649600512” 是具有毫秒精度的 unix 时间戳。例如,值'1451649600512'对应于'fri,2016年1月1日12:00:00.512 gmt'
thingsboard 属性 api 允许设备
将客户端设备属性上载到服务器。
将属性更新发布到服务器
要将客户端设备属性发布到 thingsboard 服务器节点,请将 publish 消息发送到以下主题:
v1/devices/me/attributes
因为 thingsboard 最新 release,是基于微服务架构,不利用单独理解代码。
thingsboard 源代码:
本文基于上面源代码后,剔除相关的安全验证和处理之后搭建简易的讲解项目:
因为 thingsboard 是一个 jvm 技术栈的 paas 平台,所以使用的是基于 java 通讯框架的 netty,如果有对 netty 不太熟悉的同学,可以参考我之前搭建的 netty 实践学习案例:
├── iot-guide-mqtt.iml
├── pom.xml
└── src
└── main
└── java
└── com
└── sanshengshui
└── mqtt
├── adapter
│ └── jsonmqttadaptor.java // mqtt json转换器,在跟thingsboard学习iot-物模型有所讲解
├── iotmqttserver.java // mqtt服务
├── mqtttopicmatcher.java
├── mqtttopics.java
├── mqtttransporthandler.java //mqtt处理类
└── mqtttransportserverinitializer.java
iotmqttserver
private static final int port = 1884;
private static final string leakdetectorlevel = "disabled";
private static final integer bossgroupthreadcount = 1;
private static final integer workergroupthreadcount = 12;
private static final integer maxpayloadsize = 65536;
public static void main(string[] args) throws exception {
resourceleakdetector.setlevel(resourceleakdetector.level.valueof(leakdetectorlevel.touppercase()));
eventloopgroup bossgroup = new nioeventloopgroup(bossgroupthreadcount);
eventloopgroup workergroup = new nioeventloopgroup(workergroupthreadcount);
try {
serverbootstrap b = new serverbootstrap();
b.group(bossgroup,workergroup)
.channel(nioserversocketchannel.class)
.handler(new logginghandler(loglevel.info))
.childhandler(new mqtttransportserverinitializer(maxpayloadsize));
channelfuture f = b.bind(port);
f.channel().closefuture().sync();
} finally {
bossgroup.shutdowngracefully();
workergroup.shutdowngracefully();
}
}
第 8 行,设置服务端 netty 内存读写泄漏级别,缺省条件下为:disabled
第 10 行和第 11 行,设置 boss 线程组和 work 线程组的线程数量。默认情况下,boss 线程组的线程数量为 1,work 线程组的数量为运行服务机器内核数量的 2 倍。
第 15 行,通过创建 serverbootstrap 对象,在第 16 行设置使用 eventloopgroup。
在 17 和 19 行,设置要被实例化的 nioserversockerchannel 类,并设置最大的负载内容数量。
最后我们通过 shutdowgracefully() 函数优雅的关闭 bossgroup 和 workgroup。
mqtttransporthandler#processmqttmsg()
private void processmqttmsg(channelhandlercontext ctx, mqttmessage msg) {
address = (inetsocketaddress) ctx.channel().remoteaddress();
if (msg.fixedheader() == null) {
processdisconnect(ctx);
return;
}
switch (msg.fixedheader().messagetype()) {
case connect:
processconnect(ctx, (mqttconnectmessage) msg);
break;
case publish:
processpublish(ctx, (mqttpublishmessage) msg);
break;
case subscribe:
processsubscribe(ctx, (mqttsubscribemessage) msg);
break;
case unsubscribe:
processunsubscribe(ctx, (mqttunsubscribemessage) msg);
break;
case pingreq:
if (checkconnected(ctx)) {
ctx.writeandflush(new mqttmessage(new mqttfixedheader(pingresp,false,at_most_once, false, 0)));
}
break;
case disconnect:
if (checkconnected(ctx)) {
processdisconnect(ctx);
}
break;
default:
break;
}
}
第3行,通过判断消息的固定头部是否为空,如果空;则通过 processdisconnect(ctx) 将设备连接关闭。
processdisconnect(channelhandlercontext ctx)
private void processdisconnect(channelhandlercontext ctx) {
ctx.close(); // 关闭socket通道
}
第 8 行,通过判断固定头部的 mqtt 消息类型,针对不同消息做相应的处理。
mqtttransporthandler#publishdevicepublish
以下是对发布消息进行相关的解读,更多消息类型的处理类,大家请参考我上面的 iot-guide-mqtt 进行阅读。
private void processdevicepublish(channelhandlercontext ctx, mqttpublishmessage mqttmsg, string topicname, int msgid) {
try {
if (topicname.equals(mqtttopics.device_telemetry_topic)) { //如果主题为v1/devices/me/attributes
jsonmqttadaptor.converttomsg(post_telemetry_request, mqttmsg);
} else if(topicname.equals(device_attributes_topic)) {
jsonmqttadaptor.converttomsg(post_attributes_request, mqttmsg);
} else if(topicname.equals(mqtttopics.device_attributes_request_topic_prefix)) {
jsonmqttadaptor.converttomsg(get_attributes_request, mqttmsg);
}
} catch (adaptorexception e) {
}
}
我上面的代码仅是对消息的主题进行判断,然后对主题内的内容进行物模型的解析,得到相关属性或者遥测数据的获得。
我们通过 paho 或者 mqtt.js 和服务进行连接,发布消息到以下主题:
v1/devices/me/telemetry
简易的数据格式如下:
{"key1":"value1", "key2":"value2"}
paho 图示:
服务器控制台打印数据:
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.logginghandler channelregistered
信息: [id: 0xf2bfb3a8] registered
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.logginghandler bind
信息: [id: 0xf2bfb3a8] bind: 0.0.0.0/0.0.0.0:1884
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.logginghandler channelactive
信息: [id: 0xf2bfb3a8, l:/0:0:0:0:0:0:0:0:1884] active
七月 24, 2019 1:37:22 下午 io.netty.handler.logging.logginghandler channelread
信息: [id: 0xf2bfb3a8, l:/0:0:0:0:0:0:0:0:1884] received: [id: 0xe08abd12, l:/127.0.0.1:1884 - r:/127.0.0.1:48816]
key= 1563946708305
属性名=temperature 属性值=38
属性名=humidity 属性值=60
如上所示,希望大家对 thingsboard 的 iot 架构-mqtt 设备协议这块有所了解!
本文作者: 穆书伟
本文链接: 物联网时代
澳门人威尼斯3966的版权声明: 本博客所有文章除特别声明外,均采用 by-nc-sa 许可协议。转载请注明出处!