thingsboard 二次开发之源码分析 3-威尼斯人最新

thingskit · september 29, 2020 · last by replied at august 19, 2022 · 1181 hits
本帖已被设为精华帖!

thingsboard 聚集地

thingsboard 话题讨论区:https://www.iotschool.com/topics/node8

欢迎大家加入 thingsboard 二次开发讨论群:121202538

#thingsboard 源码分析 3-启动分析 2

以下的分析环境基于内存消息队列和默认配置

1. defaulttransportservice

分析初始化方法:

@postconstruct
public void init() {
    //根据配置判断是否创建限流
    if (ratelimitenabled) {
        //just checking the configuration parameters
        new tbratelimits(pertenantlimitsconf);
        new tbratelimits(perdeviceslimitsconf);
    }
    this.schedulerexecutor = executors.newsinglethreadscheduledexecutor(thingsboardthreadfactory.forname("transport-scheduler"));
    this.transportcallbackexecutor = executors.newworkstealingpool(20);
    this.schedulerexecutor.scheduleatfixedrate(this::checkinactivityandreportactivity, new random().nextint((int) sessionreporttimeout), sessionreporttimeout, timeunit.milliseconds);
    //transportapirequesttemplate的创建见下分析①,transportapirequesttemplate中包含了
    //一个生产者producertemplate(requesttemplate)     topic:tb_transport.api.responses   ②
    //和一个消费者consumertemplate (responsetemplate)   topic:tb_transport.api.responses.localhostname ③
    transportapirequesttemplate = queueprovider.createtransportapirequesttemplate();
    //此处的producerprovider bean的创建是按照配置文件的值创建的,tbqueueproducerprovider有三个实现类,使用conditionalonexpression注解,读取service.type的值(默认monolith),所以该bean的实现类是tbcorequeueproducerprovider,此类的@postconstruct标记的init()方法初始化的,该类tbcorequeueproducerprovider初始化了一下变量:
   // 1.totbcore                    topic:tb_core
   // 2.totransport                 topic:tb_transport.notifications
   // 3.toruleengine                topic:tb_rule_engine
   // 4.toruleenginenotifications   topic:tb_rule_engine
   // 5.totbcorenotifications       topic:tb_core
    ruleenginemsgproducer = producerprovider.getruleenginemsgproducer();
    tbcoremsgproducer = producerprovider.gettbcoremsgproducer();
    transportnotificationsconsumer = queueprovider.createtransportnotificationsconsumer();
    //fulltopic = topic:tb_transport.notifications.localhostname  
    topicpartitioninfo tpi = partitionservice.getnotificationstopic(servicetype.tb_transport, serviceinfoprovider.getserviceid());
    transportnotificationsconsumer.subscribe(collections.singleton(tpi));
    //见④分析
    transportapirequesttemplate.init();
    mainconsumerexecutor.execute(() -> {
        while (!stopped) {
            try {
                list<tbprotoqueuemsg<totransportmsg>> records = transportnotificationsconsumer.poll(notificationspollduration);
                if (records.size() == 0) {
                    continue;
                }
                records.foreach(record -> {
                    try {
                        processtotransportmsg(record.getvalue());
                    } catch (throwable e) {
                        log.warn("failed to process the notification.", e);
                    }
                });
                transportnotificationsconsumer.commit();
            } catch (exception e) {
                if (!stopped) {
                    log.warn("failed to obtain messages from queue.", e);
                    try {
                        thread.sleep(notificationspollduration);
                    } catch (interruptedexception e2) {
                        log.trace("failed to wait until the server has capacity to handle new requests", e2);
                    }
                }
            }
        }
    });
}

createtransportapirequesttemplate in inmemorytbtransportqueuefactory,因为我们没有启用相应的消息队列中间件,我们分析inmemorytbtransportqueuefactory:

