rocketmq client端源码分析(1)-consumer实现

发布于:2022-11-28 ⋅ 阅读:(670) ⋅ 点赞:(0)

rocketmq客户端实现如果集成了spring-boot则写一个监听就可以实现业务逻辑。这个流程是这样的呢。

首先我们实现了监听接口RocketMQListener或者RocketMQReplyListener,将业务代码放入其onMessage即可。

在springboot ,这个接口在获得到rocketmq client的通信结果实体之后会执行以下代码

private void handleMessage(
        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
        if (rocketMQListener != null) {
            rocketMQListener.onMessage(doConvertMessage(messageExt));
        } else if (rocketMQReplyListener != null) {
            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
            Message<?> message = MessageBuilder.withPayload(replyContent).build();

            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
            DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
            producer.setSendMsgTimeout(replyTimeout);
            producer.send(replyMessage, new SendCallback() {
                @Override public void onSuccess(SendResult sendResult) {
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                    } else {
                        log.debug("Consumer replies message success.");
                    }
                }

                @Override public void onException(Throwable e) {
                    log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
                }
            });
        }
    }

springboot 这一层就是通过底层回调其container的handlerMessage方法调用到我们的业务监听器。

那么底层rocketmq-client是如何将messageExt交给container,并由container调用我们的listener的呢?

我们知道rocektmq的消息消费分两种,一种是没有顺序的并发执行模式,需要执行接口MessageListenerConcurrently

;一种是有顺序的执行模式,需要执行接口MessageListenerOrderly

这两个接口的实现方法consumeMessage 是底层rocketmq-client提交给上层的入口。我们能够在集成的DefaultRocketMQListenerContainer里面看到这两个接口的实现,最终都是调用了handleMessage调用到了我们的业务监听器。

那么MessageListenerConcurrently和MessageListenerOrderly的consumeMessage方法又是如何从底层调到的呢?

还要从一个重要的实体说起。

PullRequest

这个类里面包括以下内容

    private String consumerGroup; //消息组信息
    private MessageQueue messageQueue; //包括消息topic、brokername、消息队列queueId
    private ProcessQueue processQueue;//接受broker消息处理类
    private long nextOffset;//消费偏移量
    private boolean previouslyLocked = false;//之前消息锁

这里面包括了所有提交到broker进行消息拉取的全部信息。那么它是哪里来的呢?

一个是RebalanceService ,一个是消息结果回调之后的PullCallback实例。

线程RebalanceService

该线程是consumer.start()的一个步骤。

consumer.start()启动的过程如下:

1、offsetStore从本地或者远程获取每一个消息队列的Offset

2、将本consumer放置到MQClientInstance的consumerTable当中

3、将namesrvAddr从配置文件当中放放置到consumerTable的namesrvAddr字段中。

4、启动netty client,跟name server进行三次握手 tcp连接。

5、启动定时任务

        5.1、启动定时获取name server服务器地址服务(2分钟一次)

        5.2、启动定时更新topic路由信息定时任务(默认30秒一次,可配置)

        5.3、启动定时清除下线服务器地址、发送心跳给borker定时任务(默认30秒一次,可配置)

        5.4、持久化队列消费偏移量信息(默认5秒一次,可配置)

        5.5、调整线程池任务(1分钟一次,不可配置)

        5.6、启动后台线程PullMessageService

        5.7、启动后台线程RebalanceService

        5.8、启动group为CLIENT_INNER_PRODUCER的producer实例,并注册到MAClientInstance里面的producerTable当中。将topicKey=TBW102放置到该Producer的topicPublishInfoTable里面。

我们重点看下这个RebalanceService任务

该任务每隔20秒执行一次,具体流程如下:

1、获取consumer里面的的consumer和MQConsumerInner的关系。调用DefaultMQPushConsumerImpl的doRebalance方法。

2、DefaultMQPushConsumerImpl的doRebalance调用RebalanceImpl里面的doRebalance方法。该方法里面维护了topic和SubscriptionData的关系,

3、获取该topic下的MessageQueue信息

4、根据topic和group获取broker名字,第一步从MqClientInstance的topicRouteTable

