RocketMQ之DefaultPushConsumer

发布于:2024-05-23 ⋅ 阅读:(140) ⋅ 点赞:(0)

DefaultMQPushConsumer消息链路

  1. DefaultMQpushConsumer#start方法调用DefaultMQpushConsumerImpl#start方法,接着内部调用MQClientInstance#start方法,接着调用RebalanceService#start方法。

  2. RebalanceService#start方法开启一个线程,执行本类中的runnable#run方法。run方法中调用MQClientInstance#doRebalance方法,这个方法阻塞20秒,循环调用。

  3. MQClientInstance#doRebalance方法中接着调用DefaultMQpushConsumerImpl#doRebalance方法。

  4. DefaultMQpushConsumerImpl#doRebalance中接着调用RebalanceImpl#doRebalance方法。

  5. RebalanceImpl#doRebalance方法接着调用本类方法rebalanceByTopic,根据负载均衡分配策略获取当前消费者组对应的messageQueues,接着调用本类方法updateProcessQueueTableInRebalance。
    在这里插入图片描述

  6. RebalanceImpl#updateProcessQueueTableInRebalance方法中。①内部属性processQueueTable维护messageQueue与processQueue的关系。②调用broker获取messagQueue的消息处理偏移量。每个队列封装参数为PullRequest(里面包含消费者组名称,下一个消息的偏移量,messageQueue,processQueue),调用本地方法dispatchPullRequest传入参数PullRequests。

  7. RebalanceImpl#dispatchPullRequest方法。这是一个抽象方法,实现在RebanlancePushImpl中。在RebanlancePushImpl#dispatchPullRequest中,for循环遍历PullRequests,调用DefaultMQpushConsumerImpl#executePullRequestImmediately(pullRequest)方法执行。
    在这里插入图片描述

  8. DefaultMQpushConsumerImpl#executePullRequestImmediately方法中调用MQClientInstance的属性对象pullMessageService的方法executePullRequestImmediately,将pullReuqest存到到pullMessageServcie的内部属性队列pullMessageQueue中,这是一个LinkedBlockingQueue队列。
    在这里插入图片描述在这里插入图片描述

  9. PullMessageServcie#start方法,开启线程执行本类中的run方法。run方法中①通过take获取pullMessageQueue中的pullRequest,②调用本地的pullMessage方法进行处理。

在这里插入图片描述
在这里插入图片描述
10. PullMessageServcie#pullMessage方法,通过MQClientInstance获取到consumer,我这里用的是DefaultMQpushConsumerImpl,调用DefaultMQpushConsumerImpl#pullMessage(pullRequest)方法。
11. DefaultMQpushConsumerImpl#pullMessage方法。①获取pullRequest中的messageQueue的主题关联的订阅者。②定义回调方法PullCallback。回到方法中,如果从broker拉取消息成功,会将消息放入到processQueue的msgTreeMap中③调用pullAPIWrapper#pullKernelImpl方法拉取消息。
在这里插入图片描述
12. PullAPIWrapper#pullKernelImpl方法,调用MQClientInstance的属性类MQClientAPIImpl的pullMessage方法。
在这里插入图片描述
13. MQClientAPIImpl#pullMessage方法。调用remotingCLient发送netty消息到broker。拿到消息后,调用步骤11中定义的PullCallback方法。
14. DefaultMQpushConsumerImpl#PullCallback方法,将消息封装到ConsumeRequest,提交到MessageListenerConcurrently的任务线程池中consumerExecutor。
在这里插入图片描述
15. ConsumeRequest本身实现runnable接口,实现run方法。ConsumeRequest是MessageListenerConcurrently的内部类。①调用消费者的监听器,将消息传入。②根据监听器类处理响应status,处理响应结果。
在这里插入图片描述 在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到