首页 > 其他分享 >Flume 整合 Kafka_flume 到kafka 配置【转】

Flume 整合 Kafka_flume 到kafka 配置【转】

时间:2024-04-15 13:55:23浏览次数:22  
标签:Flume flume -- Kafka a1 kafka

1.背景
先说一下,为什么要使用 Flume + Kafka?

以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kafka 就可以起到削峰的作用。Kafka 天生为大数据场景而设计,具有高吞吐的特性,能很好地抗住峰值数据的冲击。

 

 

2.整合流程
Flume 发送数据到 Kafka 上主要是通过 `KafkaSink` 来实现的,主要步骤如下:

1. 启动Zookeeper和Kafka

这里启动一个单节点的 Kafka 作为测试:

# 启动Zookeeper
zkServer.sh start

# 启动kafka
bin/kafka-server-start.sh config/server.properties
2. 创建主题

创建一个主题 `flume-kafka`,之后 Flume 收集到的数据都会发到这个主题上:

# 创建主题
bin/kafka-topics.sh --create \
--zookeeper hadoop001:2181 \
--replication-factor 1 \
--partitions 1 --topic flume-kafka

# 查看创建的主题
bin/kafka-topics.sh --zookeeper hadoop001:2181 --list
3. 启动kafka消费者

启动一个消费者,监听我们刚才创建的 `flume-kafka` 主题:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka
4. 配置Flume

新建配置文件 `exec-memory-kafka.properties`,文件内容如下。这里我们监听一个名为 `kafka.log` 的文件,当文件内容有变化时,将新增加的内容发送到 Kafka 的 `flume-kafka` 主题上。

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /tmp/kafka.log
a1.sources.s1.channels=c1

#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka地址
a1.sinks.k1.brokerList=hadoop001:9092
#设置发送到Kafka上的主题
a1.sinks.k1.topic=flume-kafka
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
5. 启动Flume

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \
--name a1 -Dflume.root.logger=INFO,console
6. 测试

向监听的 `/tmp/kafka.log ` 文件中追加内容,查看 Kafka 消费者的输出:

 

可以看到 `flume-kafka` 主题的消费端已经收到了对应的消息:

 

转自

Flume 整合 Kafka_flume 到kafka 配置-CSDN博客
https://blog.csdn.net/shangjg03/article/details/133870099

 

标签:Flume,flume,--,Kafka,a1,kafka
From: https://www.cnblogs.com/paul8339/p/18135802

相关文章

  • Kafka消息可视化工具:Offset Explorer(原名kafka Tool)的使用方法【转】
    OffsetExplorer(以前称为KafkaTool)是一个用于管理和使用ApacheKafka®集群的GUI应用程序。它提供了一个直观的界面,允许用户快速查看Kafka集群中的对象以及集群主题中存储的消息。它包含面向开发人员和管理员的功能。一些主要功能包括:快速查看所有Kafka集群,包括其代理,主题和......
  • kafka
    高性能之道Kafka的特性之一就是高吞吐率,但是Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,但是Kafka即使是普通的服务器,Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。Kaf......
  • Kafka如何保证消息的顺序性
    Kafka发布模式通过一系列机制来确保消息的顺序性,特别是在分区内部。以下是关键要点:1.分区机制:Kafka的核心机制之一是分区(Partition)。每个主题(Topic)可以被分割成多个分区,而消息在发布时会被追加到特定的分区中。在每个分区内部,消息是按照它们被追加的顺序来存储的,因此保证了分区......
  • 【ALL】Kafka从抬脚到入门
    一、Kafka简介1.1、定义旧定义Kafka是一个分布式的基于发布/订阅模式的消息队列。新定义Kafka是一个开源的分布式事件流平台,用于数据管道、流分析、数据集成和关键任务的应用。1.2、使用场景主要用于大数据实时处理领域。缓冲:有助于控制和优化数据流经过系统的速度......
  • Kafka做消息队列的原理
    Kafka作为消息队列的实现原理主要基于其分布式架构和日志式存储机制。以下是Kafka作为消息队列工作的核心原理:1.分布式架构与分区:Kafka采用分布式架构,将数据分布存储在多个节点(称为Broker)上,以实现数据的水平扩展和并行处理。Kafka中的消息流被组织成主题(Topic),每个主题可以包......
  • docker-compose部署kafka
    docker-compose.ymlversion:'2'services:zookeeper:image:develop-harbor.geostar.com.cn/3rd/zookeeper:3.5.5ports:-"2181:2181"kafka:image:develop-harbor.geostar.com.cn/3rd/wurstmeister/kafka:2.12-2.2.1......
  • kafka集群
    对于运维需要掌握的kafka基础操作,读写管理掌握后,下一步就是集群部署搭建了。1.kafka天然支持集群2.kafka将集群状态写入zookeeper。集群部署1.确保zk启动[devops03root/opt/kafka_2.11-2.4.0]#netstat-tunlp|grep2181tcp600:::2181......
  • kafka基础
    1.流处理平台2.消息队列企业要求掌握kafka,工作里使用1.api原理2.项目实战配置3.kafka面试题学习目标1.从零熟练的掌握kafka2.学习核心API以及底层原理3.结合微信小程序,微服务完成kafka实战特性本课适合需要掌握kafka消息传递系统,以及维护大数据架构的专业......
  • Kafka 进阶
    1、为什么有消息系统解耦合异步处理例如电商平台,秒杀活动。一般流程会分为:1:风险控制、2:库存锁定、3:生成订单、4:短信通知、5:更新数据通过消息系统将秒杀活动业务拆分开,将不急需处理的业务放在后面慢慢处理;流程改为:1:风险控制、2:库存锁定、3:消息系统、4:生成订单、5:短信通知......
  • linux环境安装——kafka安装复习
    需要安装jdk、zk;然后才是kafka  kafka版本:kafka_2.13-3.2.3.tgz                             [root@iZf8zi6zcbssmm6c2nrhapZ/]#ls-alttotal84drwxrwxrwt.9rootroot4096Apr914:42tmpdrw......