首页 > 其他分享 >一些关于flume的知识知识碎片

一些关于flume的知识知识碎片

时间:2024-07-03 19:58:24浏览次数:20  
标签:flume hdfs sinks 知识 碎片 a1 k1 c1 channels

Flume架构

Flume概述

flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。

支持在日志系统中定制各类数据发送方,用于收集数据;

同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume的数据流由事件(Event)贯穿始终。

事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把event推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

Event的概念:

flume的核心是把数据从数据源(source)收集过来,在将收集到的数据由目的地(sink)所拉取。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

简单理解:event信息就是flume收集到的数据

Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。

它是一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink,类似生产者、仓库、消费者的架构。

source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

  1. Source

    Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。 Flume提供了很多内置的Source, 支持 Avro, log4j, syslog 和 http post(body为json格式)。可以让应用程序同已有的Source直接打交道,如AvroSource
    如果内置的Source无法满足需要, Flume还支持自定义Source

    支持的类型有

    • Avro Source
    • Thrift Source
    • Exec Source
    • JMS Source
    • Spooling Directory Source
    • Twitter 1% firehouse Source
    • Netcat Source
    • Sequence Generator Source
    • Syslog Source
    • HTTP Source
    • Legacy Source
  2. Channel

    Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘(或支持jdbc的数据库中)上, 直到Sink处理完该事件。介绍两个较为常用的Channel, MemoryChannel和FileChannel。

    Channel支持的类型

    • Memory Channel
    • JDBC Channel
    • File Channel
    • Spillable Memory Channel
    • Pseudo Transaction Channel
    • Custom Channel
  3. Sink

    Sink从Channel中取出事件,然后将数据发到别处,可以向文件系统、数据库、 hadoop存数据, 也可以是其他agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。

Flume运行机制

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据

值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。

比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,

也就是说,多个agent可以协同工作。

Flume可靠性

Flume 使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份channel文件作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。

Flume的广义用法(多个agent顺序连接)

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的
Agent 的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

使用案例

这里举一些简单的案例

案例一、从控制台打入数据,在控制台显示

1、确定的使用类型分别是,netcat source,memory channel,logger sink。

2、编写conf文件

#声明各个组件
a.soueces=r1
a.channels=c1
a.sinks=k1

# 定义source类型,这里是试用netcat的类型
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.254.100
a.sources.r1.prot=8888
# 定义sourcec发送的下游channel
a.sources.r1.channels=c1

# 定义channel
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000

#定义sink的类型,确定上游channel
a.sinks.k1.type=logger
a.sinks.k1.channel=c1

3、开启服务

命令:注意 -n后面跟着的是你在conf文件中定义好的,-f后面跟着的是编写conf文件的路径

flume-ng agent -n a -c ./conf -f ./netcat2logger.conf -Dflume.root.logger=DEBUG,console

4、在另一个客户端输入命令

注意:这里的master和8888是在conf文件中设置的ip地址和端口

telnet master 8888

案例二、从本地指定路径中打入数据到HDFS

1、确定类型,spooldir source。memory channel,hdfs sink

2、编写conf文件

#创建目标表
create external table flume_tb1
(
    id bigint,
    name string,
    age int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/bigdata30/flumeout2/log_s';
#声明各个组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = .../flumedata
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000

#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/flumeout/log_s
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0

#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、 开启服务

4、将文件复制到指定到的目录下

案例三、从java代码中进行捕获打入到hdfs

1、确定类型,avro source,memory channel,hdfs sink

2、编写java代码

import org.apache.log4j.Logger;

import java.text.SimpleDateFormat;
import java.util.Date;

public class CreateLogger {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个logger对象
        Logger logger = Logger.getLogger(CreateLogger.class.getName());

        // 创建一个日期格式化对象
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 写死循环
        while (true){
            Date date = new Date();
            logger.info("helloworld:"+sdf.format(date));
            //让线程休息一会
            Thread.sleep(1000);
        }
    }
}

3、编写conf文件

#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1

#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.160.100
a.sources.r1.port = 9999

#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 10

#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path =hdfs://master:9000/flume_hdfs_avro2
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 10
a.sinks.k1.hdfs.batchSize = 10
a.sinks.k1.hdfs.rollSize = 0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0 
#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1

5、开启服务

6、运行java代码

案例四、监控Hbase日志到Hbase表中(这里可以换成其他组件日志监控)

1、监控日志

提前建好表

 create 'log','cf1'

编写conf文件

a.sources = r1
a.sinks = k1 
a.channels = c1

#指定spooldir的属性
a.sources.r1.type = exec 
a.sources.r1.command = tail -F .../test.txt

#指定channel
a.channels.c1.type = memory 
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100

#指定sink的类型
a.sinks.k1.type = hbase2
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
a.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer

# 组装
a.sources.r1.channels = c1 
a.sinks.k1.channel = c1

