首页 > 其他分享 >简单理解Flume之Channel和Sink

简单理解Flume之Channel和Sink

时间:2024-05-28 15:02:24浏览次数:14  
标签:Flume sinks s1 a1 sources k1 Sink Channel

Channel

Memory Channel

1,Memory Channel将数据临时存储的到内存队列

2,属性

属性

默认值解释
capacity100

队列容量,默认情况队列中最多临时存储100条数据,实际过程这个值一般被调节成30W~50W

transacCapacity100Put List向Channel发送的数据条数,实际中一般会调节成3000~5000

File Channel

1,File Channel将数据临时存储在磁盘文件,因此File Channel读写效率较低,但是可靠

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 6666

# 配置File Channel
a1.channels.c1.type = file
# 数据临时存储路径
a1.channels.c1.dataDirs = /opt/flume_data/data
# 检查点临时存储路径
a1.channels.c1.checkpointDir = /opt/flume_data/checkpoint

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

Sink

Logger Sink

1,Logger Sink:将Flume收集到的数据打印到控制台(console)或者文件(logfile)中

2,打印的时候为了防止过多数据将屏幕沾满,所以限制body部分只打印16个字节的数据,多出来的部分被舍弃

3,实际开发中,较少使用这个Sink

HDFS Sink

1,将数据收集到HDFS上

  

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 6666

a1.channels.c1.type = memory

# 配置HDFS Sink
a1.sinks.k1.type = hdfs
# 文件/数据在HDFS上的存储路径
a1.sinks.k1.hdfs.path = hdfs://hadoop01:8020/flume
# 文件滚动间隔时间
a1.sinks.k1.hdfs.rollInterval = 3600
# 文件滚动大小
a1.sinks.k1.hdfs.rollSize = 134217728
# 文件滚动条数
a1.sinks.k1.hdfs.rollCount = 1000000
# 文件类型
a1.sinks.k1.hdfs.fileType = DataStream

# 将Source和Channel绑定
a1.sources.s1.channels = c1
# 将Sink和Channel绑定
a1.sinks.k1.channel = c1

File Roll Sink

1,将Flume收集到的数据存储到本地磁盘

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 6666

a1.channels.c1.type = memory

# 配置File Roll Sink
a1.sinks.k1.type = file_roll
# 文件/数据在磁盘上的存储位置
a1.sinks.k1.sink.directory = /opt/flume_data
# 文件滚动间隔时间
a1.sinks.k1.sink.rollInterval = 3600

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

Custom Sink

1,自定义Sink:当Flume提供的Sink不能满足业务需求的时候,此时就需要自定义Sink

2,自定义Sink和自定义Source的过程类似,需要定义一个类继承AbstractSink,实现SinkConfigurable接口

3,自定义Sink的时候需要注意事务问题

其他组件

Channel Selector

1,Channel Selector是Source的子组件之一,决定了数据的发送方式,即Source要把数据发送给哪个Channel

2,ChannelSelector提供了三种模式

  1. replicating:复制模式。当节点收到数据之后,会将这个数据复制之后分发给每一个节点,此时接收节点收到的数据是相同的

  2. multiplexing:路由模式。当节点收到数据之后,会根据这个数据的headers中的指定字段的值,来决定将数据发送给哪一个节点,此时接收节点的数据是不相同的

  3. load balancing:负载均衡模式。当节点收到数据之后,会及那个数据分发到某一个节点上,尽量保证接收节点之间的数据量是大致相等的。目前有两种负载均衡模式:round_robin(轮询),random(随机)。到目前为止,Flume提供的load balancing不好用

3,在扇出结构中,如果不指定,默认使用是replicating模式

4,multiplexing模式

a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.s1.type = http
a1.sources.s1.port = 8090
# 配置Selector
a1.sources.s1.selector.type = multiplexing
# 要监听的字段名
a1.sources.s1.selector.header = kind
# 字段值,根据字段值不同发送到不同的节点上
a1.sources.s1.selector.mapping.music = c1
a1.sources.s1.selector.mapping.video = c2
a1.sources.s1.selector.default = c2

a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 8090

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 8090

a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Sink Processor

1,Sink Processor本质上是一个Sink Group(Sink 组),会将一个或多个Sink绑定到一个组,看作是一个整体

2,Sink Processor提供了三种模式

  1. default:默认模式,默认一个Sink就是一个组,有几个Sink就是几个Sink组

  2. failover:崩溃恢复模式。要求将多个Sink绑定到一个组中,给这个组中的每一个Sink需要指定一个优先级,当高优先级的Sink宕机,才会将数据发送给低优先级的Sink

  3. load_balancing:负载均衡模式。要求将多个Sink绑定到一个组中,实现数据的均衡,提供了两种方式:round_robinrandom。目前Flume提供的原生的load_balancing模式不好用!!!

3,failover模式

a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2

