Flume基础
netcat(linux系统轻量级通信工具)安装
#桌面端可能自带 环境centos7
sudo yum install -y nc
#开启服务端(端口9999)
nc -lk 9999
#另一个窗口客户端监听 9999
nc localhost 9999
fluem官方案例测试
#判断 44444 端口是否被占用
sudo netstat -nlp | grep 44444
#创建 Flume Agent 配置文件 flume-netcat-logger.conf
[flume]$ mkdir job
[flume]$ cd job/
#在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
[job]$ vim flume-netcat-logger.conf
在 flume-netcat-logger.conf 文件中添加如下内容:
# Name the components on this agent
# 给当前agent命名组件 agent名字(唯一):a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost #绑定节点
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 #总容量1000个event
#表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# r1可以和多个c1绑定
a1.sources.r1.channels = c1 #表示将r1和c1连接起来
a1.sinks.k1.channel = c1 #表示将k1和c1连接起来
启动flume命令,开启一个agent
#-n agent的名字
#-c 配置文件
#-f flume 本次启动读取的agent的配置文件 job/net-flume-logger.conf
# -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error
$ bin/flume-ng agent -n $agent_name -c conf -f job/flume-conf.properties.template -Dflume.root.logger=INFO,console
另一个窗口监听44444端口
nc localhost 44444
实时监控的单个追加文件
需求:实时监控Hive日志,并上传到HDFS
一,Hadoop和java环境正确
二,创建fluem-file-hdfs.conf
#创建文件
[job]$ vim flume-file-hdfs.conf
# Name the components on this agent
# 给当前agent命名组件 agent名字(唯一):a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 #总容量1000个event
#表示a1的channel传输时收集到了100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30 # s
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700 #接近128M
#文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
先启动hdfs(已启动) 在启动flume的agent
bin/flume-ng agent -n a1 -c conf -f job/flume-file-hdfs.conf
hive.log存放初始位置为/tmp/本机用户/hive.log
开启hive
不支持断点续传
实时监控目录下多个新文件
案例需求:使用flume监听整个目录的文件,并上传至hdfs
一,创建配置文件flume-dir-hdfs.conf
vim flume-dir-hdfs.conf
# 添加内容
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://master:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 20
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
启动flume的agent命令
bin/flume-ng agent -n a3 -c conf/ -f job/flume-dir-hdfs.conf
文件创建后再次写入数据无法监控
文件更名后会再次监控
实时监控目录下的多个追加文件(重要)
案例需求:使用Flume监听整个目录的实时追加文件,并上传到hdfs
一,创建配置文件 flume-taildir-hdfs.conf
vim flume-taildir-hdfs.conf
#添加内容
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json #保存读取的配置信息实现断点续传
a3.sources.r3.filegroups = f1 f2 #文件组名字随意,f1,f2相当于一个变量
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path =
hdfs://master:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
启动flume的agent
bin/flume-ng agent -c conf/ -n a3 -f job/flume-taildir-hdfs.conf
源码修改使得flume任务可以一直监控而非改名后就不能监控的缺点
应用于hive日志的新一天会把当天的hive.log 变为hive-日期.log,若是在前一天的11:30宕机后指导另一天才修复,造成最后半小时的日志无法获取的问题
修改fluem-taildir-hdfs的源码配置,修改使用一个标准(inode)判断文件的是否新建
flume-taildir-source-1.9.0.jar
Flume进阶
一,事务
Put事务流程
•doCommit:检查channel内存队列是否足够合并。
•doRollback:channel内存队列空间不足,回滚数据
Take事务
•doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
•doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
•doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区t
akeList中的数据归还给channel内存队列。
二,内部原理
重要组件:
1)ChannelSelector
ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,
分别是 Replicating(复制)和 Multiplexing(多路复用)。
ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相
应的原则,将不同的 Event 发往不同的 Channel。
2)SinkProcessor
SinkProcessor
共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor
DefaultSinkProcessor 对 应 的 是 单 个 的 Sink ,
LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,
LoadBalancingSinkProcessor 可以实现负载均衡的功能,
FailoverSinkProcessor 可以错误恢复的功能
三,拓扑结构
-
简单串联
是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统
-
复制和多路复用
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。
-
负载均衡和故障转移
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能
-
聚合
这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的 flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析
Flume开发案例
复制和多路复用
需求:使用flume-1监控文件变动,flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到LocalFileSystem。
在/opt/module/flume/job 目录下创建 group1 文件夹
[job]$ mkdir group1
[job]$ cd group1/
#创建三个flume文件
#第一个文件flume,配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flumehdfs 和 flume-flume-dir
[group1]$ vim flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel(默认为replication)
a1.sources.r1.selector.type = replicating
# Describe/configure the source(若没有改hive日志则在tmp/hive.log)
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = master
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#创建 flume-flume-hdfs.conf,配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink
[group1]$ vim flume-flume-hdfs.conf
# Name the components on this agent(agent名字必须不同)
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = master
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://master:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#创建 flume-flume-dir.conf,配置上级 Flume 输出的 Source,输出是到本地目录的 Sink
[group1]$ vim flume-flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = master
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
#输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目
录
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
#新建保存本地数据的目录
[data]$ mkdir flume3
分别启动配置文件(先启动服务端(a2,a3),再开启客户端a1)
flume-flume-hdfs.conf
bin/flume-ng agent -c conf -n a2 -f job/group1/flume-flume-hdfs.conf
flume-flume-dir.conf
bin/flume-ng agent -c conf -n a2 -f job/group1/flume-flume-dir.conf
flume-file-flume.conf
bin/flume-ng agent -c conf -n a1 -f job/group1/flume-file-flume.conf
启动hadoop和hive
检查数据
负载均衡和故障转移
需求:使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能
在/opt/module/flume/job 目录下创建 group2 文件夹
[job]$ mkdir group2
[job]$ cd group2/
#创建 flume-netcat-flume.conf,配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2
[group2]$ vim flume-netcat-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#优先级设置priority k2更高
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = master
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
#创建 flume-flume-console1.conf,配置上级 Flume 输出的 Source,输出是到本地控制台
[group2]$ vim flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = master
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#创建 flume-flume-console2.conf,配置上级 Flume 输出的 Source,输出是到本地控制台
[group2]$ vim flume-flume-console2.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = master
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
分别启动配置文件(服务端先启动a2,a3)
分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume
#启动a2
bin/flume-ng agent -c conf -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
#启动a3
bin/flume-ng agent -c conf -n a3 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
#启动a1
bin/flume-ng agent -c conf -n a1 -f job/group2/flume-netcat-flume.conf
使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
查看Flume2和Flume3的控制台日志
kill 掉Flume2 ,观察Flume3控制台打印情况
使用 jps -ml 查看 Flume 进程
故障转移改负载均衡
将flume-netcat-flume.conf复制一份为flume-netcat-flume.conf并修改sink group
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#优先级设置priority k2更高
a1.sinkgroups.g1.processor.type = load balance
a1.sinkgroups.g1.processor.selector = round_robin
#退避算法backoff,两个sink拉取数据,若当前一个sink未拉去数据则选择此sink一段时间内不要去拉去数据
a1.sinkgroups.g1.processor.backoff = true
#指数增长
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = master
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
聚合
需求:master上的Flume-1监控文件/opt/module/group.log
slave1上的Flume-2监控某一个端口的数据流
Flume-1与Flume-2将数据发送给slave2上的Flume-3,Flume-3将最终数据打印到控制台
分发Flume
[module]$ xsync flume
在 master、slave1 以及 slave2 的/opt/module/flume/job 目录下创建一个
group3 文件夹
[master job]$ mkdir group3
[slave1 job]$ mkdir group3
[slave2 job]$ mkdir group3
创建 flume1-netcat-flume.conf,配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume
[master group3]$ vim flume1-netcat-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = slave2
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#创建 flume2-logger-flume.conf,配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume
[slave1 group3]$ vim flume2-logger-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/module/group.log
a2.sources.r1.shell = /bin/bash -c
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = slave2
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
#创建 flume3-flume-logger.conf ,配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台
[slave2 group3]$ vim flume3-flume-logger.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = slave2
a3.sources.r1.port = 4141
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
分别启动配置文件(服务端先启动a3)
分别开启对应配置文件:flume3-flume-logger.conf,flume1-netcat-flume.conf,flume2-logger-flume.conf
[slave2 flume]$ bin/flume-ng agent --conf conf/ --name
a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[slave1 flume]$ bin/flume-ng agent --conf conf/ --name
a1 --conf-file job/group3/flume2-logger-flume.conf
[master flume]$ bin/flume-ng agent --conf conf/ --name
a2 --conf-file job/group3/flume1-netcat-flume.conf
在 slave1 上向/opt/module 目录下的 group.log 追加内容
[slave1 module]$ echo 'hello' > group.log
在 master 上向 44444 端口发送数据
[master flume]$ telnet master 44444
[master flume]$ nc master 44444
检查 slave2上数据
自定义 Interceptor
需求:使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统
创建Java-maven项目实现Flume的自定义
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
package com.hadoop.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author H
* @create 2022/11/20 19:47
*/
public class TypeInterceptor implements Interceptor {
//声明集合用于处理存放拦截器处理后的事件
private List<Event> addHeaderEvents;
@Override
public void initialize() {
//初始化集合用于处理存放拦截器处理后的事件
addHeaderEvents = new ArrayList<>();
}
//单个事件处理方法
@Override
public Event intercept(Event event) {
//1.获取header和body
Map<String, String> headers = event.getHeaders();
String body = new String(event.getBody());
//2.根据body中是否包含hadoop字符串添加不同的头信息
if (body.contains("hadoop")){
headers.put("type","hadoop");
}else{
headers.put("type","other");
}
//3.返回数据
return event;
}
//批量事件处理方法
@Override
public List<Event> intercept(List<Event> list) {
//1.清空集合
addHeaderEvents.clear();
//2.遍历events
for (Event event : list) {
addHeaderEvents.add(intercept(event));
}
//3.返回数据
return addHeaderEvents;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TypeInterceptor();//用于构建对象
}
@Override
public void configure(Context context) {
}
}
}
打包将jar包导入到Flume的lib目录
编辑flume配置文件
#为 master 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor
[master job]$ mkdir group4
[master group4]$ vim flume1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hadoop.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.hadoop = c1
a1.sources.r1.selector.mapping.other = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = slave1
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = slave2
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
#为 slave1 上的 Flume4 配置一个 avro source 和一个 logger sink
[slave1 group4]$ vim flume2.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = slave1
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
#为 slave2 上的 Flume3 配置一个 avro source 和一个 logger sink
[slave2 group4]$ vim flume3.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = slave2
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
分别启动slave1,slave2 和master上启动flume进程
[slave1 flume]$ bin/flume-ng agent -c conf -n a1 -f job/group4/flume2.conf -Dflume.root.logger=INFO,console
[slave2 flume]$ bin/flume-ng agent -c conf -n a1 -f job/group4/flume3.conf -Dflume.root.logger=INFO,console
[master flume]$ bin/flume-ng agent -c conf -n a1 -f job/group4/flume1.conf
在 master使用 netcat 向 localhost:44444 发送字母和数字
观察 slave1和 slave2 打印的日志
slave1只接收带有hadoop字符的字符串
其他不符合的都由slave2接收
自定义source
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence ,generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source
官方也提供了自定义 source 的接口:https://flume.apache.org/FlumeDeveloperGuide.html#source
根据官方说明自定义MySource 需要继承AbstractSource 类并实现 Configurable 和 PollableSource 接口
实现相应方法:
getBackOffSleepIncrement() //backoff 步长
getMaxBackOffSleepInterval()//backoff 最长时间
configure(Context context)//初始化 context(读取配置文件内容)
process()//获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统
需求:使用flume接受数据,并给每条数据添加前缀,输出到控制台,前缀可以从flume配置文件中配置
自定义jar包
package com.hadoop.source;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
/**
* @author H
* @create 2022/11/21 9:55
*/
public class Mysource extends AbstractSource implements Configurable, PollableSource {
//声明数据的前后缀
private String perfix;//前缀
private String subfix;//后缀
private Long delay;
@Override
public void configure(Context context) {
//context会获取配置文件
perfix = context.getString("per","per-");
subfix = context.getString("sub");
delay = context.getLong("delay",2000L);
}
@Override
public Status process() throws EventDeliveryException {
//1.声明事件
Event event = new SimpleEvent();
HashMap<String, String> header = new HashMap<>();
//2.循环创建事件信息,传给channel
try {
for (int i = 0; i < 5; i++) {
event.setHeaders(header);
event.setBody((perfix + "hadoop:" + i + subfix).getBytes());
getChannelProcessor().processEvent(event);
}
Thread.sleep(delay);
return Status.READY;
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
在集群配置flume配置文件
[master job]$ vim mysource.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.hadoop.source.Mysource
#a1.sources.r1.pre =
#a1.sources.r1.sub =
#a1.sources.r1.delay = 3000
#a1.sources.r1.field = atguigu
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
开启任务
bin/flume-ng agent -c conf -n a1 -f job/mysource.conf -Dflume.root.logger=INFO,console
自定义sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
官方也提供了自定义 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义
MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
实现相应方法:
configure(Context context)//初始化 context(读取配置文件内容)
process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统
需求:使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置
jar包代码
package com.hadoop.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author H
* @create 2022/11/21 15:23
*/
public class Mysink extends AbstractSink implements Configurable {
//声明数据的前后缀
private String perfix;//前缀
private String subfix;//后缀
//创建Logger对象
private Logger logger = LoggerFactory.getLogger(Mysink.class);
@Override
public void configure(Context context) {
//context会获取配置文件
perfix = context.getString("per","per-");
subfix = context.getString("sub");
}
@Override
public Status process() throws EventDeliveryException {
//1.获取channel并开启事务
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
//2.从channel中抓取数据打印到控制台
try {
//2.1抓取数据
Event event;
while (true){
event = channel.take();
if (event != null){
break;
}
}
//2.2处理数据
logger.info(perfix + new String(event.getBody()) + subfix);
//2.3提交事务
transaction.commit();
return Status.READY;
} catch (ChannelException e) {
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
}
创建配置文件
[master job]$ vim mysink.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.hadoop.sink.Mysink
#a1.sinks.k1.per = hadoop-
a1.sinks.k1.sub = -bs
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动服务
bin/flume-ng agent -c conf -n a1 -f job/mysink.conf -Dflume.root.logger=INFO,console
nc localhost 44444
jps -l
kill -9 杀死进程
Flume数据流监控
Ganglia安装与部署
Ganglia 由 gmond、gmetad 和 gweb 三部分组成。
gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。
gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。
gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据
安装gnaglia
部署:
master : web gmetad gmod
slave1 : gmod
slave2 : gmod
在master slave1 slave2 分别安装epel-release
sudo yum -y install epel-release
在master安装
sudo yum -y install ganglia-gmetad
sudo yum -y install ganglia-web
sudo yum -y install ganglia-gmond
slave1 slave2 安装
sudo yum -y install ganglia-gmond
在 master 修改配置文件/etc/httpd/conf.d/ganglia.conf
vim /etc/httpd/conf.d/ganglia.conf
# Ganglia monitoring system php web frontend
#
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
Order deny,allow
Require all granted
#Deny from all
#Allow from 127.0.0.1
#Allow from ::1
# Allow from .example.com
</Location>
在 master 修改配置文件/etc/ganglia/gmetad.conf
sudo vim /etc/ganglia/gmetad.conf
data_source "my cluster" master
在master slave1 slave2 修改配置文件/etc/ganglia/gmond.conf
sudo vim /etc/ganglia/gmond.conf
修改为:
cluster {
name = "my cluster"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source
address
# that resolves to the machine's hostname.
Without
# this, the metrics may appear to come from
any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
# 数据发送给 master
host = master
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
# 接收来自任意连接的数据
bind = 0.0.0.0
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics
you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
在 master 修改配置文件/etc/selinux/config
sudo vim /etc/selinux/config
修改为:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
selinux 生效需要重启,如果此时不想重启,可以临时生效之:
[master flume]$ sudo setenforce 0
启动ganglia
在master slave1 slave2启动
sudo systemctl start gmond
在master启动
sudo systemctl start httpd
sudo systemctl start gmetad
sudo systemctl stop gmond
sudo systemctl stop httpd
sudo systemctl stop gmetad
打开网页浏览ganglia页面
如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录的权限
[master flume]$ sudo chmod -R 777 /var/lib/ganglia
操作 Flume 测试监控
启动Flume任务
bin/flume-ng agent \
-c conf/ \
-n a1 \
-f job/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649
发送数据观察ganglia检测图
nc localhost 44444
标签:Flume,flume,sinks,sources,实践,a1,案例,c1,channels
From: https://www.cnblogs.com/blwx/p/16916546.html