首页 > 其他分享 >Rabbitmq的使用

Rabbitmq的使用

时间:2024-07-04 21:20:28浏览次数:15  
标签:Rabbitmq NAME 队列 param QUEUE 消息 使用 public

rabbitmq的使用

1. 使用场景及它的特点介绍

image

2. mq的5种常用消息模型

2.1 队列模型—-1 对 1

image

image

2.2 队列模型 — 1(生产者)对多(消费者)

特点:
	1.当有多个消费者时,无论消费者处理的性能是否相同,生产者的消费会平均分配给每一个消费者
	2.每个消费者处理的消息是否存在重复? 不会重复
	解释:为什么开启多个消费者时,会出现有的消费者虽然处理的慢,但是也会收到相同的消息的个数?
		rabbitmq有消息默认的分配机制:平均分配(有多少个消费者,都将平均分配要处理的消息数)
优化: 能者多劳
	在消费处理消息时,可以设置由队列每次分配给消费者的消息数量,不要一次性全分完

2.3 队列模式的代码实现

2.3.1 生产的核心代码

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
	private static final String QUEUE_NAME = "queue_workqueue";
	public static void main(String[] args) throws Exception {
		//1.创建连接
		Connection connection = ConnectionUtil.getConnection();
		//2.生产者与服务端之间建立通道
		Channel channel = connection.createChannel();
		for (int i = 0; i < 20; i++) {
			/**
			 * 发送消息到队列
			 * @param exchange 交换机名称
			 * @param routingKey 发送到哪个队列(这个参数很容易搞错:没有交换机时,这个参数必须填队列名称;有交换机的时候,就填路由)
			 * @param props 消息的其他属性
			 * @param body 消息体
			 */
 //在实际开发中,我们也会将发送的内容,以字符串进行传输。但是涉及到对象类型,会将其先转为json字符串。
			String message = "queue_workqueue: 这是一个消息!" + i;
			System.out.println(message);
			//3. 调用API进行消息的发送
			channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
		}
		//5.关闭连接
		connection.close();
	}
}

