首页 > 其他分享 >Rabbit MQ 入门

Rabbit MQ 入门

时间:2024-01-30 23:01:03浏览次数:19  
标签:connectionFactory 入门 rabbitmq MQ Rabbit import com channel String

简单案例:消息生产与消费

pom.xml 配置

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <!-- 3.6.5 是稳定版本 -->
    <version>3.6.5</version>
</dependency>

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1>简单案例:消息生产与消费</h1>
 * 消息生产者
 * Created by DHA on 2019/11/18.
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 通过 chanel 发送数据
        for(int i=0;i<10;i++){
            String data="Hello!";
            channel.basicPublish("","test001",null,data.getBytes());
        }

        //5 关闭相关连接
        channel.close();
        connection.close();
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1>简单案例:消息生产与消费</h1>
 * 消息生产者
 * Created by DHA on 2019/11/18.
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 通过 chanel 发送数据
        for(int i=0;i<10;i++){
            String data="Hello!";
            channel.basicPublish("","test001",null,data.getBytes());
        }

        //5 关闭相关连接
        channel.close();
        connection.close();
    }
}

Direct Exchange

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1> Direct Exchange</h1>
 * 所有发送到 Direct Exchange 的消息被转发到 routing key 中指定的 Queue。
 * 消息生产者
 * Created by DHA on 2019/11/19.
 */
public class Producer4DirectExchange {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 声明
        //声明 exchange 名称
        String exchangeName="test_direct_exchange";
        String routingKey = "test.direct";

        //5 通过 chanel 发送数据
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());

        //6 关闭相关连接
        channel.close();
        connection.close();
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1> Direct Exchange</h1>
 * 所有发送到 Direct Exchange 的消息被转发到 routing key 中指定的 Queue。
 * 消息消费者
 * Created by DHA on 2019/11/19.
 */
public class Consumer4DirectExchange {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 声明
        String exchangeName="test_direct_exchange";
        String exchangeType="direct";
        String queueName="test_direct_queue";
        String routingKey="test.direct";

        // 声明一个交换机
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        // 声明一个队列
        channel.queueDeclare(queueName,false,false,false,null);
        // 绑定:将一个队列绑定到一个交换机上
        channel.queueBind(queueName,exchangeName,routingKey);

        //5 创建消费者
        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

        //6 设置 channel
        channel.basicConsume(queueName,true,queueingConsumer);

        //7 获取数据
        while(true){
            QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
            String msg=new String(delivery.getBody());
            System.out.println("消费端:"+msg);
        }
    }
}

Topic Exchange

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1> Topic Exchange</h1>
 * Topic Exchange 将 routing key 与某 Topic 进行模糊匹配,此时队列需要绑定一个 Topic。
 * 消息生产者
 * Created by DHA on 2019/11/19.
 */
public class Producer4TopicExchange {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 声明
        //声明 exchange 名称
        String exchangeName="test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";

        //5 通过 chanel 发送数据
        String msg = "Hello World RabbitMQ 4  Topic Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());

        //6 关闭相关连接
        channel.close();
        connection.close();
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1> Topic Exchange</h1>
 * Topic Exchange 将 routing key 与某 Topic 进行模糊匹配,此时队列需要绑定一个 Topic。
 * 消息消费者
 * Created by DHA on 2019/11/19.
 */
public class Consumer4TopicExchange {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 声明
        String exchangeName="test_topic_exchange";
        String exchangeType="topic";
        String queueName="test_topic_queue";
        String routingKey="user.*";

        // 声明一个交换机
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        // 声明一个队列
        channel.queueDeclare(queueName,false,false,false,null);
        // 绑定:将一个队列绑定到一个交换机上
        channel.queueBind(queueName,exchangeName,routingKey);

        //5 创建消费者
        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

        //6 设置 channel
        channel.basicConsume(queueName,true,queueingConsumer);

        //7 获取数据
        while(true){
            QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
            String msg=new String(delivery.getBody());
            System.out.println("消费端:"+msg);
        }
    }
}

Fanout Exchange

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1> Fanout Exchange</h1>
 * Fanout Exchange 不处理 routing key,只需要简单的将队列绑定到交换机上,发送到交换机的消息都会被转发到交换机绑定的所有队列上。
 * 消息生产者
 * Created by DHA on 2019/11/19.
 */
public class Producer4FanoutExchange {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 声明
        //声明 exchange 名称
        String exchangeName="test_fanout_exchange";

        //5 通过 chanel 发送数据
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 Fanout Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());
        }

        //6 关闭相关连接
        channel.close();
        connection.close();
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <h1> Fanout Exchange</h1>
 * Fanout Exchange 不处理 routing key,只需要简单的将队列绑定到交换机上,发送到交换机的消息都会被转发到交换机绑定的所有队列上。
 * 消息消费者
 * Created by DHA on 2019/11/19.
 */
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 声明
        String exchangeName="test_fanout_exchange";
        String exchangeType="fanout";
        String queueName="test_fanout_queue";
        String routingKey="";

        // 声明一个交换机
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        // 声明一个队列
        channel.queueDeclare(queueName,false,false,false,null);
        // 绑定:将一个队列绑定到一个交换机上
        channel.queueBind(queueName,exchangeName,routingKey);

        //5 创建消费者
        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

        //6 设置 channel
        channel.basicConsume(queueName,true,queueingConsumer);

        //7 获取数据
        while(true){
            QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
            String msg=new String(delivery.getBody());
            System.out.println("消费端:"+msg);
        }
    }
}

设置消息属性

