首页 > 其他分享 >大数据技术之Flume应用案例(2)

大数据技术之Flume应用案例(2)

时间:2024-08-24 13:54:51浏览次数:10  
标签:Flume hdfs sinks conf k3 案例 a3 应用 flume

目录

 监控端口数据官方案例

步骤 1: 准备环境

步骤 2: 配置 Flume Agent

步骤 3: 启动 Flume Agent

步骤 4: 发送数据到 Flume

步骤 5: 查看 HDFS 中的数据

注意事项

示例说明

实时监控单个追加文件案例

需求分析

实现步骤

(1)确保环境变量配置正确

(2)创建 flume-file-hdfs.conf 文件

(3)运行 Flume

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

(5)在 HDFS 上查看文件

实时监控目录下多个新文件案例

需求分析

实现步骤

(1)创建 flume-dir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 upload 文件夹中添加文件

(4)查看 HDFS 上的数据

实时监控目录下的多个追加文件案例

需求分析

实现步骤

(1)创建 flume-taildir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 files 文件夹中追加内容

(4)查看 HDFS 上的数据

Taildir Source 说明


 监控端口数据官方案例

Flume 可以用来监控网络端口数据,这对于收集来自不同系统的日志或数据非常有用。下面是一个使用 Flume 监控网络端口数据的官方示例,我们将使用 Flume 的 netcat source 来接收数据,并将其写入到 HDFS 中。

步骤 1: 准备环境

确保已经安装并配置好了 Flume 和 Hadoop。这里假设你已经在上一步中完成了 Flume 的安装。

步骤 2: 配置 Flume Agent

创建一个名为 flume-conf.properties 的配置文件,该文件将定义一个 Flume Agent 的配置。

配置文件 flume-conf.properties

# 定义 agent 名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume
a1.sinks.k1.hdfs.filePrefix = flume-logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 512
a1.sinks.k1.hdfs.rollCount = 20
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置 agent 的 source、channel 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

步骤 3: 启动 Flume Agent

使用以下命令启动 Flume Agent:

$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file ./flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

这里 $FLUME_HOME 是 Flume 的安装目录。

步骤 4: 发送数据到 Flume

你可以使用 netcat 工具或其他类似工具发送数据到 Flume 监听的端口。例如,如果你在另一台机器上或同一台机器的不同终端窗口中,可以使用 netcat 发送数据:

echo "This is a test message" | nc localhost 44444

步骤 5: 查看 HDFS 中的数据

一旦数据被发送到 Flume,Flume 将其写入到 HDFS 中。你可以使用 Hadoop 命令来查看数据:

hadoop fs -ls /flume
hadoop fs -cat /flume/flume-logs-*

注意事项

  • 确保 Hadoop 的 hdfs-site.xml 和 core-site.xml 配置文件已经正确配置。
  • 如果你的 Hadoop 集群使用了安全模式,确保你已经配置了正确的 Kerberos 凭证。
  • 如果你使用的是分布式 Flume,确保所有的 Flume 节点都能够访问 HDFS。

示例说明

  • Netcat Source (a1.sources.r1):配置了 netcat source 来监听 localhost 的 44444 端口。
  • HDFS Sink (a1.sinks.k1):配置了 HDFS sink 将数据写入到 HDFS 的 /flume 目录下。
  • Memory Channel (a1.channels.c1):使用内存 channel 作为 source 和 sink 之间的缓冲区。

实时监控单个追加文件案例

需求分析

  • 实时读取本地文件到HDFS案例
  • Hive日志文件位于 /opt/module/hive/logs/hive.log
  • Flume监控该文件
  • 数据最终存储到HDFS

实现步骤

(1)确保环境变量配置正确

确认 /etc/profile.d/my_env.sh 文件中包含以下内容:

JAVA_HOME=/opt/module/jdk1.8.0_212
HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME
(2)创建 flume-file-hdfs.conf 文件

创建文件 flume-file-hdfs.conf,并添加如下内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Configure the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop12:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0

# Configure the channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(3)运行 Flume
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
[lzl@hadoop12 hadoop-2.7.2]$ sbin/start-dfs.sh
[lzl@hadoop13 hadoop-2.7.2]$ sbin/start-yarn.sh
[lzl@hadoop12 hive]$ bin/hive
(5)在 HDFS 上查看文件
hadoop fs -ls /flume

实时监控目录下多个新文件案例

需求分析

  • 使用 Flume 监听整个目录的文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/upload

实现步骤

(1)创建 flume-dir-hdfs.conf 文件

创建文件 flume-dir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0

# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
(3)向 upload 文件夹中添加文件
[lzl@hadoop12 flume]$ mkdir upload
[lzl@hadoop12 upload]$ touch lzl.txt
[lzl@hadoop12 upload]$ touch lzl.tmp
[lzl@hadoop12 upload]$ touch lzl.log
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload

 

实时监控目录下的多个追加文件案例

需求分析

  • 使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/files

实现步骤

(1)创建 flume-taildir-hdfs.conf 文件

创建文件 flume-taildir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*

# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0

# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
  • 在 /opt/module/flume 目录下创建 files 文件夹
