首页 > 其他分享 >顺序消费rocketMQ(FIFO先进先出)和小技巧 取模运算的周期性特征来将数据分组。

顺序消费rocketMQ(FIFO先进先出)和小技巧 取模运算的周期性特征来将数据分组。

时间:2024-08-01 21:57:43浏览次数:14  
标签:取模 运算 相同 队列 FIFO 哈希 new 先进先出

20240801

一、顺序消费MQ(FIFO先进先出)

介绍

发送顺序和消费顺序保持一致

默认情况消费方式是并发模式,会导致消息乱序。不管是单线程还是并发模式,都是轮询的模式。也不需要强求全部在一块,只要大局在一块就行。局部有序,不是全局。
在这里插入图片描述
所以,生产者把消息放到一个队列里面去,然后消费者是一个单线程模式。

简单来说流程为:

生产者,把需要顺序的小,都放在一个队列里面,使用send的三个参数的方法,根据某个属性进行规范

消费者,把并发模式修改为顺序模式,然后取拿信息

直接上代码吧,

public class FOrderlyTest {

    private List<MsgModel> msgModels= Arrays.asList(
            new MsgModel("qwer",1,"下单"),
            new MsgModel("qwer",1,"短信"),
            new MsgModel("qwer",1,"物流"),

            new MsgModel("zxcv",2,"下单"),
            new MsgModel("zxcv",2,"短信"),
            new MsgModel("zxcv",2,"物流")

            );
    @Test
    public void  orderlyProducer()throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        producer.start();
        //发送顺序消息  发送时要确保有序 并且要发到同一个队列下面去
        msgModels.forEach(msgModel -> {
            Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
            //发 相同的订单号去相同的队列
            try {
                producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                      /*  //在这里选择队列,最粗暴的方式
                        return mqs.get(0);//放到第一个队列中*/
                        //2%4=2 3%4=3 4%4=0 5%4=1 6%4=2 7%4=3 8%4=0 9%4=1 10%4=2 11%4=3
                        // 0 1 2 3 0 1 2 3 0 1 2 3 取余时周期函数,结果永远比模数小
                        int hashCode = arg.toString().hashCode();//相同的订单号取相同的hashcode,所以会放到同一队列中
                        int i = hashCode % mqs.size();//取模,放到第i个中
                        return mqs.get(i);//放到第i个队列中
                    }
                }, msgModel.getOrderSn());//msgModel.getOrderSn获取之后会根据订单号去选择队列也就是上面的arg
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        producer.shutdown();
        System.out.println("发送完成");
    }

    @Test
    public void orderlyConsumer()throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-producer-group");
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        consumer.subscribe("orderlyTopic","*");
      /*  // MessageListenerConcurrently 默认是并发模式 多线程的 默认是20个线程,如果想实现顺序,就把线程设置为1。如果失败,会重试16次,还不成功,就放到死信队列。
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });*/
        //MessageListenerOrderly 顺序消费模式 单线程的。无线重试,Integer.MAX_VALUE A不走,B拿不到
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.println("线程id:"+Thread.currentThread().getId());
                System.out.println(new String(msgs.get(0).getBody()));
                // 顺序消费模式,如果消费失败,会自动重试   SUSPEND_CURRENT_QUEUE_A_MOMENT 表示当前队列暂停消费
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.in.read();
    }
}

实体类

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MsgModel {
    private String orderSn;
    private Integer userId;
    private String desc;//下单 短信 物流


}

二、一个小技巧,对于取模运算,用来在几以前进行随机选取,取模运算的周期性特征来将数据分组,

因为取模时周期函数,结果永远比模数小,所以如果我们定义为4,那我们给所有元素取哈希,相同的一定会被分到一块,因为统一元素的hash值是一样的。

当我们想在很多不同的订单中拿到相同的(少量的),可以对其进行取模,把相同的放到一块
如:

