关于 thingsboard 如何监控连接状态的问题 · 物联网平台-威尼斯人最新

yangzhang-iotschool · 2021年08月27日 · 最後由 於 2022年10月14日回覆 · 357 次閱讀

当我通过 mqtt 客户端(l 例如 mqtt box)填写好 host,port,username(username 指此设备的 token)连接到 thingsboard 平台端指定 username 的设备时,不需要发送任何说明此设备已经连接上的 topic,thingsboard 平台端就可以监控到此设备已经连接上了,这是怎么做到了?

当设备通过协议(http、mqtt、coap)连接到 thingsboard 平台后,

  1. 进行设备认证 以 http 为例, defaulttransportservice:
public void process(devicetransporttype transporttype, transportprotos.validatedevicetokenrequestmsg msg,
                    transportservicecallback<validatedevicecredentialsresponse> callback) {
    log.trace("processing msg: {}", msg);
    //封装设备access token信息
    tbprotoqueuemsg<transportapirequestmsg> protomsg = new tbprotoqueuemsg<>(uuid.randomuuid(),
            transportapirequestmsg.newbuilder().setvalidatetokenrequestmsg(msg).build());
    doprocess(transporttype, protomsg, callback);
}

通过 defaulttransportapiservice 验证 access token 的合法性

// 验证access token的合法性
      if (transportapirequestmsg.hasvalidatetokenrequestmsg()) {
          validatedevicetokenrequestmsg msg = transportapirequestmsg.getvalidatetokenrequestmsg();
          result = validatecredentials(msg.gettoken(), devicecredentialstype.access_token);
  1. 将消息转发到规则引擎执行 defaulttbruleengineconsumerservice:
private void forwardtoruleengineactor(string queuename, tenantid tenantid, toruleenginemsg toruleenginemsg, tbmsgcallback callback) {
       tbmsg tbmsg = tbmsg.frombytes(queuename, toruleenginemsg.gettbmsg().tobytearray(), callback);
       queuetoruleenginemsg msg;
       protocolstringlist relationtypeslist = toruleenginemsg.getrelationtypeslist();
       set<string> relationtypes = null;
       if (relationtypeslist != null) {
           if (relationtypeslist.size() == 1) {
               relationtypes = collections.singleton(relationtypeslist.get(0));
           } else {
               relationtypes = new hashset<>(relationtypeslist);
           }
       }
       msg = new queuetoruleenginemsg(tenantid, tbmsg, relationtypes, toruleenginemsg.getfailuremessage());
       actorcontext.tell(msg);
   }
  1. 进行设备状态、最后活动时间的更新 当前两步操作完成后, 进行 reportactivity 的回调,更新设备状态。 defaultdevicestateservice:
void updateactivitystate(deviceid deviceid, devicestatedata statedata, long lastreportedactivity) {
        log.trace("updateactivitystate - fetched state {} for device {}, lastreportedactivity {}", statedata, deviceid, lastreportedactivity);
        if (statedata != null) {
            save(deviceid, last_activity_time, lastreportedactivity);
            devicestate state = statedata.getstate();
            state.setlastactivitytime(lastreportedactivity);
            if (!state.isactive()) {
                state.setactive(true);
                save(deviceid, activity_state, true);
                pushruleenginemessage(statedata, activity_event);
            }
        } else {
            log.debug("updateactivitystate - fetched state in null for device {}, lastreportedactivity {}", deviceid, lastreportedactivity);
            cleanupdevicestatemap(deviceid);
        }
    }
需要 登錄 後方可回應,如果你還沒有帳號按這裡 註冊
网站地图