首页 > 其他分享 >RabbitMQ学习五 springboot连接RabbitMQ

RabbitMQ学习五 springboot连接RabbitMQ

时间:2024-01-21 11:35:50浏览次数:21  
标签:BootMqConstant return springboot 队列 TOPICS RabbitMQ 连接 import public

一、入门

引入依赖

在springboot中引入spring-amqp-starter

<!--amqp的起步依赖-->
 <dependency>
 	<groupId>org.springframework.boot</groupId>
 	<artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

编写配置文件

spring:
  rabbitmq:
    addresses: xxx.xx.x.xxx # rabbitMQ地址
    port: 5672 # 连接端口号
    username: xxxx #用户名
    password: xxxxv #密码
    virtual-host: /xxxx  #要连接的虚拟机

编写常量类

package com.java.coder.constant;

public class BootMqConstant {
    /**
     * 交换机名称
     */
    public static final String TOPICS_EXCHANGE_NAME="topics_exchange";
    /**
     * 队列名称
     */
    public static final String TOPICS_QUEUE_01="topics_queue_01";
    /**
     * 队列名称
     */
    public static final String TOPICS_QUEUE_02="topics_queue_02";
    /**
     * 路由键
     */
    public static final String TOPICS_ROUTING_INFO_KEY="#.info.#";
    /**
     * 路由键
     */
    public static final String TOPICS_ROUTING_ERROR_KEY="#.error.#";
}

 

编写配置类

生产者端编写配置类

package com.java.coder.config;


import com.java.coder.constant.BootMqConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {



    //声明交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(BootMqConstant.TOPICS_EXCHANGE_NAME)// 交换机名称
                .durable(true)// 是否持久话
                .build();
    }

    //声明队列
    @Bean("itemQueue01")
    public Queue itemQueue01(){
        return QueueBuilder.durable(BootMqConstant.TOPICS_QUEUE_01)// 持久话的队列
                .build();
    }

    //声明队列
    @Bean("itemQueue02")
    public Queue itemQueue02(){
        return QueueBuilder.durable(BootMqConstant.TOPICS_QUEUE_02)// 持久话的队列
                .build();
    }


    //声明队列
    @Bean("objectQueue")
    public Queue objectQueue(){
        return QueueBuilder.durable(BootMqConstant.TOPICS_OBJECT_QUEUE)// 持久话的队列
                .build();
    }
    /**
     * 绑定队列  topics_queue_01
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding itemQueueExchange01(@Qualifier("itemQueue01") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with(BootMqConstant.TOPICS_ROUTING_INFO_KEY)
                .noargs();
    }

    /**
     * 绑定队列  topics_queue_02
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding itemQueueExchange02(@Qualifier("itemQueue02") Queue queue,
                                       @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with(BootMqConstant.TOPICS_ROUTING_ERROR_KEY)
                .noargs();
    }

    /**
     * 绑定队列  topics_object_queue
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding objectQueueExchange(@Qualifier("objectQueue") Queue queue,
                                       @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with(BootMqConstant.TOPICS_ROUTING_OBJECT_KEY)
                .noargs();
    }
}

 

 

编写生产者代码

package com.java.coder.controller;

import com.java.coder.constant.BootMqConstant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/produce")
public class RabbitMqProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendInfo")
    public String sendInfo(){
        rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"info.hahha","这是info级别的日志");
        return "ok";
    }

    @RequestMapping("/sendError")
    public String sendError(){
        rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"error.hahha","error级别的日志");
        return "ok";
    }

    
}

 

编写消费者代码

package com.java.coder.rabbitmq.listener;

import com.java.coder.constant.BootMqConstant;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class ConsumerListener {
    /**
     * 监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = BootMqConstant.TOPICS_QUEUE_01)
    public void myListener01(String message){
        System.out.println("消费者接收到的消息为:" + message);
    }

    /**
     * 监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = BootMqConstant.TOPICS_QUEUE_02)
    public void myListener02(String message){
        System.out.println("消费者接收到的消息为:" + message);
    }

   
}

 

注解声明交换机和队列

除了上述在配置文件中声明交换机、队列、绑定交换机和队列 之外,还可以在消费者端通过@RabbitListener 来配置。

 

/**
     * 监听某个队列的消息
     * @param map 接收到的消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "${队列名称}"),
            exchange = @Exchange(name = "${交换机名称}",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void myListener(Map<String,Object> map ){
        System.out.println("消费者接收到的消息为:" + map);
    }

 

二、消息转换器

发送对象消息

 @RequestMapping("/sendObject")
    public String sendObject(){
        Map<String,Object> map=new HashMap<>();
        map.put("name","张三");
        map.put("address","浙江杭州");
        rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"object.hahha",map);
        return "ok";
    }

队列存储的结果

只有两个属性的数据,序列化之后消息占用123byte大小。

spring-amqp是采用Message这个类代表要发送的消息,我们传递给convertAndSend方法的消息内容最终都要封装成Message这个类。如果我们没有指定消息转换器,那么默认的消息转换器就是SimpleMessageConverter。 因为消息内容还需要在网络上传输,因此需要对消息内容进行序列化,序列化方式的代码如下:

protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        byte[] bytes = null;
        if (object instanceof byte[]) {
            bytes = (byte[])((byte[])object);
            messageProperties.setContentType("application/octet-stream");
        } else if (object instanceof String) {
            try {
                bytes = ((String)object).getBytes(this.defaultCharset);
            } catch (UnsupportedEncodingException var6) {
                throw new MessageConversionException("failed to convert to Message content", var6);
            }

            messageProperties.setContentType("text/plain");
            messageProperties.setContentEncoding(this.defaultCharset);
        } else if (object instanceof Serializable) {
            try {
                bytes = SerializationUtils.serialize(object);
            } catch (IllegalArgumentException var5) {
                throw new MessageConversionException("failed to convert to serialized Message content", var5);
            }

            messageProperties.setContentType("application/x-java-serialized-object");
        }

        if (bytes != null) {
            messageProperties.setContentLength((long)bytes.length);
            return new Message(bytes, messageProperties);
        } else {
            throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
        }
    }

 

如果对象的化,会走到object instanceof Serializable,此时的序列化方式是jdk ObjectOutputStream方式的序列化,这种方式序列化,存在以下问题:

1、安全风险

2、消息太大

3、可读性差

我们采用json的序列化方式。

json序列化

引入依赖

		 <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>

在生产者和消费者中都配置消息转换器。

 @Bean
    public MessageConverter jacksonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

 

再次对象消息,RabbitMq内存储的消息内如下:

 

消费者接收

发送的是什么类型的消息,消费者就接收什么样的消息

 /**
     * 监听某个队列的消息
     * @param map 接收到的消息
     */
    @RabbitListener(queues = BootMqConstant.TOPICS_OBJECT_QUEUE)
    public void myListenerObject(Map<String,Object> map ){
        System.out.println("消费者接收到的消息为:" + map);
    }

 

 

 