try {
                producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                      /*  //在这里选择队列,最粗暴的方式
                        return mqs.get(0);//放到第一个队列中*/
                        //2%4=2 3%4=3 4%4=0 5%4=1 6%4=2 7%4=3 8%4=0 9%4=1 10%4=2 11%4=3
                        // 0 1 2 3 0 1 2 3 0 1 2 3 取余时周期函数,结果永远比模数小
                        int hashCode = arg.toString().hashCode();//相同的订单号取相同的hashcode,所以会放到同一队列中
                        int i = hashCode % mqs.size();//取模,放到第i个中
                        return mqs.get(i);//放到第i个队列中
                    }
                }, msgModel.getOrderSn());//msgModel.getOrderSn获取之后会根据订单号去选择队列也就是上面的arg
            } catch (Exception e) {
                e.printStackTrace();
            }
       

使用场景

这个小技巧使用了取模运算的周期性特征来将数据分组,这在许多实际应用场景中都很有用。以下是一些常见的应用场景:

  1. 负载均衡
    在分布式系统中,负载均衡是将请求分配到不同的服务器上。通过对请求的哈希值取模,可以将相同的请求分配到同一台服务器上,从而提高缓存效率和减少重复计算。
  2. 哈希表
    在哈希表中,取模运算用于将键值映射到表的索引位置。这样可以将相似的键值分配到相同的桶中,方便快速查找。
  3. 缓存
    在缓存系统中,可以使用取模运算将相同的缓存数据分配到相同的缓存区域。这有助于提高缓存命中率,并减少不必要的缓存重新加载。
  4. 分片存储
    在数据库系统中,数据分片是将数据分布到多个物理存储节点上的技术。通过对数据的哈希值取模,可以将相同的数据分配到同一个分片上,从而简化数据管理。
  5. 任务调度
    在任务调度系统中,取模运算可以将任务分配到不同的处理单元或队列中。通过这种方式,可以确保相同类型的任务集中在一个队列中,便于管理和处理。
  6. 负载均衡算法
    在一些应用中,使用取模运算来分配资源或任务。比如在分布式计算中,可以将相同的任务分配到相同的计算节点,以保证任务的连续性和一致性。

总之,利用取模运算的周期性和哈希的特性可以在多个场景中高效地分配和管理数据,从而提高系统的性能和稳定性。

对于取模会重复问题

取模运算可能会导致哈希冲突,即不同的输入值经过取模后得到相同的结果。这种情况通常发生在模数较小或者数据量较大时。在处理这些重复或冲突时,可以考虑以下几种方法来优化:

  1. 增大模数
    如果取模运算的模数较小,容易造成重复。可以考虑增大模数,这样可以降低冲突的概率。例如,将模数设置为更大的质数,可以减少不同数据项取模后结果相同的情况。
  2. 使用更复杂的哈希函数
    选择适当的哈希函数可以减少冲突的发生。好的哈希函数能够将不同的输入均匀地分布到哈希表中,即使在相同的模数下,也能尽量减少冲突。
  3. 链表法
    在哈希表中,如果多个元素的哈希值取模后相同,可以使用链表法解决冲突。每个哈希桶可以维护一个链表来存储所有哈希值相同的元素。
  4. 开放地址法
    另一种解决冲突的方式是开放地址法。当发生冲突时,寻找下一个可用的位置存储元素。常见的开放地址法有线性探测、二次探测和双重哈希等。
  5. 双重哈希
    双重哈希是一种解决冲突的方法。在第一次哈希之后,如果发生冲突,再进行第二次哈希运算,结合两个哈希值来确定最终的位置,从而减少冲突。
  6. 分布式哈希表(DHT)
    在分布式系统中,可以使用分布式哈希表技术将数据均匀分布到多个节点上。DHT能更有效地处理数据冲突和负载均衡问题。
  7. 一致性哈希
    一致性哈希是一种改进的哈希技术,能有效减少节点变动对系统的影响,并降低数据冲突的几率。它常用于分布式系统中的负载均衡和数据分片。