[lzl@hadoop12 flume]$ mkdir files
  • 向 files 文件夹中添加文件
[lzl@hadoop12 files]$ echo hello >> file1.txt
[lzl@hadoop12 files]$ echo lzl>> file2.txt
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload2
Taildir Source 说明
  • Position File: Taildir Source 维护了一个 JSON 格式的 positionFile,它会定期地往 positionFile 中更新每个文件读取到的最新位置,因此能够实现断点续传。
  • Position File 格式:
    {
      "inode": 2496272,
      "pos": 12,
      "file": "/opt/module/flume/files/file1.txt"
    }
    {
      "inode": 2496275,
      "pos": 12,
      "file": "/opt/module/flume/files/file2.txt"
    }
  • Note: Linux 中存储文件元数据的区域称为 inode,每个 inode 都有一个编号,操作系统用 inode 编号来识别不同的文件。Unix/Linux 系统内部不使用文件名,而是使用 inode 编号来识别文件。

标签:Flume,hdfs,sinks,conf,k3,案例,a3,应用,flume
From: https://blog.csdn.net/qq_45115959/article/details/141498450

相关文章

  • Python文件管理器:一个基于wxPython的桌面应用
    在当今的软件开发世界中,管理大量的源代码文件变得越来越重要。无论是个人项目还是大型团队协作,有一个强大而灵活的文件管理工具都可以大大提高工作效率。今天,我们要介绍一个基于Python和wxPython构建的文件管理器,它专门用于管理.py文件。C:\pythoncode\new\managefiles.py......
  • Gradio.NET支持 .NET 8 简化 Web 应用开发
    目录前言Gradio.NETGradio.NET使用1、创建项目2、安装Gradio.Net3、示例代码Gradio.NET示例1、Layout2、Form3、Media4、Chatbot5、ProgressGradio.NET应用项目地址总结最后前言Gradio.NET是Gradio在.NET平台上的移植版本。Gradio是一个开源的......
  • 【HarmonyOS NEXT应用开发】案例69:基于原生能力的压缩与解压缩能力
    一、场景描述概览、常用图片编码格式比对及系统支持情况压缩格式简介系统支持/使用方式zip普及率高,适用范围也最广,压缩速度相比rar快一些ArkTs支持,可通过zlib实现,jsziprarrar格式比zip更能够提供较好的压缩率,但压缩速度也相对慢一些三方库支持,通过Unrar实现......
  • 6大主流的威胁情报源及应用特点分析
    威胁情报源(Threatintelligencefeed)是一种提供关于最新网络威胁和攻击信息的数据流,其中涉及漏洞、恶意软件、网络钓鱼以及其他恶意攻击活动。这些数据由安全研究人员、行业监管机构以及专业安全厂商所共同创建,通常采用STIX/TAXII等标准格式,可以与EDR、SIEM、防火墙、威胁......
  • Android开发 - BroadcastReceiver 类处理系统或应用内部发送的广播消息解析
    什么是BroadcastReceiverBroadcastReceiver类是一个非常重要的组件,用于处理系统或应用内部发送的广播消息。广播消息可以是系统发出的(比如电池电量低、网络连接变化等)或者是应用内部发出的(比如某个任务完成了)。BroadcastReceiver可以在应用的后台或前台接收到这些消息,并作出......
  • JNPF:一文详解可视化低代码开发平台的研究与应用
    低代码开发平台的兴起 随着信息技术的迅猛发展,企业对软件开发的需求不断攀升,传统的软件开发模式已经无法适应快速变化的市场需求。在这种背景下,低代码开发平台(Low-CodeDevelopmentPlatform,LCDP)应运而生,它通过提供一个可视化的开发环境,极大地简化了软件开发过程,使得非专......
  • 外网爆火的LLM应用手册来了!内行人都在学的大模型黑书,豆瓣评分高达9.9!!!
    Transformer模型介绍Transformer是工业化、同质化的后深度学习模型,其设计目标是能够在高性能计算机(超级计算机)上以并行方式进行计算。通过同质化,一个Transformer模型可以执行各种任务,而不需要微调。Transformer使用数十亿参数在数十亿条原始未标注数据上进行自监督学......
  • 栅格布局在 HarmonyOS 中的应用及扩展
    栅格布局作为一种经典的布局方式,广泛应用于不同类型的用户界面设计,尤其是在移动设备和响应式设计中,它表现出了强大的适应性。本文将深入探讨如何在HarmonyOS中使用栅格布局组件GridRow和GridCol,并通过多种示例来展示栅格布局的灵活性及扩展性。栅格布局的核心优势1.......
  • 构建Spring Boot应用的微服务服务监控与告警
    构建SpringBoot应用的微服务服务监控与告警大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!随着微服务架构的普及,服务监控与告警成为了保障系统稳定性的关键环节。本文将探讨如何在SpringBoot应用中构建微服务的监控与告警机制。一、微服务监控的......
  • 构建Spring Boot应用的微服务服务降级策略
    构建SpringBoot应用的微服务服务降级策略大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!微服务架构中的服务降级在微服务架构中,服务间的依赖关系复杂,任何一个服务的故障都可能影响到整个系统的稳定性。服务降级是一种应对策略,当某个服务不可用或响......