首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2023-05-06 17:57:41浏览次数:55  
标签:false String factory RabbitMQ import com channel

RabbitMQ

RabbitMQ使用

安装RabbitMQ

RabbitMQ官网: https://www.rabbitmq.com/

RabbitMQ是使用Erlang开发,需要先安装Erlang。RabbitMQ与Erlang的版本对照表如下:

image

启动RabbitMQ

..\RabbitMQ Server\rabbitmq_server-3.11.13\sbin目录下打开CMD,输入rabbitmq-server.bat,如下所示即表明启动成功。

image

打开RabbitMQ Management:http://localhost:15672/

初始用户名:guest

初始密码:guest

登录成功后如下所示:

image

新建用户

image

新建成功后如图所示:

image

新建虚拟主机

image

设置用户可访问的虚拟主机

image

image

image

image

RabbitMQ工作模式

导入依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

simple简单模式

在这里插入图片描述

生产者:

package com.example.producer;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * rabbit生产者:发送消息
 */
public class RabbitProducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        // IP地址,默认是localhost
        factory.setHost("localhost");
        // 端口,http端口:15672;amqp端口:5672,clustering端口:25672
        factory.setPort(5672);
        // 虚拟机,默认值 /
        factory.setVirtualHost("/test");
        // 用户名,默认值:guest
        factory.setUsername("root");
        // 密码,默认值:guest
        factory.setPassword("123456");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称
         * durable:是否持久化,持久化后,当rabbitmq重启后队列还在
         * exclusive:
         *   - 是否独占,只能有一个消费者监听这个队列
         *   - 当Connection关闭时,是否删除队列
         * autoDelete:是否自动删除。当没有Consumer时自动删除
         * arguments:参数设置。
         * 如果没有一个名字叫test的队列,则会创建该队列,如果有则不会创建
         */
        channel.queueDeclare("test",false,false,false,null);
        /**
         * 发送信息
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
         * exchange:交换机。简单模式下交换机会使用默认的 ""
         * routingKey:路由名称
         * props:配置信息
         * body:发送消息数据
         */
        String body = "hello rabbitmq ~~~";
        channel.basicPublish("","test",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    }
}

消费者:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * rabbit消费者:接收信息
 */
public class RabbitConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("test",false,false,false,null);
        /**
         * 接收消息
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,如交换机(exchange)、路由key(routingKey)...
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: "+consumerTag);
                System.out.println("envelope: "+envelope);
                System.out.println("properties: "+properties);
                System.out.println("body: "+new String(body));
            }
        };
        channel.basicConsume("test",true,consumer);
    }
}

work工作模式

在这里插入图片描述

生产者:

package com.example.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * RabbitMQ工作模式:WorkQueues
 */
public class ProducerInWorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        // IP地址,默认是localhost
        factory.setHost("localhost");
        // 端口,http端口:15672;amqp端口:5672,clustering端口:25672
        factory.setPort(5672);
        // 虚拟机,默认值 /
        factory.setVirtualHost("/test");
        // 用户名,默认值:guest
        factory.setUsername("root");
        // 密码,默认值:guest
        factory.setPassword("123456");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称
         * durable:是否持久化,持久化后,当rabbitmq重启后队列还在
         * exclusive:
         *   - 是否独占,只能有一个消费者监听这个队列
         *   - 当Connection关闭时,是否删除队列
         * autoDelete:是否自动删除。当没有Consumer时自动删除
         * arguments:参数设置。
         * 如果没有一个名字叫test的队列,则会创建该队列,如果有则不会创建
         */
        channel.queueDeclare("work_queues",false,false,false,null);
        /**
         * 发送信息
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
         * exchange:交换机。简单模式下交换机会使用默认的 ""
         * routingKey:路由名称
         * props:配置信息
         * body:发送消息数据
         */
        for (int i = 0; i < 10; i++) {
            String body = "hello rabbitmq --- "+i;
            channel.basicPublish("","work_queues",null,body.getBytes());
        }
        // 释放资源
        channel.close();
        connection.close();
    }
}

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInWorkQueuesOne {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queues",false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,如交换机(exchange)、路由key(routingKey)...
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+new String(body));
            }
        };
        /**
         * 接收消息
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * @param queue 队列名称
         * @param autoAck 自动确认
         * @param callback 回调方法
         */
        channel.basicConsume("work_queues",true,consumer);
    }
}

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInWorkQueuesTwo {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queues",false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,如交换机(exchange)、路由key(routingKey)...
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+new String(body));
            }
        };
        /**
         * 接收消息
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * @param queue 队列名称
         * @param autoAck 自动确认
         * @param callback 回调方法
         */
        channel.basicConsume("work_queues",true,consumer);
    }
}

