首页 > 编程语言 >Java实现Rabbitmq群发消息

Java实现Rabbitmq群发消息

时间:2024-01-30 17:58:07浏览次数:32  
标签:Java String 队列 Rabbitmq public springframework import queueName 群发

1. Rabbitmq简介

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。是面向消息的中间件。

image

你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。

主要流程:生产者(Producer)与消费者(Consumer)和 RabbitMQ 服务(Broker)建立连接, 然后生产者发布消息(Message)同时需要携带交换机(Exchange) 名称以及路由规则(Routing Key),这样消息会到达指定的交换机,然后交换机根据路由规则匹配对应的 Binding,最终将消息发送到匹配的消息队列(Quene),最后 RabbitMQ 服务将队列中的消息投递给订阅了该队列的消费者(消费者也可以主动拉取消息)。

image

2. Java实现

rabbit工具类

package com.example.studydemo.Rabbit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Objects;

@Slf4j
@Component
public class RabbitUtil {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 创建交换机
     *
     * @param changeName 交换机名称
     */
    public void createExchange(String changeName) {
        TopicExchange exchange = new TopicExchange(changeName, true, false);
        amqpAdmin.declareExchange(exchange);
    }

    /**
     * 创建队列
     *
     * @param queueName 队列名称
     */
    public void createQueue(String queueName) {
        Queue queue = new Queue(queueName, true, false, false);
        amqpAdmin.declareQueue(queue);
    }

    /**
     * 交换机绑定
     *
     * @param changeName 交换机名称
     * @param routingKey 路由key
     * @param queueName  队列名称
     */
    public void bindExchange(String changeName, String routingKey, String queueName) {
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, changeName, routingKey, null);
        amqpAdmin.declareBinding(binding);
    }

    /**
     * 发送信息到交换机
     *
     * @param changeName 交换机名称
     * @param routingKey 路由key
     * @param message    消息
     */
    public void sendMessage(String changeName, String routingKey, String message) {
        rabbitTemplate.convertAndSend(changeName, routingKey, message);
    }

    public String receiveMessage(String queueName) {
        return Objects.requireNonNull(rabbitTemplate.receiveAndConvert(queueName)).toString();
    }
    /**
     * 删除交换机
     *
     * @param changeName 交换机名称
     */
    public void deleteExchange(String changeName) {
        amqpAdmin.deleteExchange(changeName);
    }

    /**
     * 删除队列
     *
     * @param queueName 队列名称
     */
    public void deleteQueue(String queueName) {
        amqpAdmin.deleteQueue(queueName);
    }


}

这个类原本是没有接收消息方法receiveMessage,是我加上的,不过经过测试,这个方法只会在队列中取数据,如果队列空了,就会报空指针异常,所以感觉接收这个设计不实用(sendMessage2次消息,队列中有两条数据,请求两次receiveMessage是没问题的,三次就报错了)

Controller类

package com.example.studydemo.Rabbit;




import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/rabbits")
public class rabbitController {

    @Autowired
    private RabbitUtil rabbitUtil;

    @Value("星月天空")
    private String exchangeName;

    @Value("hello-queue")
    private String queueName;

    @Value("#.hello")
    private String routingKey;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("message") String message){
        rabbitUtil.createExchange(exchangeName);
        rabbitUtil.createQueue(exchangeName);
        rabbitUtil.createQueue(queueName);
        //#.test采用通配符绑定
//        rabbitUtil.bindExchange(exchangeName,"#.test",queueName);
        rabbitUtil.bindExchange(exchangeName,routingKey,queueName);
        //只允许hello队列接收信息
        rabbitUtil.sendMessage(exchangeName,routingKey,message);
        return "发送成功";
    }

    @GetMapping("/receive")
    public String RabbitListener() {
        //接收test-queue,hello-queue队列信息,如果每个队列都有信息,那么就会执行多次方法
        //如果只想接收hello-queue队列信息 如: @RabbitListener(queues={"test-queue"})
        return rabbitUtil.receiveMessage(queueName);
    }
}

这里的sendMessage与RabbitListener都是一次性的,并非长连接,简而言之就是sendMessage是向队列中放一条数据,RabbitListener是从队列中取一条数据,如果队列为空则报空指针异常,个人觉得这个取数据不好,不过想来正常应用中应该是可以前端订阅,后端推送,后端要想持续给前端订阅接口,就是要走长连接。。。。。

后端订阅类

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello-queue")
public class SimpleHelloReceiver {