2、监控自定义的文件

确保目标表在hbase中已经存在

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F .../data.txt

# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = test_idoall_org
a1.sinks.k1.columnFamily = name
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume agent

产生数据

echo "hello idoall.org from flume" >> data.txt

案例五、flume监控http source

1、确定类型,http source,memory channel,logger sink

2、编写conf文件

a1.sources=r1
a1.sinks=k1
a1.channels=c1
 
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
 
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
 
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100

3、启动服务

4、开启窗口进行打印数据

curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello world"}]'  http://192.168.160.100:50000

标签:flume,hdfs,sinks,知识,碎片,a1,k1,c1,channels
From: https://www.cnblogs.com/yulugoat/p/18282464

相关文章

  • 初学vue3, 全是黑盒子,vue3知识点汇总
    学习Vue.js应该像学习一门编程语言一样,首先要熟练掌握常用的知识,而对于不常用的内容可以简单了解一下。先对整个框架和语言有一个大致的轮廓,然后再逐步补充细节。千万不要像学习算法那样,一开始就钻牛角尖。前序:vueAPI的风格分为:选项式和组合式,vue2中一般用选项式,所以文章......
  • Plugin开发基本知识点 Plugin Pipeline Pre-Validation, Pre-Operation, Post-Operati
    在MicrosoftDynamics365插件开发中,插件可以注册在不同的事件管道阶段,这些阶段决定了插件的执行时机。常见的三个阶段是预验证(Pre-Validation)、预操作(Pre-Operation)和后操作(Post-Operation)。每个阶段都有其特定的用途和执行顺序。以下是对这三个阶段的详细解释及其用法:1.Pre-Va......
  • Plugin开发基本知识点 IPluginExecutionContext, iOrganization Service
    IPluginExecutionContext`IPluginExecutionContext`接口在MicrosoftDynamics365插件开发中用于获取有关当前插件执行上下文的信息。它提供了丰富的属性和方法,帮助开发者在插件执行时获取与当前操作相关的各种数据和元数据。以下是`IPluginExecutionContext`的一些主要功能和属......
  • .Net知识技能大全
    .Net知识技能大全更多请见https://www.dotnetshare.comC#常见运算符一元运算符(+、-、!、~、++、--)算术运算符(*、/、%、+、–)移位运算符(<<、>>)关系和类型测试运算符(==、!=、<、>、<=、>=、is和as)逻辑运算符(&、^和|)条件逻辑运算符(&&和||)空合并运算符(??)条件运......
  • 释放阅读潜能,让AI成为您的知识加速器
    在当今快节奏的信息化社会中,效率和知识管理成为了个人和企业成功的关键。"包阅AI",由上海知否知否信息科技有限公司倾力打造的智能AI阅读助手,正是一款致力于提升用户阅读效率和信息处理能力的创新工具。"包阅AI"以其超凡的阅读和理解能力,能够即时提炼和总结各类文档,无论是学术......
  • S1200通讯网络知识和应用
    ......
  • AI与音乐:终极对决,机械混音师将扬弃人类知识!
    AI与音乐一.引言1.1AI在音乐创作中的应用1.2AI在音乐表演与演奏中的应用二.AI在音乐创作中的应用2.1AI在音乐创作中的应用技术2.1.1深度学习2.1.2遗传算法2.1.3神经网络2.2不同AI算法在音乐创作中的应用2.2.1使用LSTM神经网络模型生成新的音乐2.2.2使用基于......
  • 线段树的基本知识和初级运用
    前言线段树绝对是出题人最爱考的高级数据结构了。它快、灵活、码量也大,相当考验OIer的综合能力。所以好好学习一下线段树是相当必要的。基础线段树是基于二叉树的。通过为二叉树的每个节点赋予线段的意义,线段树可以维护很多的区间信息,包括但不限于区间和、区间最大值、区间第......
  • 90后懒人带娃的福音,小学生古诗词的神器——ChatMoney全能知识库AI软件
    本文由ChatMoney团队出品因为工作需要,浅尝辄止般的用了一下ChatMoney全能知识库AI软件。一些长期积存的问题迎刃而解,但这只是一款企业级大模型功能的冰山一角。让自己尤为惊奇的是里面涵盖的各类资料系统,居然能助力宝妈宝爸带娃脱困。只要带过娃,懂的都懂。小学生内卷程度不亚......
  • 小孩子的好老师:ChatMoney全能知识库AI软件
    本文由ChatMoney团队出品因为工作需要,浅尝辄止般的用了一下ChatMoney全能知识库AI软件。一些长期积存的问题迎刃而解,但这只是一款企业级大模型功能的冰山一角。让自己尤为惊奇的是里面涵盖的各类资料系统,居然能助力宝妈宝爸带娃脱困。只要带过娃,懂的都懂。小学生内卷程度不亚......