里面获取,如果获取不到从nameserver获取,

        4.1、如果地址有变更则更新brokerAddrTable

        4.2、维护DefaultMQProducerImpl的topicPublishInfoTable

        4.3、维护到rebalanceImpl的topicSubscribeInfoTable

        4.4、维护到MqClientInstance的topicRouteTable

5、获取所有客户端id (cidAll)

6、根据AllocateMessageQueueStrategy策略给所有的客户端id分配消费队列(包括brokerName和topic queueId).

7、consumeFromWhere默认是CONSUME_FROM_LAST_OFFSET,则客户端请求broker获取boker对应topic和queueId一致的的brokerOffset。

        7.1、将brokerOffset和MessageQueue维护到RebalanceImpl当中的processQueueTable去。

        7.2、如果之前processQueueTable维护过则跳过7.3

        7.3、包装一个PullRequest,将其放置到PullMessageService的pullRequestQueue当中去。PullMessageService

是启动consumer的步骤5.6,是一个后台线程。一旦放置到这里面该线程就会拉取这个PullRequest,并找到PullRequest中groupname对应的DefaultMQPushConsumerImpl实现类,并调用其pullMessage方法。

pullMessage方法具体如下:

1、检查PullRequest的ProcessQueue是否已经被抛弃,如果被抛弃直接返回。不合法的offset值、更新topic的messageQueue都会导致某个ProcessQueue被废弃。

2、更新processQueue的lastPullTimestamp为当前时间。

3、检查consumer service是否被关闭,如果有异常被关闭则不会再去拉取broker

4、检查consumer是否被暂停,当broker发送RESET_CONSUMER_CLIENT_OFFSET命令的时候,客户端会暂停响应consumer.错误信息:consumer was paused, execute pull request later

5、计算当前缓存message个数是否超过了限流字段pullThresholdForQueue的值,超过了会间隔50毫秒再次执行。如果限流次数超过1000,会报错the cached message count exceeds the threshold

6、计算当前缓存message的数据大小是否超过pullThresholdSizeForQueue(默认100M),超过就会间隔50秒再次提交,如果限流次数超过1000,会报错the cached message count exceeds the threshold

7、针对非顺序性消息,还有一个并发请求度,也就是msgTreeMap的最大offSet和最小offset之间的差值,如果该值超过consumeConcurrentlyMaxSpan(默认100),超过就会间隔50秒再次提交,如果限流次数超过1000,会报错the queue's messages, span too long, so do flow control

8、获取processQueue锁,这块考虑到了多线程并发情况。获取不到锁的线程会在pullTimeDelayMillsWhenException(默认1秒)时间后重新发起。

9、获取远程broker当前队列的获取确认的offSet值和本地需要拉去的nextOffset的值,如果broker的offSet小于nextOffset,说明broker过于忙碌。设置pullRequest的lockedFirst为true,并将pullRequest的nextOffset设置成远程的offset.

10、获取topic对应的RebalanceImpl的subscriptionInner对象。

包装PullCallback对象,如果从broker顺利获取到了结果,其做两件事儿,1、计算rt,2、重新包装offset为下一个值的pullRequest,将其放置到PullMessageService的pullRequestQueue队列当中,继续下一次请求。

生成PullSysFlag,该flag里面是按照位标识字段的

11、提交到PullAPIWrapper的pullKernelImpl的方法当中提交远程。

提交过程下文分解。

今天的文章主要想跟大家呈现以下这个图,customer这一侧是如何消费远程broker消息的呢?

首先是自身有分配队列算法,能够根据算法计算出当前结点被分配了哪几个队列,并根据队列包装pullRequest,将其放入消息拉去任务PullMessageService的队列pullRequestQueue当中去。PullMessageService线程会拉去pullRequestQueue的pullRequest请求,找到其对应的MQconsumerInner实例去拉去远程broker信息,如果正确拉取到,会回调会pullCallback继续将新的pullRequest放置到pullRequestQueue队列当中去。这就形成了闭环。

 

本文含有隐藏内容,请 开通VIP 后查看