首页 > 编程语言 >RabbitMQ(三)Java客户端

RabbitMQ(三)Java客户端

时间:2024-08-05 21:56:34浏览次数:19  
标签:Java 消费者 work RabbitMQ queue 消息 msg hello 客户端

1.快速入门


在idea里面创建两个springboot项目,一个模块是consumer,一个是publisher

两者有自己的启动类,继承同一父工程的pom。

父工程的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

publisher的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>mq-demo</artifactId>
        <groupId>cn.itcast.demo</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>publisher</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

consumer的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>mq-demo</artifactId>
        <groupId>cn.itcast.demo</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>consumer</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

入门案例

直接省略交换机,生产者->队列->消费者(虚拟主机不能省略)

  • 利用控制台创建队列simple.queue

  • 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息

  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

那么怎么发送消息呢?

SpringAMQP提供了RabbitTemplate工具类,方便我们发送信息。

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessage2Queue(){
        String queueName="simple.queue";
        String msg="hello,amqp!";
        rabbitTemplate.convertAndSend(queueName,msg);
    }
}


那么如何接收信息呢?

@Component
@Slf4j
public class MqListener {
    @RabbitListener(queues ="simple.queue" )
    public void listenSimpleQueue(String msg){//发送来的是string类型,所以我们也用string接收
        System.out.println("消费者收到了simple.queue的消息【"+msg+"】");
    }
}

然后用启动类启动(这个不是测试类)

控制台输出:消费者收到了simple.queue的消息【hello,amqp!】

此时,观察控制台发现,程序并没有停止,消费者始终处于监听的状态

然后publisher再次发送一个消息,consumer依然能够监听到

消费者收到了simple.queue的消息【hello,amqp!】

消费者收到了simple.queue的消息【hello,我是第二个消息】

最后,消息不存在队列当中。。。。。。。。。。。。。


2.WorkQueue

WorkQueue,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息.

案例

模拟workqueue,实现一个队列绑定多个消费者


  • 在RabbitMQ的控制台创建一个队列,名为work.queue
  • 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  • 在consumer服务中定义两个消息监听者,都监听work.queue队列
  • 消费者1每秒处理50条消息,消费者2每秒处理5条消息

1.创建队列work.queue

2.publisher发送消息

@Test
void testWorkQueue() throws InterruptedException {
    for (int i = 0; i < 50; i++) {
        String queueName="work.queue";
        String msg="hello,我是第【"+i+"】条消息呀";
        rabbitTemplate.convertAndSend(queueName,msg);
        Thread.sleep(20);
    }
}

线程休眠20ms发送一个消息,这样能看到循序渐进的效果。

3.consumer两个消息监听者,共同监听队列

@RabbitListener(queues ="work.queue" )
public void listenSimpleQueue1(String msg) throws InterruptedException {
//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者1 收到了work.queue的消息【"+msg+"】");
    Thread.sleep(20);
}

@RabbitListener(queues ="work.queue" )
public void listenSimpleQueue2(String msg) throws InterruptedException {
//发送来的是string类型,所以我们也用string接收  err.print是打印红色的
    System.err.println("消费者2 收到了work.queue的消息【"+msg+"】");
    Thread.sleep(200);
}

如果两个监听器不加线程休眠,那么性能是一样的:

消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】

消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】

-----------------------------------------

如果两个监听器加上不同的线程休眠,导致性能不一样:

消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】

但是你会发现,依旧是02468--13579????

消费者消息推送限制

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但是并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息。

消费者2 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【3】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【5】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【7】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【8】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【9】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【10】条消息呀】

总结

Work模型的使用:
  • 多个消费者绑定到一个队列中,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者领取的消息数量,处理完一条再处理下一条,实现能者多劳

3.交换机

真正的生产环境都会经过交换机来发送消息,交换机有路由功能,而不是直接发送到队列,交换机的类型有三种:

  • Fanout广播
  • Direct定向
  • Topic话题
1.Fanout交换机

Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

案例---利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

1.在控制台中,声明队列fanout.queue1和fanout.queue2

2.在控制台中,声明交换机hmall.fanout,将两个队列与其绑定

3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4.在publisher中编写测试方法,向hmall.fanout发送消息




现在Fanout广播交换机和两个队列已经就绪,

在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues ="fanout.queue1" )
    public void listenFanoutQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
        System.out.println("消费者1 收到了fanout.queue1的消息【"+msg+"】");
    }

    @RabbitListener(queues ="fanout.queue2" )
    public void listenFanoutQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
        System.out.println("消费者2 收到了fanout.queue2的消息【"+msg+"】");
    }

在publisher中编写测试方法,向hmall.fanout发送消息

@Test
    void testSendFanout(){
        String exchangeName="hmall.fanout";
        String msg="hello,everyone";
        rabbitTemplate.convertAndSend(exchangeName,null,msg);
    }

