首页 > 其他分享 >Flume 自定义拦截器

Flume 自定义拦截器

时间:2022-11-15 17:00:43浏览次数:68  
标签:Flume body 拦截器 自定义 hostIP event

Apache Flume是一个分布式的、可靠和易用的日志收集系统,用于将大量日志数据从许多不同的源进行收集、聚合,最终移动到一个集中的数据中心进行存储。

Flume的使用不仅仅限于日志数据聚合,由于数据源是可定制的,Flume可以用于传输大量数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎所有可能的数据源。

拦 截 器

Flume在拦截器的帮助下,可以修改或删除正在传送中的event。拦截器是一些实现org.apache.flume.interceptor.Interceptor接口的类,功能类似于Java Servlet中的FilterServlet。

Flume拦截器可以在配置文件的Source组件中进行设置,支持给一个Source组件设置多个拦截器,多个拦截器使用空格连接在一起,根据配置顺序依次执行。如果某个拦截器需要删除event,当event经过该拦截器时,该event会被过滤掉,不会返回给下一个拦截器。

自定义拦截器

对于一些特殊的业务,例如对日志数据进行清洗后存入数据库,或者需要重写event body的内容(Kafka只能收到Flume消息的body部分,不能收到header部分),Flume内置的拦截器往往不能满足我们的需求,这时就需要我们自定义拦截器。

依赖:

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.8.0</version>
</dependency>

编写拦截器类:

自定义拦截器类需要实现Interceptor接口并重写intercept()方法。

package com.xc.xcspringboot.test;

import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

/**
 * 自定义拦截器类,修改event body体,将本机IP添加到event body体的前面
 */
public class MyFlumeInterceptor implements Interceptor {

    private String hostIP = null;// 自定义属性 hostIP

    // 私有构造函数,仅在内部类MyBuilder中可以对其实例化
    private MyFlumeInterceptor(String hostIP) {
        this.hostIP = hostIP;
    }

    // 修改event的body体
    public Event intercept(Event event) {
        StringBuilder builder = new StringBuilder();
        // 获得body体字节数组
        byte[] byteBody = event.getBody();
        // 将body体转为字符串
        String body = new String(byteBody, Charsets.UTF_8);
        // 拼接IP与body体,形成新body
        builder.append("ip:" + hostIP);
        builder.append(";body:" + body);
        byte[] newBody = builder.toString().trim().getBytes();
        // 重新设置body体
        event.setBody(newBody);
        System.out.println("拼接后的body信息:" + builder.toString().trim());
        return event;
    }

    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    /**
     * 定义内部类MyBuilder,用于构建自定义拦截器类MyFlumeInterceptor的实例,
     * 并获取Flume配置文件中自定义的拦截器属性值,将值传给自定义类MyFlumeInterceptor
     */
    public static class MyBuilder implements Builder {

        private String hostIP = null;

        public void configure(Context context) {
            // 获取Flume配置文件中设置的自定义属性值,字符串“hostIP”需与配置文件中设置的属性hostIP一致
            String hostIP = context.getString("hostIP");
            this.hostIP = hostIP;
        }

        public Interceptor build() {
            // 实例化自定义拦截器类并传入自定义属性
            return new MyFlumeInterceptor(hostIP);
        }
    }

    public void close() {
    }

    public void initialize() {
    }

}

书籍:Hadoop大数据技术开发实战 12.6 拦 截 器

https://gitee.com/caoyeoo0/xc-springboot/blob/hadoopApi/src/main/java/com/xc/xcspringboot/test/MyFlumeInterceptor.java

 

标签:Flume,body,拦截器,自定义,hostIP,event
From: https://www.cnblogs.com/ooo0/p/16893001.html

相关文章

  • uniapp 实现小程序中自定义tabBar
    uniapp实现小程序中自定义tabBar的方法第一种方式:page.json中配置"tabBar":{"color":"#7A7E83","selectedColor":"#007AFF","borderStyle":"black",......
  • leaflet 加载高德地图自定义样式
    最近项目需求,需要使用leaflet封装成一个vue组件,涉及功能主要有高德自定义样式地图封装为leaflet底图图层、自定义坐标系、topjson省市区街道下钻、线面区域热力层、飞线、......
  • umi配置chainWebpack,使用自定义loader----jsx-px2rem
    前言虽然云谦大佬在github上说了,umi本身的配置已经很完善了,但是肯定满足不了所有人各种各样的奇葩需求。。。比如今天说的将jsx中的style里,将px转换为rem。 umi本身提......
  • java poi导出excel单元格设置自定义背景颜色(任意颜色)
    转自:http://t.csdn.cn/QHfUU//创建一个workbook对象Workbookworkbook=newXSSFWorkbook();//创建一个sheet对象Sheetsheet=workbook.createSheet();//创......
  • SAP ABAP FICO FAGLL03H CODING BLOCK新增自定义字段
    1、SGLPOS_N_GL_CT、SGLPOS_N_CT两个结构新增自定义字段  2、执行t-code:HDBVIEWS  3、实施增强 FAGL_LIB  4、使用selectdata方法 5、代码示例:......
  • 自定义一个python pip包
    新建一个目录mkdirexample基本说明文件README.rstLICENSEsetup.cfgsetup.pyMANIFEST.indocsexample/README.rst点击查看代码=====example=====Pollsi......
  • Vue 拦截器思路
    //数据响应拦截器,统一处理返回的数据逻辑axios.interceptors.response.use(res=>{if(res&&res.status==HTTP_STATUS.SUCCESS){returnres.data;}els......
  • 自定义定时器
    自定义定时器#include<iostream>#include<sys/epoll.h>#include<chrono>#include<functional>#include<memory>#include<set>int64_tgid{0};structNodeBa......
  • 【AGC】远程配置如何传入自定义属性
    ​背景:现在AGC远程配置端侧服务提供的SDK支持传入自定义属性获取和更新云端配置数据了。下面将通过一个demo集成远程配置SDK来实现这一功能。 集成准备1.在AGC创建工......
  • 1710. 卡车上的最大单元数 ----- 贪心算法,自定义sort排序
    请你将一些箱子装在一辆卡车上。给你一个二维数组boxTypes,其中boxTypes[i]=[numberOfBoxesi,numberOfUnitsPerBoxi]:numberOfBoxesi是类型i的箱子的数量。numb......