最近在写一个数据量比较大的项目时候,需要使用flume将kafka中的数据传输到HDFS上进行存储,方便后续的数仓搭建,但是flume在传输数据中却报错如下日志
org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:129)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:748)
其实关于这个问题,大家需要先知道flume的传输原理,flume是有三大核心组件的,分别是:
(一)Source(数据源)
- 作用:Source是Flume的输入端,负责从数据源收集数据
- 类型:Flume提供了多种内置的Source,如Avro Source、NetCat Source、Spooling Directory Source等,也支持自定义Source。
- 工作原理:Source将收集到的数据传递给Channel,供后续处理。
(二)Channel(通道)
作用:Channel是Source和Sink之间的缓冲区,用于存储Source收集到的数据,并平衡Source收集和Sink读取数据的速度。
类型:Flume提供了多种内置的Channel实现,如Memory Channel、File Channel、Kafka Channel等。
- Memory Channel:读写速度快,但存储数据量小,适用于内存资源充足且不关心数据丢失的场景。
- File Channel:存储容量大,无数据丢失风险,适用于对数据可靠性要求较高的场景。
工作原理:Channel线程安全且具有事务性,支持source写失败重复写和sink读失败重复读等操作。它存储的是Source收集并且没有被Sink读取的Event,这些Event可以被视为Flume内部的消息队列。
(三)Sink(数据汇)
作用:Sink是Flume的输出端,负责将数据从Channel中取出,并传递给目标系统或存储。
类型:Flume提供了多种内置的Sink,如HDFS Sink、HBase Sink、Kafka Sink等,也支持自定义Sink。不同的Sink可以将数据发送到不同的目的地,例如File Sink将数据存储到文件中,HDFS Sink将数据存储到Hadoop分布式文件系统中,Kafka Sink将数据发送到Kafka消息队列中等。
工作原理:Sink从Channel中获取数据,并将数据传递到目标系统或存储中,完成数据的持久化存储或传输到其他系统中以便进一步处理和分析。
Flume中的Source、Channel和Sink三个组件相互协作,共同完成了数据的采集、传输和存储任务。它们各自承担着不同的作用,共同构成了Flume这一强大而灵活的海量日志采集、聚合和传输系统。
简而言之,这个报错的原因就是source在拿到数据,往channel里面写的时候,没地方了,中间的通道,比作池子,流入太快,流出慢,然后,慢慢的,池子就满了,再流就溢出了。
既然知道原因了,那么就要去解决这些问题
1、流入和流出的时间差
2、增大池子的容量,延缓池子流满的时间
先检查一下flume的配置,是不是source的batchSize比sink的batchSize大,改改让流入慢点,给流出争取时间。
如果这样还不行,那么我建议大家不要再使用Memory Channel管道了,因为这个本事就不适用于数据量很大的场景,更换为File Channel管道,我这里换成Flie之后,数据就成功进入HDFS了
标签:flume,Flume,java,Source,传输数据,报错,Sink,Channel From: https://blog.csdn.net/xq_123dd/article/details/143068167