rabbitmq
-
基础简介
- 由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang
- rabbitmq包含的关键字
- 消息队列使用过程
- 组成部分
- 场景:
- 系统解耦、异步通信和流量削峰,延迟通知、最终一致性保证、顺序消息、流式处理
-
原理
- AMQP的核心思想就是将生产者和消费者隔离开,生产者从不直接将消息发送给队列
-
Exchange的类型
- direct模式
- 所有发送给Direct Exchange的消息被转发给具有指定RouteKey的Queue(发送给与RouteKey绑定的队列)
- fanout
- 所有发送给Fanout Exchnage的消息被转发给与该Exchange绑定的所有Queue上(不需要处理RouteKey,只需要将队列绑定在Exchange上)
- topic(direct的扩展)
- 所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上(对RouteKey通配符模糊匹配)
- header
- 根据发送的消息内容中的headers属性进行匹配
-
产生问题的情况有哪一些
- 为什么要创建信道而不是直接使用Tcp连接
- 创建tcp和销毁是耗费性能的,特别是数据量大的时候
- Rabbitmq采用类似Nio的做法采取Tcp连接复用
- 防止Rabbitmq数据丢失的方法
- 生产者消息确认
- tx(事务模式)
- 生产者发送消息之前,通过channel.txSelect开启一个事务并发送消息, 如果消息投递server失败,进行事务回滚channel.txRollback,然后重新发送, 如果server收到消息,就提交事务channel.txCommit
*注意点:同步操作,一条消息发送会是发送端阻塞等待rabbitmqserver回应,生产者生产消息的吞吐量和性能大大降低
- 生产者发送消息之前,通过channel.txSelect开启一个事务并发送消息, 如果消息投递server失败,进行事务回滚channel.txRollback,然后重新发送, 如果server收到消息,就提交事务channel.txCommit
- confirm
- 先将channel设置为confirm模式,生产者发送一个消息,rabbitmq如果接收到了发送的消息,就会回调生产者本地的接口,通知生产者收到发送的消息,如果在接收消息的时候出现异常,就会回调接口,通知生产者消息接受失败,可再次重发消息
- ConfirmSelect() 等待所有消息确认,如果所有的消息都被服务端成功接收返回true,只要有一条没有被成功接收就返回false
- WaitForConfirms()
- WaitForConfirmOrDie
- tx(事务模式)
- 消费者消息确认
- ack机制
- 自动ack:当RabbbitMQ将消息发送给消费者后,消费者端接收到消息后,不等待消息处理结束,立即自动回送一个确认回执
- 手动ack:自动确认可能会出现消息丢失的问题,如果可以让消费者在接收消息时不立即返回确认回执,等到消息处理完成后(或者完成一部分的逻辑)再返回确认回执
- ack机制
- Rabbitmq消息持久化
- 创建队列交换机的时候将其持久化,但不持久队列里面的数据(队列Exchange持久化)
- 发送消息持久化(消息持久化)
- 生产者消息确认
- 如何保证消息队列消费的幂等性
- 利用消息做唯一主键,或者用redis的set操作
- 生产消息给消息加全局唯一Id,消费者消费根据id去redis查询消息是否消费过
- 如何保证消息顺序消费
- 为什么要创建信道而不是直接使用Tcp连接
-
链接