thingsboard 二次开发之源码分析 5-威尼斯人最新

thingskit · september 29, 2020 · last by replied at august 19, 2022 · 2035 hits
本帖已被设为精华帖!

thingsboard 聚集地

thingsboard 话题讨论区:https://www.iotschool.com/topics/node8

欢迎大家加入 thingsboard 二次开发讨论群:121202538

thingsboard 源码分析 5-如何接收 mqtt 连接

1. mqtt server

需要接收设备的 mqtt 连接,那么 thingsboard 中必然有 mqtt 服务器,mqtt 服务器创建的类是mqtttransportservice

基于 netty 的 mqtt server,添加了mqtttransportserverinitializer的处理类,并向channelpipeline添加了 netty 的mqttdecodermqttencoder让我们可以忽略 mqtt 消息的编解码工作,重要的是添加了mqtttransporthandler

2. mqtttransporthandler 处理连接

此例中,我们首先需要创建租户,租户管理员,并添加设备,使用 mqtt box 模拟硬件设备,拷贝 access token 做为 mqtt box 的 username 开始连接我们的 thingsboard 后台

如果图片看不清楚,请点击:

  • 标准:
  • 高清:

由于没有使用 ssl,收到连接请求以后,便会调用

private void processauthtokenconnect(channelhandlercontext ctx, mqttconnectmessage msg) {
    string username = msg.payload().username();
    log.info("[{}] processing connect msg for client with user name: {}!", sessionid, username);
    if (stringutils.isempty(username)) {
        ctx.writeandflush(createmqttconnackmsg(connection_refused_bad_user_name_or_password));
        ctx.close();
    } else {
        //取出username,构造protobuf的类(方便传输与解析),交给transportservice处理。此时会使用到源码解析第三篇defaulttransportservice的解析的相关信息了解process的处理。参阅下方①的详细解析。
        transportservice.process(validatedevicetokenrequestmsg.newbuilder().settoken(username).build(),
                new transportservicecallback<validatedevicecredentialsresponsemsg>() {
                    @override
                    public void onsuccess(validatedevicecredentialsresponsemsg msg) {
                        onvalidatedeviceresponse(msg, ctx);
                    }
                    @override
                    public void onerror(throwable e) {
                        log.trace("[{}] failed to process credentials: {}", address, username, e);
                        ctx.writeandflush(createmqttconnackmsg(mqttconnectreturncode.connection_refused_server_unavailable));
                        ctx.close();
                    }
                });
    }
}
  1. defaulttransportserviceprocess方法构造了异步任务,成功调用onsuccessconsumer,失败调用onfailureconsumer

  2. 将验证用户的任务交由transportapirequesttemplate.send

public listenablefuture<response> send(request request) {
    if (ticksize > maxpendingrequests) {
        return futures.immediatefailedfuture(new runtimeexception("pending request map is full!"));
    }
    uuid requestid = uuid.randomuuid();
    request.getheaders().put(request_id_header, uuidtobytes(requestid));
    //由第三篇文章的分析得出,此topic时tb_transport.api.responses.localhostname
    request.getheaders().put(response_topic_header, stringtobytes(responsetemplate.gettopic()));
    request.getheaders().put(request_time, longtobytes(system.currenttimemillis()));
    //参阅第一篇基础知识的介绍,来自谷歌的库,settablefuture,可设置结果的完成
    settablefuture<response> future = settablefuture.create();
    responsemetadata<response> responsemetadata = new responsemetadata<>(tickts  maxrequesttimeout, future);
    //将future放到pendingrequests中②
    pendingrequests.putifabsent(requestid, responsemetadata);
    log.trace("[{}] sending request, key [{}], exptime [{}]", requestid, request.getkey(), responsemetadata.exptime);
    //将消息发送给消息队列topic是tb_transport.api.requests
    requesttemplate.send(topicpartitioninfo.builder().topic(requesttemplate.getdefaulttopic()).build(), request, new tbqueuecallback() {
        @override
        public void onsuccess(tbqueuemsgmetadata metadata) {
            log.trace("[{}] request sent: {}", requestid, metadata);
        }
        @override
        public void onfailure(throwable t) {
            pendingrequests.remove(requestid);
            future.setexception(t);
        }
    });
    return future;
}
  1. 根据第三篇tbcoretransportapiservice的分析,我们发现defaulttbqueueresponsetemplate的成员变量requesttemplateconsumer刚好是订阅的 tb_transport.api.requests 的消息:
