首页 > 其他分享 >RabbitMQ初探

RabbitMQ初探

时间:2023-02-19 14:56:06浏览次数:43  
标签:amqp spring RabbitMQ hello 初探 msg public String

RabbitMQ初探

结合SpringAMQP,讨论RabbitMQ的几种消息模型

RabbitMQ Tutorials

工程结构

├─consumer
└─publisher

父工程pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.9.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!--单元测试-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

各微服务application.yaml

spring:
  rabbitmq:
    host: 192.168.8.114 # rabiitMQ 服务 ip
    port: 5672          # rabiitMQ 服务 port
    username: root      # rabiitMQ 用户名
    password: root      # rabiitMQ 密码
    virtual-host: /     # rabiitMQ 要使用的虚拟主机

AMQP发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    
    // SpringAMQP 的模板类
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsgToSimpleQueue() {

        // 要投递的队列名(String)-simple.queue
        String queueName = "simple.queue";

        // 要投递的消息(Object)
        String msg = "hello spring amqp!!!!!!";

        // 投递消息
        rabbitTemplate.convertAndSend(queueName, msg);
    }
}

AMQP接收消息

@Component
public class SpringRabbitListener {
    // 要接收的队列(String[])
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) { // msg 为String, 为取消息体
        System.out.println("msg:" + msg);
    }
}

Hello,World

单队列,单生产者,单消费者

image-20230219104513243

是基本案例,这里跳过

Work Queues

多消费者绑定同一队列,加速消息的提取和处理,同一消息只会被一个消费者处理

image-20230219104939850

RabbitMQ在这种模式下,默认使用了“预取”机制,即所有消费者依次获取一个消息,直到消息消费完成

“预取”案例

P

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsgToSWorkQueue() throws InterruptedException {

        String queueName = "simple.queue";

        String msg = "hello spring amqp__";

        for (int i = 0; i < 10; i++) { // 1s内发送10条消息
            Thread.sleep(20);
            rabbitTemplate.convertAndSend(queueName, msg + i);
        }
    }
}

C1+C2

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage1(String msg) throws InterruptedException {
        System.out.println("consumer[1]msg: " + msg + ";" + LocalTime.now());
        Thread.sleep(20); // 50/s
    }


    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage2(String msg) throws InterruptedException {
        System.err.println("consumer[2]msg: " + msg + ";" + LocalTime.now());
        Thread.sleep(100); // 10/s
    }

}

结果

可见低速的C2,依旧接收到了一半的消息(此处为奇数)耗费了较长时间

consumer[1]msg: hello spring amqp__0;10:58:28.552860100
consumer[2]msg: hello spring amqp__1;10:58:28.556858500
consumer[1]msg: hello spring amqp__2;10:58:28.588858800
consumer[1]msg: hello spring amqp__4;10:58:28.621858500
consumer[2]msg: hello spring amqp__3;10:58:28.658858100
consumer[1]msg: hello spring amqp__6;10:58:28.661859100
consumer[1]msg: hello spring amqp__8;10:58:28.702858400
consumer[2]msg: hello spring amqp__5;10:58:28.760866
consumer[2]msg: hello spring amqp__7;10:58:28.861865900
consumer[2]msg: hello spring amqp__9;10:58:28.963866700

克服“预取”,适应消费者的能力

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次取1消息,处理完成后再取

改进后的结果

按能力接受,耗费时间较优

consumer[2]msg: hello spring amqp__1;10:59:01.193174400
consumer[1]msg: hello spring amqp__0;10:59:01.193174400
consumer[1]msg: hello spring amqp__2;10:59:01.222183900
consumer[1]msg: hello spring amqp__3;10:59:01.261184400
consumer[1]msg: hello spring amqp__4;10:59:01.287184800
consumer[2]msg: hello spring amqp__5;10:59:01.297183800
consumer[1]msg: hello spring amqp__6;10:59:01.311184200
consumer[1]msg: hello spring amqp__7;10:59:01.339185
consumer[1]msg: hello spring amqp__8;10:59:01.370183900
consumer[1]msg: hello spring amqp__9;10:59:01.395184200

Publish/Subscribe

允许将一条消息送达到多个队列中,发布订阅的核心由exchange实现

image-20230219131019885

Fanout Exchange

将接收到的消息路由到每一个与其绑定的queue

交换机不能缓存消息,路由失败则消息丢失

案例

image-20230219132959128

Exchange+Queue

// Bean形式
@Configuration
public class FanoutConfig {

    // Exchange
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange");
    }

    // Queue
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    // Bind
    @Bean
    public Binding fanoutBinding1(
            Queue fanoutQueue1,
            FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }

    @Bean
    public Binding fanoutBinding2(
            Queue fanoutQueue2,
            FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }

}

