首页 > 编程语言 >大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 了 拦截器实现 Java

大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 了 拦截器实现 Java

时间:2024-11-16 09:15:11浏览次数:3  
标签:flume Flume 拦截器 自定义 离线 a1 conf import event

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

flume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.1.23</version>
  </dependency>
</dependencies>

编写代码

package icu.wzk;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class CustomerInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 这里是逐条处理
        String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
        // 获取Event的Header
        Map<String, String> headerMap = event.getHeaders();
        // 解析Body获取JSON字符串
        String[] bodyArr = eventBody.split("\\s+");
        try {
            String jsonStr = bodyArr[6];
            // 解析JSON字符串获取时间戳
            JSONObject jsonObject = JSON.parseObject(jsonStr);
            String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
            // 将时间戳转换字符串 yyyy-MM-dd
            // 将字符串转换为Long
            long timestampLong = Long.parseLong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestampLong);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);
            // 将转换后的字符串放置header中
            headerMap.put("logtime", date);
            event.setHeaders(headerMap);
        } catch (Exception e) {
            headerMap.put("logtime", "Unknown");
            event.setHeaders(headerMap);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event : list) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new CustomerInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }

}

标签:flume,Flume,拦截器,自定义,离线,a1,conf,import,event
From: https://blog.csdn.net/w776341482/article/details/143811819

相关文章

  • 大数据-224 离线数仓 - 数仓 技术选型 版本选型 系统逻辑架构 数据库命名规范
    点一下关注吧!!!非常感谢!!持续更新!!!目前已经更新到了:Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis(已更完)Kafka(已更完)Spark(已更完)Flink(已更完)ClickHouse(已更完)Kudu(已更完)Druid(已更完)Kylin(已更完)Elasticsearch(已更完......
  • 鸿蒙NEXT自定义组件:太极Loading
     【引言】(完整代码在最后面)本文将介绍如何在鸿蒙NEXT中创建一个自定义的“太极Loading”组件,为你的应用增添独特的视觉效果。【环境准备】电脑系统:windows10开发工具:DevEcoStudioNEXTBeta1BuildVersion:5.0.3.806工程版本:API12真机:mate60pro语言:ArkTS、ArkUI......
  • Axios 拦截器示例(JWT 登录与自动刷新)
    1.安装axios首先,确保你已经安装了axios:npminstallaxios2.设置Axios拦截器importaxiosfrom'axios';//创建一个axios实例constaxiosInstance=axios.create({baseURL:'http://localhost:8000/',//后端API地址timeout:10000,//设置超时时间......
  • 自定义注解进行数据脱敏
    前言有些时候,我们可能对输出的某些字段要做特殊的处理在输出到前端,比如:身份证号,电话等信息,在前端展示的时候我们需要进行脱敏处理,这时候通过自定义注解就非常的有用了。在Jackson中要自定义注解,我们可以通过@JacksonAnnotationsInside注解来实现,如下示例:一、自定义注解import......
  • 鸿蒙Navigation拦截器实现页面跳转登录鉴权方案
    我们在进行页面跳转时,很多情况下都得考虑登录状态问题,比如进入个人信息页面,下单交易页面等等。在这些场景下,通常在页面跳转前,会先判断下用户是否已经登录,若已登录,则跳转到相应的目标页面,若没有登录,则先跳转到登录页面,然后等着获取登录状态,若登录页面关闭时,能获取到已登录,则继续跳......
  • go fiber: 抛出自定义异常
    一,代码:1,自定义错误类:packageconfigimport("fmt")//定义错误代码和错误信息typeMyErrorstruct{CodeintMsgstring}//需要定义通用的Error()方法func(eMyError)Error()string{returnfmt.Sprintf("Code:%d,Msg:%s",e.Code,e.M......
  • ubuntu22.04离线部署k8s1.28.2(随笔)
    一、准备环境(所有服务器)1.各服务器设置hosts192.168.137.100k8s-master192.168.137.101k8s-node1192.168.137.102k8s-node2hostnameset-hostname k8s-masterhostnameset-hostname k8s-node1hostnameset-hostname k8s-node12。时间同步自己想办法3.lvs环境(暂定)mo......
  • SpringBoot中监听器、过滤器、拦截器和AOP详解
    SpringBoot中监听器、过滤器、拦截器和AOP详解在构建SpringBoot应用程序时,监听器(Listener)、过滤器(Filter)、拦截器(Interceptor)和面向切面编程(AOP)是四种常用的机制,它们各自有不同的用途和执行时机。本文将详细介绍这四种技术的执行时机和区别,并附上示例代码帮助理解。1.......
  • 自动化测试环境配置-selenium库和谷歌浏览器版(离线安装)
    环境下载链接:https://pan.baidu.com/s/1acJJrA087zf_e02at3hoUg?pwd=f83d提取码:f83d 第一步,取消谷歌浏览器的自动升级 再去控制面板卸载原来的谷歌浏览器 第二步,安装谷歌浏览器80版本 通过设置查看版本号,该版本是不会自动升级的 第三步:直接使用下载好的驱动......
  • 织梦自定义图片字段报错 Call to a member function GetInnerText()
    问题:添加自定义图片字段时,前台打开当前栏目列表出现 Fatalerror:CalltoamemberfunctionGetInnerText()onstring 错误。解决方法:修改 customfields.func.php 文件:打开 /include/customfields.func.php 文件,搜索:  $fvalue=trim($ntag->GetInnerTe......