标签:BootMqConstant,return,springboot,队列,TOPICS,RabbitMQ,连接,import,public
From: https://www.cnblogs.com/cplinux/p/17977630

相关文章

  • 面试官:SpringBoot如何实现缓存预热?
    缓存预热是指在SpringBoot项目启动时,预先将数据加载到缓存系统(如Redis)中的一种机制。那么问题来了,在SpringBoot项目启动之后,在什么时候?在哪里可以将数据加载到缓存系统呢?实现方案概述在SpringBoot启动之后,可以通过以下手段实现缓存预热:使用启动监听事件实现缓存预热。使......
  • springboot多模块项目(微服务项目)正确打包(jar)方式
    大致步骤新建一个springboot项目名称为父亲添加父快捷方式。新建子模块,子模块同时插入新建springboot的项目,依次创建enty和web模块(关键是并配置好pom文件)web模块依赖于entiy模块中的实体类,创建测试控制器,先测试项目没问题再开始打包(jar)开始打包测试jar是否有用创建项目注意点:子模......
  • springBoot项目正确认识打war包方式(含打包代码链接)
    一:新建一个普普通通的springBoot项目二:并且编写测试controller@RequestMapping@RestControllerpublicclassController{@RequestMapping("/zzh")publicStringtoString(){return"zzh666";}}三:改造启动类(重点)主要就是继承SpringBootServletInitiali......
  • springboot整合springSecurity入门案例(实现登录,记住我等常用标签使用)
    一,整合进依赖每个依赖都标了注释,大家可以按照自己需要的来添加,置于配置问件啥的,大家可以参考springboot+mybatisplus+redis整合(附上脚手架完整代码)<!--主要就是加了这个依赖--><dependency><groupId>org.springframework.security</groupId><artifact......
  • springboot+mybatis-plus+redis整合(附上脚手架完整代码)
    首先新建一个springboot项目next到这里的时候,我们可以选择用jdk几,还有就是Group,这个一般就是com.公司名字了,artifact就是项目名字。个人开发我还是喜欢用com.名字前缀哈。到了这一步的话,如果对这个项目有什么别的需求,比如需要用到mybatis啥的可以勾相应的选项。其实就是idea自动帮......
  • springboot项目结合filter,jdk代理实现敏感词过滤(简单版)
    我们对getParameter()这个方法得到的参数进行敏感词过滤。实现思路:利用过滤器拦截所有的路径请求同时在在过滤器执行的时候对getParameter得到的value值进行过滤。最后呢,到我们自己的实现的逻辑中呢?这个value值就被我们做过处理了。1:自定义的过滤配置文件把文件位置放在resource下的......
  • 正确理解springboot国际化简易运行流程
    看源码可以看出–》大致原理localeResolver国际化视图(默认的就是根据请求头带来的区域信息获取Locale进行国际化)返回的本地解析是根据响应头来决定的)接着按住ctrl点localeresolver可知localeresolver是一个接口于是有了这些我们只需通过继承LocaleResolver来自定义我们自己的Loca......
  • springboot中优雅的个性定制化错误页面+源码解析
    boot项目的优点就是帮助我们简化了配置,并且为我们提供了一系列的扩展点供我们使用,其中不乏错误页面的个性化开发。理解错误响应流程我们来到org.springframework.boot.autoconfigure.web.servlet.error下的ErrorMvcAutoConfiguration这里面配置了错误响应的规则。主要介绍里面注册......
  • 数据库的内连接和外连接
    数据库的内连接和外连接内连接:两个或两个以上的表进行关联查询时,查询的结果集中返回所有满足连接条件的行。外连接:两个或两个以上的表进行关联查询时,查询的结果集中除了返回满足连接条件的行以外,还返回左(或右)表中不满足条件的行,这种连接称为左(或右)外连接。如果是左外连接,则连......
  • jetson nano ssh远程连接控制
    jetsonorinnanossh远程连接准备:好用的网线一根,jetsonorinnano一台,将网线两端连接nano的网口以及当作主机的笔记本的网口PS:确保双方网线连接成功,网线设置不用更改默认即可step11.执行nmap10.42.0/24//调用查看nano的ip地址可能情况-bash:nmap:commandnotfoun......