目录
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
(1)创建 flume-taildir-hdfs.conf 文件
监控端口数据官方案例
Flume 可以用来监控网络端口数据,这对于收集来自不同系统的日志或数据非常有用。下面是一个使用 Flume 监控网络端口数据的官方示例,我们将使用 Flume 的 netcat source 来接收数据,并将其写入到 HDFS 中。
步骤 1: 准备环境
确保已经安装并配置好了 Flume 和 Hadoop。这里假设你已经在上一步中完成了 Flume 的安装。
步骤 2: 配置 Flume Agent
创建一个名为 flume-conf.properties
的配置文件,该文件将定义一个 Flume Agent 的配置。
配置文件 flume-conf.properties
# 定义 agent 名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume
a1.sinks.k1.hdfs.filePrefix = flume-logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 512
a1.sinks.k1.hdfs.rollCount = 20
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置 agent 的 source、channel 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
步骤 3: 启动 Flume Agent
使用以下命令启动 Flume Agent:
$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file ./flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
这里 $FLUME_HOME
是 Flume 的安装目录。
步骤 4: 发送数据到 Flume
你可以使用 netcat 工具或其他类似工具发送数据到 Flume 监听的端口。例如,如果你在另一台机器上或同一台机器的不同终端窗口中,可以使用 netcat 发送数据:
echo "This is a test message" | nc localhost 44444
步骤 5: 查看 HDFS 中的数据
一旦数据被发送到 Flume,Flume 将其写入到 HDFS 中。你可以使用 Hadoop 命令来查看数据:
hadoop fs -ls /flume
hadoop fs -cat /flume/flume-logs-*
注意事项
- 确保 Hadoop 的
hdfs-site.xml
和core-site.xml
配置文件已经正确配置。 - 如果你的 Hadoop 集群使用了安全模式,确保你已经配置了正确的 Kerberos 凭证。
- 如果你使用的是分布式 Flume,确保所有的 Flume 节点都能够访问 HDFS。
示例说明
- Netcat Source (
a1.sources.r1
):配置了 netcat source 来监听localhost
的44444
端口。 - HDFS Sink (
a1.sinks.k1
):配置了 HDFS sink 将数据写入到 HDFS 的/flume
目录下。 - Memory Channel (
a1.channels.c1
):使用内存 channel 作为 source 和 sink 之间的缓冲区。
实时监控单个追加文件案例
需求分析
- 实时读取本地文件到HDFS案例
- Hive日志文件位于
/opt/module/hive/logs/hive.log
- Flume监控该文件
- 数据最终存储到HDFS
实现步骤
(1)确保环境变量配置正确
确认 /etc/profile.d/my_env.sh
文件中包含以下内容:
JAVA_HOME=/opt/module/jdk1.8.0_212
HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME
(2)创建 flume-file-hdfs.conf 文件
创建文件 flume-file-hdfs.conf
,并添加如下内容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Configure the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop12:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
# Configure the channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(3)运行 Flume
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
[lzl@hadoop12 hadoop-2.7.2]$ sbin/start-dfs.sh
[lzl@hadoop13 hadoop-2.7.2]$ sbin/start-yarn.sh
[lzl@hadoop12 hive]$ bin/hive
(5)在 HDFS 上查看文件
hadoop fs -ls /flume
实时监控目录下多个新文件案例
需求分析
- 使用 Flume 监听整个目录的文件,并上传至 HDFS
- 被监控的目录位于
/opt/module/flume/upload
实现步骤
(1)创建 flume-dir-hdfs.conf 文件
创建文件 flume-dir-hdfs.conf
,并添加如下内容:
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
(3)向 upload 文件夹中添加文件
[lzl@hadoop12 flume]$ mkdir upload
[lzl@hadoop12 upload]$ touch lzl.txt
[lzl@hadoop12 upload]$ touch lzl.tmp
[lzl@hadoop12 upload]$ touch lzl.log
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload
实时监控目录下的多个追加文件案例
需求分析
- 使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
- 被监控的目录位于
/opt/module/flume/files
实现步骤
(1)创建 flume-taildir-hdfs.conf 文件
创建文件 flume-taildir-hdfs.conf
,并添加如下内容:
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
- 在
/opt/module/flume
目录下创建files
文件夹
[lzl@hadoop12 flume]$ mkdir files
- 向
files
文件夹中添加文件
[lzl@hadoop12 files]$ echo hello >> file1.txt
[lzl@hadoop12 files]$ echo lzl>> file2.txt
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload2
Taildir Source 说明
- Position File: Taildir Source 维护了一个 JSON 格式的
positionFile
,它会定期地往positionFile
中更新每个文件读取到的最新位置,因此能够实现断点续传。 - Position File 格式:
{ "inode": 2496272, "pos": 12, "file": "/opt/module/flume/files/file1.txt" } { "inode": 2496275, "pos": 12, "file": "/opt/module/flume/files/file2.txt" }
- Note: Linux 中存储文件元数据的区域称为
inode
,每个inode
都有一个编号,操作系统用inode
编号来识别不同的文件。Unix/Linux 系统内部不使用文件名,而是使用inode
编号来识别文件。