publish/subscribe发布订阅模式

在这里插入图片描述

生产者:

package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * RabbitMQ工作模式:PubSub
 */
public class ProducerInPubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        // 创建交换机
        /**
         * exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * @param exchange 交换机名称
         * @param type 交换机类型:direct(定向)、fanout(扇形【广播】)、topic(通配符的方式)、headers(参数匹配)
         * @param durable 是否持久化
         * @param autoDelete 是否自动删除
         * @param internal 是否内部使用,一般为false
         * @param arguments 参数列表
         */
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT,false,false,false,null);
        //创建队列
        channel.queueDeclare("fanout_queue1",false,false,false,null);
        channel.queueDeclare("fanout_queue2",false,false,false,null);
        //绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * @param queue 队列名称
         * @param exchange 交换机名称
         * @param routingKey 路由键,绑定规则。如果交换机类型为fanout,routingKey设置为"",即将消息分发给每一个绑定的queue。
         */
        channel.queueBind("fanout_queue1","fanout_exchange","");
        channel.queueBind("fanout_queue2","fanout_exchange","");
        // 发送消息
        String body = "欢迎来到我的世界";
        channel.basicPublish("fanout_exchange","",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    }
}

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInPubSubOne {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+new String(body));
            }
        };
        channel.basicConsume("fanout_queue1",true,consumer);
    }
}

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInPubSubTwo {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: "+new String(body));
            }
        };
        channel.basicConsume("fanout_queue2",true,consumer);
    }
}

routing路由模式

在这里插入图片描述

生产者:

package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ProducerInRouting {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机
        channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,false,false,null);
        // 创建队列
        channel.queueDeclare("direct_queue1",false,false,false,null);
        channel.queueDeclare("direct_queue2",false,false,false,null);
        // 绑定队列和交换机
        // 队列1绑定 error
        channel.queueBind("direct_queue1","direct_exchange","error");
        // 队列2绑定 info error warning
        channel.queueBind("direct_queue2","direct_exchange","info");
        channel.queueBind("direct_queue2","direct_exchange","error");
        channel.queueBind("direct_queue2","direct_exchange","warning");
        // 消息内容
        String body = "欢迎来到我的世界";
        // 发送消息
        channel.basicPublish("direct_exchange","error",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    }
}

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInRoutingOne {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("direct_queue1",true,consumer);
    }
}

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInRoutingTwo {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("direct_queue2",true,consumer);
    }
}

topic主题模式

在这里插入图片描述

生产者:

package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ProducerInTopic {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机
        channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,false,false,false,null);
        // 创建队列
        channel.queueDeclare("topic_queue1",false,false,false,null);
        channel.queueDeclare("topic_queue2",false,false,false,null);
        // 绑定交换机和队列
        // routingKey 系统名称.日志级别。需求:error级别和order系统的绑定到queue1
        // *:1个及以上单词;#:0个或多个单词
        channel.queueBind("topic_queue1","topic_exchange","#.error");
        channel.queueBind("topic_queue1","topic_exchange","order.#");
        // 所有信息绑定到queue2
        channel.queueBind("topic_queue2","topic_exchange","*.*");
        String body = "欢迎来到我的世界";
        // 发送消息
        channel.basicPublish("topic_exchange","goods.error",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    }
}

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInTopicOne {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("topic_queue1",true,consumer);
    }
}

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInTopicTwo {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("topic_queue2",true,consumer);
    }
}

