首页 > 其他分享 >中间件学习

中间件学习

时间:2022-12-14 21:34:26浏览次数:39  
标签:QUEUE false String 中间件 学习 public channel NAME

==========Kafka、RocketMQ、RabbitMQ的比较总结=========

Kafka:高吞吐量

RocketMQ:可靠性高

RabittMQ:数据量没没那么大,小公司优先选择

==============RabbitMQ======================

一、介绍

RabittMQ支持多种语言,支持海量插件、自带图形化界面,延迟低

Kafka:最大有点高吞吐量

 

二、使用MQ的好处

1、流量削峰

2、应用解耦

3、异步处理

 

三、RabbitMQ架构

 

四、RabbitMQ通讯方式

1、Hello Wold 简单模式

一对一模式,只有一个消费者能接收到

 

 

2、Work queues工作 队列模式

多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费,可配置能者多劳模式,谁完成的快,谁多做一点

 

@Test
      public void consume2() throws Exception {
          //1. 获取连接对象
          Connection connection = RabbitMQConnectionUtil.getConnection();

          //2. 构建Channel
          Channel channel = connection.createChannel();

          //3. 构建队列
          channel.queueDeclare(Publisher.QUEUE_NAME,false,false,false,null);

          channel.basicQos(3);//设置消费的流控,消费者每次消费几个消息

          //4. 监听消息
          DefaultConsumer callback = new DefaultConsumer(channel){
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println("消费者2号-获取到消息:" + new String(body,"UTF-8"));
                  channel.basicAck(envelope.getDeliveryTag(),false);//手动ack,告诉rabbitmq消费完了
              }
          };
          channel.basicConsume(Publisher.QUEUE_NAME,false,callback);//false:表示手动ack
          System.out.println("开始监听队列");

          System.in.read();
      }

3、Publish/Subscribe发布订阅模式

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。
实现方式是加入了exchange(交换机),注意:交换机是不缓存消息的

public class Publisher {

    public static final String EXCHANGE_NAME = "pubsub";
    public static final String QUEUE_NAME1 = "pubsub-one";
    public static final String QUEUE_NAME2 = "pubsub-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列,使用的是FANOUT类型的交换机,绑定方式是直接绑定
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"45jk6h645jk",null,"publish/subscribe!".getBytes());
        System.out.println("消息成功发送!");
    }
}

4、Routing 路由模式

使用的是Direct类型的交换机,会将接收到的消息根据规则路由到指定的Queue(队列),因此称为路由模式

 

public class Publisher {

    public static final String EXCHANGE_NAME = "routing";
    public static final String QUEUE_NAME1 = "routing-one";
    public static final String QUEUE_NAME2 = "routing-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"ORANGE");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"BLACK");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"GREEN");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"ORANGE",null,"大橙子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"BLACK",null,"黑布林大狸子".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"WHITE",null,"小白兔!".getBytes());
        System.out.println("消息成功发送!");


    }

}

5、Topics 主题模式

topicExchange与directExchange类型,区别在于routingKey必须是多个单词的列表,并且以 . 分隔

*(代表通配符,任意一个字段)
#(号代表一个或多个字段)

 

public class Publisher {

    public static final String EXCHANGE_NAME = "topic";
    public static final String QUEUE_NAME1 = "topic-one";
    public static final String QUEUE_NAME2 = "topic-two";
    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //4. 构建队列
        channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
        channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

        //5. 绑定交换机和队列,
        // TOPIC类型的交换机在和队列绑定时,需要以aaa.bbb.ccc..方式编写routingkey
        // 其中有两个特殊字符:*(相当于占位符),#(相当通配符)
        channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.orange.*");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"lazy.#");

        //6. 发消息到交换机
        channel.basicPublish(EXCHANGE_NAME,"big.orange.rabbit",null,"大橙兔子!".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"small.white.rabbit",null,"小白兔".getBytes());
        channel.basicPublish(EXCHANGE_NAME,"lazy.dog.dog.dog.dog.dog.dog",null,"懒狗狗狗狗狗狗".getBytes());
        System.out.println("消息成功发送!");

    }
}

6、RPC方式

 

客户端:
package com.mashibing.rpc;

import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.UUID;

/**
 * @author zjw
 * @description
 * @date 2022/2/8 20:03
 */
public class Publisher {

    public static final String QUEUE_PUBLISHER = "rpc_publisher";
    public static final String QUEUE_CONSUMER = "rpc_consumer";

