首页 > 其他分享 >Rabbitmq实战

Rabbitmq实战

时间:2022-11-04 10:57:24浏览次数:41  
标签:实战 交换器 false 消费者 队列 Rabbitmq 消息 channel

broker 高可用

镜像队列

异步解耦削峰

 异步:不重要的业务场景,可以忍受延迟一致性    栗子;亿通行,全民K歌发布 好友不会第一时间看到。入库-订阅binlog-重建缓存 反正不影响。
 解耦:面向对象是说大家一起干一件事情,谁负责啥谁负责啥,解耦是说将他们在这之间抽出来,之后执行也行。
 削峰:大量消息调用系统可能会导致系统瘫痪,可以先进入消息队列,平稳处理。减轻服务器压力。

强一致性

配合一个msg数据库主从,放入队列前先insert一下,用状态来标识。

重要概念

Publisher 生产者
Message 消息
Exchange 交换器
BindingKey 绑定机
Routingkey
Queue
Connection
Channel 通道
Virtual Host 虚拟主机
Broker  消息队列服务器实体

常用交换器有哪些

Exchange type(交换器类型) Default pre-declared names(预声明默认名称)  
Direct exchange(直连交换器) (Empty string) and amq.direct  如果路由键完全匹配,消息就被投递到相应的队列
Fanout exchange(扇形交换器) amq.fanout  如果交换器收到消息,将会广播到所有绑定的队列上
    大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
    体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
    分发系统使用它来广播各种状态和配置更新
    在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此
    XMPP 可能会是个更好的选择)
Topic exchange(主题交换器) amq.topic  可以使来自不同源头的消息能够到达同一个队列。使用topic交换器时,可以使用通配符
    使用案例
    分发有关于特定地理位置的数据,例如销售点
    由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
    股票价格更新(以及其他类型的金融数据更新)
    涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
    云端的不同种类服务的协调
    分布式架构 / 基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
Headers exchange(头信息交换器) amq.match (and amq.headers in RabbitMQ)

总结:
Default 自动命名的直交换机
Direct
把消息路由到BindingKey和RoutingKey完全匹配的队列中,Routing Key==Binding
Key,严格匹配
Fanout 发送到该交换机的消息都会路由到与该交换机绑定的所有队列上,可以用来做广播
Topic
topic和direct类似,也是将消息发送到RoutingKey和BindingKey相匹配的队列中,
只不过可以模糊匹配
Headers 根据发送的消息内容中的 headers 属性进行匹配,性能差,基本不会使用

消息可靠性

持久化消息
      AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
      .deliveryMode(2) // 持久化消息
      .contentEncoding("UTF-8")
      .expiration("500") // TTL
      .build();
持久化队列 
      String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
持久化交换机
      String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
      channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);

方式二:事务消息
      
    // 开启发送方确认模式
    channel.confirmSelect();
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
      // 普通Confirm,发送一条,确认一条
    if (channel.waitForConfirms()) {
        System.out.println("消息发送成功");
    } else {
        System.out.println("消息发送失败");
    }

生产者 确认机制

publisher-confirm-type: correlated  消息到达交换机回调  可以做消息重试和落表 告警等
publisher-returns: true             消息路由失败回调

消费者 手动确认(生产不可能用自动 这里不再说) 多线程消费

   /**
         * channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
         * ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
         */
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

   /**
         * channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
         * ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
         */
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    /**
         * channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
         * Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
         */
       channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

  ****注意消息重试次数 和 时间间隔 如果因为消费者问题 就会一直重试 导致系统压力增加和消息堆积。

  消息队列:rabbltmq
  消费者1:
  消费者2:
  消费者3:
  注意问题和现象: 消费者1启动时消费消息报错(没有捕获异常) 会一直堵塞当前队列所有消息,新消息进入9条 
  消费者2 3 可以各自消费三条 消费这一继续堵塞所有消息消费,重启消费者1 消息会由消  费者 2 3 全部消费。
  消费者1消费出现问题:重新放回队列,消息可以被消费者2 3 消费,则不影响正常业务。 
  如果所有消费者都消费失败,可以整合redis
  消息消费失败记录  hash 命令 hset delay_queue messageid retry重试次数  支持可配置重试次数。
  超过重试次数不能处理的消息责进入死信队列或者异步落表,手动ack并且抛弃出队列。可以异步告警。


  多线程消费:这里提交偏移量会有一定问题 假如你批量1000条消息,999条成功,一条失败,返回队列就会全部重新发送。
  可以单个消费者注册多个RabbitmqListener 来提升性能。

