RocketMQ:深入理解Broker如何接收Producer生产消息请求?

网友投稿 269 2022-12-01

RocketMQ:深入理解Broker如何接收Producer生产消息请求?

一、前言

今天原打算写一篇关于消息在Broker端如何存储的文档,刚梳理好整体文章脉络,想到一个问题:Broker端是如何接收生产消息请求的?印象中是通过Netty接收的,由于对具体的实现比较好奇,也就有了今天的故事。

二、源码分析

以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:​​类型请求命令的Processor为​​SendMessageProcessor​​。

没啥特别的,接着往下跟,进入到同名方法​​asyncProcessRequest()​​中。

由于我们请求的Code不是​​CONSUMER_SEND_MSG_BACK​​​,所以会进入到Switch代码块的default中;接着我们进入到:​​asyncSendMessage()​​方法中,看一下生产单个消息是怎么做的?

这个方法是在准备要发送消息的数据,主要工作如下:

如果没有指定queue,就随机选一个queue;一般情况下也不会指定queue,除非顺序生产消息时。构建​​MessageExtBrokerInner​​​对象,给消息添加一些额外信息,比如:​​queue​​​、​​topic​​​、​​事务消息标识​​、发送地址、发送时间等。根据是否是​​事务消息​​,决定发送事务消息还是普通消息。

后面MessageStore登场,进行消息的持久化操作,这一部分内容,我们放在下一篇文章中探讨。

三、原理总结

Broker端接收Producer生产消息的流程如下:

在Broker启动时,即BrokerController#start()方法中,初始化接收消息的Netty服务NettyRemotingServer。在​​NettyRemotingServer​​​中​​pipeline​​​下的​​NettyServerHandler​​​这个​​ChannelHandler​​负责处理消息生产请求;接着调用到​​NettyRemotingAbstract​​​的​​processMessageReceived()​​方法中根据请求的code调用对应的事件处理器处理请求。对于消息生产而言,最终会调用到​​SendMessageProcessor#asyncProcessRequest()​​方法处理消息生产请求。然后调用​​SendMessageProcessor#asyncSendMessage()​​​方法封装消息、填充queue、topic信息,假如我们不指定queue进行消息生产,broker会​​随机选择一个queue​​​ 就是在这里做的;最后调用​​MessageStore#asyncPutMessage()​​​方法持久化消息,最终将消息写到​​CommitLog​​中。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JPA 加锁机制及@Version版本控制方式
下一篇:为什么要用LRU算法?Java怎么实现?
相关文章

 发表评论

暂时没有评论,来抢沙发吧~