    @Test
    public void publish() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);

        //4. 发布消息
        String message = "Hello RPC!";
        String uuid = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .replyTo(QUEUE_CONSUMER)
                .correlationId(uuid)
                .build();
        channel.basicPublish("",QUEUE_PUBLISHER,props,message.getBytes());

        channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String id = properties.getCorrelationId();
                if(id != null && id.equalsIgnoreCase(uuid)){
                    System.out.println("接收到服务端的响应:" + new String(body,"UTF-8"));
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
        System.out.println("消息发送成功!");

        System.in.read();
    }


}
```

服务端:
package com.mashibing.rpc;

import com.mashibing.helloworld.Publisher;
import com.mashibing.util.RabbitMQConnectionUtil;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

/**
 * @author zjw
 * @description
 * @date 2022/1/24 23:02
 */
public class Consumer {

    public static final String QUEUE_PUBLISHER = "rpc_publisher";
    public static final String QUEUE_CONSUMER = "rpc_consumer";

    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQConnectionUtil.getConnection();

        //2. 构建Channel
        Channel channel = connection.createChannel();

        //3. 构建队列
        channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
        channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);


        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body,"UTF-8"));
                String resp = "获取到了client发出的请求,这里是响应的信息";
                String respQueueName = properties.getReplyTo();
                String uuid = properties.getCorrelationId();
                AMQP.BasicProperties props = new AMQP.BasicProperties()
                        .builder()
                        .correlationId(uuid)
                        .build();
                channel.basicPublish("",respQueueName,props,resp.getBytes());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_PUBLISHER,false,callback);
        System.out.println("开始监听队列");

        System.in.read();
    }
}

五、RabbitMQ保证消息的可靠性

1、确保消息发送到Exchange

Confirm机制

//4. 开启confirms
channel.confirmSelect();

//5. 设置confirms的异步回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息成功的发送到Exchange!");
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息没有发送到Exchange,尝试重试,或者保存到数据库做其他补偿操作!");
    }
});

2、确保消息路由到队列

Return机制:消息没有到达Queue时候会回调handleReturn方法

//6. 设置Return回调,确认消息是否路由到了Queue
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消息没有路由到指定队列,做其他的补偿措施!!");
    }
});
//7. 在发送消息时,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
```

3、保证Queue可以持久化消息

DeliveryMode设置消息持久化 DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。
//7. 设置消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties()
    .builder()
    .deliveryMode(2)
    .build();

//7. 发布消息
channel.basicPublish("","confirms",true,props,message.getBytes());

4、保证消费者可以正常消费消息

手动Ack机制

六、RabbitMQ死信队列&延迟交换机

1、死信&死信队列

 

 

2、延迟交换机

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。 延迟交换机当消息发送到交换机,rabbitmq宕机消息会丢失

 

 

 

 

 

 

 

 

 

标签:QUEUE,false,String,中间件,学习,public,channel,NAME
From: https://www.cnblogs.com/come-on-come-on/p/16977011.html

相关文章

  • python学习笔记整理03(函数)
    1语法:1.1基本语法:#1.定义函数#使用def(define)关键字定义函数,且函数命名方法要遵循标识符规则deffunc1():#定义函数内的代码称为函数体print('函......
  • 网络流学习笔记
    网络流初步一个网络\(G=(V,E)\)是一张有向图,图中每条有向边\((x,y)\inE\)都有一个给定的权值\(c(x,y)\),称为边的容量。图中还有两个节点\(S\)和\(T\),源点和汇点。网络......
  • 【系统架构设计师】软考高级职称,一次通过,倾尽所有,【系统架构设计师】备考学习资料
    【系统架构设计师】软考高级职称,一次通过,倾尽所有,看完这篇就够了,学习方法和技巧这里全都有。下文中的网盘连接,可能触发到百度网盘风控规则,导致连接失效,非本人原因,如需要,可以......
  • JavaScript学习--Item1 严格模式
    一、概述除了正常运行模式,ECMAscript5添加了第二种运行模式:“严格模式”(strictmode)。顾名思义,这种模式使得Javascript在更严格的条件下运行。设立”严格模式”的目的,主要......
  • 【机器学习】李宏毅——机器学习任务攻略
    这一节的主要内容是当测试数据的准确度不够高的时候应该怎么做。首先一定要检查你的训练数据集的误差,如果发现是你的训练数据集误差也比较大,那么就有两种可能:模型过于简......
  • #yyds干货盘点# react笔记之学习之state注意事项
    前言我是歌谣我有个兄弟巅峰的时候排名c站总榜19叫前端小歌谣曾经我花了三年的时间创作了他现在我要用五年的时间超越他今天又是接近兄弟的一天人生难免坎坷大不了从......
  • Go语言学习--接受和处理Web请求之接收请求(更新中......)
    GoWeb服务器请求和响应的流程如下:1.客户端发送请求2.服务器端的多路复用器收到请求3.多路复用器根据请求的URL找到注册的处理器,将请求交由处理器处理......
  • 12.14学习总结
    缓存更新策略缓存更新是redis为了节约内存而设计出来的一个东西,主要是因为内存数据宝贵,当我们向redis插入太多数据,此时就可能会导致缓存中的数据过多,所以redis会对部分数......
  • 从9月开始每天坚持在力扣上刷题,不停学习
       ......
  • SpringMVC学习
    SpringMVC学习1.回顾MVC1.1什么是MVCMVC是模型(Model)、视图(View)、控制器(Controller)的简写,是一种软件设计规范。是将业务逻辑、数据、显示分离的方法来组织代码......