首页 > 其他分享 >大数据技术Flume框架详解

大数据技术Flume框架详解

时间:2022-08-29 23:01:40浏览次数:48  
标签:Flume channels sinks 框架 a1 sources 详解 c1 a2

Flume的概述

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日 志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

  • 高可用(HA) flume框架(故障转移机制)
  • 高可靠 数据采集的可靠性
  • 分布式 分布式集群搭建

Flume的作用

最主要的作用:实时读取服务器本地磁盘的数据,将数据写到HDFS、Kafka

Flume的优点

可以和任意存储进程集成。

  • 支持不同的采集源
  • 支持多类型的目标源

输入的的数据速率大于写入目的存储的速率,flume会进行缓冲,减小 hdfs的压力

flume中的事务基于channel,使用了两个事务模型(sender + receiver),确保消息被可靠发送

Flume使用两个独立的事务分别负责从soucrce到channel,以及从 channel到sink的事件传递。一旦事务中所有的数据全部成功提交到 channel,那么source才认为该数据读取完成。同理,只有成功被sink 写出去的数据,才会从channel中移除。

Flume的组成结构

1、Flume组成架构

2、Agent

a、简介

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent 主要有3个部分组成,Source、Channel、Sink。

b、Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理 各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、 spooling directory、netcat、sequence generator、syslog、 http、legacy。

c、Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许 Source和Sink运作在不同的速率上。Channel是线程安全的,可以同 时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel:Memory Channel和File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心 数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数 据丢失。File Channel将所有事件写到磁盘。因此在程序关闭或机器宕 机的情况下不会丢失数据。

d、Sink

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批 量写入到存储或索引系统、或者被发送到另一个Flume Agent。Sink 是完全事务性的。在从Channel批量删除数据之前,每个Sink用 Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该 Channel从自己的内部缓冲区删除事件。Sink组件目的地包括hdfs、 logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

e、Event

传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送 至目的地。 Event由可选的header和载有数据的一个byte array 构成。Header是容纳了key-value字符串对的HashMap。

Flume agent的配置文件

单数据源单出口案例

这种模式是将多个flume给顺序连接起来了,从最初的source开始到最 终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume宕机,会影响整个传输系统。

flume实现监控端口数据案例:

用netcat工具向本机端口号:44444发送消息,flume监听

# Name the components on this agent
# r1:表示a1的输入源	a1:表示agent的名称
a1.sources = r1
# k1:表示a1的输出目的地
a1.sinks = k1
# c1:表示a1的缓冲区
a1.channels = c1

# Describe/configure the source
# 表示a1的输入源类型为netcat端口类型
a1.sources.r1.type = netcat
# 表示a1的监听主机
a1.soucres.r1.bind = localhost
# 表示a1的监听的端口号
a1.sources.r1.port = 44444

# Describe the sink
# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示讲r1和c1链接起来
a1.sources.r1.channels = c1
# 表示将k1和c1链接起来
a1.sinks.k1.channel = c1

启动flume

  • 方法一:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf - Dflume.root.logger=INFO,console

  • 方法二:bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明

  • --conf conf/ :表示(conf)配置文件存储在conf/目录
  • --name a1 :表示给agent起名为a1
  • --conf-file job/flume-netcat.conf :flume本次启动读取的配置 文件是在job文件夹下的flume-telnet.conf文件。
  • -Dflume.root.logger==INFO,console :-D表示flume运行时动 态修改flume.root.logger参数属性值,并将控制台日志打印级别设 置为INFO级别。日志级别包括:log、info、warn、error。

实时采集文件到HDFS上案例

用flume实时监听某文件,当该文件的内容变化时,上传该数据到HDFS上。

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
# 定义数据源文件的类型
a2.sources.r2.type = exec
# 监听该目录下的access.log文件
a2.sources.r2.command = tail -F /home/hadoop/nginx/logs/access.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
# 上传文件的路径 %Y%m%d为时间戳,自动生成对应时间 年月日
a2.sinks.k2.hdfs.path = hdfs://192.168.137.128:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

实时读取目录文件到HDFS上案例

使用flume实时监听整个目录文件,当该目录文件新增时,上传该文件到HDFS上。

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
# 定义source类型为目录
a3.sources.r3.type = spooldir
# 定义监控目录
a3.sources.r3.spoolDir = /home/hadoop/bigdatasoftware/flume/upload
# 定义文件上传完的后缀名
a3.sources.r3.fileSuffix = .COMPLETED
# 是否有五年间头
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.137.128:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

单数据源多出口案例(选择器)

Flume支持将事件流向一个或者多个目的地。这种模式将数据源复制到 多个channel中,每个channel都有相同的数据,sink可以选择传送的 不同的目的地。

flume1监控文件的变动,并将变动的内容传递给flume2和flume3。

flume2负责输出到HDFS上

