首页 > 其他分享 >stream

stream

时间:2023-07-11 19:56:45浏览次数:27  
标签:消费 消费者 stream 队列 消息 id 读取

stream

由来

为了抢消息队列的饭碗

原理

1 Message Content 消息内容
2 Consumer group 消费组,通过XGROUP CREATE 命令创建,同一个消费组可以有多个消费者
3 Last_delivered_id 游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
4 Consumer 消费者,消费组中的消费者
5 Pending_ids 消费者会有一个状态变量,用于记录被当前消费已读取但未ack的消息Id,如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack它就开始减少。这个pending_ids变量在Redis官方被称之为 PEL(Pending Entries List),记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符),它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理

特殊符号

  • -/+ 最小和最大可能出现的id
  • $ $表示只消费新的消息,当前流中最大的id,可用于将要到来的消息
  • > 用于用于XREADGROUP命令,表示迄今还没有发送给组中使用者的信息,会更新消费者组的最后ID
  • * 用于XADD命令中,让系统自动生成id

流操作

  1. XADD(* 会让系统自动生成ID)(没有这个流,会自动创建)
  2. Xrange 获取消息队列(可指定范围),忽略删除的消息
    1. start 表示开始值 -表示最小值
    2. end 表示结束值, + 代表最大值
    3. count代表最多获取多少个值
  3. xrevrange 与range区别在,获取消息列表元素的方向是相反的,end在前,start在后
  4. XDEL 删除某个流的某条消息
  5. XLEN 获取stream队列的消息的长度
  6. XTRIM 对Stream 的长度进行截取
    1. MAXLEN 允许的最大长度,对流进行修剪限制长度(留下日期较新的消息)
    2. MINID 允许的最小id,从某个id值开始比该id值小的会被抛弃

消费组

  1. XGROUP CREATE 创建消费者组
  2. XREADGROUP GROUP
    1. > 表示从第一条尚未被消费的消息开始读取
    2. 同一个信息只能被一个消费组的人读取,其他人读取不到
    3. 不同消费组的人可以消费同一条消息
    4. 消费组目的:让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的
  3. XPENDING
    1. 查看每个消费组内所有消费者[已读取,未确认]的消息
    2. 查看某个消费者具体读取了哪些数据
  4. XACK 向消息队列确认消息处理已完成,对应的pendlist会减少对应的记录
1问题 基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?
2 Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息保底措施,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。
3 消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成

  • XINFO 打印Stream\consumer\group的详细信息

标签:消费,消费者,stream,队列,消息,id,读取
From: https://www.cnblogs.com/seamount3/p/17545767.html

相关文章

  • Streamlit 入门介绍
    Streamlit入门介绍Streamlit是一个PythonWeb应用框架。但和常规Web框架,如Flask/Django的不同之处在于,它不需要你去编写任何客户端代码(HTML/CSS/JS),只需要编写普通的Python模块,就可以在很短的时间内创建美观并具备高度交互性的界面,从而快速生成数据分析或者机器学习的结......
  • 【NSSCTF逆向】【2023题目】《stream》《a_cup_of_tea》
    总览streamRC4base64exe解包pyc反编译a_cup_of_teatea题目stream解法拿到题目是一个exe,先用exeinfo打开看看可以看到这个程序使用python写的,又是exe。所以就是常规的思路,exe解包到pyc文件,再对pyc文件进行反编译先是解包用pyinstxtractor.py里面找到它再来反编......
  • 关于Stream流的一些常用方法
    前言在这里先说明一下,这个重在应用,本文就不对概念跟描述做过多赘述。应用1)提取对象数组中的某一个字段(带去重)List<String>orderIdList=orderList.stream().map(e->e.getOrderId()).distinct().collect(Collectors.toList());//收集全部orderIdSetthirdCategoryI......
  • Spark 以及 spark streaming 核心原理及实践 - (2)
    SparkStreaming运行原理spark程序是使用一个spark应用实例一次性对一批历史数据进行处理,sparkstreaming是将持续不断输入的数据流转换成多个batch分片,使用一批spark应用实例进行处理。从原理上看,把传统的spark批处理程序变成streaming程序,spark需要构建什么?需要构建4个东西:一个静......
  • 从头学Java17-Stream API(二)结合Record、Optional
    StreamAPIStreamAPI是按照map/filter/reduce方法处理内存中数据的最佳工具。本系列教程由Record讲起,然后结合Optional,讨论collector的设计。使用Record对不可变数据进行建模Java语言为您提供了几种创建不可变类的方法。可能最直接的是创建一个包含final字段的final类。......
  • ubuntu20.04 卸载已有的显卡驱动,安装安装 NVIDIA 驱动程序 525.85.12(主要是搭建deepst
    nvidia安装deepstream官网:  https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_Quickstart.html一、首先卸载本机的显卡驱动:参考连接(https://blog.csdn.net/gongdiwudu/article/details/128850792)1、卸载驱动库sudoapt-get--purgeremovenvidia*sudoapt......
  • Flink DataStream API
    Flink的DataSet和DataStream的API,并模拟了实时计算的场景。说好的流批一体呢现状Flink很重要的一个特点是“流批一体”,然而事实上Flink并没有完全做到所谓的“流批一体”,即编写一套代码,可以同时支持流式计算场景和批量计算的场景。目前截止1.10版本依然采用了DataS......
  • 从头学Java17-Stream API(一)
    StreamAPIStreamAPI是按照map/filter/reduce方法处理内存中数据的最佳工具。本系列中的教程包含从基本概念一直到collector设计和并行流。在流上添加中继操作将一个流map为另一个流map流是使用函数转换其元素。此转换可能会更改该流处理的元素的类型,但您也可以在不更改......
  • centos-9stream安装zabbix-all
    centos-9stream安装zabbix-all下载Zabbix1.zabbix是一个基于web界面的提供分布式系统监控以及网络监控功能的企业级开源解决方案。zabbix能监控各种网络参数,保证服务器系统的安全运行,并且能够提供灵活的通知报警机制让系统管理员能快速的发现问题,定位问题,解决问题.2.zabbix除了支持......
  • 18、【SparkStreaming】object not serializable (class: org.apache.kafka.clients.c
    背景:当SparkStream连接kafka,消费数据时,报错:objectnotserializable(class:org.apache.kafka.clients.consumer.ConsumerRecord,value:ConsumerRecord分析:消费者的消费记录序列化出现了问题,需要正确的进行序列化。措施:在设置sparkconf的时候,指定序列化方式就可以解......