    @RabbitHandler
    public void handle(String in) {
        System.out.println("我收到了消息:" + in);
    }
}

原理应该是通过管道监听器实现的,还是那句话,要想用这个的话,前端要不定时任务,要不长连接。。。。。。。。。。。。
参考:
https://blog.csdn.net/weixin_44905972/article/details/131683234
https://blog.csdn.net/qq_48721706/article/details/125194646

标签:Java,String,队列,Rabbitmq,public,springframework,import,queueName,群发
From: https://www.cnblogs.com/beijie/p/17997648

相关文章

  • 深入了解Java中的Map.size方法
    本文转载自:https://www.python100.com/html/54940.html在Java中,Map是一个非常重要和常用的数据结构,它用于存储键值对映射的集合。在Map中,size()这个方法是用来获取集合大小的,我们经常使用它来获取键值对映射的数量。但是,你知道吗?实际上,Map.size()的实现和使用也是有许多细节和注......
  • 【Java】Java版本升级,找不到符号 问题处理
    哈喽,各位早上/中午/晚上好呀!JDK21出了也有一段时间了,有的小伙伴也按捺不住开始将项目升级到JDK21了吧。既然升级,大概多多少少都会遇到“找不到符号”的问题,英文原文是“cannotfindsymbol”。如何解决呢?以下方式不止针对于升级到JDK21,其它版本的升级配置方法相同。其实Idea......
  • Java并发基础:一文讲清util.concurrent包的作用
    java.util.concurrent包是Java中用于并发编程的重要工具集,提供了线程池、原子变量、并发集合、同步工具类、阻塞队列等一系列高级并发工具类,使用这些工具类可以极大地简化并发编程的难度,减少出错的可能性,提高程序的效率和可维护性。官方文档地址:https://docx.iamqiang.com/jd......
  • Java开发的SRM供应商、在线询价、招投标采购一体化系统源码功能解析
    前言:随着全球化和信息化的发展,企业采购管理面临越来越多的挑战。传统的采购方式往往涉及到多个繁琐的步骤,包括供应商筛选、询价、招投标等,这些过程不仅耗时,而且容易出错。为了解决这些问题,供应商、询价、招投标一体化系统应运而生。该系统通过集成供应商管理、询价管理、招投标......
  • 7000字详解Spring Boot项目集成RabbitMQ实战以及坑点分析
    本文给大家介绍一下在SpringBoot项目中如何集成消息队列RabbitMQ,包含对RibbitMQ的架构介绍、应用场景、坑点解析以及代码实战。最后文末有免费领取龙年红包封面以及腾讯云社区答题领奖福利,欢迎大家领取。我将使用waynboot-mall项目作为代码讲解,项目地址:https://github.co......
  • Java 系统学习 | Springboot 数据验证
    本篇使用Springboot3框架,IDEA2022编辑器,java17版本。在上一篇的基础上进行优化添加依赖在pom.xml中添加依赖,记得更新maven<!--validation依赖--><dependency><groupId>org.springframework.boot</groupId><artifactI......
  • JAVA之浮点数的比较
    一、浮点数比较概述由于计算机内部浮点数精度的原因,使得本来应该相等的两个浮点数可能存在微小的误差,所以对于浮点数之间浮点数之间的等值判断,我们不能用==来进行比较。通常情况下,对于浮点数比较,我们通常指定一个误差范围,两个浮点数的差值在此范围之内,则认为是相等的。二、JS......
  • (转)Java中equals和==、hashcode的区别
    https://www.cnblogs.com/lixuwu/p/5676207.htmlhttps://www.cnblogs.com/lixuwu/p/10662234.htmlhttps://timzhouyes.github.io/2020/02/27/Java%E7%89%B9%E7%A7%8D%E5%85%B5/https://blog.csdn.net/a745233700/article/details/83186808https://www.cnblogs.com/dolphin......
  • Java连接kubernates集群最优雅的两种方式
    创建maven工程,pom.xml中引入连接k8s的客户端jar包:<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><fabric.io.version>6.10.0</fabric.io.version></properties......
  • Java微服务框架开发
    Swagger常见问题:Swagger与高版本SpringBoot不兼容问题  分析源码查找问题解决springboot2.6和swagger冲突的四种方法解决方法:按如下配置修改策略,如仍然不需,需按照上述四种方法第四种添加Beanspring:mvc:path-match:matching-strategy:ant_p......