thingsboard 话题讨论区:https://www.iotschool.com/topics/node8
欢迎大家加入 thingsboard 二次开发讨论群:121202538
需要接收设备的 mqtt 连接,那么 thingsboard 中必然有 mqtt 服务器,mqtt 服务器创建的类是mqtttransportservice
;
基于 netty 的 mqtt server,添加了mqtttransportserverinitializer
的处理类,并向channelpipeline
添加了 netty 的mqttdecoder
和mqttencoder
让我们可以忽略 mqtt 消息的编解码工作,重要的是添加了mqtttransporthandler
;
此例中,我们首先需要创建租户,租户管理员,并添加设备,使用 mqtt box 模拟硬件设备,拷贝 access token 做为 mqtt box 的 username 开始连接我们的 thingsboard 后台
如果图片看不清楚,请点击:
由于没有使用 ssl,收到连接请求以后,便会调用
private void processauthtokenconnect(channelhandlercontext ctx, mqttconnectmessage msg) {
string username = msg.payload().username();
log.info("[{}] processing connect msg for client with user name: {}!", sessionid, username);
if (stringutils.isempty(username)) {
ctx.writeandflush(createmqttconnackmsg(connection_refused_bad_user_name_or_password));
ctx.close();
} else {
//取出username,构造protobuf的类(方便传输与解析),交给transportservice处理。此时会使用到源码解析第三篇defaulttransportservice的解析的相关信息了解process的处理。参阅下方①的详细解析。
transportservice.process(validatedevicetokenrequestmsg.newbuilder().settoken(username).build(),
new transportservicecallback<validatedevicecredentialsresponsemsg>() {
@override
public void onsuccess(validatedevicecredentialsresponsemsg msg) {
onvalidatedeviceresponse(msg, ctx);
}
@override
public void onerror(throwable e) {
log.trace("[{}] failed to process credentials: {}", address, username, e);
ctx.writeandflush(createmqttconnackmsg(mqttconnectreturncode.connection_refused_server_unavailable));
ctx.close();
}
});
}
}
defaulttransportservice
的process
方法构造了异步任务,成功调用onsuccess
的consumer
,失败调用onfailure
的consumer
;
将验证用户的任务交由transportapirequesttemplate.send
public listenablefuture<response> send(request request) {
if (ticksize > maxpendingrequests) {
return futures.immediatefailedfuture(new runtimeexception("pending request map is full!"));
}
uuid requestid = uuid.randomuuid();
request.getheaders().put(request_id_header, uuidtobytes(requestid));
//由第三篇文章的分析得出,此topic时tb_transport.api.responses.localhostname
request.getheaders().put(response_topic_header, stringtobytes(responsetemplate.gettopic()));
request.getheaders().put(request_time, longtobytes(system.currenttimemillis()));
//参阅第一篇基础知识的介绍,来自谷歌的库,settablefuture,可设置结果的完成
settablefuture<response> future = settablefuture.create();
responsemetadata<response> responsemetadata = new responsemetadata<>(tickts maxrequesttimeout, future);
//将future放到pendingrequests中②
pendingrequests.putifabsent(requestid, responsemetadata);
log.trace("[{}] sending request, key [{}], exptime [{}]", requestid, request.getkey(), responsemetadata.exptime);
//将消息发送给消息队列topic是tb_transport.api.requests
requesttemplate.send(topicpartitioninfo.builder().topic(requesttemplate.getdefaulttopic()).build(), request, new tbqueuecallback() {
@override
public void onsuccess(tbqueuemsgmetadata metadata) {
log.trace("[{}] request sent: {}", requestid, metadata);
}
@override
public void onfailure(throwable t) {
pendingrequests.remove(requestid);
future.setexception(t);
}
});
return future;
}
tbcoretransportapiservice
的分析,我们发现defaulttbqueueresponsetemplate
的成员变量requesttemplate
即consumer
刚好是订阅的 tb_transport.api.requests 的消息:......
requests.foreach(request -> {
long currenttime = system.currenttimemillis();
long requesttime = bytestolong(request.getheaders().get(request_time));
if (requesttime requesttimeout >= currenttime) {
byte[] requestidheader = request.getheaders().get(request_id_header);
if (requestidheader == null) {
log.error("[{}] missing requestid in header", request);
return;
}
//获取response的topic,可以做到消息从哪来,处理好以后回哪里去,此时的topic是tb_transport.api.responses.localhostname
byte[] responsetopicheader = request.getheaders().get(response_topic_header);
if (responsetopicheader == null) {
log.error("[{}] missing response topic in header", request);
return;
}
uuid requestid = bytestouuid(requestidheader);
string responsetopic = bytestostring(responsetopicheader);
try {
pendingrequestcount.getandincrement();
//调用handler进行处理消息
asynccallbacktemplate.withcallbackandtimeout(handler.handle(request),
response -> {
pendingrequestcount.decrementandget();
response.getheaders().put(request_id_header, uuidtobytes(requestid));
//handler.hande处理的结果返回给发送方topic是tb_transport.api.responses.localhostname
responsetemplate.send(topicpartitioninfo.builder().topic(responsetopic).build(), response, null);
},
e -> {
pendingrequestcount.decrementandget();
if (e.getcause() != null && e.getcause() instanceof timeoutexception) {
log.warn("[{}] timeout to process the request: {}", requestid, request, e);
} else {
log.trace("[{}] failed to process the request: {}", requestid, request, e);
}
},
requesttimeout,
timeoutexecutor,
callbackexecutor);
.......
@override
public listenablefuture<tbprotoqueuemsg<transportapiresponsemsg>> handle(tbprotoqueuemsg<transportapirequestmsg> tbprotoqueuemsg) {
transportapirequestmsg transportapirequestmsg = tbprotoqueuemsg.getvalue();
// protobuf构造的类中判定是否包含需要验证的信息块
if (transportapirequestmsg.hasvalidatetokenrequestmsg()) {
validatedevicetokenrequestmsg msg = transportapirequestmsg.getvalidatetokenrequestmsg();
//调用validatecredentials,具体内容就是查询deviceinfo,并将结果交由第二个function进行进一步处理
return futures.transform(validatecredentials(msg.gettoken(), devicecredentialstype.access_token), value -> new tbprotoqueuemsg<>(tbprotoqueuemsg.getkey(), value, tbprotoqueuemsg.getheaders()), moreexecutors.directexecutor());
}
......
defaulttransportservice
的transportapirequesttemplate
即订阅此 topic:list<response> responses = responsetemplate.poll(pollinterval);
if (responses.size() > 0) {
log.trace("polling responses completed, consumer records count [{}]", responses.size());
} else {
continue;
}
responses.foreach(response -> {
byte[] requestidheader = response.getheaders().get(request_id_header);
uuid requestid;
if (requestidheader == null) {
log.error("[{}] missing requestid in header and body", response);
} else {
requestid = bytestouuid(requestidheader);
log.trace("[{}] response received: {}", requestid, response);
//参见上②,将验证的future放入到pendingrequests中,现在通过设置的requestid取出来
responsemetadata<response> expectedresponse = pendingrequests.remove(requestid);
if (expectedresponse == null) {
log.trace("[{}] invalid or stale request", requestid);
} else {
//设置settablefuture的结果
expectedresponse.future.set(response);
}
}
......
defaulttransportservice
的process
异步请求获得了返回的结果,此时调用onsuccess
回调,即调用mqtttransporthandler
的onvalidatedeviceresponse
;private void onvalidatedeviceresponse(validatedevicecredentialsresponsemsg msg, channelhandlercontext ctx) {
if (!msg.hasdeviceinfo()) {
ctx.writeandflush(createmqttconnackmsg(connection_refused_not_authorized));
ctx.close();
} else {
devicesessionctx.setdeviceinfo(msg.getdeviceinfo());
sessioninfo = sessioninfoproto.newbuilder()
.setnodeid(context.getnodeid())
.setsessionidmsb(sessionid.getmostsignificantbits())
.setsessionidlsb(sessionid.getleastsignificantbits())
.setdeviceidmsb(msg.getdeviceinfo().getdeviceidmsb())
.setdeviceidlsb(msg.getdeviceinfo().getdeviceidlsb())
.settenantidmsb(msg.getdeviceinfo().gettenantidmsb())
.settenantidlsb(msg.getdeviceinfo().gettenantidlsb())
.setdevicename(msg.getdeviceinfo().getdevicename())
.setdevicetype(msg.getdeviceinfo().getdevicetype())
.build();
//创建sessionevent.open的消息,调用sendtodeviceactor方法,包含sessioninfo
transportservice.process(sessioninfo, defaulttransportservice.getsessioneventmsg(sessionevent.open), new transportservicecallback<void>() {
.......
protected void sendtodeviceactor(transportprotos.sessioninfoproto sessioninfo, transporttodeviceactormsg todeviceactormsg, transportservicecallback<void> callback) {
//创建tpi,此时会选择一个固定的partition id,组成的topic是tb_core, fulltopicname是tb_core.(int) 如: tb_core.1
topicpartitioninfo tpi = partitionservice.resolve(servicetype.tb_core, gettenantid(sessioninfo), getdeviceid(sessioninfo));
......
//使用tbcoremsgproducer发送到消息队列,设置了todeviceactormsg
tbcoremsgproducer.send(tpi,
new tbprotoqueuemsg<>(getroutingkey(sessioninfo),
tocoremsg.newbuilder().settodeviceactormsg(todeviceactormsg).build()), callback != null ?
new transporttbqueuecallback(callback) : null);
}
defaulttbcoreconsumerservice
可以知道defaulttbcoreconsumerservice
的消费者订阅该主题的消息:try {
tocoremsg tocoremsg = msg.getvalue();
if (tocoremsg.hastosubscriptionmgrmsg()) {
log.trace("[{}] forwarding message to subscription manager service {}", id, tocoremsg.gettosubscriptionmgrmsg());
forwardtosubmgrservice(tocoremsg.gettosubscriptionmgrmsg(), callback);
} else if (tocoremsg.hastodeviceactormsg()) {
log.trace("[{}] forwarding message to device actor {}", id, tocoremsg.gettodeviceactormsg());
//交由此方法进行处理
forwardtodeviceactor(tocoremsg.gettodeviceactormsg(), callback);
}
forwardtodeviceactor
对消息的处理private void forwardtodeviceactor(transporttodeviceactormsg todeviceactormsg, tbcallback callback) {
if (statsenabled) {
stats.log(todeviceactormsg);
}
//创建type为transport_to_device_actor_msg的消息,并交给appactor处理
actorcontext.tell(new transporttodeviceactormsgwrapper(todeviceactormsg, callback));
}
通过第四篇的总结 3,我们可以直接去看appactor
的doprocess
方法对此类型消息的处理,跟踪发现appactor
将消息转给了tenantactor
, tenantactor
创建了deviceactor
,并将消息转给了deviceactor
;
deviceactor 拿到此类型的消息,进行了如下的处理:
protected boolean doprocess(tbactormsg msg) {
switch (msg.getmsgtype()) {
case transport_to_device_actor_msg:
//包装成transporttodeviceactormsgwrapper交由processor处理,并继续调用processsessionstatemsgs
processor.process(ctx, (transporttodeviceactormsgwrapper) msg);
break;
case device_attributes_update_to_device_actor_msg:
processsessionstatemsgs
的处理:
private void processsessionstatemsgs(sessioninfoproto sessioninfo, sessioneventmsg msg) {
uuid sessionid = getsessionid(sessioninfo);
if (msg.getevent() == sessionevent.open) {
.....
sessions.put(sessionid, new sessioninfometadata(new sessioninfo(sessiontype.async, sessioninfo.getnodeid())));
if (sessions.size() == 1) {
// 将调用pushruleenginemessage(statedata, connect_event);
reportsessionopen();
}
//将调用pushruleenginemessage(statedata, activity_event);
systemcontext.getdevicestateservice().ondeviceactivity(deviceid, system.currenttimemillis());
dumpsessions();
}
....
由于connect_event
和activity_event
仅仅类型不同,以下暂时只分析connect_event
public void pushmsgtoruleengine(tenantid tenantid, entityid entityid, tbmsg tbmsg, tbqueuecallback callback) {
if (tenantid.isnulluid()) {
if (entityid.getentitytype().equals(entitytype.tenant)) {
tenantid = new tenantid(entityid.getid());
} else {
log.warn("[{}][{}] received invalid message: {}", tenantid, entityid, tbmsg);
return;
}
}
//和第7点类似,创建的tpi的fulltopicname的例子 tb_rule_engine.main.1
topicpartitioninfo tpi = partitionservice.resolve(servicetype.tb_rule_engine, tenantid, entityid);
log.trace("pushing msg: {} to:{}", tbmsg, tpi);
toruleenginemsg msg = toruleenginemsg.newbuilder()
.settenantidmsb(tenantid.getid().getmostsignificantbits())
.settenantidlsb(tenantid.getid().getleastsignificantbits())
.settbmsg(tbmsg.tobytestring(tbmsg)).build();
producerprovider.getruleenginemsgproducer().send(tpi, new tbprotoqueuemsg<>(tbmsg.getid(), msg), callback);
toruleenginemsgs.incrementandget();
}
通过第二篇的分析defaulttbruleengineconsumerservice
订阅了此 topic: tb_rule_engine.main.1 的消息,收到消息以后,调用forwardtoruleengineactor
方法,包裹成queue_to_rule_engine_msg
类型的消息,交由 appactor 进行分发处理;
appactor
交给tenantactor
处理,tenantactor
交给rootrulechain
处理,rulechainactor
交给firstrulenode
处理,也就是某一个rulenodeactor
;
打开前端 rule chains 的界面,会发现,message type switch 是接收 input 的第一个节点,其实数据库的配置中,rule_chain表中配置的first_rule_node_id就是tbmsgtypeswitchnode
;
进入tbmsgtypeswitchnode
的onmsg
方法 (实际上所有的 rulenode 处理消息的方法都是onmsg
),发现根据messagetype
(此时是connect_event
)定义了 relationtype 并调用ctx.tellnext(msg, relationtype)
;
此时defaulttbcontext
创建一个rulenodetorulechaintellnextmsg
,类型是rule_to_rule_chain_tell_next_msg
,交给rulechainactor
处理;
接下来将会进入到rulechainactormessageprocessor
的ontellnext
方法:
private void ontellnext(tbmsg msg, rulenodeid originatornodeid, set<string> relationtypes, string failuremessage) {
try {
checkactive(msg);
//消息来源
entityid entityid = msg.getoriginator();
//创建一个tpi,可能会使用
topicpartitioninfo tpi = systemcontext.resolve(servicetype.tb_rule_engine, msg.getqueuename(), tenantid, entityid);
//查询有关系的rulenode,其实就是从relation表中查询,该消息来源的id,relation_type和在tbmsgtypeswitchnode定义的relationtype一直的节点id,如上connect event就没有找到相应的relation的rulenodeid
list<rulenoderelation> relations = noderoutes.get(originatornodeid).stream()
.filter(r -> contains(relationtypes, r.gettype()))
.collect(collectors.tolist());
int relationscount = relations.size();
//connect event就没有找到相应的relation的rulenodeid,消息通过规则引擎,已经处理完成
if (relationscount == 0) {
log.trace("[{}][{}][{}] no outbound relations to process", tenantid, entityid, msg.getid());
if (relationtypes.contains(tbrelationtypes.failure)) {
rulenodectx rulenodectx = nodeactors.get(originatornodeid);
if (rulenodectx != null) {
msg.getcallback().onfailure(new rulenodeexception(failuremessage, rulechainname, rulenodectx.getself()));
} else {
log.debug("[{}] failure during message processing by rule node [{}]. enable and see debug events for more info", entityid, originatornodeid.getid());
msg.getcallback().onfailure(new ruleengineexception("failure during message processing by rule node [" originatornodeid.getid().tostring() "]"));
}
} else {
msg.getcallback().onsuccess();
}
//举例:post telemetry的type可以找到相应的rulenode,实现类是:tbmsgtimeseriesnode,那么此消息将会交给tbmsgtimeseriesnode处理
} else if (relationscount == 1) {
for (rulenoderelation relation : relations) {
log.trace("[{}][{}][{}] pushing message to single target: [{}]", tenantid, entityid, msg.getid(), relation.getout());
pushtotarget(tpi, msg, relation.getout(), relation.gettype());
}
} else {
multipletbqueuetbmsgcallbackwrapper callbackwrapper = new multipletbqueuetbmsgcallbackwrapper(relationscount, msg.getcallback());
log.trace("[{}][{}][{}] pushing message to multiple targets: [{}]", tenantid, entityid, msg.getid(), relations);
for (rulenoderelation relation : relations) {
entityid target = relation.getout();
puttoqueue(tpi, msg, callbackwrapper, target);
}
}
} catch (rulenodeexception rne) {
msg.getcallback().onfailure(rne);
} catch (exception e) {
msg.getcallback().onfailure(new ruleengineexception("ontellnext - " e.getmessage()));
}
}
what's more:
如上面的举例,比如是遥测数据 post telemetry,将会使用tbmsgtimeseriesnode
的onmsg
做进一步的处理,比如存储数据,再通过 websocket 进行数据的更新如果有 websocket 的 session 的话,或者其他通知消息,就不详细展开了。
处理 mqtt 的连接其实就是走完了整个规则引擎的逻辑,其他类型的消息,比如遥测数据,属性更新,rpc 请求发送与接收,大体流程大同小异;
在处理消息流向的时候,我们一定要清楚其订阅或者发布的主题是什么,这样我们才不会丢失方向;
actor 的模型就是根据消息的类型,使用 appactor 进行一步步的分发,最终交由合适的 rulenode 进行处理;
protobuf 类型的消息容易序列化传输与解析,所以在 thingsboard 中大量使用,但是生成的类可读性不是很高,可以选择直接读 queue.proto 文件,对类有感性的认知。
由于作者水平有限,只是梳理了大致的流程,文章难免出现纰漏,望谅解并指正。