消费者2 收到了fanout.queue2的消息【hello,everyone】
消费者1 收到了fanout.queue1的消息【hello,everyone】

2.Direct交换机

Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此叫定向路由。

  • 每一个Queue都与Exchage设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例

1.在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2

2.在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定

3.在consumer服务里面,编写两个消费者方法,分别监听direct.queue1和direct.queue2

4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息


消费者监听:

@RabbitListener(queues ="direct.queue1" )
public void listenDirectQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】");
}

@RabbitListener(queues ="direct.queue2" )
public void listenDirectQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者2 收到了direct.queue2的消息【"+msg+"】");
}

publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息:

@Test
void testSendDirect(){
    String exchangeName="hmall.direct";
    String msg="红色红色";
    rabbitTemplate.convertAndSend(exchangeName,"red",msg);
}

结果:

消费者1 收到了direct.queue1的消息【红色红色】
消费者2 收到了direct.queue2的消息【红色红色】

3.Topic交换机

案例:


消费者接收:
@RabbitListener(queues ="topic.queue1" )
public void listenTopicQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者1 收到了topic.queue1的消息【"+msg+"】");
}

@RabbitListener(queues ="topic.queue2" )
public void listenTopicQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者2 收到了topic.queue2的消息【"+msg+"】");
}

publisher发送:

@Test
void testSendTopic(){
    String exchangeName="hmall.topic";
    String msg="小日本的新闻";
    rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg);
}

控制台:

消费者2 收到了topic.queue2的消息【小日本的新闻】
 

4.声明队列和交换机的方式(新建队列&交换机)

就比如说刚才我们声明一个新的队列和交换机,都是在RabbitMQ的控制台里面生成的,并不是在JAVA代码里面生成的。


方式一:

在消费者里面建立一个config包:

@Configuration
public class FanoutConfiguration {

    //交换机
    @Bean
    public FanoutExchange fanoutExchange() {
//  ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
        return new FanoutExchange("hmall.fanout2");
    }

    //队列    durable持久的
    @Bean
    public Queue fanoutQueue3() {
//        QueueBuilder.durable("").build();
        return new Queue("fanout.queue3");
    }


    @Bean
    public Queue fanoutQueue4() {
        return new Queue("fanout.queue4");
    }

    //绑定
    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);

    }

    @Bean
    public Binding fanoutBinding4(Queue fanoutQueue4, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue4).to(fanoutExchange);
        //后面加上 .with  就是routingKey
    }

//    在configuration中的bean会被直接动态代理,而在service里
//    方法在不注入自己的情况下调用自己的方法会使事物无效,原因就是没有动态代理,只有用到该类时才会代理
}

方式二:基于注解声明

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

在消费者的监听器里面:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1",durable = "true"),
        exchange=@Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
        key={"red","blue"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】");
}

5.消息转换器

@Test
    void testSendObject(){
        Map<String, Object>msg=new HashMap<>(2);
        msg.put("name","jack");
        msg.put("aga",21);
        rabbitTemplate.convertAndSend("object.queue",msg);
    }



 


        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.17.1</version>
        </dependency>

在发布者启动类里配置:

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }
    @Bean
    public MessageConverter jacksonMessageConvertor() {
        return new Jackson2JsonMessageConverter();
    }
}

成功!!!!!!!!!

接收和以前一样!!!!!!!


END-业务改造

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

消费者监听器里面:

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "mark.order.pay.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
            key = "pay.success"
    ))
    public void listenOrderPay(Long orderId) {
        //标记订单状态为已经支付
        orderService.markOrderPaySuccess(orderId);
    }

}

@RequiredArgsConstructor和@Autowired

这两种方式在实现依赖注入的方式和效果上有一些区别,主要体现在以下几个方面:

  1. 生成方式

    • @RequiredArgsConstructor 是 Lombok 提供的功能,它会为所有被 final 修饰的字段生成一个构造函数,并将这些字段作为参数进行注入。
    • @Autowired 是 Spring 提供的注解,用来标注依赖注入的位置(字段、构造函数、或者方法)。
  2. 字段可变性

    • @RequiredArgsConstructor 要求依赖字段必须是 final 的,因为它生成的构造函数会使用 final 字段作为参数,并在构造函数中进行赋值。
    • @Autowired 不要求字段必须是 final 的,可以是普通的私有字段或者通过 setter 方法注入。
  3. 灵活性和推荐性

    • @RequiredArgsConstructor 通常适用于希望保持类中依赖字段不变(immutable)的情况,例如通过构造函数注入一次性初始化这些依赖,而后不再修改。
    • @Autowired 则更为灵活,可以在字段或者方法中使用,依赖字段可以是 final 也可以不是,更适合在需要动态变化依赖或者在单元测试中替换依赖时使用。
  4. 使用场景

    • 如果你的类设计为依赖在初始化后不再改变,并且希望利用 final 字段确保其不可变性,那么使用 @RequiredArgsConstructor 是一个很好的选择。
    • 如果你希望在类的生命周期中动态更改依赖,或者依赖字段不一定是 final 的,那么使用 @Autowired 更为灵活。

 