public tbqueuerequesttemplate<tbprotoqueuemsg<transportapirequestmsg>, tbprotoqueuemsg<transportapiresponsemsg>> createtransportapirequesttemplate() {
    //根据配置文件值queue.transport_api.requests_topic获取到的topic是tb_transport.api.requests创建了生产者
    inmemorytbqueueproducer<tbprotoqueuemsg<transportapirequestmsg>> producertemplate =
            new inmemorytbqueueproducer<>(transportapisettings.getrequeststopic());
    //根据配置文件值queue.transport_api.responses_topic获取到的topic是tb_transport.api.responses
    //加上serviceid(我们在第二篇分析中提到,本机的hostname作为serviceid,其topic就是tb_transport.api.responses.localhostname
    inmemorytbqueueconsumer<tbprotoqueuemsg<transportapiresponsemsg>> consumertemplate =
            new inmemorytbqueueconsumer<>(transportapisettings.getresponsestopic()  "."  serviceinfoprovider.getserviceid());
    //使用建造者模式返回了tbqueuerequesttemplate实例,其中包含了一个消费者和一个生产者
    defaulttbqueuerequesttemplate.defaulttbqueuerequesttemplatebuilder
            <tbprotoqueuemsg<transportapirequestmsg>, tbprotoqueuemsg<transportapiresponsemsg>> templatebuilder = defaulttbqueuerequesttemplate.builder();
    templatebuilder.queueadmin(new tbqueueadmin() {
        @override
        public void createtopicifnotexists(string topic) {}
        @override
        public void destroy() {}
    });
    templatebuilder.requesttemplate(producertemplate);
    templatebuilder.responsetemplate(consumertemplate);
    templatebuilder.maxpendingrequests(transportapisettings.getmaxpendingrequests());
    templatebuilder.maxrequesttimeout(transportapisettings.getmaxrequeststimeout());
    templatebuilder.pollinterval(transportapisettings.getresponsepollinterval());
    return templatebuilder.build();
}

init() in defaulttbqueuerequesttemplate:

public void init() {
    queueadmin.createtopicifnotexists(responsetemplate.gettopic());
    //按照是使用的中间件,实现不同的初始化方法,inmemory该方法体为空
    this.requesttemplate.init();
    tickts = system.currenttimemillis();
    //见③,订阅主题为 tb_transport.api.responses.localhostname
    responsetemplate.subscribe();
    executor.submit(() -> {
        long nextcleanupms = 0l;
        while (!stopped) {
            try {
                //从消息队列里面获取消息
                list<response> responses = responsetemplate.poll(pollinterval);
                ...........

2.tbcoretransportapiservice

  • postconstruct注解方法:
@postconstruct
public void init() {
    this.transportcallbackexecutor = executors.newworkstealingpool(maxcallbackthreads);
    //topic是配置文件queue.transport_api.responses_topic的值默认为:tb_transport.api.responses ⑤
    tbqueueproducer<tbprotoqueuemsg<transportapiresponsemsg>> producer = tbcorequeuefactory.createtransportapiresponseproducer();
    //topic是配置文件queue.transport_api.requests_topic的值,默认为:tb_transport.api.requests ⑥
    tbqueueconsumer<tbprotoqueuemsg<transportapirequestmsg>> consumer = tbcorequeuefactory.createtransportapirequestconsumer();
    defaulttbqueueresponsetemplate.defaulttbqueueresponsetemplatebuilder
            <tbprotoqueuemsg<transportapirequestmsg>, tbprotoqueuemsg<transportapiresponsemsg>> builder = defaulttbqueueresponsetemplate.builder();
    builder.requesttemplate(consumer);
    builder.responsetemplate(producer);
    builder.maxpendingrequests(maxpendingrequests);
    builder.requesttimeout(requesttimeout);
    builder.pollinterval(responsepollduration);
    builder.executor(transportcallbackexecutor);
    builder.handler(transportapiservice);
    transportapitemplate = builder.build();
  • @eventlistener(applicationreadyevent.class)注解方法,调用了transportapitemplate.init(transportapiservice);transportapitemplate即上一步创建的defaulttbqueueresponsetemplate对象init()方法为:
@override
public void init(tbqueuehandler<request, response> handler) {
   //按照是使用的中间件,实现不同的初始化方法,inmemory该方法体为空
    this.responsetemplate.init();
    //见⑥,订阅主题为tb_transport.api.requests
    requesttemplate.subscribe();
    loopexecutor.submit(() -> {
        while (!stopped) {
            try {
                while (pendingrequestcount.get() >= maxpendingrequests) {
                    try {
                        thread.sleep(pollinterval);
                    } catch (interruptedexception e) {
                        log.trace("failed to wait until the server has capacity to handle new requests", e);
                    }
                }
                list<request> requests = requesttemplate.poll(pollinterval);
                ...........

总结

defaulttransportservicetbcoretransportapiservice方法的启动并不是很复杂,我们需要将主要的关注点放在两个 bean 初始化消费者和生产者的 topic 上面,thingsboard 将使用中间件将消息解耦,如果按照传统的调试方法很容易找不到消息的流向,此时我们将 topic 作为关键的切入点,方便后面整个数据流的分析。

thingskit 将本帖设为了精华贴 29 sep 14:40
需要 sign in 后方可回复, 如果你还没有账号请点击这里 sign up
网站地图