Flume 是一个日志数据抽取工具
Agent : 是Flume中的基本单位,一个Flume配置文件,可以有多个Agent. 每一个Agent中有三个组件组成,缺一不可:
1、Source 来源,数据过来的地方
2、channel 通道 传递数据用的通道,一般比较的长,一个Channel中可以存储多个数据
3、Sink 数据下沉的地方
4、event 一个event就是一个数据单元(一条数据)
在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件
若在我们指定的/opt/installs/flume/conf/myconf 下创建配置文件时,
使用了touch 命令 创建文件后,再使用notepote++ 打开的话 ,会将linux的文件变为windows的文件,造成错误
而使用vi 创建文件
一、Flume的案例展示
1、Exec + Memory + HDFS
经过一个执行的命令,将数据通过内存,上传至hdfs
使用tail -f ...... 不断的查看这个文件,常用于查看日志文件
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txt
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间,否则会报时间戳是null的异常
a1.sinks.k1.hdfs.useLocalTimeStamp=true
假如hdfs中使用了时间转义字符,配置文件中必须二选一:
1)useLocalTimeStamp=true
2)使用时间戳拦截器
原因:
时间需要转义,没有时间无法翻译为具体的值 %d 就无法翻译为 日期
启动下面命令即可:
flume-ng agent -c ./ -f exec-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
此时就可以看到 我们的bigdata01的集群中有了我们传的数据
2、tailDir + Memory + HDFS
tailDir 是用来监控多个文件夹下的多个文件的,只要文件内容发生变化,就会再次的进行数据的抽取
在/home/hivedata/下创建一个test目录,用于测试tailDir ,接着在test下创建a,b,c,d四个文件
创建
tailDir 采集一个不断变化的文件 a.txt 不断的有数据产生,就不断的上传至hdfs
二、Flume中的拦截器
1、时间戳拦截器
我们使用时间戳的时候,需要将本地时间设置为true
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp =true
还可以使用时间戳拦截器的形式达到一样的效果
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txt
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# i1 是拦截器的名字,自己定义的
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
2、Host添加拦截器
如果event被这个拦截器拦住后,就会在heder中添加 主机名或者IP地址。
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 6666
a1.sources.r1.interceptors=i2
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.preserveExisting=false
a1.sources.r1.interceptors.i2.useIP=true
a1.sources.r1.interceptors.i2.hostHeader=hostname
# hostname=192.168.233.128
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y/%m/%d/%H%M
a1.sinks.s1.hdfs.filePrefix=%{hostname}
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=10
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
这个例子可以完美的说明host拦截器是干嘛的,拦截住之后 在Header中添加一个 hostname=192.168.32.128
在hdfs想使用hostname的时候,就可以通过%{hostname}
9870 hdfs的web访问端口
9820 是hdfs底层进行通信的端口
jps -ml 查看java进行,带详细信息
测试:
curl 起始就是通过命令模拟浏览器
curl http://www.baidu.com
curl http://bigdata01:9870/dfshealth.html#tab-overview
curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:6666
3、静态拦截器
Header中的数据是可以被其他地方使用的,时间戳和Host拦截器只能指定固定的KV键值对。
假如我现在想指定 name=zhangsan , 也就是说如果想在Header中指定自定义的KV键值对,可以使用静态拦截器。
三、自定义拦截器(重要)
需要处理的数据如下:
处理数据样例:
log='{
"host":"www.baidu.com",
"user_id":"13755569427",
"items":[
{
"item_type":"eat",
"
active_time":156234
},
{
"item_type":"car",
"active_time":156233
}
]
}'
结果样例:
[{"active_time":156234,"user_id":"13755569427","item_type":"eat","host":"www.baidu.com"},
{"active_time":156233,"user_id":"13755569427","item_type":"car","host":"www.baidu.com"}]
首先, 既然是自定义拦截器,就需要使用java
1) 创建maven项目,导入相关jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bigdata</groupId>
<artifactId>MyInterceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.48</version>
</dependency>
</dependencies>
<!--可以使用maven中的某些打包插件,不仅可以帮助我们打包代码还可以打包所依赖的jar包-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<!-- 禁止生成 dependency-reduced-pom.xml-->
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<!-- 解决包冲突 进行转换-->
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<excludes>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们-->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>mainclass</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
json --> java 代码解析 json --> 实体 ,实体-->json 字符串 都需要使用工具
jackson、fastjson(阿里巴巴)
我们这次使用的是fastjson
可以先创建一个测试类,用来测试能不能获取到json对象中的值 :
package com.bigdata;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Main {
public static void main(String[] args) {
// 将需要转换的数据指定出来
String log = "{\n" +
"\"host\":\"www.baidu.com\",\n" +
"\"user_id\":\"13755569427\",\n" +
"\"items\":[\n" +
" {\n" +
" \"item_type\":\"eat\",\n" +
" \"active_time\":156234\n" +
" },\n" +
" {\n" +
" \"item_type\":\"car\",\n" +
" \"active_time\":156233\n" +
" }\n" +
" ]\n" +
"}";
//解析
JSONObject jsonObject = JSON.parseObject(log);
// 获取json里面的值
String host = jsonObject.getString("host");
String userId = jsonObject.getString("user_id");
System.out.println(host);
System.out.println(userId);
// items 是一个集合,所以需要创建集合获取
JSONArray items = jsonObject.getJSONArray("items");
//创建list集合 泛型可以指定为map集合,或者再创建一个items的实体类
List list = new ArrayList<Map<String, String>>();
for (Object item : items) {
//创建map集合
Map map = new HashMap<String,String>();
map.put("host",host);
map.put("user_id",userId);
// 将list转换为字符串类型
String itemStr = item.toString();
JSONObject jsonItem = JSON.parseObject(itemStr);
//获取值
String itemType = jsonItem.getString("item_type");
String activeTime = jsonItem.getString("active_time");
System.out.println(itemType);
System.out.println(activeTime);
map.put("item_type",itemType);
map.put("active_time",activeTime);
list.add(map);
}
String jsonString = JSON.toJSONString(list);
System.out.println(jsonString);
}
}
完整代码如下:
package com.bigdata;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DemoInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 解析 文本数据变为另一种格式
String body = new String(event.getBody());
// 将json字符串转为json类型
JSONObject jsonObject = JSON.parseObject(body);
// 获取json里面的值
String host = jsonObject.getString("host");
String userId = jsonObject.getString("user_id");
System.out.println(host);
System.out.println(userId);
// items 是一个集合,所以需要创建集合获取
JSONArray items = jsonObject.getJSONArray("items");
//创建list集合 泛型可以指定为map集合,或者再创建一个items的实体类
List list = new ArrayList<Map<String, String>>();
for (Object item : items) {
//创建map集合
Map map = new HashMap<String,String>();
map.put("host",host);
map.put("user_id",userId);
// 将list转换为字符串类型
String itemStr = item.toString();
JSONObject jsonItem = JSON.parseObject(itemStr);
//获取值
String itemType = jsonItem.getString("item_type");
String activeTime = jsonItem.getString("active_time");
System.out.println(itemType);
System.out.println(activeTime);
map.put("item_type",itemType);
map.put("active_time",activeTime);
list.add(map);
}
String jsonString = JSON.toJSONString(list);
event.setBody(jsonString.getBytes());
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> events = new ArrayList<>();
for (Event event : list) {
Event newEvent = intercept(event);
events.add(newEvent);
}
return events;
}
@Override
public void close() {
}
// 作用只有一个,就是new 一个自定义拦截器的类
public static class BuilderEvent implements Builder{
@Override
public Interceptor build() {
return new DemoInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
代码编写完毕后,将文件打包成jar包之前,要确定,是否使用了打包插件,如果没有使用的话,要确定是否jar包完全导入进去到了集群中
最后 打包,将jar包传入集群中
启动flume
三、自动容灾(故障转移)
其实就是高可用
在 bigdata01 创建脚本文件 failover.conf:
#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088
#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
# 此处是设置的权重,权重越多,就工作
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000
bigdata02 failover2.conf:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata02
a1.sources.r1.port = 10087
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = logger
在bigdata03上,编写failover3.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata03
a1.sources.r1.port = 10088
# channel
a1.channels.c1.type = memory
# sink
a1.sinks.k1.type = logger
先在bigdata02,bigdata03 上面启动flume脚本命令
bigdata02上:
flume-ng agent -n a1 ../ -f failover2.conf -Dflume.root.logger=INFO,console
bigdata03上:
flume-ng agent -n a1 ../ -f failover3.conf -Dflume.root.logger=INFO,console
bigdata02,bigdata03 上面启动flume脚本 无异常后,
再在bigdata01 上面启动
在bigdata01 上,发送消息:
echo "wei,wei,wei" | nc bigdata01 10086
这时,bigdata02 就有了反应
为了测试高可用,这时,再把bigdata02 flume服务关闭,营造一种flume挂掉的状态,再在bigdata01上发送消息
这时,数据就会传入到了bigdata03 中
自动容灾(高可用完成)
标签:Flume,数仓,sinks,hdfs,a1,sources,k1,工具,r1 From: https://blog.csdn.net/qq_62984376/article/details/141998176