首页 > 其他分享 >关于上报数据量大,且需要剔除一些数据时 --kafka

关于上报数据量大,且需要剔除一些数据时 --kafka

时间:2024-11-15 15:31:40浏览次数:1  
标签:key -- Kafka Streams 数据量 streams kafka 数据

假如有些数据是异常大量上报的,此时我需要屏蔽禁止这样的数据进来。能不能在消息上报到kafka时,还没消费消息之前就判断哪些是大量上报的垃圾数据,直接去除掉

在使用Kafka对数据进行处理时,通常我们是在数据进入Kafka之后,在消费阶段进行过滤、转换等处理。但如果你的目标是在数据进入Kafka之前进行过滤,以阻止大量的垃圾数据进入系统,你可以采用以下几种策略:

1. 消息预处理层
在数据发送到Kafka之前,可以设置一个预处理层,这个层负责初步的数据审核和过滤。这可以是一组微服务、API网关或者是一个简单的消息队列,专门用于初步处理数据。

  • 实现步骤:
    数据收集 :所有上报的数据首先被发送到预处理层。
    数据审核 :在这一层对数据进行检查,识别出异常大量的上报数据。
    基于规则:如对特定来源的请求频率进行限制。
    使用机器学习模型:识别可能的异常模式。
    数据过滤 :根据审核结果决定是否将数据发送到Kafka。
    反馈信息 :如果数据被拒绝,可以反馈给数据提供者。

2. 使用Kafka Connector自定义
如果使用Kafka Connect来导入外部系统数据,可以开发自定义的Kafka Connector,利用其transform功能,在数据进入Kafka之前做过滤。

  • 实现步骤:
    自定义Transforms:创建一个或多个Transformer,用于在数据进入Kafka前进行处理。
    配置Connector:在Connector的配置文件中指明使用这些自定义Transforms。

3. 数据来源处限制
在数据发送到Kafka的源头就进行控制。比如,如果数据是由前端应用上报的,可以在应用层面增加限流、监控和数据验证的逻辑。

  • 实现步骤:
    限流 :使用如Token Bucket、Leaky Bucket或其他算法限制数据上报频率。
    数据校验 :在数据发送前进行格式、大小、频率等方面的校验。
    监控与警报 :监控数据上报模式,一旦检测到异常可以立即采取措施。

4. Kafka Broker配置
虽然这不是直接在Kafka之前过滤,但可以在Kafka层面上使用配置来部分控制数据质量:
消息最大尺寸 :通过设置message.max.bytes限制可以接收的消息最大大小。
客户端配额 :使用client.id配置来限制单个客户端的带宽和请求率。

预处理代码的体现

1. 使用缓存和速率限制

实现步骤:
缓存记录 :为每个数据点或数据来源配置一个缓存条目,例如使用Redis。
频率检查 :每次数据上报时检查缓存中相应的频率值。
速率限制 :如果某数据点在设定的时间窗口内达到了限制阈值,则触发警报或直接拒绝处理数据。

示例代码(Java + Redis用于速率限制):
假设你已经有一个Redis服务器运行。

添加依赖到 pom.xml:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>YourProjectVersion</version> </dependency>
Redis配置和速率检测逻辑:

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;

import java.util.concurrent.TimeUnit;

public class RateLimiter {

    private RedisTemplate<String, String> redisTemplate;

    public RateLimiter(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public boolean shouldBlock(String key, long maxAllowed, long windowInSeconds) {
        ValueOperations<String, String> ops = redisTemplate.opsForValue();
        // 使用自增和设置过期时间来跟踪次数
        long count = ops.increment(key);
        if (count == 1) {
            // 设置过期时间窗口
            redisTemplate.expire(key, windowInSeconds, TimeUnit.SECONDS);
        }
        return count > maxAllowed;
    }
}

这个 RateLimiter 类使用Redis来限制指定时间窗口内的数据上报次数。如果到达或超过上限,shouldBlock 方法返回 true。然后再调用这个方法时就可以知道需不需要保留该相关数据

2. 使用流处理技术

如Apache Kafka的Kafka Streams或Apache Flink可以用来实现复杂的事件处理和分析,包括基于时间窗口的聚合。

Kafka Streams实现:
在Kafka Streams应用程序中,你可以定义一个时间窗口来处理流入的数据,并对窗口内的事件计数。

添加依赖到 pom.xml:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.0</version> **</dependency>
Kafka Streams处理逻辑:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;

StreamsBuilder builder = new StreamsBuilder();
KGroupedStream<String, String> groupedStream = builder
    .stream("incoming-data-topic")
    .groupBy((key, value) -> value);  // 根据数据内容分组

groupedStream
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)))
    .count(Materialized.as("counts-store"))
    .toStream()
    .foreach((Windowed<String> window, Long count) -> {
        if (count > threshold) {
            System.out.println("Block data with key " + window.key() + " due to high frequency");
        }
    });

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

这个实现创建了一个窗口化流,其中统计了特定时间窗口内的事件计数。如果计数超过了设定的阈值,可以执行相应的阻止或警报逻辑

3.Kafka Streams的一些内容

在某些情况下,当你使用 Kafka Streams 等强大的流处理框架时,可能不再需要使用 Redis 来单独计数。Kafka Streams 自身提供了状态管理和窗口计算的功能,可以高效地处理时间窗口内的计数和其他聚合操作。这种内置功能减少了在架构中加入多个技术栈的复杂性,同时减少管理和同步不同系统的开销。

Kafka Streams 的适用功能

1. 时间窗口 (Time Windowing)

Kafka Streams 支持多种时间窗口操作,如滑动窗口、跳跃窗口和会话窗口,这可以直接用于进行时间范围内的计数等聚合操作。

2. 状态管理 (Stateful Processing)