通过这些方法,可以有效地管理哈希冲突,并提高系统的性能和稳定性。在实际应用中,可以根据具体需求选择合适的策略来解决取模运算带来的重复问题。

标签:取模,运算,相同,队列,FIFO,哈希,new,先进先出
From: https://blog.csdn.net/f552126506/article/details/140858409

相关文章

  • FPGA FIFO IP核(3)- 仿真
    仿真思路如何在写入标志信号(写入请求信号)有效时将数据写入到FIFO中?在调用模块代码中,pi_flag每四个时钟周期产生一个有效信号,即写请求信号。每次当pi_data检测到pi_flag信号有效时加1,从0~255循环变化。实现效果就是在pi_flag有效时将pi_data写入到FIFO中。读请求信号rdreq......
  • 大数相乘取模
    https://www.cnblogs.com/shuaihui520/p/9619322.html记一下a∗bmodp=a∗b−⌊a∗bp⌋∗pa∗bmodp=a∗b−⌊a∗bp⌋∗p用longdouble来计算⌊a∗bp⌋⌊a∗bp⌋,误差很小,因为longdouble的特性是存不下就舍弃低位,再把它转成longlong。直接用longlong来计算。longlong爆掉了......
  • 取模+组合数
    jiangly的板子//------取模机------//usingi64=longlong;template<classT>constexprTpower(Ta,i64b){Tres{1};for(;b;b/=2,a*=a){if(b%2){res*=a;}}returnres;}//快速幂constexpri64......
  • FIFO最小深度计算
    1、读写没有空闲周期。(fA>fB)    例如,fA=80MHz;fB=50MHz;BurstLength=120;读写之间没有空闲周期,连续读写一个突发长度。解法:        写一个数据需要的时间=1/80MHz=12.5ns;        写一个突发需要的时间=120*12.5ns=1500ns;   ......
  • 将 python 脚本的 stdin 重定向到 fifo 会导致 RuntimeError: input():lost sys.stdin
    我有这个python脚本,它的作用是充当服务器,它从重定向到fifo的stdin读取命令:test.py:whileTrue:try:line=input()exceptEOFError:breakprint(f'Received:{line}')在bash中运行命令:mkfifotestfifotest.py<testfifo......
  • 旋转链表-灵活运用取模
    题目描述:个人题解:        记给定链表的长度为n,注意到当向右移动的次数k≥n时,我们仅需要向右移动k%n次即可。因为每n次移动都会让链表变为原状。这样我们可以知道,新链表的最后一个节点为原链表的第(n−1)−(k%n)个节点(从0开始计数)。这样,我们可以先将给定......
  • 数论知识(取模运算)
    若amodk=xa......
  • FPGA必修课—FIFO
    FIFO基本概念FIFO,全称为“FirstInFirstOut”,译为“先进先出”。这是一种常见的数据存储和处理原则,其基本含义在于数据的存取顺序:最先进入的数据将最先被取出。FIFO可以被视为一种特殊类型的数据缓冲区,它按照元素到达的顺序进行数据的存取操作。学习FIFO的重要性在于它在......
  • 【LeetCode 0232】【设计】用FILO栈实现FIFO队列
    ImplementQueueusingStacksImplementafirstinfirstout(FIFO)queueusingonlytwostacks.Theimplementedqueueshouldsupportallthefunctionsofanormalqueue(push,peek,pop,andempty).ImplementtheMyQueueclass:*voidpush(intx)Pushes......
  • 队列:先进先出的数据结构
    1.队列的概念及结构队列:只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有先进先出FIFO(FirstInFirstOut)入队列:进行插入操作的一端称为队尾出队列:进行删除操作的一端称为队头2.队列的实现队列可以通过多种方式实现,包括数组、链表等。数......