thingsboard 话题讨论区:https://www.iotschool.com/topics/node8
欢迎大家加入 thingsboard 二次开发讨论群:121202538
#thingsboard 源码分析 3-启动分析 2
以下的分析环境基于内存消息队列和默认配置
分析初始化方法:
@postconstruct
public void init() {
//根据配置判断是否创建限流
if (ratelimitenabled) {
//just checking the configuration parameters
new tbratelimits(pertenantlimitsconf);
new tbratelimits(perdeviceslimitsconf);
}
this.schedulerexecutor = executors.newsinglethreadscheduledexecutor(thingsboardthreadfactory.forname("transport-scheduler"));
this.transportcallbackexecutor = executors.newworkstealingpool(20);
this.schedulerexecutor.scheduleatfixedrate(this::checkinactivityandreportactivity, new random().nextint((int) sessionreporttimeout), sessionreporttimeout, timeunit.milliseconds);
//transportapirequesttemplate的创建见下分析①,transportapirequesttemplate中包含了
//一个生产者producertemplate(requesttemplate) topic:tb_transport.api.responses ②
//和一个消费者consumertemplate (responsetemplate) topic:tb_transport.api.responses.localhostname ③
transportapirequesttemplate = queueprovider.createtransportapirequesttemplate();
//此处的producerprovider bean的创建是按照配置文件的值创建的,tbqueueproducerprovider有三个实现类,使用conditionalonexpression注解,读取service.type的值(默认monolith),所以该bean的实现类是tbcorequeueproducerprovider,此类的@postconstruct标记的init()方法初始化的,该类tbcorequeueproducerprovider初始化了一下变量:
// 1.totbcore topic:tb_core
// 2.totransport topic:tb_transport.notifications
// 3.toruleengine topic:tb_rule_engine
// 4.toruleenginenotifications topic:tb_rule_engine
// 5.totbcorenotifications topic:tb_core
ruleenginemsgproducer = producerprovider.getruleenginemsgproducer();
tbcoremsgproducer = producerprovider.gettbcoremsgproducer();
transportnotificationsconsumer = queueprovider.createtransportnotificationsconsumer();
//fulltopic = topic:tb_transport.notifications.localhostname
topicpartitioninfo tpi = partitionservice.getnotificationstopic(servicetype.tb_transport, serviceinfoprovider.getserviceid());
transportnotificationsconsumer.subscribe(collections.singleton(tpi));
//见④分析
transportapirequesttemplate.init();
mainconsumerexecutor.execute(() -> {
while (!stopped) {
try {
list<tbprotoqueuemsg<totransportmsg>> records = transportnotificationsconsumer.poll(notificationspollduration);
if (records.size() == 0) {
continue;
}
records.foreach(record -> {
try {
processtotransportmsg(record.getvalue());
} catch (throwable e) {
log.warn("failed to process the notification.", e);
}
});
transportnotificationsconsumer.commit();
} catch (exception e) {
if (!stopped) {
log.warn("failed to obtain messages from queue.", e);
try {
thread.sleep(notificationspollduration);
} catch (interruptedexception e2) {
log.trace("failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
});
}
① createtransportapirequesttemplate
in inmemorytbtransportqueuefactory
,因为我们没有启用相应的消息队列中间件,我们分析inmemorytbtransportqueuefactory
:
public tbqueuerequesttemplate<tbprotoqueuemsg<transportapirequestmsg>, tbprotoqueuemsg<transportapiresponsemsg>> createtransportapirequesttemplate() {
//根据配置文件值queue.transport_api.requests_topic获取到的topic是tb_transport.api.requests创建了生产者
inmemorytbqueueproducer<tbprotoqueuemsg<transportapirequestmsg>> producertemplate =
new inmemorytbqueueproducer<>(transportapisettings.getrequeststopic());
//根据配置文件值queue.transport_api.responses_topic获取到的topic是tb_transport.api.responses
//加上serviceid(我们在第二篇分析中提到,本机的hostname作为serviceid,其topic就是tb_transport.api.responses.localhostname
inmemorytbqueueconsumer<tbprotoqueuemsg<transportapiresponsemsg>> consumertemplate =
new inmemorytbqueueconsumer<>(transportapisettings.getresponsestopic() "." serviceinfoprovider.getserviceid());
//使用建造者模式返回了tbqueuerequesttemplate实例,其中包含了一个消费者和一个生产者
defaulttbqueuerequesttemplate.defaulttbqueuerequesttemplatebuilder
<tbprotoqueuemsg<transportapirequestmsg>, tbprotoqueuemsg<transportapiresponsemsg>> templatebuilder = defaulttbqueuerequesttemplate.builder();
templatebuilder.queueadmin(new tbqueueadmin() {
@override
public void createtopicifnotexists(string topic) {}
@override
public void destroy() {}
});
templatebuilder.requesttemplate(producertemplate);
templatebuilder.responsetemplate(consumertemplate);
templatebuilder.maxpendingrequests(transportapisettings.getmaxpendingrequests());
templatebuilder.maxrequesttimeout(transportapisettings.getmaxrequeststimeout());
templatebuilder.pollinterval(transportapisettings.getresponsepollinterval());
return templatebuilder.build();
}
④init()
in defaulttbqueuerequesttemplate
:
public void init() {
queueadmin.createtopicifnotexists(responsetemplate.gettopic());
//按照是使用的中间件,实现不同的初始化方法,inmemory该方法体为空
this.requesttemplate.init();
tickts = system.currenttimemillis();
//见③,订阅主题为 tb_transport.api.responses.localhostname
responsetemplate.subscribe();
executor.submit(() -> {
long nextcleanupms = 0l;
while (!stopped) {
try {
//从消息队列里面获取消息
list<response> responses = responsetemplate.poll(pollinterval);
...........
postconstruct
注解方法:@postconstruct
public void init() {
this.transportcallbackexecutor = executors.newworkstealingpool(maxcallbackthreads);
//topic是配置文件queue.transport_api.responses_topic的值默认为:tb_transport.api.responses ⑤
tbqueueproducer<tbprotoqueuemsg<transportapiresponsemsg>> producer = tbcorequeuefactory.createtransportapiresponseproducer();
//topic是配置文件queue.transport_api.requests_topic的值,默认为:tb_transport.api.requests ⑥
tbqueueconsumer<tbprotoqueuemsg<transportapirequestmsg>> consumer = tbcorequeuefactory.createtransportapirequestconsumer();
defaulttbqueueresponsetemplate.defaulttbqueueresponsetemplatebuilder
<tbprotoqueuemsg<transportapirequestmsg>, tbprotoqueuemsg<transportapiresponsemsg>> builder = defaulttbqueueresponsetemplate.builder();
builder.requesttemplate(consumer);
builder.responsetemplate(producer);
builder.maxpendingrequests(maxpendingrequests);
builder.requesttimeout(requesttimeout);
builder.pollinterval(responsepollduration);
builder.executor(transportcallbackexecutor);
builder.handler(transportapiservice);
transportapitemplate = builder.build();
@eventlistener(applicationreadyevent.class)
注解方法,调用了transportapitemplate.init(transportapiservice);
transportapitemplate
即上一步创建的defaulttbqueueresponsetemplate
对象init()
方法为:@override
public void init(tbqueuehandler<request, response> handler) {
//按照是使用的中间件,实现不同的初始化方法,inmemory该方法体为空
this.responsetemplate.init();
//见⑥,订阅主题为tb_transport.api.requests
requesttemplate.subscribe();
loopexecutor.submit(() -> {
while (!stopped) {
try {
while (pendingrequestcount.get() >= maxpendingrequests) {
try {
thread.sleep(pollinterval);
} catch (interruptedexception e) {
log.trace("failed to wait until the server has capacity to handle new requests", e);
}
}
list<request> requests = requesttemplate.poll(pollinterval);
...........
defaulttransportservice
和tbcoretransportapiservice
方法的启动并不是很复杂,我们需要将主要的关注点放在两个 bean 初始化消费者和生产者的 topic 上面,thingsboard 将使用中间件将消息解耦,如果按照传统的调试方法很容易找不到消息的流向,此时我们将 topic 作为关键的切入点,方便后面整个数据流的分析。