首页 > 其他分享 >07-RabbitMQ核心API-Direct Exchange

07-RabbitMQ核心API-Direct Exchange

时间:2022-10-04 23:24:42浏览次数:52  
标签:String Exchange Direct final static RabbitMQHelper public channel 07

Direct Exchange

简介

所有发送到direct exchange 的消息被转发到Routekey中指定的Queue

注意: Direct模式可以使用RabbitMQ自带的Exchange(default exchange), 所以不需要将Exchange进行任何Binding操作, 消息传递时RouteKey必须完全匹配才会被队列接收, 否则该消息会被丢弃

代码实现

RabbitMQHelper

package com.dance.redis.mq.rabbit;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RabbitMQHelper {

    private static final String HOST = "192.168.247.142";
    private static final Integer PORT = 5672;
    private static final String VH = "/";
    private static final String USERNAME = "root";
    private static final String PASSWORD = "123456";
    private static final Boolean ARE = true;
    private static final Integer NRI = 3000;

    public static final String EXCHANGE_TYPE_DIRECT = "direct";
    public static final String EXCHANGE_TYPE_TOPIC = "topic";
    public static final String EXCHANGE_TYPE_FANOUT = "fanout";
    public static final String EXCHANGE_TYPE_HEADERS = "headers";

    private static Connection connection = null;

    public static Channel getChannel() throws IOException, TimeoutException {
        if (null == connection) {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(HOST);
            connectionFactory.setPort(PORT);
            connectionFactory.setVirtualHost(VH);
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            connectionFactory.setAutomaticRecoveryEnabled(ARE);
            connectionFactory.setNetworkRecoveryInterval(NRI);
            connection = connectionFactory.newConnection();
        }
        return connection.createChannel();
    }

    public static AMQP.Exchange.DeclareOk exchangeDeclare(Channel channel, String exchangeName, String exchangeType) throws IOException {
        return channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    }
    public static AMQP.Queue.DeclareOk queueDeclare(Channel channel, String queueName) throws IOException {
        return channel.queueDeclare(queueName, false, false, false, null);
    }

    public static void closeConnection() throws IOException {
        connection.close();
    }

    public static Consumer buildConsumer(Channel channel){
        return new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
            throws IOException {
                System.out.println("recvive message:" + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
    }

}

创建这个帮助类, 是为了减少一些通用代码的编写, 后续也会使用

消费者

package com.dance.redis.mq.rabbit.direct;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;

import java.util.concurrent.TimeUnit;
 
public class RabbitMQ4DirectExchangeConsumer {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_direct_exchange";
        String queueName = "test_direct_queue";
        String routingKey = "test_direct_routingKey";
        RabbitMQHelper.exchangeDeclare(channel,exchangeName,RabbitMQHelper.EXCHANGE_TYPE_DIRECT);
        RabbitMQHelper.queueDeclare(channel,queueName);
        channel.queueBind(queueName, exchangeName, routingKey);
        channel.basicQos(64);//设置客户端最多接收未被ack的消息个数
        Consumer consumer = RabbitMQHelper.buildConsumer(channel);
        channel.basicConsume(queueName, consumer);
        //等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

package com.dance.redis.mq.rabbit.direct;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.Channel;
 
public class RabbitMQ4DirectExchangeProducer {
 
    
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String exchangeName = "test_direct_exchange";
        String routingKey = "test_direct_routingKey";
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
    
}

测试

先启动消费者

然后启动生产者

查看消费者

已经收到了生产者的消息

标签:String,Exchange,Direct,final,static,RabbitMQHelper,public,channel,07
From: https://www.cnblogs.com/flower-dance/p/16754780.html

相关文章

  • 08-RabbitMQ核心API-Topic Exchange
    TopicExchange简介所有发送到TopicExchange的消息被转发到所有关心RouteKey中指定Topic的Queue上Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个T......
  • 09-RabbitMQ核心API-Fanout Exchange
    FanoutExchange简介不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上Fanout交换机转发消息是最快的......
  • 06-RabbitMQ核心API-Exchange
    Exchange流程图接收消息,并根据路由键转发消息所绑定的队列Exchange属性属性含义name交换机名称type交换机类型[direct|topic|fanout......
  • 07_音频录制01_命令行
    终于要开始进行FFmpeg实战了,一起来感受一下FFmpeg的强大吧。命令简介FFmpeg的bin目录中提供了3个命令(可执行程序),可以直接在命令行上使用。ffmpegffmpeg的主要作用:对......
  • 0766-6.3.3-如何实现Kafka跨网络访问
    文档说明在使用Kafka时会遇到内外网的场景,即Kafka集群使用内网搭建,在内网和外网均有客户端需要消费Kafka的消息,同时在集群内由于使用内网环境通信,因此不必太过考虑通信的加......
  • 03-Active Directory的使用与验证
    温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。Fayson的github:​​https://github.com/fayson/cdhproject​​提示:代码块部分可......
  • 07 导师不敢和你说的水论文隐藏技巧,毕业论文,小论文和综述的区别,三者怎么进行换汤不换
    博客配套视频链接:​​https://www.bilibili.com/video/BV11g41127Zn/?spm_id_from=333.788&vd_source=b1ce52b6eb3a9e6c2360a4b7172edf5a​​b站直接看如果大家有什么问......
  • 【Linux】学习-07-Linux防火墙端口
    firewall-cmd--list-ports:查看当前开启的端口情况firewall-cmd--zoon=public--add-port=9000/tcp-parmanent:开启9000端口firewall-cmd--reload:重启防火墙,上......
  • 007.注入集合对象
           ......
  • 多点DLT (Direct Linear Transformation) 算法
    阅读前可以先参看上一篇代数视觉博客:四点DLT(DierctLinearTransformation)算法对于大于4个点的数据点来进行DLT算法变换,如果数据点的标注都十分准确,那么将所有......