首页 > 其他分享 >消息队列MQ

消息队列MQ

时间:2024-08-27 20:51:31浏览次数:11  
标签:producer 队列 cmd Topic MQ 消息 RocketMQ

MQ概述

1.MQ简介

MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、储、消费全过程API的软件系统。消息即数据。一般消息的体量不会很大。

2.MQ用途

限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度
太高。而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术通过MQ完成比类数据收集是最好的选择。


3.常见的MQ产品

ActiveMQ

ActiveMQ是使用Java语言开发一款MQ产品。早期很多公司与项目中都在使用。但现在的社区活跃度已经很低。现在的项目中已经很少使用了。


RabbitMQ

RabbitMQ是使用ErLang语言开发的一款MQ产品。其吞吐量较Kafka与RocketMQ要低,且由于其不是Java语言开发,所以公司内部对其实现定制化开发难度较大。


Kafka

Kafka是使用Scala/Java语言开发的一款MQ产品。其最大的特点就是高吞吐率,常用于大数据领域的实时计算、日志采集等场景。其没有遵循任何常见的MQ协议,而是使用自研协议。对于Spring CloudNetcix,其仅支持RabbitMq与Kafka.


 


RocketMQ

RocketMQ是使用Java语言开发的一款MQ产品。经过数年阿里双 11 的考验,性能与稳定性非常高。其没有遵循任何常见的MQ协议,而是使用自研协议。对于Spring Cloud Alibaba,其支持RabbitMQ、Kafka,但提倡使用RocketMQ.



RocketMQ的安装与启动

下载与安装

1.下载RocketMQ

https://rocketmq.apache.org/download/

2.windows下解压安装,配置环境变量ROCKETMQ_HOME

注意路径不要有中文或者空格等特殊字符


RocketMQ部署

1.启动nameserver

鼠标双击执行bin目录下的mqnamesrv.cmd 文件 或者 cmd 打开控制台,切换目录到rocket的bin目录下,执行命令:

start mqnamesrv.cmd

注:如果出现“找不到主类”的问题,需要将ROCKMQ_HOME和JAVA _HOME两个变量路径改掉,换成没有空格的路径


2.启动broker

执行bin目录下的 mqbroker.cmd 文件 或者 cmd 打开控制台,切换到bin目录下,执行命令:

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

默认端口是9876,autocreateTopicEnable=true表示允许自动创建topic

注:rocketMq运行环境中的内存不足会导致启动失败,可以修改配置文件
runserver.sh/runserver.cmd,runbroker.sh/runbroker.cmd中的配置参数


3.RocketMQ插件安装

访问 https://github.com/apache/rocketmq-externals/tags

下载rocketmq-console-1.0.0,并解压

在Eclipse中导入项目:

修改pom.xml文件中的java环境为当前环境1.8并执行compile指令

构建成功

在application.properties中设置地址namesrvAddr为localhost:9876

右键项目更新Maven

通过启动类App.java启动

在浏览器中打开


这里可以通过下面的指令将项目打包为jar包,失败可以尝试使用管理员身份运行

clean package -Dmaven.test.skip=true

打包成功后刷新项目可以在target文件夹下看到刚生成的jar包

此时复制jar包到到工作文件夹下通过cmd可以直接启动项目

java -jar rocketmq-console-ng-1.0.0.jar

同样可以在浏览器中打开


在bin目录下通过cmd输入指令mqadmin

mqadmin是一个命令行工具,用于管理Apache RocketMQ消息队列系统。它提供了一组命令,用于创建、删除、查询和管理消息队列和主题,以及监控和诊断RocketMQ集群的状态和性能。


基本概念

消息(Message)

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于个主题。

主题(Topic)

Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。topic:message 1:n message:topic 1:1

一个生产者可以同时发送多种Topic的消息;而一个消费者只对某种特定的Topic感兴趣,只可以订阅和消费一种Topic的消息。producer:topic 1:n consumer:topic 1:1

标签(Tag)

为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

Topic是消息的一级分类,Tag是消息的二级分类。

队列(Queue)

存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。

一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。一个Queue中的消息不允
许同一个消费者组中的多个消费者同时消费

消息标识(MessageId/Key)

RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。

不过需要注意的是,MessageId有两个:在生产者send()消息时会自动生成一个Messaged(msgId),当消息到达Broker后,Broker也会自动生成一个Messageld(offsetMsgId)。

msgId、offsetMsgId与key都称为消息标识。

  • msgId:由producer端生成,其生成规则为:producerIp+进程pid+MessageClientIDSetter类的ClassLoader的hashCode +当前时间+AutomicInteger自增计数器
  • offsetMsgId:由broker端生成,其生成规则为:brokerIp+物理分区的offset(Queue中的偏移量)
  • key:由用户指定的业务相关的唯一标识

发送消息

同步发送

1.新建Maven项目并在pom.xml中导入依赖

	<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
	<dependency>
	    <groupId>org.apache.rocketmq</groupId>
	    <artifactId>rocketmq-client</artifactId>
	    <version>5.3.0</version>
	</dependency>

2.创建测试类同步发送消息

