首页 > 编程语言 >RocketMQ生产者启动源码

RocketMQ生产者启动源码

时间:2024-04-28 15:25:01浏览次数:25  
标签:Start 生产者 DefaultMQProducer topic start 源码 new RocketMQ

核心代码

初始化Default生产者

DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

设置NameAddr地址

producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

producer.start();

分析new DefaultMQProducer(PRODUCER_GROUP)

public DefaultMQProducer(final String producerGroup) {
  this.producerGroup = producerGroup;

  //初始化
  defaultMQProducerImpl = new DefaultMQProducerImpl(this, null);

  //
  produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

DefaultMQProducerImpl

  • 初始化发送线程池:队列LinkedBlockingQueue<>(50000)、核心(最大)线程数据为cpu核数
  • semaphoreAsyncSendNum:生产者并发数,默认10
  • semaphoreAsyncSendSize:生产者“处理中”的消息最大数量,默认1024*1024
  • serviceDetector:获取topic信息,获取topic对应的queue的最大位移
  • MQFaultStrategy:MQ客户端故障策略,生产者出现故障后,根据brokerName重新获取DefaultMQProducerImpl对象

核心变量

  • DefaultMQProducer defaultMQProducer;
  • ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =new ConcurrentHashMap<>();
  • ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<>();
  • ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<>();
  • RPCHook rpcHook;
  • BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
  • ExecutorService defaultAsyncSenderExecutor:
  • BlockingQueue<Runnable> checkRequestQueue;
  • ExecutorService checkExecutor;
  • ServiceState serviceState = ServiceState.CREATE_JUST;
  • MQClientInstance mQClientFactory;
  • MQFaultStrategy mqFaultStrategy;

MQClientManager.getInstance().getOrCreateProduceAccumulator

单例获取MQClientManager.getInstance

获取clientId

根据clientId获取ProduceAccumulator

ProduceAccumulator作用:为每个client提供一个生产者具体类,内部封装了批量消息、发送消息线程等

AggregateKey作用:包括topic、queue、tag、waitStoreMsgOK(等待存储确认boolean变量)

MessageAccumulation:批次消息存储实例,用于存储对应topic、tag对应的生产者堆积实例,支持add添加消息

GuardForSyncSendService作用:同步发送消息服务线程,继承Thread,AggregateKey对MessageAccumulation:1对1,Map

GuardForAsyncSendService作用:异步发送消息服务,继承Thread,AggregateKey对MessageAccumulation:1对1,Map




分析producer.start()

对下面两个核心实例进行start

defaultMQProducerImpl.start()

produceAccumulator.start()

 defaultMQProducerImpl.start()

根据ServiceState状态枚举处理,实际是对CREATE_JUST枚举(服务仅创建场景未启动)的处理

配置检测

创建MQClientInstance实例(netty处理):

  • 为每个clientId分配一个MQClientInstance实例
  • 底层使用netty的channel监听各种事件,核心事件为onChannelActive,在onChannelActive会记性与broken之间的心跳检测,进行负载处理
  • 最终创建MQClientAPIImpl:把各种检测处理注册为NettyRequestProcessor

把defaultMQProducer注册到MQClientInstance中,这样就可以使用netty处理消息的具体内容

mQClientFactory.start():

// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

initTopicRoute():从defaultMQProducer读取所有topic,如果没有还未加载topic路由信息,则nameServer中加载topic相关信息(TopicPublishInfo)

mqFaultStrategy.startDetector():启动broker检测服务

mQClientFactory.sendHeartbeatToAllBrokerWithLock():没什么说的,遍历brokerAddrTable,向每个broker发送心跳

RequestFutureHolder.getInstance().startScheduledTask(this):启动request过期扫描定时任务,把过期请求从requestFutureTable中移除

 

produceAccumulator.start() 

guardThreadForSyncSend.start();  
guardThreadForAsyncSend.start();

 

标签:Start,生产者,DefaultMQProducer,topic,start,源码,new,RocketMQ
From: https://www.cnblogs.com/use-D/p/18163625

相关文章

  • Docker安装RocketMQ
    https://blog.csdn.net/qq_43600166/article/details/136187969 前提条件需要安装dockerhttps://yeasy.gitbook.io/docker_practice/install/centos NameServer1.拉取容器dockerpullrocketmqinc/rocketmq2.创建NameServer容器创建一个新的容器并指定RocketMQ的镜像......
  • DRF源码汇总
    DRF源码汇总【一】三大认证【1】认证【2】权限【3】频率【3.1】SimpleRateThrottle源码分析【二】JWT【1】simple-jwt【1.1】登录【1.2】认证......
  • ubuntu18源码安装postgresql15.2数据库
    由于官方的源只能安装到pg10这个版本,整了好一会没有成功就改为源码安装了。下载源代码源码并解压wgethttps://ftp.postgresql.org/pub/source/v15.2/postgresql-15.2.tar.gztar-xfpostgresql-15.2.tar.gzcdpostgresql-15.2/安装C++相关开发库和编译工具aptinst......
  • JDK源码分析-Vector
    概述Vector是Java集合中线程安全的动态数组,它也可以根据需要进行扩容和缩容,与ArrayList类似。但有一个重要的区别,Vector是同步的,也就是它的操作是线程安全的,在某些特定场景下是可以保证线程安全的,但同时也会带来性能损耗,因此在单线程环境通常还是推荐使用ArrayList。类图......
  • Apache RocketMQ ACL 2.0 全新升级
    作者:徒钟引言RocketMQ作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ现有的AC......
  • Nginx 源码安装
     Nginx官网:https://nginx.org参考:Nginx配置常用参数梳理https://www.jb51.net/server/285538k8k.htmnginx配置参数详解https://blog.csdn.net/u013286192/article/details/136418472Nginx配置详解https://www.runoob.com/w3cnote/nginx-setup-intro.html查看nginx开启......
  • RocketMQLog:WARN No appenders could be found for logger (io.netty.channel.nio.Ni
    springBoot集成rocketMq启动的时候报RocketMQLog:WARNNoappenderscouldbefoundforlogger(io.netty.channel.nio.NioEventLoop). RocketMQLog:WARNPleaseinitializetheloggersystemproperly. 原因是pom中的rocket的依赖版本太高了。<dependency><groupI......
  • springBoot源码(一)
    构造函数运行代码publicConfigurableApplicationContextrun(String...args){ Startupstartup=Startup.create(); if(this.registerShutdownHook){ SpringApplication.shutdownHook.enableShutdownHookAddition(); } DefaultBootstrapContextbootstrapConte......
  • Windows下RocketMQ的启动
    下载地址:下载|RocketMQ 解压后   一、修改runbroker.cmd修改 bin目录下的runbroker.cmdset"JAVA_OPT=%JAVA_OPT%-server-Xms2g-Xmx2g"set"JAVA_OPT=%JAVA_OPT%-XX:MaxDirectMemorySize=15g"set"JAVA_OPT=%JAVA_OPT%-cp%CLASSPATH%"分别改为 s......
  • RocketMQ顺序消息
    什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到......