首先要知道Flume中的Event是由Header + Body组成的。
Flume支持在运行时对Event进行修改或丢弃,可以通过拦截器来实现。Flume里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor
接口的类。拦截器可以根据开发者的意图随意修改甚至丢弃Event, Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了。拦截器的顺序取决于它们被初始化的顺序(实际也就是配置的顺序),Event就这样按照顺序经过每一个拦截器,如果想在拦截器里面丢弃Event, 在传递给下一级拦截器的list里面把它移除就行了。如果想丢弃所有的Event,返回一个空集合就行了。
在Flume中,拦截器分为时间拦截器、Host添加拦截器、静态属性拦截器....并且支持自定义拦截器,具体看官网文档
Flume1.9用户手册中文版
下面演示通过JavaAPI实现Flume自定义拦截器
package com.pzb.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @author 海绵先生
* @Description TODO 关于flume 的时间拦截器
* @date 2022/12/26-9:58
*/
public class TimeStampInterceptor implements Interceptor {// 实现拦截器接口
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//1. 取出body
// 通过getBody()方法就可以直接取出Event中Body的内容
String body = new String(event.getBody(), StandardCharsets.UTF_8);
//2. 将json字符串解析成对象
// 将字符串转换成Json格式,方便取数据
JSONObject jsonObject = JSON.parseObject(body);
//3. 从对象中获取ts
// 首先你要了解Body中的内容样子(就是消息的格式),然后通过getObject()方法,按key值提取,并转换为对应的属性[Object]
String ts = jsonObject.getString("ts");
//注意Flume中的Header里的时间戳是13位的数字(毫秒级),如果提取的ts小于13位数字,应主动转换,否则时间会出错
//4.将ts的值设置到event的header中.
// 通过getHeaders()获取Header的内容,并按键值对的格式将数据添加到Header中(后序的Flume配置文件可直接通过%{key}的格式获取value值)
event.getHeaders().put("timestamp",ts);
return event;
}
// 定义了Event列表的操作(Event在拦截器之间流动的时候是以集合的形式,并不是逐个Event传输的)
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {
}
// 编写拦截器一定要实现构造器(通过构造器启动拦截器)
public static class MyBuilder implements Builder{
// 构造器调用TimeStampInterceptor类
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
写好拦截器后,就可以打包放到Flume的lib目录下了
如何在Flume配置文件中用自定义拦截器呢?
其实看官方文档应该都知道了
# 配置拦截器必要参数
a1.sources.r1.interceptors = i1
# 拦截器类型就直接指定刚刚打包的构造器路径(前提:已将jar包放入lib目录)
a1.sources.r1.interceptors.i1.type = com.pzb.flume.interceptor.TimeStampInterceptor$MyBuilder
这样拦截器就大功告成了
通过%{key}的方式就可以直接获取拦截器捕获的Header对应key的value值了
并且通过%Y-%m-%d
的形式,可以将拦截器里的时间戳timestamp
转换成yyyy-MM-dd的格式了,解决了零点漂移的问题。