示例比较:

使用 @RequiredArgsConstructor 的示例:

import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class MyService {
    
    private final SomeOtherService otherService;
    
    // 其他方法
}

使用 @Autowired 的示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MyService {
    @Autowired
    private SomeOtherService otherService;
    
    
    
    
    // 其他方法
}

在这两个示例中,第一个示例利用 @RequiredArgsConstructor 自动生成了构造函数,并使用 final 字段 otherService 进行依赖注入。第二个示例则显式地使用了 @Autowired 注解在构造函数中进行依赖注入,没有要求 otherServicefinal 字段。

综上所述,主要区别在于字段的 final 要求、代码生成方式以及推荐的使用场景。选择哪种方式取决于你的设计需求和项目的规范。

 发消息者:

 

 


 


 异步修改订单状态,尽量用try、catch

 


 

标签:Java,消费者,work,RabbitMQ,queue,消息,msg,hello,客户端
From: https://blog.csdn.net/weixin_73253843/article/details/140865342

相关文章

  • 基于java+springboot+vue的高校实习管理系统
    ......
  • JavaEE 图书管理系统
    基于阿里巴巴的fastjson框架搭建的JavaEE版本的图书管理系统,项目架构如下:fastjson包的阿里云下载镜像如下:CentralRepository:com/alibaba/fastjson2/fastjson2/2.0.8 运行效果:BeanBook.javapackageBean;publicclassBook{ privateStringtitle;//书名 pr......
  • 日撸Java三百行(day14:栈)
    目录一、栈的基本知识1.栈的概念2.栈的功能3.栈的实现二、栈的代码实现1.栈的基本属性与方法2.栈的遍历3.入栈实现4.出栈实现5.数据测试6.完整的程序代码总结一、栈的基本知识1.栈的概念根据百度百科,我们知道“栈”是存储货物或供旅客住宿的地方,可引申为仓库......
  • 日撸Java三百行(day13:链表)
    目录一、链表的基础知识二、链表的代码实现1.链表创建2.链表遍历3.链表定位查找4.链表插入5.链表删除6.数据测试7.完整的程序代码总结一、链表的基础知识在之前顺序表的学习中,我们其实提到过链表。链表它是线性表在不同的物理存储方式下派生出来的,链表不像顺序表......
  • 【报错提示】java.lang.RuntimeException: Can't create handler inside thread
    ​报错提示遇到一个报错: java.lang.RuntimeException:Can'tcreatehandlerinsidethreadThread[OkHttphttps://a.fxltsbl.com/...]thathasnotcalledLooper.prepare() 分析 1.这个报错提示是在一个没有调用Looper.prepare()的线程中尝试创建一个Handler对象......
  • JAVA变量类型
    一个类可以包含以下类型变量:局部变量:在方法、构造方法或者语句块中定义的变量被称为局部变量。变量声明和初始化都是在方法中,方法结束后,变量就会自动销毁。成员变量:成员变量是定义在类中,方法体之外的变量。这种变量在创建对象的时候实例化。成员变量可以被类中方法、构造方法......
  • java8-常用类型(包装类,BigDecimal,Date等)
    1.包装类1.1包装类简介java语言是面向对象的语言,但是其中的八大基本数据类型不符合面向对象的特征。因此java为了弥补这样的缺点,为这八种基本数据类型专门设计了八种符合面向对象特征的的类型,这八种具有面向对象特征的类型,统称为包装类,英文单词:wrapperclass。包装类,就是......
  • java9-泛型
    1.泛型的简介1.1什么是泛型        泛型是一种特殊的数据类型。它是Java的一个高级特性。在Mybatis、Hibernate这种持久化框架,泛型更是无处不在。在这之前,不管我们在定义成员变量时,还是方法的形参时,都要规定他们的具体类型。所以提出猜想,有没有一种可能,一次声......
  • JSON parse error: Cannot deserialize instance of `java.lang.Long` out of START_O
    这个问题的实际原因就是:    后端id(Long类型)用的雪花算法生成主键id    后端生成id位:1820397662671867904    前端查询id的结果为:1820397662671868000产生的原因:    后端生成为19位,前端接受并展示,使用的类型是number类型是16位   ......
  • java计算字符数量
      publicclassCharCountExample{publicstaticvoidmain(String[]args){Stringtext="Hello,world!Howareyou?";chartargetChar='o';intcount=countChar(text,targetChar);System.out.p......