标签:false,String,factory,RabbitMQ,import,com,channel
From: https://www.cnblogs.com/yts-helloworld/p/17378134.html

相关文章

  • RabbitMQ - 消息中间件
    RabbitMQ-消息中间件目录RabbitMQ-消息中间件1消息队列Rabbitmq介绍1.0什么是消息队列1.1rabbitmq介绍1.2MQ解决的问题1.3常见的消息队列及比较2RabbitMQ介绍安装2.1下载2.2安装(1)window安装(2)linux下安装rabbitmq(3)docker安装2.3配置web管理插件(1)windows配置(2)centos7......
  • 消息队列Rabbitmq介绍、rabbitmq安装、基于queue实现生产者消费者、基本使用、消息安
    目录1消息队列Rabbitmq介绍2rabbitmq安装3基于queue实现生产者消费者4基本使用4.1发送者4.2消费者5消息安全(详见笔记)6持久化(详见笔记)7闲置消费(详见笔记)8发布订阅(详见笔记)9发布订阅高级之Routing(按关键字匹配)(详见笔记)1消息队列Rabbitmq介绍#消息队列 -......
  • Linux安装rabbitMQ常用命令
    1.拉取最新的rabbitMQdockerpullrabbitmq:management2.容器启动rabbitMQdockerrun-d--hostnamemy-rabbit--namerabbit-p15672:15672-p5672:5672rabbitmq:management其中:     --hostname:指定容器主机名称     --name:        指定容器名称  ......
  • Rabbitmq 介绍 、安装、基于Queue实现生产者消费者模型、基本使用、消息安全之ack、du
    师承老刘llnb一、消息队列介绍1.1介绍消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”1.2MQ解决什么问题MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。应用解耦......
  • rabbitmq 使用
    今日内容1消息队列Rabbitmq介绍---------------------------------------------#消息队列也叫消息队列中间件celery中使用redis做过消息队列来用换Rabbitmq做消息队列,就只需要把broker的连接地址换成Rabbitmq的连接地址就行了---------------------------......
  • RabbitMq
    1.消息队列1.1MQ的相关概念1.1.1什么是MQMQ(messagequeue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使......
  • Docker 安装 RabbitMQ
    系统环境系统:Debian-10.2内核:Linux4.19.0-21-amd64x86_64(通过uname-r命令查看内核版本)RabbitMQ版本:rabbitmq:3.11-management安装教程访问RabbitMQ官方文档,查看官方安装教程,直接运行:dockerrun-it--rm--namerabbitmq-p5672:5672-p15672:15672rabbitmq......
  • RabbitMQ安装Delayed Message 插件
    在官网:https://www.rabbitmq.com/community-plugins.html点击:下载好之后就是一个解压好的文件:然后在将这个文件复制到rabiitmq/plugins里面:cp/Users/sixcandy/Downloads/rabbitmq_delayed_message_exchange-3.10.2.ez/opt/homebrew/Cellar/rabbitmq/3.10.2/plugins1进入rabi......
  • rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列
    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行。作者简介:五月君,NodejsDeveloper,慕课网认证作者,热爱技术、喜欢分享的90后青年,欢迎关注Nodejs技术栈(id:NodejsRoadmap)和Github开源项目 https://www.nodejs.redDLX+TTL方式存在的时序问......
  • RabbitMQ linux安装流程
    1.在根目录创建文件夹rabbitMQcd/mkdirrabbitMQ2.下载rabbitMQram安装包和对应版本的Erlang(我这里用的3.11.2的rabbitMQ就需要对应的25.1的Erlang)参考地址:RabbitMQErlangVersionRequirements—RabbitMQrabbitmqIndexofrabbitmq-server-local/v3.10.2(huaweic......