首页 > 其他分享 >Flume实战--Flume中的拦截器详解与操作

Flume实战--Flume中的拦截器详解与操作

时间:2024-09-29 13:48:10浏览次数:7  
标签:Flume hdfs 拦截器 sinks -- a1 type r1

        在处理大规模数据流时,Apache Flume 是一款功能强大的数据聚合工具,它可以通过拦截器在运行时对Event进行修改或丢弃。本文将详细讲解Flume中的拦截器,包括时间戳拦截器、Host添加拦截器、静态拦截器以及如何自定义拦截器。

拦截器

拦截器的作用

拦截器用于在事件流经Flume时,对其进行拦截、处理(比如修改、增强)、或者丢弃。

1. 时间戳拦截器

        时间戳拦截器是Flume中非常实用的一个功能,它会自动为每个Event的header添加一个当前的时间戳。这对于后续的数据存储和分析非常有用,尤其是在需要时间序列数据的场景中。

使用场景

        当数据需要带有时间戳存储到HDFS,并且HDFS的目录结构中包含时间转义字符时,这个拦截器非常有用。

配置示例

a1.sources.r1.interceptors = i1   
i1 是拦截器的名字,自己定义的
a1.sources.r1.interceptors.i1.type = timestamp   
指定i1 这个拦截器的类型

hdfs中,如果文件夹使用了时间相关的转义字符,比如

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.useLocalTimeStamp =true

也可以使用时间戳拦截器


a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txt

a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# i1 是拦截器的名字,自己定义的
a1.sources.r1.interceptors = i1    
a1.sources.r1.interceptors.i1.type = timestamp

因为必须保证header中有时间戳timestamp这个KV键值对。

假如hdfs中使用了时间转义字符,此时必须指定时间,两种方案

1)使用本地时间戳

a1.sinks.k1.hdfs.useLocalTimeStamp =true

2)使用时间拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

 

2. Host添加拦截器

        Host拦截器可以将数据来源的主机名或IP地址添加到Event的header中。这对于追踪数据来源和进行数据分析时识别数据源非常有用。

配置示例

a1.sources = r1  
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 6666

a1.sources.r1.interceptors=i2

a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
# hostname=192.168.233.128
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log

a1.sinks.s1.hdfs.rollInterval=60

a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=10
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1


这个例子可以完美的说明host拦截器是干嘛的,拦截住之后 在Header中添加一个 hostname=192.168.32.128
在hdfs想使用hostname的时候,就可以通过%{hostname}

9870 hdfs的web访问端口
9820 是hdfs底层进行通信的端口

jps -ml 查看java进行,带详细信息

 测试

curl 起始就是通过命令模拟浏览器
curl http://www.baidu.com
curl http://bigdata01:9870/dfshealth.html#tab-overview
curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:6666

3. 静态拦截器

        静态拦截器允许我们在Event的header中添加自定义的静态键值对。这在需要为数据添加额外信息时非常有用,比如添加特定的标签或分类信息。

配置示例

a1.sources.r1.interceptors = i3
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = name
a1.sources.r1.interceptors.i3.value = zhangsan

自定义拦截器

        Flume支持自定义拦截器,这为处理特定的数据格式或进行复杂的数据转换提供了可能。通过编写Java代码实现自定义的拦截器,我们可以对接收到的数据进行任意形式的处理。

实现步骤

  1. 创建Maven项目:导入必要的Flume和JSON处理库(如Jackson或Fastjson)。
  2. 实现Interceptor接口:编写拦截器逻辑。
  3. 打包:将编写的拦截器打包成JAR文件,并放入Flume的lib目录下。
  4. 在Flume配置文件中配置自定义拦截器。

需求

处理数据样例:
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
    {
        "item_type":"eat",
        "
        
        
        active_time":156234
    },
    {
        "item_type":"car",
        "active_time":156233
    }
 ]
}'

