首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2022-11-01 22:44:50浏览次数:35  
标签:String 队列 void RabbitMQ 消息 message public

RabbitMQ

一、MQ

1.同步调用的优缺点

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高
  • 性能和吞吐能力下降
  • 有额外的资源消耗
  • 有级联失败问题

2.异步调用

异步调用常见实现就是事件驱动模式

image-20220212221646769

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

3.MQ实现

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

二、Linux安装RabbitMQ

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

image-20220212222300774

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

注意5672是MQ消息通信的端口,15672是ui端口

三、RabbitMQ

1.种类

大致分为两种

1.**无交换机的 **基本消息队列 和 工作消息队列

image-20220212222723087

2.有交换机的发布订阅,路由,主题

image-20220212222757450

2.基于SpringAMQP使用

2.1基础消息队列

image-20220212230528538

  1. 先使用测试类生成队列

    public class ConsumerTest {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("116.62.32.68");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("itcast");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
    
  2. 引入依赖

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. 生产者编写yaml文件的MQ地址,端口号,用户名密码

    spring:
      rabbitmq:
        host: 192.168.150.101 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码
    
  4. 编写生产者代码

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
        //类似于redisTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue(){
            //队列名
            String queueName = "simple.queue";
            //消息
            String message = "hello,SpringAmqp";
            //发送消息到队列
            rabbitTemplate.convertAndSend(queueName,message);
        }
    
    }
    
  5. 消费者编写yaml文件的MQ地址,端口号,用户名密码

    同上

  6. 编写消费者代码

    @Component
    public class SpringRabbitListener {
    
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg) throws InterruptedException {
            System.out.println("spring 消费者接收到消息:【" + msg + "】");
        }
    }
    

2.2工作消息队列

image-20220212230455466

消费者:

//工作队列
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1Message(String msg) throws InterruptedException {
    System.out.println("spring 消费者1接收到消息:【" + msg + "】");
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue2Message(String msg) throws InterruptedException {
    System.out.println("spring 消费者2接收到消息:【" + msg + "】");
}
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host:  # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码
    listener:
      direct:
        prefetch: 1 # 消息预期,之前把所有消息预期导致平分消息,现在是处理一个取一个

2.3广播Fanout

image-20221101120053099

多了一个交换机,交换机决定将生产者的消息发送给哪个队列,这决定就是交换机种类不同规则不同,因此分为三种,广播、路由和主题。

广播:只要队列绑定了交换机,就发送消息到队列

1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)

//3.广播(绑定队列,同时绑定交换机)
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String message){
    System.out.println("广播方式队列1接受消息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue2"),
        exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String message){
    System.out.println("广播方式队列2接受消息:"+message);
}

2.生产者

//广播类型交换机
@Test
public void testFanoutExchangeQueue(){
    //交换机名称
    String exchangeName = "fanout.exange";
    //消息
    String message = "广播消息";
    //发送,第二个参数是队列的key,在路由类型中可以使用到
    rabbitTemplate.convertAndSend(exchangeName,"",message);
}

2.4路由Route

image-20220212234608495

和广播相比,在队列中多了一个key属性。

交换机通过消息的key来对应路由到相同key的队列上。

1.消费者:

/**
    4.路由
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
))
public void listenDirectQueue1(String message){
    System.out.println("路由红、蓝队列接受消息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
        key = {"red","green"}
))
public void listenDirectQueue2(String message){
    System.out.println("路由红、绿队列接受消息:"+message);
}

2.生产者

//路由类型交换机
@Test
public void testDirectExchangeQueue(){
    //交换机名称
    String exchangeName = "direct.exchange";
    //消息
    String message = "路由消息";
    //发送,第二个参数是队列的key,在路由类型中可以使用到
    rabbitTemplate.convertAndSend(exchangeName,"red",message);
}

2.5主题Topic

image-20220212234557232

key不再是一组单词,而是一个规则

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

1.消费者

/**
 * 主题
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String message){
    System.out.println("中国队列接受消息:"+message);
}
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenTopicQueue2(String message){
    System.out.println("新闻队列接受消息:"+message);
}

2.生产者

//主题类型交换机
@Test
public void testTopicExchangeQueue(){
    //交换机名称
    String exchangeName = "topic.exchange";
    //消息
    String message = "china.lala消息";
    //发送,第二个参数是队列的key,在路由类型中可以使用到
    rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
}

3.消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

因此需要配置一个Json转换器

步骤

  1. 在publisher和consumer两个服务中都引入依赖:
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>
  1. 配置消息转换器。在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

标签:String,队列,void,RabbitMQ,消息,message,public
From: https://www.cnblogs.com/Gao-yubo/p/16849444.html

相关文章

  • 三 docker安装rabbitMQ之springboot集成stomp,实现前端主动刷新
    一 场景分析对于一些需要数据同步的场景中,例如后台数据有变化,需要程序主动刷新前端界面的数据显示,这样能提供更好的用户数据交互,能第一时间了解到资源信息的变化,而不是......
  • RabbitMq消息的百分百投递
    RabbitMq消息的百分百投递​ 在RabbitMq作为消息中间件的时候,存在消息丢失的情况;在大部分业务中是不允许的;尤其是在使用中间件来保证数据的最终一致性时,消息丢失则无......
  • RabbitMq
    RabbitMq什么是MQ:是一种存放消息的队列;还是一种跨进程的通讯机制,用于两个微服务之间的消息通讯;消息中间件作用于分布式系统之间的通讯且必须是异步处理的场景提......
  • RabbitMQ
    我这边的使用场景:多名用户进行同一操作。(具体一点:我这边的系统哦,数据库用的是mysql,操作是导入excel,可以这么说百分之六十的页面都有批量导入,而且一个页面不止一个批量导入......
  • 某 .NET RabbitMQ SDK 有采集行为,你怎么看?
    一:背景1.讲故事前几天有位朋友在微信上找到我,说他的一个程序上了生产之后,被运维监控定位到这个程序会向一个网址为:http://m.365ey.net上不定期打数据,而且还是加密的格......
  • Spring Cloud Stream RabbitMQ消费消息失败
    丢弃消息这种场景生产环境应该不怎么用。入死信队列cloud:stream:bindings:hking:destination:hkingcontent-type:application/j......
  • .Net Core&RabbitMQ限制循环消费
    前言当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。循环场景生产者发送100条消息到Rabbit......
  • RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群
    一、为什么要搭建RabbitMQ集群?未部署集群的缺点如果RabbitMQ集群只有一个broker节点,那么该节点的失效将导致整个服务临时性的不可用,并且可能会导致message的丢失(尤其是在非......
  • Python RabbitMQ pika的安装及direct路由模式的使用
    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有......
  • java-rabbitmq-官网实例02
    java-rabbitmq-官网实例02描述:  1.定义持久化队列,发送持久化消息,消息接受者需要手动应答,MQ才会删除队列中的消息 2.使用channel.basicQos......