c语言sscanf函数的用法是什么
269
2022-12-01
RocketMQ:深入理解Broker如何接收Producer生产消息请求?
一、前言
今天原打算写一篇关于消息在Broker端如何存储的文档,刚梳理好整体文章脉络,想到一个问题:Broker端是如何接收生产消息请求的?印象中是通过Netty接收的,由于对具体的实现比较好奇,也就有了今天的故事。
二、源码分析
以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:类型请求命令的Processor为SendMessageProcessor。
没啥特别的,接着往下跟,进入到同名方法asyncProcessRequest()中。
由于我们请求的Code不是CONSUMER_SEND_MSG_BACK,所以会进入到Switch代码块的default中;接着我们进入到:asyncSendMessage()方法中,看一下生产单个消息是怎么做的?
这个方法是在准备要发送消息的数据,主要工作如下:
如果没有指定queue,就随机选一个queue;一般情况下也不会指定queue,除非顺序生产消息时。构建MessageExtBrokerInner对象,给消息添加一些额外信息,比如:queue、topic、事务消息标识、发送地址、发送时间等。根据是否是事务消息,决定发送事务消息还是普通消息。
后面MessageStore登场,进行消息的持久化操作,这一部分内容,我们放在下一篇文章中探讨。
三、原理总结
Broker端接收Producer生产消息的流程如下:
在Broker启动时,即BrokerController#start()方法中,初始化接收消息的Netty服务NettyRemotingServer。在NettyRemotingServer中pipeline下的NettyServerHandler这个ChannelHandler负责处理消息生产请求;接着调用到NettyRemotingAbstract的processMessageReceived()方法中根据请求的code调用对应的事件处理器处理请求。对于消息生产而言,最终会调用到SendMessageProcessor#asyncProcessRequest()方法处理消息生产请求。然后调用SendMessageProcessor#asyncSendMessage()方法封装消息、填充queue、topic信息,假如我们不指定queue进行消息生产,broker会随机选择一个queue 就是在这里做的;最后调用MessageStore#asyncPutMessage()方法持久化消息,最终将消息写到CommitLog中。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~