20240801
一、顺序消费MQ(FIFO先进先出)
介绍
发送顺序和消费顺序保持一致
默认情况消费方式是并发模式,会导致消息乱序。不管是单线程还是并发模式,都是轮询的模式。也不需要强求全部在一块,只要大局在一块就行。局部有序,不是全局。
所以,生产者把消息放到一个队列里面去,然后消费者是一个单线程模式。
简单来说流程为:
生产者,把需要顺序的小,都放在一个队列里面,使用send的三个参数的方法,根据某个属性进行规范
消费者,把并发模式修改为顺序模式,然后取拿信息
直接上代码吧,
public class FOrderlyTest {
private List<MsgModel> msgModels= Arrays.asList(
new MsgModel("qwer",1,"下单"),
new MsgModel("qwer",1,"短信"),
new MsgModel("qwer",1,"物流"),
new MsgModel("zxcv",2,"下单"),
new MsgModel("zxcv",2,"短信"),
new MsgModel("zxcv",2,"物流")
);
@Test
public void orderlyProducer()throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
//发送顺序消息 发送时要确保有序 并且要发到同一个队列下面去
msgModels.forEach(msgModel -> {
Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
//发 相同的订单号去相同的队列
try {
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
/* //在这里选择队列,最粗暴的方式
return mqs.get(0);//放到第一个队列中*/
//2%4=2 3%4=3 4%4=0 5%4=1 6%4=2 7%4=3 8%4=0 9%4=1 10%4=2 11%4=3
// 0 1 2 3 0 1 2 3 0 1 2 3 取余时周期函数,结果永远比模数小
int hashCode = arg.toString().hashCode();//相同的订单号取相同的hashcode,所以会放到同一队列中
int i = hashCode % mqs.size();//取模,放到第i个中
return mqs.get(i);//放到第i个队列中
}
}, msgModel.getOrderSn());//msgModel.getOrderSn获取之后会根据订单号去选择队列也就是上面的arg
} catch (Exception e) {
e.printStackTrace();
}
});
producer.shutdown();
System.out.println("发送完成");
}
@Test
public void orderlyConsumer()throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-producer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderlyTopic","*");
/* // MessageListenerConcurrently 默认是并发模式 多线程的 默认是20个线程,如果想实现顺序,就把线程设置为1。如果失败,会重试16次,还不成功,就放到死信队列。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});*/
//MessageListenerOrderly 顺序消费模式 单线程的。无线重试,Integer.MAX_VALUE A不走,B拿不到
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println("线程id:"+Thread.currentThread().getId());
System.out.println(new String(msgs.get(0).getBody()));
// 顺序消费模式,如果消费失败,会自动重试 SUSPEND_CURRENT_QUEUE_A_MOMENT 表示当前队列暂停消费
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.in.read();
}
}
实体类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MsgModel {
private String orderSn;
private Integer userId;
private String desc;//下单 短信 物流
}
二、一个小技巧,对于取模运算,用来在几以前进行随机选取,取模运算的周期性特征来将数据分组,
因为取模时周期函数,结果永远比模数小,所以如果我们定义为4,那我们给所有元素取哈希,相同的一定会被分到一块,因为统一元素的hash值是一样的。
当我们想在很多不同的订单中拿到相同的(少量的),可以对其进行取模,把相同的放到一块
如:
try {
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
/* //在这里选择队列,最粗暴的方式
return mqs.get(0);//放到第一个队列中*/
//2%4=2 3%4=3 4%4=0 5%4=1 6%4=2 7%4=3 8%4=0 9%4=1 10%4=2 11%4=3
// 0 1 2 3 0 1 2 3 0 1 2 3 取余时周期函数,结果永远比模数小
int hashCode = arg.toString().hashCode();//相同的订单号取相同的hashcode,所以会放到同一队列中
int i = hashCode % mqs.size();//取模,放到第i个中
return mqs.get(i);//放到第i个队列中
}
}, msgModel.getOrderSn());//msgModel.getOrderSn获取之后会根据订单号去选择队列也就是上面的arg
} catch (Exception e) {
e.printStackTrace();
}
使用场景
这个小技巧使用了取模运算的周期性特征来将数据分组,这在许多实际应用场景中都很有用。以下是一些常见的应用场景:
- 负载均衡:
在分布式系统中,负载均衡是将请求分配到不同的服务器上。通过对请求的哈希值取模,可以将相同的请求分配到同一台服务器上,从而提高缓存效率和减少重复计算。 - 哈希表:
在哈希表中,取模运算用于将键值映射到表的索引位置。这样可以将相似的键值分配到相同的桶中,方便快速查找。 - 缓存:
在缓存系统中,可以使用取模运算将相同的缓存数据分配到相同的缓存区域。这有助于提高缓存命中率,并减少不必要的缓存重新加载。 - 分片存储:
在数据库系统中,数据分片是将数据分布到多个物理存储节点上的技术。通过对数据的哈希值取模,可以将相同的数据分配到同一个分片上,从而简化数据管理。 - 任务调度:
在任务调度系统中,取模运算可以将任务分配到不同的处理单元或队列中。通过这种方式,可以确保相同类型的任务集中在一个队列中,便于管理和处理。 - 负载均衡算法:
在一些应用中,使用取模运算来分配资源或任务。比如在分布式计算中,可以将相同的任务分配到相同的计算节点,以保证任务的连续性和一致性。
总之,利用取模运算的周期性和哈希的特性可以在多个场景中高效地分配和管理数据,从而提高系统的性能和稳定性。
对于取模会重复问题
取模运算可能会导致哈希冲突,即不同的输入值经过取模后得到相同的结果。这种情况通常发生在模数较小或者数据量较大时。在处理这些重复或冲突时,可以考虑以下几种方法来优化:
- 增大模数:
如果取模运算的模数较小,容易造成重复。可以考虑增大模数,这样可以降低冲突的概率。例如,将模数设置为更大的质数,可以减少不同数据项取模后结果相同的情况。 - 使用更复杂的哈希函数:
选择适当的哈希函数可以减少冲突的发生。好的哈希函数能够将不同的输入均匀地分布到哈希表中,即使在相同的模数下,也能尽量减少冲突。 - 链表法:
在哈希表中,如果多个元素的哈希值取模后相同,可以使用链表法解决冲突。每个哈希桶可以维护一个链表来存储所有哈希值相同的元素。 - 开放地址法:
另一种解决冲突的方式是开放地址法。当发生冲突时,寻找下一个可用的位置存储元素。常见的开放地址法有线性探测、二次探测和双重哈希等。 - 双重哈希:
双重哈希是一种解决冲突的方法。在第一次哈希之后,如果发生冲突,再进行第二次哈希运算,结合两个哈希值来确定最终的位置,从而减少冲突。 - 分布式哈希表(DHT):
在分布式系统中,可以使用分布式哈希表技术将数据均匀分布到多个节点上。DHT能更有效地处理数据冲突和负载均衡问题。 - 一致性哈希:
一致性哈希是一种改进的哈希技术,能有效减少节点变动对系统的影响,并降低数据冲突的几率。它常用于分布式系统中的负载均衡和数据分片。
通过这些方法,可以有效地管理哈希冲突,并提高系统的性能和稳定性。在实际应用中,可以根据具体需求选择合适的策略来解决取模运算带来的重复问题。
标签:取模,运算,相同,队列,FIFO,哈希,new,先进先出 From: https://blog.csdn.net/f552126506/article/details/140858409