# 给SinkGroup起名
a1.sinkgroups = g1
# 给这个SinkGroup绑定Sink
a1.sinkgroups.g1.sinks = k1 k2
# 指定SinkGroup的类型
a1.sinkgroups.g1.processor.type = failover
# 给每一个Sink指定优先级
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 2
# 指定等待时间
a1.sinkgroups.g1.processor.maxpenalty = 100000

a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090

a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 8090

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03
a1.sinks.k2.port = 8090

a1.sources.s1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Interceptors

1,Interceptors本身已是Source的组件之一,可以对数据进行修改,并且每一个Source上可以添加多个Interceptor,构成拦截器链

2,Timestamp Interceptor:时间戳拦截器,在Event的headers中添加一个timestamp字段,用于标记数据被收集的时间

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = netcat
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 6666
# 给Interceptor起名
a1.sources.s1.interceptors = i1
# 配置Timestamp Interceptor
a1.sources.s1.interceptors.i1.type = timestamp

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

标签:Flume,sinks,s1,a1,sources,k1,Sink,Channel
From: https://blog.csdn.net/m0_63130425/article/details/139261315

相关文章

  • 简单理解Flume之Source
    SourceAVROSource1,AVROSource监听指定端口,接收被AVRO序列化之后的数据2,结合AVROSink可以实现多级扇入扇出流动a1.sources=s1a1.channels=c1a1.sinks=k1#配置AVROSourcea1.sources.s1.type=avro#要监听的主机名或者IP地址a1.sources.s1.bind=hadoop0......
  • 如何使用 Channel 类来创建一个生产者-消费者模型
    如何使用Channel类来创建一个生产者-消费者模型.NET中Channel类简单使用 Channel是干什么的TheSystem.Threading.Channelsnamespaceprovidesasetofsynchronizationdatastructuresforpassingdatabetweenproducersandconsumersasynchronously.Theli......
  • Netty ChannelHandler的生命周期
    ChannelHandler方法的执行是有顺序的,而这个执行顺序可以被称为ChannelHandler的生命周期。 LifeCyCleTestHandlerimportio.netty.channel.ChannelInboundHandlerAdapter;importio.netty.channel.ChannelHandlerContext;publicclassLifeCyCleTestHandlerextendsChan......
  • .NET 中 Channel 类(内存级消息队列)简单使用
    Channel是干什么的#TheSystem.Threading.Channelsnamespaceprovidesasetofsynchronizationdatastructuresforpassingdatabetweenproducersandconsumersasynchronously.Thelibrarytargets.NETStandardandworksonall.NETimplementations.Channelsa......
  • 第一层flume采集脚本
    #!/bin/bash#1、判断参数是否传入if[$#-lt1]then echo"必须输入参数...." exitfi#2、根据参数匹配执行case$1in"start") forhostinhadoop102hadoop103 do echo"===========启动$host第一层flume采集==============" ssh$host"nohup/op......
  • 【Halcon】示例程序学习——append_channel / tile_channels
    Name:1、append_channel——将其他矩阵(通达)附加到图像2、tile_channels——多张图像平铺成一个大图像signature:1、append_channel(MultiChannelImage,Image:ImageExtended::)2、tile_channels(Image:TiledImage:NumColumns,TileOrder:)Description:1、运算符ap......
  • go channel ->同步
    通道并非用来取代锁,各有不同使用场景。通道解决高级别逻辑层次并发架构,锁则用来保护低级别局部代码安全。●竟态条件:多线程同时读写共享资源(竟态资源)。●临界区:读写竟态资源的代码片段。●互斥锁:同一时刻,只有一个线程能进入临界区。●读写锁:写独占(其他读写均被阻塞),读共享。●信号......
  • golang channel 封装
    对于closed或nil通道,规则如下:无论收发,nil通道都会阻塞。不能关闭nil通道。重复关闭通道,引发panic!向已关闭通道发送数据,引发panic!从已关闭通道接收数据,返回缓冲数据或零值。nil通道是指没有make的变量。鉴于通道关闭后,所有基于此的阻塞都被解除,可用作通知。没......
  • Go语言系列——Go协程、信道(channel)、缓冲信道和工作池、Select、Mutex、结构体取代类
    文章目录21-Go协程Go协程是什么?Go协程相比于线程的优势如何启动一个Go协程?启动多个Go协程22-信道(channel)什么是信道?信道的声明通过信道进行发送和接收发送与接收默认是阻塞的信道的代码示例信道的另一个示例死锁单向信道关闭信道和使用forrange遍历信道23-缓冲信......
  • RocketMQLog:WARN No appenders could be found for logger (io.netty.channel.nio.Ni
    springBoot集成rocketMq启动的时候报RocketMQLog:WARNNoappenderscouldbefoundforlogger(io.netty.channel.nio.NioEventLoop). RocketMQLog:WARNPleaseinitializetheloggersystemproperly. 原因是pom中的rocket的依赖版本太高了。<dependency><groupI......