public class EasyA {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("easygroup");

        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        //发送消息
        Message message = new Message("easytopic","rocketmq message".getBytes());
        SendResult result = producer.send(message);
        System.out.println(result);
        
        producer.shutdown();
    }
}

发送成功,在RocketMQ中,默认情况下一个Topic会创建4个队列。


异步发送

public class EasyB {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer producer=new DefaultMQProducer("easygroup");

        producer.setNamesrvAddr("127.0.0.1:9876");

        Message message=new Message("easytopic","Helloworld".getBytes());

        producer.start();

        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
                System.out.println("OK");
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                System.out.println("Error");
            }
        });

        System.out.println("发送了一个消息");
        Thread.sleep(3000);
        producer.shutdown();
    }
}

标签:producer,队列,cmd,Topic,MQ,消息,RocketMQ
From: https://blog.csdn.net/qq_63161848/article/details/141550931

相关文章

  • rabbitmq实现用户关系绑定信息推送
    1.MQ三大要点交换机队列Key2.交换机  交换机是消息队列系统中的一个核心组件,主要用于将消息路由到一个或多个队列中。交换机通过不同的路由规则来决定消息的去向。根据不同的类型,交换机可以有不同的路由策略:直连交换机(DirectExchange):根据消息的路由键(RoutingKey......
  • prometheus学习笔记之PromQL
    一、PromQL语句简介官方文档:https://prometheus.io/docs/prometheus/latest/querying/basics/Prometheus提供⼀个函数式的表达式语⾔PromQL(PrometheusQueryLanguage),可以使⽤户实时地查找和聚合时间序列数据,表达式计算结果可以在图表中展示,也可以在Prometheus表达式浏览器中......
  • 算法与数据结构——队列
    队列队列(queue)是一种遵循先入先出规则的线性数据结构。队列模拟了排队现象,即新来的人不断加入队列尾部,而队列头部的人逐个离开。如图所示,我们将队列头部称为“队首”,尾部称为“队尾”,将把元素加入队列尾部的操作称为“入队”,删除队首元素的操作称为“出队”。队列常用操作......
  • (五)焊缝检测之--测量间距并用ROS消息包发送
    前言在上一小节记录了矩形参照物的检测。这一小节将介绍基于识别到的圆形焊缝和矩形参照物的坐标,读出两点之间在x和y方向上的间距,并通过话题通信的方式发送初期一、测量圆心和焊缝间距在之前小节中已经分别通过霍夫圆检测拿到了圆心坐标和矩形参照物角点坐标,分别将其记......
  • bfs+双端队列
    算法介绍\(bfs+\)双端队列是一种单源最短路算法,适用于边权为\(0\)或\(1\)的图中。时间复杂度为\(O(n)\)。算法原理分析算法的整体框架与普通\(bfs\)求最短路类似,只是根据边权做了分类讨论,如果边权为\(1\),则将邻居节点压到队列尾部,反之,压到队列首部。当每个节点第一次出......
  • 代码随想录训练营day29|134.加油站,135. 分发糖果,860.柠檬水找零,406.根据身高重建队列
    加油站想法:暴力遍历?好吧第一遍写的时候读错题意了,以为是比较gas[i]与cost[i+1]的值,shit。intsum1=0,sum2=0;for(intg:gas)sum1+=g;for(intc:cost)sum2+=c;if(sum1<sum2)return-1;//如果gas没cost多intyouliang=0;intn=gas.size()......
  • 搭建多协议的串口服务器流程:RS-232、RS-485和TCP/IP、MQTT网络协议(代码示例)
    一、项目概述在物联网(IoT)和自动化控制的快速发展中,串口通信作为一种经典的通信方式,依然发挥着重要作用。本项目旨在构建一个支持多种协议的串口服务器,能够通过串口接收和发送数据,并通过网络协议(如TCP/IP、MQTT等)与其他设备和系统进行交互。项目的目标和用途本项目的目标......
  • 谷粒商城实战笔记-259-商城业务-消息队列-可靠投递-发送端确认
    文章目录一,确认机制简介二,ConfirmCallback三,returnCallback事务消息的问题一,确认机制简介RabbitMQ的消息确认机制主要包括以下几种:发布者确认(PublisherConfirm):在发布者和代理之间建立一个确认协议。当发布者发送一条消息到代理时,代理会返回一个确认信息给发布者......
  • 谷粒商城实战笔记-260-商城业务-消息队列-可靠投递-消费端确认
    文章目录一,Ack消息确认机制简介1,简介2,两个常用的Api二,消费者端消息确认实战三,RabbitMQ可靠性保障总结1,生产者2,消费者一,Ack消息确认机制简介消费者端的确认机制(ACK/NACK)是RabbitMQ中一种重要的特性,它允许消费者告知Broker它们是否成功处理了接收到的......
  • RabbitMQ 入门教程
    RabbitMQ入门教程1.引言RabbitMQ是一个开源的消息代理和队列服务器,实现高级消息队列协议(AMQP)。它能帮助开发者实现应用程序间的解耦、异步处理、流量削峰等需求。2.安装与配置2.1安装RabbitMQ2.1.1Ubuntu```bashsudoapt-getupdatesudoapt-getinstallrabb......