首页 > 编程语言 >RabbitMQ 04 直连模式-Java操作

RabbitMQ 04 直连模式-Java操作

时间:2023-03-26 15:36:22浏览次数:43  
标签:直连 Java 04 队列 factory Connection 消息 false channel

使用Java原生的方式使用RabbitMQ现在已经较少,但这是基础,还是有必要了解的。

  1. 引入依赖。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    
  2. 实现生产者。

    // 使用ConnectionFactory创建连接
    ConnectionFactory factory = new ConnectionFactory();
    // 设置连接信息
    factory.setHost("127.0.0.1");
    // 注意这里写5672,是amqp协议端口
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/test");
    
    // 创建Connection
    Connection connection = factory.newConnection();
    // 通过Connection创建新的Channel
    Channel channel = connection.createChannel();
    try (connection; channel) {
        // 声明队列,如果此队列不存在,会自动创建
        channel.queueDeclare("test_java", false, false, false, null);
        // 将队列绑定到交换机
        channel.queueBind("test_java", "amq.direct", "test_java_key");
        // 转换为byte[],发布消息
        channel.basicPublish("amq.direct", "test_java_key", null, "Hello World!".getBytes());
    }
    
    • queueDeclare方法参数如下:

      • queue:队列的名称(默认创建后routingKey和队列名称一致)

      • durable:是否持久化。

      • exclusive:是否排他。

        如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。

        排他队列是基于Connection可见,同一个Connection的不同Channel是可以同时访问同一个连接创建的排他队列。

        如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的。

        即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。

      • autoDelete:是否自动删除。

      • arguments:设置队列的其他一些参数,这里暂时不需要。

    • queueBind方法参数如下:

      • queue:需要绑定的队列名称。
      • exchange:需要绑定的交换机名称。
      • routingKey:路由标识。
    • basicPublish方法参数如下:

      • exchange:对应的Exchange名称,这里就使用第二个直连交换机。
      • routingKey:这里填写绑定时指定的routingKey,其实和之前在管理页面操作一样。
      • props:其他的配置。
      • body:消息本体。
        执行完成后,可以在管理页面中看到刚刚创建好的消息队列:

    并且此消息队列已经成功与amq.direct交换机进行绑定:

  3. 实现消费者。

    // 使用ConnectionFactory创建连接
    ConnectionFactory factory = new ConnectionFactory();
    // 设置连接信息
    factory.setHost("127.0.0.1");
    // 注意这里写5672,是amqp协议端口
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/test");
    
    // 这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
    // 创建Connection
    Connection connection = factory.newConnection();
    // 通过Connection创建新的Channel
    Channel channel = connection.createChannel();
    // 创建一个基本的消费者
    channel.basicConsume("test_java", false, (s, delivery) -> {
        
        System.out.println(new String(delivery.getBody()));
        /*
        确认应答
        参数 1:当前的消息标签
        参数 2:是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
         */
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        /*
        拒绝应答
        参数 1:当前的消息标签
        参数 2:倍数
        参数 3:是否将当前消息放回队列,如果为false,那么消息就会被丢弃
         */
    //            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        /*
        拒绝应答
        参数 1:当前的消息标签
        参数 2:是否将当前消息放回队列,如果为false,那么消息就会被丢弃
         */
    //            channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    }, s -> {
    });
    

    basicConsume方法参数如下:

    • queue:消息队列名称,直接指定。

    • autoAck:自动应答。

      消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。

    • deliver:消息接收后的函数回调。

      可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答。

    • cancel:当消费者取消订阅时进行的函数回调,这里暂时用不到。
      执行上述代码后,控制台输出:Hello World!,说明消息消费成功。


  • 环境
    • JDK 17.0.6
    • Maven 3.6.3
    • amqp-client 5.16.0

标签:直连,Java,04,队列,factory,Connection,消息,false,channel
From: https://www.cnblogs.com/codesail/p/17258750.html

相关文章

  • RabbitMQ 05 直连模式-Spring Boot操作
    SpringBoot操作SpringBoot集成RabbitMQ是现在主流的操作RabbitMQ的方式。官方文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/引入依赖。<d......
  • JAVA面试小结之基础篇
    最近,一直忙于业务,发现自己的很多基础知识没有使用的话,都快忘了,感觉很虚。此外,最近经常面试一些同学,有些东西还得自己复习下,才好意思。这里整理一份JAVA面试小结,一来可以持续......
  • javaSE学习Day1之Object类的使用
    Object类的使用Object类Object类中的方法可以在网上搜索得到Object类是所有java类的父类如果类在声明中未使用extends关键字指明其父类,则默认父类为java.lang.Obje......
  • java----内存模型
    内存模型一个对象的内存图方法区开始运行,先找到Main函数,然后将这个方法入栈.new将会在堆空间中开辟空间,里面有成员变量和成员方法(注意,成员方法保存的是方法区的成员......
  • java——spring boot集成kafka——broker、主题、分区、副本——概念理解
    一、代理商Broker 在之前我们已经为大家介绍了生产者向消息队列中投递消息,消费者从消息队列中拉取数据。 在kafka消息队列中有一个非常重要的概念就是代理Broker,大家......
  • 找不到符号 java
    尝试项目如果有多个Maven,出现找不到符号的问题可以进行一个项目全局的packager,对root进行package打包utf-8编码-Djps.track.ap.dependencies=false清除编码工具缓存......
  • Javascript之V8内存和垃圾回收讲解
    目录1Javascript内存1.1Javascript引擎1.2V8内存模型1.2.1栈1.2.2堆1.3内存生命周期1.3.1栈内对象生命周期1.3.2堆内对象生命周期2Javascript垃圾回收2.1引言2.2......
  • javaSE学习Day1之多态及向下转型
    向下转型的使用Java的多态性:父类指向子类的声明Animalanimal=newDog()//Dog()重写了父类Animal有了对象的多态性以后,内存实际上加载的是子类的属性和方法,但是由......
  • java----对象的的创建和使用
    对象的的创建和使用创建一个学生类publicclassStudent{/*创建一个学生类*///下面是成员变量Stringname;//intage;intweight;//下......
  • 什么是 RUM JavaScript
    RUMJavaScript指的是一种用于网页性能监测的JavaScript代码,它能够在用户访问网站时记录用户的行为和页面性能数据,并将这些数据发送给RUM(RealUserMonitoring,实时用户......