Kafka Streams 允许开发者定义状态,这些状态可以是本地化的也可以是分布式的。状态的存储可用于记录计数器和其他必要的度量。

3. 处理效率

由于 Kafka Streams 是为与 Kafka 集成而优化的,使用它处理数据流自然比引入外部系统更为高效。它可以减少数据在系统之间传输的延迟和开销。

4. 容错性 (Fault Tolerance)

Kafka Streams 提供了强大的容错性特性,包括状态的故障恢复和消息的重新处理机制,这对于保证实时数据处理的准确性至关重要。

使用 Kafka Streams 替代 Redis 的考虑因素

虽然 Kafka Streams 提供了强大的功能,但决定是否完全替代 Redis 需要考虑以下几点:

1. 特定的性能需求

尽管 Kafka Streams 在流处理中非常高效,但在某些高速写入和读取场景中,Redis 可能仍然有其独特的优势,特别是在需要极低延迟的情况下。

2. 功能复杂性和开发成本

Kafka Streams 的使用和维护可能要比 Redis 复杂,特别是在涉及复杂的状态管理和时间窗口操作时。需要评估团队的技能和资源来决定是否采用。

3. 系统资源和成本

Kafka Streams 运行可能需要更多的系统资源,如CPU和内存,特别是在管理大规模状态时。对于资源受限制的环境,或在云环境中成本敏感的应用,这一点需要特别注意。

示例代码

下面是简化版的 Kafka Streams 使用代码,演示如何进行基于时间窗口的设备事件计数:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> deviceEvents = builder.stream("device-events");

// 定义一个1天的滑动窗口,保留计数
KTable<Windowed<String>, Long> deviceCounts = deviceEvents
    .groupBy((key, value) -> key)
    .windowedBy(TimeWindows.of(Duration.ofDays(1)))
    .count(Materialized.as("DeviceCounts"));

// 将结果输出到另一个主题
deviceCounts.toStream()
    .map((key, value) -> new KeyValue<>(key.key(), "Count: " + value))
    .to("device-counts-output", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(...));
streams.start();

标签:key,--,Kafka,Streams,数据量,streams,kafka,数据
From: https://www.cnblogs.com/jhfnewstart/p/18548064

相关文章

  • 【英语】短篇学习一
    【音频】第一章外宾接待1.1-英语听力-单曲-网易云音乐(163.com)【英文原文】A:Excuseme,areyouMrs.GreenfromEngland打扰了,请问您是来自英格兰的格林女士吗?B:Yes,Iam是的,我是。A:I'mpleasedtomeetyou,Mrs.Green.MynameisMyra.IworkinBeijing......
  • 自定义注解进行数据脱敏
    前言有些时候,我们可能对输出的某些字段要做特殊的处理在输出到前端,比如:身份证号,电话等信息,在前端展示的时候我们需要进行脱敏处理,这时候通过自定义注解就非常的有用了。在Jackson中要自定义注解,我们可以通过@JacksonAnnotationsInside注解来实现,如下示例:一、自定义注解import......
  • openVAS安装记
    项目需要使用openVAS安装步骤我这里使用的是Ubuntu最新版,因为Ubuntu和debian可通过官网仓库进行安装,因改名为gvm后续直接上操作#安装sudoaptinstallgvm-y#初始化(可能时间比较长,台会去下载数据库)sudogvm-setup#开机自启服务sudosystemctlenablenotus-scannerg......
  • WSL系统迁移Ubuntu
    一、通过MicrisoftStore下载Ubuntu二、运行终端或者PowerShell三、运行如下命令找到你安装的Ubuntu名字wsl-l-v四、关闭wslwsl--shutdown五、导出映像到指定盘,我这里想从C盘导入E盘wsl--exportUbuntuE:\WSL\Ubuntu.tar六、卸载磁盘wsl--unregisterUbunt......
  • css动态检测视口屏幕的尺寸
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>css动态检测屏幕的视口尺寸</title&g......
  • IpAddressServiceImplTest的一些准备
    AllIpAddressCheckRequest类只有一个属性,ListipAddressList,AllIpAddressCheckResponse类有两个属性,Booleanresult和HashMap<String,Boolean>map,RespUtils定义如下publicclassRespUtils{privatestaticfinalLoggerlog=LoggerFactory.getLogger(RespUtils.class);pr......
  • QuantConnect/Lean学习(1)
    QuantConnect/Lean学习一、什么是Lean?LeanEngine是一个完全开源的算法交易引擎,由QuantConnect平台开发。它可以帮助我们轻松完成策略研究、策略回测、实盘接入。它的核心使用C#编写,但它在Linux、MacOs、Windows操作系统上都可以运行。它支持Python3.11或C#编写策略。二......
  • 轮廓线DP
    更新日志概念类似于状态压缩DP,但我们储存的是轮廓线上的状态。有些时候,也不需要进行状态压缩,而可以用某一点的状态代表一个区域的状态。思路轮廓线就是已经决策的与尚未决策的部分的分界线,我们储存分界线上已经决策过的所有节点的状态。借图OI-wiki:图中最粗的那一条就......
  • Nuxt.js 应用中的 schema:written 事件钩子详解
    title:Nuxt.js应用中的schema:written事件钩子详解date:2024/11/15updated:2024/11/15author:cmdragonexcerpt:schema:written钩子是Vite提供的一种生命周期钩子,在模式写入完成后调用。通过这个钩子,开发者可以在配置被正式应用之后执行一些后续操作,比如记录日志、......
  • 线程间通信
    使用锁+信号量+队列,可以实现线程间通信。 下面是一个生产者,消费者的例子。 #include<iostream>#include<queue>#include<thread>#include<mutex>#include<condition_variable>#include<chrono>//定义一个消息类型structMessage{intdata;};......