生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * <h1>消息属性设置</h1>
 * 消息生产者
 * Created by DHA on 2019/11/18.
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        // 设置自定义属性
        Map<String, Object> headers = new HashMap<>();
        headers.put("attr1", "111");
        headers.put("attr2", "222");

        //4 设置消息属性
        AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                .deliveryMode(2)  // 2 表示持久化的投递
                .contentEncoding("UTF-8") // 设置内容编码
                .expiration("10000") // 设置过期时间为 10 秒
                .headers(headers) // 自定义属性
                .build();

        //5 通过 chanel 发送数据
        for(int i=0;i<5;i++){
            String data="Hello!";
            channel.basicPublish("","test001",properties,data.getBytes());
        }

        //6 关闭相关连接
        channel.close();
        connection.close();
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * <h1>消息属性设置</h1>
 * 消息消费者
 * Created by DHA on 2019/11/18.
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //1 创建一个 Connectionfactory,并进行设置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过 connecion 创建一个 Channel
        Channel channel = connection.createChannel();

        //4 声明一个队列
        String queueName="test001";
        channel.queueDeclare(queueName,true,false,false,null);

        //5 创建消费者
        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

        //6 设置 channel
        channel.basicConsume(queueName,true,queueingConsumer);

        //7 获取数据
        while(true){
            Delivery delivery=queueingConsumer.nextDelivery();
            String msg=new String(delivery.getBody());
            System.out.println("消费端:"+msg);
            // 获取自定义属性数据
            Map<String,Object> headers=delivery.getProperties().getHeaders();
            System.err.println("headers get attribute attr1 value: " + headers.get("attr1"));
        }
    }
}

参考:

标签:connectionFactory,入门,rabbitmq,MQ,Rabbit,import,com,channel,String
From: https://www.cnblogs.com/i9code/p/17998177

相关文章

  • RabbitMQ 高级特性
    消息100%可靠性投递的解决方案生产端可靠性投递保障消息成功发出保障MQ节点的成功接收发送端收到MQ节点(Broker)确认应答完善的消息补偿机制解决方案1:消息落库消息落库,对消息状态进行打标。解决方案2:二次确认,回调检查消息的延迟投递,做二次确认,回调检查。消费端幂......
  • Kafka 和 RabbitMQ 比较
    从以下几个方面比较Kafka和RabbitMQ:吞吐量Kafka:十万数量级,高吞吐量RabbitMQ:万数量级Topic数量对吞吐量影响Kafka的Topic可达百/千级,吞吐量下降幅度小,在同等机器下,可以支撑大量的Topic。RabbitMQ无Topic概念。时效性Kafka毫秒级;RabbitMQ微秒级可用性......
  • Java 编程指南:入门,语法与学习方法
    Java是什么?Java是一种流行的编程语言,诞生于1995年。由Oracle公司拥有,运行在超过30亿台设备上。Java可以用于:移动应用程序(尤其是Android应用)桌面应用程序网络应用程序网络服务器和应用程序服务器游戏数据库连接等等!为什么使用Java?Java拥有以下优势:跨平......
  • RocketMQ应用-基金购买秒杀实现
    架构支持根据实际业务场景,分析集群分流的具体处理方案,假设基金购买接口单次处理时间为500ms,tomcat使用默认线程数200,则单个tomcat处理基金购买接口的QPS=1000/500*200=400。场景1-4000QPS要求实现4000QPS的并发量,可以部署10个tomcat集群应用,使用nginx做负载均衡,轮询分配到tomc......
  • springboot集成mqtt
    SpringBoot集成MQTT(简单版)一、docker安装emqx环境(Linux系统)emqx:mqtt服务器(broker)version:'3'services:emqx:image:emqx/emqxcontainer_name:emqxrestart:alwaysports:-8001:18083-8002:1883-8003:8083-8004......
  • CS231N Assignment3 入门笔记(Q4 GANs)
    斯坦福2023年春季CS231N课程第三次作业(最后一次)解析、笔记与代码,作为初学者入门学习。在这项作业中,将实现语言网络,并将其应用于COCO数据集上的图像标题。然后将训练生成对抗网络,生成与训练数据集相似的图像。最后,将学习自我监督学习,自动学习无标签数据集的视觉表示。本作业的......
  • Java实现Rabbitmq群发消息
    1.Rabbitmq简介RabbitMQ是一个实现了AMQP(AdvancedMessageQueuingProtocol)高级消息队列协议的消息队列服务,用Erlang语言。是面向消息的中间件。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员......
  • Qt QCustomPlot 入门教程
    简述QCustomPlot是一个基于QtC++的图形库,用于绘制和数据可视化-制作漂亮的2D图-曲线图、趋势图、坐标图、柱状图等,并为实时可视化应用程序提供高性能服务。它没有进一步的依赖关系,并有着良好的文档记录。QCustomPlot可以导出为各种格式,比如:PDF文件和位图(如:PNG、JPG......
  • 7000字详解Spring Boot项目集成RabbitMQ实战以及坑点分析
    本文给大家介绍一下在SpringBoot项目中如何集成消息队列RabbitMQ,包含对RibbitMQ的架构介绍、应用场景、坑点解析以及代码实战。最后文末有免费领取龙年红包封面以及腾讯云社区答题领奖福利,欢迎大家领取。我将使用waynboot-mall项目作为代码讲解,项目地址:https://github.co......
  • RocketMQ应用-消费幂等性问题解决
    重复消费产生原因生产者多次投递-投递时服务端接收后客户端网络原因确认失败,重新投递消费者扩容重试-消费者扩容导致正在消费的消息没有正常应答,服务端重新推送重复消费解决方案给消息增加唯一key,消费时校验key是否已经消费过消费者控制消息的幂等性(多次同样的操作结果一......