flume3负责输出到本地上

三个flume在同一台设备上

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/bigdatasoftware/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
# 设置其中一个flume接收的地址
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
# 设置另一个flume的接收地址
a1.sinks.k2.hostname = 192.168.137.128
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
# 设置本机地址,注意端口号
a2.sources.r1.bind = 192.168.137.128
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path =
hdfs://192.168.137.128:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.128
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

注意:接收方与发送方的地址和端口号要对应

单数据源多出口案例(Sink组)

Flume支持使用将多个sink逻辑上分到一个sink组,flume将数据发送 到不同的sink,主要解决负载均衡和故障转移问题

配置1个接收日志文件的source和1个channel、两个sink,分别输送给flume-flume-console1和flume-flume-console2。

flume1:

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 22222

#定义一个sink组
#一个channel对应多个sink时要设置一个sinkgroups
a1.sinkgroups = g1
#指明sink组中的sink实例
a1.sinkgroups.g1.sinks = k1 k2
#设置sinkProcessor的类型(负载均衡)
a1.sinkgroups.g1.processor.type = load_balance
#①random-随机分配  ②round_robin-轮循
a1.sinkgroups.g1.processor.selector = random


a1.channels.c1.type = memory


a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 33333

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.137.129
a1.sinks.k2.port = 44444


a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

flume2:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.128
a1.sources.r1.port = 33333

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume3:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.129
a1.sources.r1.port = 44444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

a1.sinks.k1.type = logger


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

多数据源汇总

这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百 个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也 非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务 器部署一个flume采集日志,传送到一个集中收集日志的flume,再由 此flume上传到hdfs、hive、hbase、jms等,进行日志分析

flume1监控一个文件的变动

flume2监控一个端口的数据

flume1和flume2将数据发送给flume3,flume3最终将数据打印到控制台。

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.129
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = 198.168.137.128
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.137.129
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.129
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

标签:Flume,channels,sinks,框架,a1,sources,详解,c1,a2
From: https://www.cnblogs.com/Mr-Sponge/p/16637703.html

相关文章

  • 服务器TIME_WAIT和CLOSE_WAIT详解和解决办法
    服务器TIME_WAIT和CLOSE_WAIT详解和解决办法-悟寰轩-叶秋-博客园 https://www.cnblogs.com/sunxucool/p/3449068.html昨天解决了一个HttpClient调用错误导致的服务......
  • 第二章、框架设计的核心要素
    1.提升用户的开发体验开发体验是衡量一个框架的指标之一关于快速定位问题和打印警告信息和其他重要信息 Vue.js3  源码中使用 init......
  • APICloud AVM框架 封装车牌号输入键盘组件
    AVM(Application-View-Model)前端组件化开发模式基于标准WebComponents组件化思想,提供包含虚拟DOM和Runtime的编程框架avm.js以及多端统一编译工具,完全兼容WebComponents标......
  • 如何将本地化添加到 Django REST 框架
    如何将本地化添加到DjangoREST框架Django在本文中,我将为您提供有关如何使用I18N和DjangoRestFramework本地化DjangoRestAPI的指南。在开始本教程之前,我将......
  • (转)SNMP学习笔记之SNMPWALK 安装与使用详解
    0x00 简介snmpwalk是SNMP的一个工具,它使用SNMP的GETNEXT请求查询指定OID(SNMP协议中的对象标识)入口的所有OID树信息,并显示给用户。通过snmpwalk也可以查看支持SNMP协议(可......
  • 数位DP详解
    数位dp一般针对统计某个区间符合一个或多个条件的数的数量,因为算法名字叫数位dp,所以我们需要对数位进行枚举思路考虑,传统做法,例如统计[L,R]之间有多少个数,该数至少含......
  • 详解 OpenDAL |Data Infra 研究社第三期
    你是否对 OpenDAL的设计和使用还有不解,急需一个系统的解释去深入了解呢?对于 OpenDAL在Databend中的应用是否了解?本次直播我们会携手旋涡老师一起为大家答疑解惑,学习......
  • 【三维地图】开发攻略 —— 详解“GeoJSON”技术和应用场景
    GeoJSON,一个用于存储地理信息的数据格式。GoeJSON对象可以表示几何、特征或特征集合,支持:点、线、面、多点、多线、多面和几何集合。在基于平面地图,三维地图中都需要用到的......
  • Logstash配置详解
    转自:https://blog.csdn.net/hushukang/article/details/844231841.InputPlugin1.1从文件输入从文件读取数据,如常见的日志文件。文件读取通常要解决几个问题:序号......
  • “轻松搞定CMake”系列之find_package用法详解
    本文是“轻松搞定CMake”系列博客中的一篇,该篇文章的主要目的是详细讲解一下CMake中搜包命令find_package的使用和原理。其他更多文章请参考:“轻松搞定CMake”系列博客......