首页 > 编程语言 >通过Java编写Flume拦截器

通过Java编写Flume拦截器

时间:2023-02-18 21:57:14浏览次数:48  
标签:Flume 拦截器 Java Event public import event

首先要知道Flume中的Event是由Header + Body组成的。

Flume支持在运行时对Event进行修改或丢弃,可以通过拦截器来实现。Flume里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor 接口的类。拦截器可以根据开发者的意图随意修改甚至丢弃Event, Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了。拦截器的顺序取决于它们被初始化的顺序(实际也就是配置的顺序),Event就这样按照顺序经过每一个拦截器,如果想在拦截器里面丢弃Event, 在传递给下一级拦截器的list里面把它移除就行了。如果想丢弃所有的Event,返回一个空集合就行了。
在Flume中,拦截器分为时间拦截器、Host添加拦截器、静态属性拦截器....并且支持自定义拦截器,具体看官网文档
Flume1.9用户手册中文版

下面演示通过JavaAPI实现Flume自定义拦截器

package com.pzb.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @author 海绵先生
 * @Description TODO 关于flume 的时间拦截器
 * @date 2022/12/26-9:58
 */
public class TimeStampInterceptor implements Interceptor {// 实现拦截器接口
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //1. 取出body
        // 通过getBody()方法就可以直接取出Event中Body的内容
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        
        //2. 将json字符串解析成对象
        // 将字符串转换成Json格式,方便取数据
        JSONObject jsonObject = JSON.parseObject(body);

        //3. 从对象中获取ts
        // 首先你要了解Body中的内容样子(就是消息的格式),然后通过getObject()方法,按key值提取,并转换为对应的属性[Object]
        String ts = jsonObject.getString("ts");
        //注意Flume中的Header里的时间戳是13位的数字(毫秒级),如果提取的ts小于13位数字,应主动转换,否则时间会出错
        
        //4.将ts的值设置到event的header中.
        // 通过getHeaders()获取Header的内容,并按键值对的格式将数据添加到Header中(后序的Flume配置文件可直接通过%{key}的格式获取value值)
        event.getHeaders().put("timestamp",ts);
        return event;
    }

    // 定义了Event列表的操作(Event在拦截器之间流动的时候是以集合的形式,并不是逐个Event传输的)
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    // 编写拦截器一定要实现构造器(通过构造器启动拦截器)
    public static class MyBuilder implements Builder{

        // 构造器调用TimeStampInterceptor类
        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

写好拦截器后,就可以打包放到Flume的lib目录下了

如何在Flume配置文件中用自定义拦截器呢?
其实看官方文档应该都知道了

# 配置拦截器必要参数
a1.sources.r1.interceptors = i1
# 拦截器类型就直接指定刚刚打包的构造器路径(前提:已将jar包放入lib目录)
a1.sources.r1.interceptors.i1.type = com.pzb.flume.interceptor.TimeStampInterceptor$MyBuilder

这样拦截器就大功告成了
通过%{key}的方式就可以直接获取拦截器捕获的Header对应key的value值了
并且通过%Y-%m-%d的形式,可以将拦截器里的时间戳timestamp转换成yyyy-MM-dd的格式了,解决了零点漂移的问题。

标签:Flume,拦截器,Java,Event,public,import,event
From: https://www.cnblogs.com/Mr-Sponge/p/17133725.html

相关文章

  • 算法刷题-字符串分隔-JAVA
    0x00引言为获取一个良好的算法思维,以及不再成为一个脚本小子,争取每天一道算法题,培养自己的逻辑思维,温顾各类型语言语法知识。题解只写自己理解的解法,其他解法不再增加。......
  • PAT-basic-1023 组个最小数 java
    一、题目给定数字0-9各若干个。你可以以任意顺序排列这些数字,但必须全部使用。目标是使得最后得到的数尽可能小(注意0不能做首位)。例如:给定两个0,两个1,三个5,一个8,......
  • PAT-basic-1021 个位数统计 java
    一、题目给定一个 k 位整数 N=dk−1​10k−1+⋯+d1​101+d0​ (0≤di​≤9, i=0,⋯,k−1, dk−1​>0),请编写程序统计每种不同的个位数字出现的次数。例如:给定 N=1......
  • PAT-basic-1022 D进制的A+B java
    一、题目输入两个非负10进制整数 A 和 B (≤230−1),输出 A+B 的 D (1<D≤10)进制数。输入格式:输入在一行中依次给出3个整数 A、B 和 D。输出格式:输出......
  • Java文章抓取
    @PostMapping("/grab")@ApiOperationSupport(order=9)@ApiOperation(value="抓取",notes="传入grabUrl")publicRgrabe(@ApiParam(value="抓取",required=true......
  • Java正则匹配域名白名单
    在上一篇文章《通用正则表达式开源工具,为开源绵尽薄力》中,我们介绍了很多正则表达式的实例,工作中大家也经常遇到新的规则需要匹配,今天就看一下检测域名白名单的几种方式......
  • JAVA多线程(二)--线程池
    JAVA多线程(二)--线程池一、线程池概念顾名思义,线程池是管理线程的池子。使用线程池有以下优点:降低线程创建和销毁的开销。提高响应速度。用到时创建和直接使用已创建......
  • Java代码工具快速生成词云图(强烈建议收藏)
    “词云”一词最早是由美国西北大学新闻学副教授、新媒体专业主任里奇戈登(RichGordon)提出的。词云(WordCloud),又称文字云、标签云(TagCloud)、关键词云(KeywordCloud),是对文本......
  • Java 只有值传递
    实参:传递给方法的参数,必须有确定的值。形参:定义方法的参数,接收实参,不需要有确定的值值传递:方法接收的是实参值的拷贝,会创建副本。引用传递:方法接收的是实参所引用的......
  • 算法刷题-计算某字符出现次数-JAVA
    0x00引言为获取一个良好的算法思维,以及不再成为一个脚本小子,争取每天一道算法题,培养自己的逻辑思维,温顾各类型语言语法知识。题解只写自己理解的解法,其他解法不再增加。......