当设备通过协议(http、mqtt、coap)连接到 thingsboard 平台后,
- 进行设备认证
以 http 为例, defaulttransportservice:
public void process(devicetransporttype transporttype, transportprotos.validatedevicetokenrequestmsg msg,
transportservicecallback<validatedevicecredentialsresponse> callback) {
log.trace("processing msg: {}", msg);
//封装设备access token信息
tbprotoqueuemsg<transportapirequestmsg> protomsg = new tbprotoqueuemsg<>(uuid.randomuuid(),
transportapirequestmsg.newbuilder().setvalidatetokenrequestmsg(msg).build());
doprocess(transporttype, protomsg, callback);
}
通过 defaulttransportapiservice 验证 access token 的合法性
// 验证access token的合法性
if (transportapirequestmsg.hasvalidatetokenrequestmsg()) {
validatedevicetokenrequestmsg msg = transportapirequestmsg.getvalidatetokenrequestmsg();
result = validatecredentials(msg.gettoken(), devicecredentialstype.access_token);
- 将消息转发到规则引擎执行
defaulttbruleengineconsumerservice:
private void forwardtoruleengineactor(string queuename, tenantid tenantid, toruleenginemsg toruleenginemsg, tbmsgcallback callback) {
tbmsg tbmsg = tbmsg.frombytes(queuename, toruleenginemsg.gettbmsg().tobytearray(), callback);
queuetoruleenginemsg msg;
protocolstringlist relationtypeslist = toruleenginemsg.getrelationtypeslist();
set<string> relationtypes = null;
if (relationtypeslist != null) {
if (relationtypeslist.size() == 1) {
relationtypes = collections.singleton(relationtypeslist.get(0));
} else {
relationtypes = new hashset<>(relationtypeslist);
}
}
msg = new queuetoruleenginemsg(tenantid, tbmsg, relationtypes, toruleenginemsg.getfailuremessage());
actorcontext.tell(msg);
}
- 进行设备状态、最后活动时间的更新
当前两步操作完成后, 进行 reportactivity 的回调,更新设备状态。
defaultdevicestateservice:
void updateactivitystate(deviceid deviceid, devicestatedata statedata, long lastreportedactivity) {
log.trace("updateactivitystate - fetched state {} for device {}, lastreportedactivity {}", statedata, deviceid, lastreportedactivity);
if (statedata != null) {
save(deviceid, last_activity_time, lastreportedactivity);
devicestate state = statedata.getstate();
state.setlastactivitytime(lastreportedactivity);
if (!state.isactive()) {
state.setactive(true);
save(deviceid, activity_state, true);
pushruleenginemessage(statedata, activity_event);
}
} else {
log.debug("updateactivitystate - fetched state in null for device {}, lastreportedactivity {}", deviceid, lastreportedactivity);
cleanupdevicestatemap(deviceid);
}
}