首页 > 其他分享 >13-RabbitMQ高级特性-Confirm确认消息

13-RabbitMQ高级特性-Confirm确认消息

时间:2022-10-04 23:22:06浏览次数:63  
标签:13 String Confirm com RabbitMQ import RabbitMQHelper channel

Confirm确认消息

理解Confirm消息确认机制

  • 消息的确认, 是指投递消息后, 如果Broker收到消息, 则会给我们生产者一个应答
  • 生产者进行接收应答用来确定这条消息是否正常的发送到Broker, 这种方式也是消息的可靠性投递的核心保障

Confirm确认消息流程解析

Confirm确认消息实现

  • 在Channel上开启确认模式: channel.confirmSelect()
  • 在Channel上添加监听: addConfirmListener, 监听成功和失败的返回结果, 根据具体的结果对消息进行重新发送, 记录日志或者等后续处理

代码实现

消费者

package com.dance.redis.mq.rabbit.confirm;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
public class Receiver4ConfirmListener {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_confirmlistener_exchange";
        String queueName = "test_confirmlistener_queue";
        String routingKey = "confirm.#";
        RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_TOPIC);
        RabbitMQHelper.queueDeclare(channel,queueName);
        channel.queueBind(queueName, exchangeName, routingKey);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, consumer);
        //等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

package com.dance.redis.mq.rabbit.confirm;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class Sender4ConfirmListener {
 
    
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_confirmlistener_exchange";
        String routingKey1 = "confirm.save";
        String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            // 处理失败
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("------- error ---------");
            }
            // 处理成功
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("------- ok ---------");
            }
        });
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
    
}

测试

启动消费者

启动生产者

收到ok

查看消费者

消费成功

标签:13,String,Confirm,com,RabbitMQ,import,RabbitMQHelper,channel
From: https://www.cnblogs.com/flower-dance/p/16754802.html

相关文章

  • 04-基于CentOS7安装RabbitMQ3.10.7
    RabbitMQ安装与入门安装与启动我实在是找不到这么老的版本了,直接用最新版本的,按照道理来说,新版本是兼容老版本的官网地址https://www.rabbitmq.com/Erlang安......
  • 05-RabbitMQ控制台入门及其Java简单操作
    MQ控制台简单操作建立Exchange新建Exchange成功新建Queue新建Queue成功建立Exchange与Queue的关系建立关系成功路由键:就是指发送到Exchange的消息,通......
  • 06-RabbitMQ核心API-Exchange
    Exchange流程图接收消息,并根据路由键转发消息所绑定的队列Exchange属性属性含义name交换机名称type交换机类型[direct|topic|fanout......
  • 俗话说先下手为强,后动手遭殃。在军事13
    俗话说先下手为强,后动手遭殃。在军事http://ds.163.com/article/63372100880c710001936142/?2022/10/06_=2022/10/05http://ds.163.com/feed/63372100880c710001936142/?202......
  • 03-RabbitMQ核心概念[简介, AMQP协议, 整体架构, 消息流转]
    核心概念RabbitMQ简介RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是......
  • 13.设计模式-适配器模式-接口适配器
    //1.定义接口//2.定义抽象实现,通过抽象类来实现适配,//当存在这样一个接口,其中定义了N多的方法,而我们现在却只想使用其中的一个到几个方法,如果我们直接实现接口,那么我们......
  • 洛谷1351 -- 联合权值
      遍历一遍树,在遍历的同时,传入节点u的父亲和祖父,计算答案#include<iostream>#include<cstdio>#include<cstring>#include<algorithm>usingnamespacestd;......
  • 13张图让你百分百掌握kafka副本同步限流机制
    ​​......
  • 我们在埃埃厄岛火化并且安葬了埃尔朋诺13
    我们在埃埃厄岛火化并且安葬了埃尔朋诺http://ds.163.com/article/6338b349d3fdd000019877c7/?2022_1005=20221005uhttp://ds.163.com/feed/6338b349d3fdd000019877c7/?2022......
  • 洛谷 P1340 兽径管理
    题干 悲怆历程(主要还是因为自己作死)啊这个题,一眼就是克鲁斯卡尔最小生成树简介题意:$n$个点,添加$W$次边,每次添加边都询问最小生成树其中1<=n<=200,1<=......