......
requests.foreach(request -> {
    long currenttime = system.currenttimemillis();
    long requesttime = bytestolong(request.getheaders().get(request_time));
    if (requesttime  requesttimeout >= currenttime) {
        byte[] requestidheader = request.getheaders().get(request_id_header);
        if (requestidheader == null) {
            log.error("[{}] missing requestid in header", request);
            return;
        }
          //获取response的topic,可以做到消息从哪来,处理好以后回哪里去,此时的topic是tb_transport.api.responses.localhostname
        byte[] responsetopicheader = request.getheaders().get(response_topic_header);
        if (responsetopicheader == null) {
            log.error("[{}] missing response topic in header", request);
            return;
        }
        uuid requestid = bytestouuid(requestidheader);
        string responsetopic = bytestostring(responsetopicheader);
        try {
            pendingrequestcount.getandincrement();
            //调用handler进行处理消息
            asynccallbacktemplate.withcallbackandtimeout(handler.handle(request),
                    response -> {
                        pendingrequestcount.decrementandget();
                        response.getheaders().put(request_id_header, uuidtobytes(requestid));
                        //handler.hande处理的结果返回给发送方topic是tb_transport.api.responses.localhostname
                        responsetemplate.send(topicpartitioninfo.builder().topic(responsetopic).build(), response, null);
                    },
                    e -> {
                        pendingrequestcount.decrementandget();
                        if (e.getcause() != null && e.getcause() instanceof timeoutexception) {
                            log.warn("[{}] timeout to process the request: {}", requestid, request, e);
                        } else {
                            log.trace("[{}] failed to process the request: {}", requestid, request, e);
                        }
                    },
                    requesttimeout,
                    timeoutexecutor,
                    callbackexecutor);
          .......
  1. 具体验证逻辑:
@override
public listenablefuture<tbprotoqueuemsg<transportapiresponsemsg>> handle(tbprotoqueuemsg<transportapirequestmsg> tbprotoqueuemsg) {
    transportapirequestmsg transportapirequestmsg = tbprotoqueuemsg.getvalue();
    // protobuf构造的类中判定是否包含需要验证的信息块
    if (transportapirequestmsg.hasvalidatetokenrequestmsg()) {
        validatedevicetokenrequestmsg msg = transportapirequestmsg.getvalidatetokenrequestmsg();
        //调用validatecredentials,具体内容就是查询deviceinfo,并将结果交由第二个function进行进一步处理
        return futures.transform(validatecredentials(msg.gettoken(), devicecredentialstype.access_token), value -> new tbprotoqueuemsg<>(tbprotoqueuemsg.getkey(), value, tbprotoqueuemsg.getheaders()), moreexecutors.directexecutor());
    } 
  ......
  1. 当通过设备的 acess token 找到了 deviceinfo,便会通过消息中间件将 deviceinfo 发出来,topic 是tb_transport.api.responses.localhostname,在第三篇的分析中,defaulttransportservicetransportapirequesttemplate即订阅此 topic:
list<response> responses = responsetemplate.poll(pollinterval);
if (responses.size() > 0) {
    log.trace("polling responses completed, consumer records count [{}]", responses.size());
} else {
    continue;
}
responses.foreach(response -> {
    byte[] requestidheader = response.getheaders().get(request_id_header);
    uuid requestid;
    if (requestidheader == null) {
        log.error("[{}] missing requestid in header and body", response);
    } else {
        requestid = bytestouuid(requestidheader);
        log.trace("[{}] response received: {}", requestid, response);
        //参见上②,将验证的future放入到pendingrequests中,现在通过设置的requestid取出来
        responsemetadata<response> expectedresponse = pendingrequests.remove(requestid);
        if (expectedresponse == null) {
            log.trace("[{}] invalid or stale request", requestid);
        } else {
            //设置settablefuture的结果
            expectedresponse.future.set(response);
        }
    }
......
  1. defaulttransportserviceprocess异步请求获得了返回的结果,此时调用onsuccess回调,即调用mqtttransporthandleronvalidatedeviceresponse
private void onvalidatedeviceresponse(validatedevicecredentialsresponsemsg msg, channelhandlercontext ctx) {
    if (!msg.hasdeviceinfo()) {
        ctx.writeandflush(createmqttconnackmsg(connection_refused_not_authorized));
        ctx.close();
    } else {
        devicesessionctx.setdeviceinfo(msg.getdeviceinfo());
        sessioninfo = sessioninfoproto.newbuilder()
                .setnodeid(context.getnodeid())
                .setsessionidmsb(sessionid.getmostsignificantbits())
                .setsessionidlsb(sessionid.getleastsignificantbits())
                .setdeviceidmsb(msg.getdeviceinfo().getdeviceidmsb())
                .setdeviceidlsb(msg.getdeviceinfo().getdeviceidlsb())
                .settenantidmsb(msg.getdeviceinfo().gettenantidmsb())
                .settenantidlsb(msg.getdeviceinfo().gettenantidlsb())
                .setdevicename(msg.getdeviceinfo().getdevicename())
                .setdevicetype(msg.getdeviceinfo().getdevicetype())
                .build();
        //创建sessionevent.open的消息,调用sendtodeviceactor方法,包含sessioninfo
        transportservice.process(sessioninfo, defaulttransportservice.getsessioneventmsg(sessionevent.open), new transportservicecallback<void>() {
           .......
  1. sendtodeviceactor 的实现:
protected void sendtodeviceactor(transportprotos.sessioninfoproto sessioninfo, transporttodeviceactormsg todeviceactormsg, transportservicecallback<void> callback) {
    //创建tpi,此时会选择一个固定的partition id,组成的topic是tb_core, fulltopicname是tb_core.(int) 如: tb_core.1
    topicpartitioninfo tpi = partitionservice.resolve(servicetype.tb_core, gettenantid(sessioninfo), getdeviceid(sessioninfo));
......
    //使用tbcoremsgproducer发送到消息队列,设置了todeviceactormsg
    tbcoremsgproducer.send(tpi,
            new tbprotoqueuemsg<>(getroutingkey(sessioninfo),
                    tocoremsg.newbuilder().settodeviceactormsg(todeviceactormsg).build()), callback != null ?
                    new transporttbqueuecallback(callback) : null);
}
  1. 此时第二篇基于defaulttbcoreconsumerservice可以知道defaulttbcoreconsumerservice 的消费者订阅该主题的消息:
try {
    tocoremsg tocoremsg = msg.getvalue();
    if (tocoremsg.hastosubscriptionmgrmsg()) {
        log.trace("[{}] forwarding message to subscription manager service {}", id, tocoremsg.gettosubscriptionmgrmsg());
        forwardtosubmgrservice(tocoremsg.gettosubscriptionmgrmsg(), callback);
    } else if (tocoremsg.hastodeviceactormsg()) {
        log.trace("[{}] forwarding message to device actor {}", id, tocoremsg.gettodeviceactormsg());
        //交由此方法进行处理
        forwardtodeviceactor(tocoremsg.gettodeviceactormsg(), callback);
    }
  1. forwardtodeviceactor对消息的处理
private void forwardtodeviceactor(transporttodeviceactormsg todeviceactormsg, tbcallback callback) {
    if (statsenabled) {
        stats.log(todeviceactormsg);
    }
    //创建type为transport_to_device_actor_msg的消息,并交给appactor处理
    actorcontext.tell(new transporttodeviceactormsgwrapper(todeviceactormsg, callback));
}
  1. 通过第四篇的总结 3,我们可以直接去看appactordoprocess方法对此类型消息的处理,跟踪发现appactor将消息转给了tenantactor, tenantactor创建了deviceactor,并将消息转给了deviceactor;

  2. deviceactor 拿到此类型的消息,进行了如下的处理:

    protected boolean doprocess(tbactormsg msg) {
        switch (msg.getmsgtype()) {
            case transport_to_device_actor_msg:
                //包装成transporttodeviceactormsgwrapper交由processor处理,并继续调用processsessionstatemsgs
                processor.process(ctx, (transporttodeviceactormsgwrapper) msg);
                break;
            case device_attributes_update_to_device_actor_msg:
    
  3. processsessionstatemsgs的处理:

    private void processsessionstatemsgs(sessioninfoproto sessioninfo, sessioneventmsg msg) {
        uuid sessionid = getsessionid(sessioninfo);
        if (msg.getevent() == sessionevent.open) {
         .....
            sessions.put(sessionid, new sessioninfometadata(new sessioninfo(sessiontype.async, sessioninfo.getnodeid())));
            if (sessions.size() == 1) {
               // 将调用pushruleenginemessage(statedata, connect_event);
                reportsessionopen();
            }
            //将调用pushruleenginemessage(statedata, activity_event);
            systemcontext.getdevicestateservice().ondeviceactivity(deviceid, system.currenttimemillis());
            dumpsessions();
        }
    ....
    
  4. 由于connect_eventactivity_event仅仅类型不同,以下暂时只分析connect_event

    public void pushmsgtoruleengine(tenantid tenantid, entityid entityid, tbmsg tbmsg, tbqueuecallback callback) {
        if (tenantid.isnulluid()) {
            if (entityid.getentitytype().equals(entitytype.tenant)) {
                tenantid = new tenantid(entityid.getid());
            } else {
                log.warn("[{}][{}] received invalid message: {}", tenantid, entityid, tbmsg);
                return;
            }
        }
        //和第7点类似,创建的tpi的fulltopicname的例子 tb_rule_engine.main.1
        topicpartitioninfo tpi = partitionservice.resolve(servicetype.tb_rule_engine, tenantid, entityid);
        log.trace("pushing msg: {} to:{}", tbmsg, tpi);
        toruleenginemsg msg = toruleenginemsg.newbuilder()
                .settenantidmsb(tenantid.getid().getmostsignificantbits())
                .settenantidlsb(tenantid.getid().getleastsignificantbits())
                .settbmsg(tbmsg.tobytestring(tbmsg)).build();
        producerprovider.getruleenginemsgproducer().send(tpi, new tbprotoqueuemsg<>(tbmsg.getid(), msg), callback);
        toruleenginemsgs.incrementandget();
    }
    
  5. 通过第二篇的分析defaulttbruleengineconsumerservice订阅了此 topic: tb_rule_engine.main.1 的消息,收到消息以后,调用forwardtoruleengineactor方法,包裹成queue_to_rule_engine_msg类型的消息,交由 appactor 进行分发处理;

  6. appactor交给tenantactor处理,tenantactor交给rootrulechain处理,rulechainactor交给firstrulenode处理,也就是某一个rulenodeactor;

  7. 打开前端 rule chains 的界面,会发现,message type switch 是接收 input 的第一个节点,其实数据库的配置中,rule_chain表中配置的first_rule_node_id就是tbmsgtypeswitchnode

  8. 进入tbmsgtypeswitchnodeonmsg方法 (实际上所有的 rulenode 处理消息的方法都是onmsg),发现根据messagetype(此时是connect_event)定义了 relationtype 并调用ctx.tellnext(msg, relationtype);

  9. 此时defaulttbcontext创建一个rulenodetorulechaintellnextmsg,类型是rule_to_rule_chain_tell_next_msg,交给rulechainactor处理;

  10. 接下来将会进入到rulechainactormessageprocessorontellnext方法:

    private void ontellnext(tbmsg msg, rulenodeid originatornodeid, set<string> relationtypes, string failuremessage) {
        try {
            checkactive(msg);
            //消息来源
            entityid entityid = msg.getoriginator();
            //创建一个tpi,可能会使用
            topicpartitioninfo tpi = systemcontext.resolve(servicetype.tb_rule_engine, msg.getqueuename(), tenantid, entityid);
           //查询有关系的rulenode,其实就是从relation表中查询,该消息来源的id,relation_type和在tbmsgtypeswitchnode定义的relationtype一直的节点id,如上connect event就没有找到相应的relation的rulenodeid
            list<rulenoderelation> relations = noderoutes.get(originatornodeid).stream()
                    .filter(r -> contains(relationtypes, r.gettype()))
                    .collect(collectors.tolist());
            int relationscount = relations.size();
           //connect event就没有找到相应的relation的rulenodeid,消息通过规则引擎,已经处理完成
            if (relationscount == 0) {
                log.trace("[{}][{}][{}] no outbound relations to process", tenantid, entityid, msg.getid());
                if (relationtypes.contains(tbrelationtypes.failure)) {
                    rulenodectx rulenodectx = nodeactors.get(originatornodeid);
                    if (rulenodectx != null) {
                        msg.getcallback().onfailure(new rulenodeexception(failuremessage, rulechainname, rulenodectx.getself()));
                    } else {
                        log.debug("[{}] failure during message processing by rule node [{}]. enable and see debug events for more info", entityid, originatornodeid.getid());
                        msg.getcallback().onfailure(new ruleengineexception("failure during message processing by rule node ["  originatornodeid.getid().tostring()  "]"));
                    }
                } else {
                    msg.getcallback().onsuccess();
                }
             //举例:post telemetry的type可以找到相应的rulenode,实现类是:tbmsgtimeseriesnode,那么此消息将会交给tbmsgtimeseriesnode处理
            } else if (relationscount == 1) {
                for (rulenoderelation relation : relations) {
                    log.trace("[{}][{}][{}] pushing message to single target: [{}]", tenantid, entityid, msg.getid(), relation.getout());
                    pushtotarget(tpi, msg, relation.getout(), relation.gettype());
                }
            } else {
                multipletbqueuetbmsgcallbackwrapper callbackwrapper = new multipletbqueuetbmsgcallbackwrapper(relationscount, msg.getcallback());
                log.trace("[{}][{}][{}] pushing message to multiple targets: [{}]", tenantid, entityid, msg.getid(), relations);
                for (rulenoderelation relation : relations) {
                    entityid target = relation.getout();
                    puttoqueue(tpi, msg, callbackwrapper, target);
                }
            }
        } catch (rulenodeexception rne) {
            msg.getcallback().onfailure(rne);
        } catch (exception e) {
            msg.getcallback().onfailure(new ruleengineexception("ontellnext - "  e.getmessage()));
        }
    }
    

    what's more:

    如上面的举例,比如是遥测数据 post telemetry,将会使用tbmsgtimeseriesnodeonmsg做进一步的处理,比如存储数据,再通过 websocket 进行数据的更新如果有 websocket 的 session 的话,或者其他通知消息,就不详细展开了。

总结:

  1. 处理 mqtt 的连接其实就是走完了整个规则引擎的逻辑,其他类型的消息,比如遥测数据,属性更新,rpc 请求发送与接收,大体流程大同小异;

  2. 在处理消息流向的时候,我们一定要清楚其订阅或者发布的主题是什么,这样我们才不会丢失方向;

  3. actor 的模型就是根据消息的类型,使用 appactor 进行一步步的分发,最终交由合适的 rulenode 进行处理;

  4. protobuf 类型的消息容易序列化传输与解析,所以在 thingsboard 中大量使用,但是生成的类可读性不是很高,可以选择直接读 queue.proto 文件,对类有感性的认知。

​ 由于作者水平有限,只是梳理了大致的流程,文章难免出现纰漏,望谅解并指正。

thingskit 将本帖设为了精华贴 29 sep 14:40

大佬,那后端拿到之后怎么传给前端的这部分在哪呢

需要 sign in 后方可回复, 如果你还没有账号请点击这里 sign up
网站地图