消息堆积问题解决

确定是什么原因,消息生产者过快,消费者拖拉。
横向扩展消费者,会自动消费消息。提升处理速率。

消息幂等=重复消费=唯一ID=业务主键。

顺序性消息

一个queue对应一个consumer , 性能上可以根据不同业务分为不同的queue。提升性能
consumer采用多线程批量,交给内存高性能队列分发给不同的worker。

标签:实战,交换器,false,消费者,队列,Rabbitmq,消息,channel
From: https://www.cnblogs.com/li-xiaotian/p/16856977.html

相关文章

  • 这可能是你需要的React实战技巧
    一、父组件通过Ref调用子组件中的方法这里同时演示使用函数组件和类组件的父子组件如何编写子组件React.forwardRefReact.useImperativeHandlepublic、private、pr......
  • 《ASP.NET Core技术内幕与项目实战》精简集-高级组件4.3:请求数据校验
    本节内容,涉及到8.3(P269-P272),以WebApi说明为主。主要NuGet包:内置命名空间:System.ComponentModel.DataAnnotationsFluentValidation.AspNetCore(数据检验框架)  一、请......
  • Git实战
    分四步走:1.进入你要管理的文件夹2.初始化3.管理当前文件下的文件4.生成版本信息 gitinit ---初始化  gitstatus ---检测当前文件夹下面文件的状......
  • 【2022-11-03】luffy项目实战(一)
    一、企业项目类型#1面向互联网用户:商城类项目 -微信小程序商城#2面向互联网用户:二手交易类的-咸鱼-转转#3公司内部项目:python写的重点......
  • 记一次详细的实战渗透
    前言一次授权的渗透测试,过程比较详细,充满了巧合,也算比较有意思直接记录一下,图片打码比较严重,应该是不影响阅读!!!!前端RCE信息搜集拿到的资产,通过序列化实现的RCE,但是这里只......
  • zk系列二:zookeeper实战之分布式统一配置获取
    前面介绍了zk的一些基础知识,这篇文章主要介绍下如何在java环境下获取zk的配置信息;主要基于zk的监听器以及回调函数通过响应式编程的思想将核心代码糅合成一个工具类,几乎做......
  • TensorFlow 最佳学习资源大全(含课程、书籍、博客、实战项目)
    人工智能大数据与深度学习 公众号:datayxTensorFlow从0到1学习资料大全,含入门指南、在线博客、视频教程、书籍推荐和实战项目等注:♥资料获取方式关注微信公众号datayx ......
  • Laravel入门与实战示例代码----数据库和Eloquent
    示例8-1数据库默认连接列表'connections'=>['sqlite'=>['driver'=>'sqlite','database'=>database_path('database.sqlite'),'prefix'......
  • React实战篇——二、代码分片
    二、代码分片默认情况下,npmrunbuild时,create-react-app内部使用webpack将src路径下的所有代码打包成一个js文件和一个css文件。对于一个大型应用,如果还把代码打包到一个......
  • 盘点一个使用Python实现Excel数理统计的实战问题
    大家好,我是皮皮。一、前言前几天在小小明大佬的Python交流群中遇到一个粉丝问了一个使用Python实现Excel数理统计的实战问题,觉得还挺有用的,这里拿出来跟大家一起分享下。......