结果样例:
[{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
{"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]

创建一个maven项目

导入jar包

 xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bigdata</groupId>
    <artifactId>MyInterceptor</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.48</version>
        </dependency>
    </dependencies>

    <!--可以使用maven中的某些打包插件,不仅可以帮助我们打包代码还可以打包所依赖的jar包-->

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                    <!-- 禁止生成 dependency-reduced-pom.xml-->
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <!-- 解决包冲突 进行转换-->
                                    <pattern>com.google.protobuf</pattern>
                                    <shadedPattern>shaded.com.google.protobuf</shadedPattern>
                                </relocation>
                            </relocations>
                            <artifactSet>
                                <excludes>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们-->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>mainclass</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

打包插件: 

builder --> plugins --> plugin

插件
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>3.1.1</version>
  <configuration>
    <!-- 禁止生成 dependency-reduced-pom.xml-->
    <createDependencyReducedPom>false</createDependencyReducedPom>
  </configuration>
  <executions>
    <!-- Run shade goal on package phase -->
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <relocations>
          <relocation>
            <!-- 解决包冲突 进行转换-->
            <pattern>com.google.protobuf</pattern>
            <shadedPattern>shaded.com.google.protobuf</shadedPattern>
          </relocation>
        </relocations>
        <artifactSet>
          <excludes>
            <exclude>log4j:*</exclude>
          </excludes>
        </artifactSet>
        <filters>
          <filter>
            <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
            </excludes>
          </filter>
        </filters>
        <transformers>
          <!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们-->
          <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
            <resource>reference.conf</resource>
          </transformer>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
            <mainClass>mainclass</mainClass>
          </transformer>
        </transformers>
      </configuration>
    </execution>
  </executions>
            </plugin>
示例代码

先测试一下:

json --> java 代码解析 json --> 实体 ,实体-->json 字符串 都需要使用工具

jackson、fastjson(阿里巴巴)

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TestJson {

    public static void main(String[] args) {
        String log="{\n" +
        "                 'host':'www.baidu.com',\n" +
        "                 'user_id':'13755569427',\n" +
        "                 'items':[\n" +
        "                     {\n" +
        "                         'item_type':'eat',\n" +
        "                         'active_time':156234\n" +
        "                        },\n" +
        "                     {\n" +
        "                         'item_type':'car',\n" +
        "                         'active_time':156233\n" +
        "                     }\n" +
        "                  ]\n" +
        "}";

        JSONObject jsonObject = JSON.parseObject(log);
        String host = jsonObject.getString("host");
        String user_id = jsonObject.getString("user_id");
        System.out.println(host);
        System.out.println(user_id);
        JSONArray items = jsonObject.getJSONArray("items");

        List list =new ArrayList<Map<String,String>>();



        for (Object item : items) {
            // {"active_time":156234,"item_type":"eat"}
            Map map = new HashMap<String,String>();
            String itemStr = item.toString();
            JSONObject jsonItem = JSON.parseObject(itemStr);
            String active_time = jsonItem.getString("active_time");
            String item_type = jsonItem.getString("item_type");
            System.out.println(active_time);
            System.out.println(item_type);
            map.put("active_time",active_time);
            map.put("user_id",user_id);
            map.put("item_type",item_type);
            map.put("host",host);
            list.add(map);
        }

        /**
         *  需要转化为:
         *      *      * [{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
         *      *      * {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
         */


        String jsonString = JSON.toJSONString(list);
        System.out.println(jsonString);
    }
}
package com.bigdata;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class DemoInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    // 只需要关注 这个方法的写法
    /**
     *  需求:
     *  log='{
     * "host":"www.baidu.com",
     * "user_id":"13755569427",
     * "items":[
     *     {
     *         "item_type":"eat",
     *         "active_time":156234
     *     },
     *     {
     *         "item_type":"car",
     *         "active_time":156233
     *     }
     *  ]
     * }'
     *
     * 需要转化为:
     * [{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
     * {"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
     */
    @Override
    public Event intercept(Event event) {
        // 解析 文本数据变为另一种格式
        byte[] body = event.getBody();
        String content = new String(body);
        /**
         *  {
         *      "host":"www.baidu.com",
         *      "user_id":"13755569427",
         *      "items":[
         *          {
         *              "item_type":"eat",
         *              "active_time":156234
         *          },
         *          {
         *              "item_type":"car",
         *              "active_time":156233
         *          }
         *       ]
         *   }
         */
        // 将一个json字符串变为 json 对象
        JSONObject jsonObject = JSON.parseObject(content);
        // 通过对象 获取 json 中的值
        String host = jsonObject.getString("host");
        String user_id = jsonObject.getString("user_id");
        // 通过对象获取json 数组
        JSONArray items = jsonObject.getJSONArray("items");

        // 定义一个集合,集合中是map
        ArrayList<HashMap<String, String>> list = new ArrayList<>();

        for (Object object: items) {

            String obj = object.toString();
            JSONObject jobj = JSON.parseObject(obj);
            String item_type = jobj.getString("item_type");
            String active_time = jobj.getString("active_time");

            HashMap<String, String> map = new HashMap<>();
            map.put("active_time",active_time);
            map.put("item_type",item_type);
            map.put("host",host);
            map.put("user_id",user_id);

            list.add(map);
        }

        // 将对象变为字符串
        String s = JSON.toJSONString(list);
        event.setBody(s.getBytes());

        return event;
    }

    // 这个方法可以调取 上面这个方法
    @Override
    public List<Event> intercept(List<Event> list) {

        for (int i=0;i<list.size();i++) {
            Event oldEvent = list.get(i);
            Event newEvent = intercept(oldEvent);
            list.set(i,newEvent);
        }
        return list;
    }

    @Override
    public void close() {

    }

    // 作用只有一个,就是new 一个自定义拦截器的类
    public static class BuilderEvent implements Builder{

        @Override
        public Interceptor build() {
            return new DemoInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

 

打包,上传至 flume 下的lib 下。

测试

编写一个flume脚本文件

testInter.conf

a1.sources = s1
a1.channels = c1
a1.sinks = r1

a1.sources.s1.type = TAILDIR

#以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件
a1.sources.s1.filegroups = f1
#文件组的绝对路径
a1.sources.s1.filegroups.f1=/home/b.log

#使用自定义拦截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = com.bigdata.DemoInterceptor$BuilderEvent

a1.channels.c1.type = file
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.sinks.r1.type = hdfs
a1.sinks.r1.hdfs.path = /flume/202409
a1.sinks.r1.hdfs.fileSuffix= .log

# 将上传的数据格式使用text类型,便于查看
a1.sinks.r1.hdfs.fileType=DataStream
a1.sinks.r1.hdfs.writeFormat=Text


a1.sources.s1.channels = c1
a1.sinks.r1.channel = c1
运行该脚本
flume-ng agent -c ./ -f testInterceptor.conf -n a1 -Dflume.root.logger=INFO,console

 出现错误

此时,说明我们打包的时候没有将这个jar包打包到自定义的jar包,可以通过手动的提交的方式解决这个问题。

将fast-json.jar 放入到 flume/lib

通过网盘分享的文件:fastjson-1.2.48.jar

假如你使用了打包插件,已经将这个 fast-json 打入了你的 jar 包中,无需该操作。

{"host":"www.baidu.com","user_id":"13755569427","items":[{"item_type":"eat","active_time":156234},{"item_type":"car","active_time":156233}]}

接着开始进行测试,必须先启动flume

编写一个脚本,模拟 b.log 中不断的产生json数据的场景。

#!/bin/bash
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
echo $log >> /home/b.log

保存,并且赋予权限:

chmod 777 createJson.sh

执行这个脚本,就可以模拟不断的向  b.log中传输数据了
./createJson.sh

 视频链接

10-时间戳拦截器_哔哩哔哩_bilibili

11-host拦截器_哔哩哔哩_bilibili

12-静态拦截器_哔哩哔哩_bilibili

13-自定义拦截器一_哔哩哔哩_bilibili

14-自定义拦截器二_哔哩哔哩_bilibili

标签:Flume,hdfs,拦截器,sinks,--,a1,type,r1
From: https://blog.csdn.net/weixin_64726356/article/details/142424360

相关文章

  • 分享C++程序员面试八股文(十三)
    以下是C++常见八股文(十三):一、C++中的命名空间和模块的高级用法(AdvancedUsageofNamespacesandModules)解释命名空间别名和嵌套命名空间的作用及使用场景命名空间别名:作用:命名空间别名可以为一个较长或复杂的命名空间提供一个更简洁的名称,提高代码的可读性和可......
  • 数据结构(顺序表)
    无论你期望或者不期望,清晨依旧来临。--谏山创《进击的巨人》线性表的概念1.线性表是n个具有相同特性的数据元素的有限序列。2.线性表在逻辑上是线性结构,但是在物理结构上并不⼀定是连续的。3.线性表在物理上存储时,通常以数组和链式结构的形式存储。顺序表的概念......
  • Go语言项目组织架构
    目录Go目录/cmd/internal/pkg/vendor服务应用程序目录/apiWeb应用程序目录/web通用应用目录/configs/init/scripts/build/deployments/test其他目录/docs/tools/examples/third_party/githooks/assets/websitehttps://github.com/golang-standards/project-layoutGo目录/cmd......
  • 哥斯拉流量特征
    .php:eval_xor_base64加密格式下连接时会发送三个数据包:首包post数据挺大,返回包内容为空。且包中的eval函数和base64加密函数挺明显剩下两个包请求体小,一个返回数据为加密后的ok,最后一个是返回加密后目标的基本环境信息。发送的数据包请求体特征为两个参数pass=和key=,这两个参......
  • 格式规范
    一、字体格式1.文章标题:方正小标宋简体二号2.摘要:楷体三号3.正文各级小标题示例(尽量不要自动编号)一、一级标题:黑体三号(一)二级标题:楷体三号1.三级标题:仿宋三号加粗(注意此处是全角点“.”不是顿号“、”)(1)四级标题:仿宋三号 二、页面设置1.纸张:A42.页边距:上下边距3.7cm,3.6c......
  • 学期2024-2025-1 学号 20241317 《计算机基础与程序设计》第1周学习总结
    这个作业属于哪个课程 https://edu.cnblogs.com/campus/besti/2024-2025-1-CFAP这个作业要求在哪里 https://www.cnblogs.com/rocedu/p/9577842.html#WEEK01这个作业的目标 1.基于VirtualBox虚拟机安装Ubuntu和安装Linux系统2.快速浏览一遍教材计算机科学概论(第七版),课本每章提......
  • 第一届“物天杯”球类赛事规则
    一、 篮球赛制、赛程和规则(一)赛制男子五人制全场,女子三人制半场。以下赛制男子、女子组相同,根据参赛队伍数量进行赛制选择。1.参赛队伍数量为12支及以上比赛共分两个阶段进行,第一阶段为小组循环赛,第二阶段为淘汰赛及排位赛。(1)小组赛:参赛班级抽签分成四小组(A、B、C、D组,小组......
  • 小主机虚拟化平台搭建记录
    小主机搭建虚拟化的一些记录:1,如果主机是Intel平台,目前建议还是使用VMwareEsxi6.7,如果你的主机网卡驱动没有包含在ESXI官方安装包内,去恩山找封装了网卡驱动的版本https://www.right.com.cn/FORUM/thread-7881507-1-1.html 2,如果你的主机是AMD平台,并且是ZEN系列,那么优先选......
  • 2024-2025-1 20241301 《计算机基础与程序设计》第4周学习总结
    这个作业属于哪个课程<[2024-2025-1-计算机基础与程序设计](https://edu.cnblogs.com/campus/besti/2024-2025-1-CFAP)>这个作业要求在哪里<2024-2025-1计算机基础与程序设计第一周作业>这个作业的目标<总结一周内的学习内容,巩固所学,加深对计算机学科的理解,提升思考能......
  • 变压器磁芯的基础知识介绍
     一、Core磁芯磁芯具有高磁导率,高的磁导率意味着导磁能力很强,也可以说磁芯对产生磁通的阻力很小。磁芯使得变压器的初级绕组可以较小的励磁电流产生较大的磁感应强度(又叫磁通密度或磁力线密度)。磁芯为磁力线提供了“比较顺畅的通路”,使得大多数的磁力线被约束在磁芯内,使得磁......