首页 > 其他分享 >RabbitMQ消费消息方法basicConsume

RabbitMQ消费消息方法basicConsume

时间:2023-05-25 20:05:33浏览次数:36  
标签:消费 String consumerTag void RabbitMQ IOException basicConsume public channel


RabbitMQ-消费消息

 

Address[] addresses = new Address[] {new Address(IP_ADDRESS, PORT)};
        /**
         * 1.建立连接工厂
         */
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(PASSWORD);
        /**
         * 网络故障自动连接恢复
         */
        connectionFactory.setAutomaticRecoveryEnabled(true);
        
        /**
         * 2.创建连接 和生产者有一点不同
         */
        Connection connection = connectionFactory.newConnection(addresses);
        
        /**
         * 3.创建信道
         */
        final Channel channel = connection.createChannel();
        
        /**
         * 4.设置客户端最多接收示被ack的消息个数
         */
        channel.basicQos(64);
        
        
        
        Consumer consumer  =new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("接收消息 :   "+new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
//消息确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        
        /**
         * 回调
         */
        channel.basicConsume(QUEUR_NAME, consumer);
        /**
         * 关闭资源
         */
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();

basicConsume方法

 

String basicConsume(String queue, Consumer callback) throws IOException;

 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

 String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;


String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
  • queue 队列名
  • autoAck 是否自动确认消息,true自动确认,false 不自动要手动调用,建立设置为false

 

//消息确认
                channel.basicAck(envelope.getDeliveryTag(), false);
  • consumerTag 消费者标签,用来区分多个消费者
  • noLocal 设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
  • exclusive 是否排他
  • arguments 消费者的参数
  • callback 消费者 DefaultConsumer建立使用,重写其中的方法

 

@Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
  • 重写的方法

 

@Override
    public void handleConsumeOk(String consumerTag) {
        this._consumerTag = consumerTag;
    }

  @Override
    public void handleCancelOk(String consumerTag) {
        // no work to do
    }

 @Override
    public void handleCancel(String consumerTag) throws IOException {
        // no work to do
    }

   @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        // no work to do
    }

  @Override
    public void handleRecoverOk(String consumerTag) {
        // no work to do
    }
  • handleShutdownSignal方法 当Channel与Conenction关闭的时候会调用,
  • handleCancelOk方法会在其它方法之前调用,返回消费者标签
    *handleCancelOk与handleCancel消费者可以显式或者隐式的取水订单的时候调用,也可以通过
    channel.basicCancel方法来显式的取消一个消费者订阅
    会首先触发handleConsumeOk方法,之后触发handleDelivery方法,最后才触发handleCancelOk方法

channel.basicAck();确认消息

deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

 

标签:消费,String,consumerTag,void,RabbitMQ,IOException,basicConsume,public,channel
From: https://blog.51cto.com/chengzheng183/6350781

相关文章

  • RabbitMQ之消息确认机制
    RabbitMQ之消息确认机制标签(空格分隔):php,rabbitmq在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话......
  • 杭银消费金融有限公司招聘信息
    前言「CV技术指南」将持续带来人工智能相关在招职位信息,欢迎正在找工作与看新机会的朋友关注,也欢迎企业伙伴与我们联系合作,免费发布招聘信息,想给公司内推的朋友也可以联系。欢迎关注公众号CV技术指南,专注于计算机视觉的技术总结、最新技术跟踪、经典论文解读、CV招聘信息。CV......
  • docker安装rabbitMQ
    输入命令dockerpullrabbitmq:3.7.7-management  设置账号和密码dockerrun-d--namerabbitmq3.7.7-p5672:5672-p15672:15672-v`pwd`/data:/var/lib/rabbitmq--hostnamemyRabbit-eRABBITMQ_DEFAULT_VHOST=my_vhost-eRABBITMQ_DEFAULT_USER=admin-eRABBITM......
  • RabbitMQ系列-概念及安装
    1.消息队列消息队列是指利用队列这种数据结构进行消息发送、缓存、接收,使得进程间能相互通信,是点对点的通信而消息代理是对消息队列的扩展,支持对消息的路由,是发布-订阅模式的通信,消息的发送者并不清楚消息的接收者,消息可以被多个消费者接收。使用消息队列的作用如下异步:对于......
  • 【Kafka从入门到成神系列 六】Kafka 消费组及重平衡
    ......
  • 【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
    ......
  • centos7安装erlang、rabbitmq以及php扩展
    centos7安装erlang、rabbitmq以及php扩展标签(空格分隔):liunx,php安装Erlang版本:el7erlang-20.3.8.25erlang-20.3.8.25-1.el7.x86_64.rpm1.下载wget--content-disposition"https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-20.3.8.25-1.el7.x86_64.rpm/d......
  • 基于Docker安装RabbitMQ
    安装步骤1.在线拉取dockerpullrabbitmq:3-management2.安装RabbitMQdockerrun\-eRABBITMQ_DEFAULT_USER=wzh\-eRABBITMQ_DEFAULT_PASS=1234\-vmq-plugins:/plugins\--namemq\--hostnamemq\-p15672:15672\-p5672:5672\-d\rabbitmq:3-m......
  • iOS GCD 和信号量 实现 生产者和消费者模式
    GCD提供两种方式支持dispatch队列同步,即dispatch组和信号量。一、dispatch组(dispatchgroup)1.创建dispatch组dispatch_group_tgroup=dispatch_group_create();2.启动dispatch队列中的block关联到group中dispatch_group_async(group,queue,^{//。。。});3.......
  • 云原生之使用Docker部署RabbitMQ消息中间件
    (云原生之使用Docker部署RabbitMQ消息中间件一、RabbitMQ介绍1.RabbitMQ简介RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。2.RabbitMQ特点开源、性能优秀,稳定......