首页 > 编程语言 >Java_消息队列_RocktMQ

Java_消息队列_RocktMQ

时间:2023-11-08 20:45:09浏览次数:39  
标签:存储 Java RocktMQ broker Kafka 队列 消息 Broker RocketMQ

RocketMQ 安装

 RocketMQ 的安装包分为两种,二进制包和源码包
   sudo apt-get install default-jdk
   sudo apt-get install maven
 
 解耦,异步,削峰填谷
    异步消息可以作为解耦消息的生产和处理的一种解决方案
 
 部署:
    包括 NameServer、Broker、Proxy 组件 NameServer需要先于Broker启动
	  nohup sh mqnamesrv &

RocketMQ基本概念和组件

生产者,消费者,broker,nameServer四个重要组成部分
 Producer Producer Group
 Message-消息
     RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询
    MessageId(msgId)	 MessageId(offsetMsgId)  
	msgId:由producer端生成,其生成规则为: producerIp + 进程pid + MessageClientIDSetter类的ClassLoader的hashCode + 当前时间 + AutomicInteger自增计数器 
    offsetMsgId:由broker端生成,其生成规则为:brokerIp + 物理分区的offset(Queue中的偏移量) 
     key:由用户指定的业务相关的唯一标识

 Topic表示一类消息的集,每条消息只能属于一个主题
 Tag 标签为消息设置的标签,用于同一主题下区分不同类型的消息
 Queue: 存储消息的物理实体。 一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 
         一个Topic的Queue也被称为一个Topic中消息的分区(Partition
	在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小都是相同的。	 
  Consumer  Consumer Group
  Broker
  NameServer是一个Broker与Topic路由的注册中心: Broker管理 路由注册  Client Manager   Store Service HA Service: Index Service

RocketMQ 工作流程

  启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

相关概念

 RocketMQ 事务消息(Transactional Message)  定时消息  回溯消息	  
   Push和Pull模式
 用消费端主动拉取的方式,即 Consumer 轮询从 Broker 拉取消息。
  JMS是定义了统一的接口,来对消息操作进行统一 Java Message Service
发送模式:
   单条同步发送,批量同步发送,异步发送	

设计思路不同

1.消息通信
	Kafka   :采用一套自行设计的TCP协议,完成producer到broker和broker到consume的通信
	Rocketmq:采用Netty进行通信将消息封装成RemotingCommand、主从之间的消息同步使用的是单独的TCP连接
	Rabbitmq:采用AMQP协议

2.生产者发送消息到队列
    Kafka、Rocketmq:如果指定了对应的分区/队列那就发送到指定地方,如果没有就有负载均衡算法均匀发送到队列中。
	Rabbitmq:指定对应路由器发送,根据消息附带标签发送到对应的队列。

3.消息在broker中存储方式
    Kafka:每个partition对应一个文件夹的文件和一个索引文件、支持大量堆积
    Rabbitmq:将消息持久化在文件中也是会有一个索引、支持少量堆积
    Rocketmq:一个broker下所有的topic下的队列的消息都存储在一个commitlog的文件中、
              每个topic下都有consumerqueue来映射该topic下的消息在commitlog中的位置(相当于一个索引文件)

4.主备方式
     Kafka:备份以partition为单位,备份在同个topic下的不同broker中且不提供服务。当Leader挂掉之后从Follower中选举一个新的Leader
     Rocketmq:以broker为单位进行备份,Master挂掉后只能等这个master上线才能开启写服务不过其它slave的读服务还存在
	 rabbitMQ支持miror的queue,主queue失效,miror queue接管
	   普通模式集群 + 镜像模式集群

5.消费者消费信息
     Kafka:offset存储在broker中并由zookeeper管理,消费者以拉取的方式从broker读取
	         消费者通过维护分区的偏移(或者说索引)来顺序的读出消息,然后消费消息
	 Rocketmq:如果是cluster模式就是offset由broker管理、如果是broadcasting模式就是本地存储offset。发送方式本质上是consumer从broker中拉取消息。
     Rabbitmq:broker发送给消费者,如果一个队列有多个消费者消费会有特定的策略如轮询


6.zookeeper/NameServer
      Kafka,使用zookeeper来对于集群、消费者、生产者进行管理。
	    在Kafka 2.8之后,引入了基于Raft协议的 KRaft模式,支持取消对Zookeeper的依赖
		Controller节点:即控制器节点,是集群中的特殊节点,负责储存和管理整个集群元数据和状态,它能够监控整个集群中的 Broker,在需要时还能够进行平衡操作
	 rocketmq中使用nameserver这个轻量级的工具完成管理,nameserver通过存储topic和broker之间的map表、存储了broker的地址表等等,
	     生产者消费者broker通过向nameserver发送消息来获取这些信息来维护自己内部的结构	
7.存储
    Kafka 的存储层是使用分区事务日志来实现的
   数据存储方式:Kafka使用磁盘存储,RabbitMQ和RocketMQ使用内存存储基于队列和交换器的 RabbitMQ

事务机制,Confirm机制

 RabbitMQ 使用消息交换器来实现发布/订阅模式 RabbitMQ 支持临时和持久两种订阅类型。
 RabbitMQ 是一个消息代理,但是 Apache Kafka 是一个分布式流式系统
    在消息路由和过滤方面,RabbitMQ 提供了更好的支持 消费者成功消费消息之后,RabbitMQ 就会把对应的消息从存储中删除
  RabbitMQ 会给我们提供诸如交付重试和死信交换器(DLX)来处理消息处理故障
	  保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响,保证消息幂等性;
 Kafka 会给每个主题配置超时时间,只要没有达到超时时间的消息都会保留下来。
    在消息留存方面,Kafka 仅仅把它当做消息日志来看待,并不关心消费者的消费状态。	

安装RocketMQ

 RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分

客户端api开发

环境
1、安装rocketmq-client-python
   pip install rocketmq-client-python

2、安装 librocketmq 
   wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
   sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
   find / -name librocketmq.so
   ln -s /*****/librocketmq.so /usr/lib

RocketMQ

 RocketMQ自己实现了namesrv 服务发现
from rocketmq.client import Producer, Message
from rocketmq.client import PushConsumer, ConsumeStatus	 

参考

 Kafka、RocketMQ、RabbitMQ多维度对比  https://zhuanlan.zhihu.com/p/149819268

标签:存储,Java,RocktMQ,broker,Kafka,队列,消息,Broker,RocketMQ
From: https://www.cnblogs.com/ytwang/p/17809415.html

相关文章

  • java基础学习:二进制,八进制,十六进制
      ......
  • 如何将我的Java程序转换为.exe文件?
    内容来自DOChttps://q.houxu6.top/?s=如何将我的Java程序转换为.exe文件?如果我有一个Java源文件(*.java)或一个类文件(*.class),我如何将其转换为一个.exe文件?我还需要一个程序的安装程序。javapackagerJavaPackager工具用于编译、打包和准备Java和JavaFX应用程序进行分发。......
  • java 获取resources下文件的路径 使用 ClassLoader类 获取路径,使用流的方式读取
    java获取resources下文件的路径使用ClassLoader类,使用流的方式读取Java获取resources下文件的路径在Java开发中,我们经常需要读取resources目录下的文件,例如配置文件、模板文件等。本文将介绍如何获取resources下文件的路径,并提供相应的代码示例。1.resources目录在Java项......
  • Java登陆第一天——Mysql安装
    MySQL是一种开源、免费的关系型数据库官网https://www.mysql.com/由于是外网国内下载很慢可以使用国内镜像阿里云镜像站:https://mirrors.aliyun.com/mysql/搜狐开源镜像站:http://mirrors.sohu.com/mysql/建议下载压缩包。下载解压文件夹文件夹放在自己想放的目录下配......
  • DataGrip连接MySql数据库失败:dataGrip java.net.ConnectException: Connection refuse
    1.问题报错:dataGripjava.net.ConnectException:Connectionrefused:connect.详细错误:[08S01]CommunicationslinkfailureThelastpacketsentsuccessfullytotheserverwas0millisecondsago.Thedriverhasnotreceivedanypacketsfromtheserver.Communica......
  • Java 中时区转换的方法有哪些?
    1、使用java.util.TimeZone类进行时区转换。可以使用TimeZone类的静态方法获取某个时区的实例,例如TimeZone.getTimeZone("Asia/Shanghai"),然后使用SimpleDateFormat进行时间格式化,将时间从一个时区转换为另一个时区。示例代码:SimpleDateFormatformatter=newSimpleDateFo......
  • Java线程池
      ......
  • setTimeout 是 DOM 提供的函数,不是JavaScript的全局函数
    JavaScript中包含以下7个全局函数,用于完成一些常用的功能(以后的章节中可能会用到):escape()、unescape()、eval()、isFinite()、isNaN()、parseFloat()、parseInt()函数描述decodeURI()解码某个编码的URI。decodeURIComponent()解码一个编码的URI组件。......
  • 每天5道Java面试题(第5天)
    1. 如何将字符串反转?先把字符串转换成StringBuilder或者stringBuffer然后再用reverse()方法即可。2. String类的常用方法都有那些?indexOf():返回指定字符的索引。charAt():返回指定索引处的字符。replace():字符串替换。trim():去除字符串两端空白。split():分割字符串,返回一个分......
  • Java中的Runnable、Callable、Future、FutureTask的区别与示例
    Java中存在Runnable、Callable、Future、FutureTask这几个与线程相关的类或者接口,在Java中也是比较重要的几个概念,我们通过下面的简单示例来了解一下它们的作用于区别。Runnable其中Runnable应该是我们最熟悉的接口,它只有一个run()函数,用于将耗时操作写在其中,该函数没有返回值。然......