2.3.2 消费者的代码实现

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer01 {
	//队列的名称,必须要与接收的消息生产者,设置的队列名相同
	private static final String QUEUE_NAME = "queue_workqueue";
	public static void main(String[] args) throws Exception {
		//1.创建连接
		Connection connection = ConnectionUtil.getConnection();
		//2.生产者与服务端之间建立通道
		Channel channel = connection.createChannel();
		//3.声明队列:因为生产者那边已经声明过队列了,所以这边就不需要声明队列
		/**
		 * 3.声明队列
		 * @param queue 队列名称
		 * @param durable 是否持久化
		 * @param exclusive 是否为专用队列
		 * @param autoDelete 是否自动删除
		 * @param arguments 其他参数
		 */
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		//设置消费者每次预提取1个消息【这是一个提高消息处理效率的参数。表示每次接收几个消息】
		channel.basicQos(1);
		//4. 采用匿名内部类 写一个DefaultConsumer的子类,子类中重写handleDelivery方法
		DefaultConsumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				try {
					//接收消息
					String message = new String(body, "utf-8");
					Thread.sleep(500);
					System.out.println("消费者收到消息:" + message);
					long deliveryTag = envelope.getDeliveryTag();
					/**
					【如果采用默认的 自动确认ACK机制 ,则可省略】
					 * 正常情况下的手动回执
					 * @param deliveryTag 处理消息的标识
					 * @param multiple 是否自动批量处理(自动处理队列中当前消息,及之前的所有消息) false表示只处理当前消息
					 */
		  //注意:当ACK采用手动确认机制时,确认消息的成功发送的代码,一定要放在当前方法体的最后一行
					channel.basicAck(deliveryTag, false);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		/**
		 * 5.监听队列
		 *  一旦被监听的队列中有新的消息,就自动调用consumer对象的handleDelivery方法来接收消息
		 * @param queue 队列名称
		 * @param autoAck 是否自动回执 true表示自动回执,false表示手动回执
		 * @param callback 接收消息的回调方法Consumer
		 */
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

2.4 订阅模型的代码实现

2.4.1 订阅模型分3种

1. fanout类型 : 1.不需要设置routekey,生产者的消息,会统一分别发给每一个消费者
2. direct : 1. 设置routekey,且生产者在发送消息时,也要指定routekey,且消息在过滤时,需要完全匹配生产指定的routekey
3. topic  : 1. 在设置toutekey时,可以引用【通配符】 ;2.通配符分2种:*:单个匹配;#:多个匹配
  • fanout模型的效果图
    image

  • direct效果图
    image

  • topic效果图
    image

2.4.2 生产者的代码实现

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
	private static final String EXCHANGE_NAME = "direct_exchange";
	public static void main(String[] args) throws Exception {
		//1.创建连接
		Connection connection = ConnectionUtil.getConnection();
		//2.生产者与服务端之间建立通道
		Channel channel = connection.createChannel();
		/**
		 * 3.声明交换机
		 * @param exchange 交换机名称
		 * @param type 交换机类型
		 * @param durable 是否持久化
		 */
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
		/**
		 * 4.发送消息到队列
		 * @param exchange 交换机名称
		 * @param routingKey 发送到哪个队列(这个参数很容易搞错:没有交换机时,这个参数必须填队列名称;有交换机的时候,就填路由)
		 * @param props 消息的其他属性
		 * @param body 消息体
		 */
		String message = "这是一个消息!" + System.currentTimeMillis();
		System.out.println(message);
		//要指定 路由key  : routekey。设置后,对应的消费者,只要在监听指定的路由key的消息,才会收取到 
		channel.basicPublish(EXCHANGE_NAME,"email",null,message.getBytes("utf-8"));
		//5.关闭连接
		connection.close();
	}
}

2.4.3 消费者的代码实现

import cn.itsource.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerEMAIL {
	private static final String QUEUE_NAME_EMAIL = "queue_direct_email";
	private static final String EXCHANGE_NAME = "direct_exchange";
	public static void main(String[] args) throws Exception {
		//1.创建连接
		Connection connection = ConnectionUtil.getConnection();
		//2.生产者与服务端之间建立通道
		Channel channel = connection.createChannel();
		/**
		 * 3.声明队列
		 * @param queue 队列名称
		 * @param durable 是否持久化
		 * @param exclusive 是否为专用队列
		 * @param autoDelete 是否自动删除
		 * @param arguments 其他参数
		 */
		channel.queueDeclare(QUEUE_NAME_EMAIL,true,false,false,null);
		/**
			在绑定到 指定的交换机时,要同时指定接收什么类型的 routekey消息
		 * 4.将队列绑定到交换机
		 * @param queue 队列名称
		 * @param exchange 交换机名称
		 * @param routingKey 路由设置
		 * @param arguments 其他参数
		 */
		channel.queueBind(QUEUE_NAME_EMAIL,EXCHANGE_NAME,"email",null);
		//设置消费者每次预提取1个消息
		channel.basicQos(1);
		//采用匿名内部类 写一个DefaultConsumer的子类,子类中重写handleDelivery方法
		DefaultConsumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				try {
					//接收消息
					String message = new String(body,"utf-8");
					Thread.sleep(2000);
					System.out.println("消费者收到消息:"+message);
					/**
					 * 正常情况下的手动回执
					 * @param deliveryTag 处理消息的标识
					 * @param multiple 是否自动批量处理(自动处理队列中当前消息,及之前的所有消息) false表示只处理当前消息
					 */
					channel.basicAck(envelope.getDeliveryTag(),false);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		/**
		 * 4.监听队列
		 *  一旦被监听的队列中有新的消息,就自动调用consumer对象的handleDelivery方法来接收消息
		 * @param queue 队列名称
		 * @param autoAck 是否自动回执 true表示自动回执,false表示手动回执
		 * @param callback 接收消息的回调方法Consumer
		 */
		channel.basicConsume(QUEUE_NAME_EMAIL, false, consumer);
	}
}

3. springboot整合mq

  • springboot整合mq时,在企业开发中,都会将生产者和消费者分开集成到 2个工程中

3.1 整合生产者

3.1.1 导入pom依赖

<!--spirngboot集成rabbitmq-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.1.2 配置yml

spring:
  rabbitmq:
	  host: 127.0.0.1
	  port: 5672
	  username: guest
	  password: guest
	  virtualHost: /
	  listener:
		simple:
		  acknowledge-mode: manual #手动签收
		  prefetch: 1     #预提取1条消息
	  publisher-confirms: true #消息发送到交换机失败回调
	  publisher-returns: true  #消息发送到队列失败回调
	  template:
		mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

3.1.3 配置启动类的注解

  • 不需要在启动类添加开启的注解,但是需要添加几个@Bean的配置

  • 配置bean

      public static final String EXCHANGE_NAME = "springboot-rabbitmq-exchange";
      public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
      public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
      /**
       * 声明交换机
       * @return
       */
      @Bean(EXCHANGE_NAME)
      public Exchange EXCHANGE_NAME(){
      	return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
      }
      /**
       * 声明队列:sms
       * @return
       */
      @Bean(QUEUE_NAME_SMS)
      public Queue QUEUE_NAME_SMS(){
      	return QueueBuilder.durable(QUEUE_NAME_SMS).build();
      }
      /**
       * 声明队列:email
       * @return
       */
      @Bean(QUEUE_NAME_EMAIL)
      public Queue QUEUE_NAME_EMAIL(){
      	return QueueBuilder.durable(QUEUE_NAME_EMAIL).build();
      }
      /**
       * sms队列绑定到交换机
       *  需要参数有两个办法:
       *      1)直接在方法内部调用其他方法获取对象
       *      2)直接方法参数中写变量,Spring会自动从Spring容器取出对象进行依赖注入
       * @param queue
       * @param exchange
       * @return
       */
      @Bean
      public Binding BINDING_QUEUE_NAME_SMS(@Qualifier(QUEUE_NAME_SMS)Queue queue, Exchange exchange){
      	return BindingBuilder.bind(queue).to(exchange).with("user.#.sms").noargs();
      }
      /**
       * email队列绑定到交换机
       * @param queue
       * @param exchange
       * @return
       */
      @Bean
      public Binding BINDING_QUEUE_NAME_EMAIL(@Qualifier(QUEUE_NAME_EMAIL)Queue queue, Exchange exchange){
      	return BindingBuilder.bind(queue).to(exchange).with("user.#.email").noargs();
      }
    

3.1.4 测试

  • 先定义一个controller,调用RabbitmqTemplate方法。

在浏览器中,调用一次下面的消息发送的方法,就到 rabbitmq服务器中,检查是否生成了对应的exchange和queue的内容

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@GetMapping("/sendMsg")
	public void sendMsg(String msg) {
		//发送一个消息给mq服务器
		rabbitTemplate.convertAndSend(Contants.EXCHANGE_NAME, "user.email", msg);
	}
}
  • 检查rabbitmq服务器,是否会生成对应的exchange和queue的数据

image

image

3.2 整合消费者

3.2.1 导入pom依赖

<!--spirngboot集成rabbitmq-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2.2 配置yml

spring:
  rabbitmq:
	  host: 127.0.0.1
	  port: 5672
	  username: guest
	  password: guest
	  virtualHost: /
	  listener:
		simple:
		  acknowledge-mode: manual #手动签收
		  prefetch: 1     #预提取1条消息
	  publisher-confirms: true #消息发送到交换机失败回调
	  publisher-returns: true  #消息发送到队列失败回调
	  template:
		mandatory: true # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃

3.2.3 配置启动注解或bean

3.2.4 测试

  • 消费者的核心代码

      import com.rabbitmq.client.Channel;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      import java.io.IOException;
      @Component
      public class ConsumerListener {
      	public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
      	public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
      	/**
      	 * 监听器:监听一个或者多个队列
      	 *  被监听的队列中一旦有了新的消息,就自动执行此方法来处理消息
      	 * @param msg
      	 * @param message
      	 * @param channel
      	 */
      	@RabbitListener(queues = {QUEUE_NAME_SMS})
      	public void accept_sms(String msg, Message message, Channel channel){
      		try {
      			System.out.println("SMS消费者收到消息:" + msg);
      			//成功接收消息
      			channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      		} catch (IOException e) {
      			e.printStackTrace();
      		}
      	}
      	/**
      	 * 监听器:监听一个或者多个队列
      	 *  被监听的队列中一旦有了新的消息,就自动执行此方法来处理消息
      	 * @param msg
      	 * @param message
      	 * @param channel
      	 */
      	@RabbitListener(queues = {QUEUE_NAME_EMAIL})
      	public void accept_email(String msg, Message message, Channel channel){
      		try {
      			System.out.println("EMAIL消费者收到消息:" + msg);
      			//成功接收消息
      			channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      		} catch (IOException e) {
      			e.printStackTrace();
      		}
      	}
      }
    
  • 启动消费者的工程后,不需要做任何事,只要生产者发送成功一条消息,消费者就应该能接收到 消息内容,如果接收不到 ,说明 环境 配置失败

标签:Rabbitmq,NAME,队列,param,QUEUE,消息,使用,public
From: https://www.cnblogs.com/xw-01/p/18276664

相关文章

  • Python学习笔记28:进阶篇(十七)常见标准库使用之质量控制中的代码质量与风格第二部分
    前言本文是根据python官方教程中标准库模块的介绍,自己查询资料并整理,编写代码示例做出的学习笔记。根据模块知识,一次讲解单个或者多个模块的内容。教程链接:https://docs.python.org/zh-cn/3/tutorial/index.html质量控制质量控制(QualityControl,QC),主要关注于提高......
  • osg使用整理(12):SSAO屏幕空间环境光遮蔽
    一、基础概念1、SSAO:通过将褶皱、孔洞和非常靠近墙面变暗的方法,近似模拟间接光照。SSAO称为屏幕空间环境光遮蔽,使用屏幕空间场景的深度而不是真实的几何体数据来确定遮蔽量,速度快效果好。2、实现原理:根据物体表面法线方向生成一个半球随机深度采样,主要看物体周围深度值大小,通......
  • 使用go语言实现快速排序、归并排序、插入排序、冒泡排序、选择排序
    冒泡排序(BubbleSort):原理:比较相邻的元素,如果前一个比后一个大,就交换它们。这个过程会使得每一轮最大的元素“冒泡”到数组的末尾。时间复杂度:O(n^2)稳定性:稳定//BubbleSort函数使用冒泡排序算法对数组进行排序funcBubbleSort(arr[]int){ n:=len(arr) fori:=0......
  • springboot 如何使用MongoDB集成 shedlock-spring
    ShedLock是一个用于防止在分布式环境中任务重复执行的库。它允许多个节点共享一个任务调度器,并确保同一时间只有一个节点能够执行某个任务。SpringBoot项目中可以通过集成shedlock-spring来实现这一功能。下面是一个完整的集成指南:1.添加依赖首先,需要在pom.xml中添加sh......
  • ROS学习笔记(三、ros节点使用)
    对于ros节点的理解部分:节点(nodes)是ros中一个很重要的部分,一个节点等价于一个可执行文件。通俗理解就是:我们所有写的代码,脚本都是需要执行的,因此需要将我们写的代码等转化成一个ros中可以执行的文件,这个可执行文件在ros中称为节点。一个节点可以通过ros与其他节点进行一个通......
  • JDK中有直接可以使用的阻塞队列
    是的,Java标准库(JDK)中提供了多个阻塞队列,可以直接使用。这些阻塞队列位于java.util.concurrent包中。阻塞队列是一种支持在某些操作无法立即完成时等待的队列,例如在队列为空时执行的take操作,或者在队列已满时执行的put操作。以下是JDK中几种常见的阻塞队列及其特点:1.ArrayBlocki......
  • 使用 Tampermonkey 在页面加载完5秒后监听特定页面元素的点击事件并修改 API 返回的数
    示例 //==UserScript==//@nameNewUserscript//@namespacehttp://tampermonkey.net///@version2024-07-04//@descriptiontrytotakeovertheworld!//@authorYou//@matchhttps://a.x.com/*//@iconhttps://www.......
  • 当使用 PowerShell 管理 Active Directory(AD)域用户时,以下是一些初级的示例和操作:Power
    使用PowerShell管理ActiveDirectory(AD)域用户时,以下是一些常见的命令和示例:1.新增域用户powershellCopyCodeNew-ADUser-Name"JohnDoe"-GivenName"John"-Surname"Doe"-SamAccountName"johndoe"-UserPrincipalName"[email protected]......
  • flutter状态管理 provider使用
    provider是flutter官方推荐的状态管理插件,是基于InheritedWidget实现的。下面我们来讲一个provider的使用方法。1.在pubspec.yaml文件中添加provider:^6.1.2开发文档:https://pub-web.flutter-io.cn/packages/provider可以查看使用方法和最新版本号。添加完成后,进行保......
  • golang 中 Jwt 的验证及续期使用
    创建Utils的Jwt文件,用于创建JwtToken 和  验证并继期packageutilsimport("errors""fmt""github.com/dgrijalva/jwt-go""strings""time")//生成JwtToken//@ParamsecretKey表示jwtsecretKey【***......