C1+C2

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1Message(String msg) {
    System.out.println("fanout.queue1:" + msg);
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2Message(String msg) {
    System.out.println("fanout.queue2:" + msg);
}

image-20230219133445268

P

@Test
public void sendMsgToFanoutExchange() {

    String exchangeName = "fanout.exchange";

    String msg = "hello spring amqp clients!";

    // 交换机名称, RoutingKey, 消息
    rabbitTemplate.convertAndSend(exchangeName, "", msg);

}

结果

fanout.queue1:hello spring amqp clients!
fanout.queue2:hello spring amqp clients!

Direct Exchange

根据路由规则,将消息发送到指定的Queue,称为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey(RoutingKey)

发布者发布消息,也应当指定BindingKey(RoutingKey)

案例

image-20230219134318634

// @RabbitListener形式声明
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    // @Exchange.type 默认是 direct
    exchange = @Exchange(name = "direct.exchange"),
    key = {"red", "blue"}
))
public void listenDirectQueue1Msg(String msg) {
    System.out.println("direct.queue1:" + msg);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2Msg(String msg) {
    System.out.println("direct.queue2:" + msg);
}

image-20230219135218384

P

@Test
public void sendMsgToDirectExchange() {

    String exchangeName = "direct.exchange";

    String msgBlue = "blue!";
    String msgYellow = "yellow!";
    String msgRed = "red!";

    rabbitTemplate.convertAndSend(exchangeName, "blue", msgBlue);
    rabbitTemplate.convertAndSend(exchangeName, "yellow", msgYellow);
    rabbitTemplate.convertAndSend(exchangeName, "red", msgRed);

}

结果

direct.queue1:blue!
direct.queue2:yellow!
direct.queue2:red!
direct.queue1:red!

Topic Exchange

与Direct Exchange相似,区别在于:RoutingKey必须是多个单词的列表,并且以 . 分隔

使用通配符,让BindingKey(RoutingKey)更高效

  • #指0个或多个单词
  • *指一个单词

案例

image-20230219140724384

E+C

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
    key = "#.sports"
))
public void listenTopicQueue1Msg(String msg) {
    System.out.println("topic.queue1:" + msg);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
    key = "asia.#"
))
public void listenTopicQueue2Msg(String msg) {
    System.out.println("topic.queue2:" + msg);
}

image-20230219141113533

# queue1: #.sports
# queue2: asia.#
topic.queue1:asia.sports msg
topic.queue2:asia.sports msg
topic.queue1:europe.sports msg
topic.queue2:asia.weather msg

对象的序列化

前面代码中,发送的消息都是String类型,而Spring AMQP支持发送Object类型

这一过程牵扯到序列化,Spring默认的序列化器使用JDK的序列化,体积大

通常使用Jackson序列化器

(发送方和接收方一定要使用相同的序列化器)

<!--jackson-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Bean

// jackson serializer
@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

模拟发送Object

Queue

@Bean
public Queue objectQueue() {
    return new Queue("object.queue");
}

P

@Test
public void sendMsgWithObject() {
    String queueName = "object.queue";
    HashMap<String, Object> msg = new HashMap<>();
    msg.put("name", "柳岩");
    msg.put("age", 18);
    rabbitTemplate.convertAndSend(queueName, msg);
}

image-20230219144115926

添加C

@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg) {
    System.err.println("object.queue:" + msg);
}

输出

object.queue:{name=柳岩, age=18}

标签:amqp,spring,RabbitMQ,hello,初探,msg,public,String
From: https://www.cnblogs.com/jentreywang/p/17134726.html

相关文章

  • 将古老的ASP项目转换为PHP初探
    ASP是一种服务器端脚本语言,主要用于开发动态Web应用程序。ASP可以在服务器上执行代码,并将结果返回给客户端浏览器,实现动态生成Web页面的功能。ASP代码通常包含在<%......
  • 96、商城业务---消息队列---RabbitMQ工作流程&概念
    流程如下:生产者先跟消息代理建立一条长连接,在长连接里开辟很多通道(channel),然后通过通道发送消息Message,其中消息必须指定路由键route-key。消息代理里面有很多交换......
  • 95、商城业务---消息队列---RabbitMQ简介
    其中点对点式指只能有一个发送者,但是可有有多个接收者并且只能由一个接收者可以获得消息JMS和AMQP是两大规范......
  • rabbitMq客户端连接超时
    rabbitMq客户端连接超时rabbitmq有两个端口号:15672,用户web页面的http连接;5672用户客户端的tcp长连接。用腾讯云搭建时:需要在防火墙策略处将两个端口都打开。......
  • 安装rabbitmq
    1、创建文件夹mkdirrabbitmqcdrabburmq2、安装Erlanga、在安装RabbitMQ之前,必须安装受支持的Erlang/OTP 版本。在基于RPM的发行版中,Erlang软件包有三种常......
  • RabbitMQ工作原理及应用
    工作模式https://www.rabbitmq.com/getstarted.html上图,列出了RabbitMQ的使用模式,学习上面的模式,对理解所有消息队列都很重要。名词解释名词说明server服务......
  • RabbitMQ的基础安装与使用
    安装主机部署http://www.rabbitmq.com/install-rpm.html选择RPM包下载,选择对应平台,本次安装在CentOS7,其它平台类似。由于使用了erlang语言开发,所以需要erlang的包。er......
  • RabbitMQ 服务器启用 SSL/TLS
    为客户端和服务器生成自签名证书为了启用TLS/SSL,我们需要证书/密钥对。这可以借助OpenSSL为客户端和服务器生成自签名证书。生成自签名CA证书我们现在将使用OpenSSL创......
  • 为 RabbitMQ 服务器启用 SSL/TLS
    为RabbitMQ服务器启用SSL/TLS目录为RabbitMQ服务器启用SSL/TLS为客户端和服务器生成自签名证书在RabbitMQ服务器中启用TLS/SSL支持使用RabbitMQAssistant连......
  • reactor rabbitmq 实现RPC远程调用
    照着官方文档上写,最后发现在消费端怎么也返回不了数据。在文档中也找不到怎么返回数据,查看官方demo也没有案例,各种搜索都找不到。最后在源码中发现有一个RpcServer类,经过......