首页 > 其他分享 >kafka

kafka

时间:2023-03-24 11:58:09浏览次数:33  
标签:-- 192.168 server sh kafka properties

kafka使用场景

消息传递Messaging

  • 消息传递就是发送数据,作为 TCP HTTP 或者 RPC 的替代方案,可以实现异步、 解耦、削峰(RabbitMQ 和 RocketMQ 能做的事情,它也能做)。因为 Kafka 的吞吐 量更高,在大规模消息系统中更有优势。

Website Activity Tracking 网站活动跟踪

  • 把用户活动发布到数据管道中,可以用来做监控、实时处理、报表等等,比如社交网站的行为跟踪,购物网站的行为跟踪,这样可以实现更加精准的内容推荐。

Log Aggregation日志聚合

  • 又比如用 Kafka 来实现日志聚合。这样就不用把日志记录到本地磁盘或者数据库, 实现分布式的日志聚合。

Metrics应用指标监控

  • 还可以用来记录运营监控数据。举例,对于贷款公司,需要监控贷款的业务数据: 今天放出去多少笔贷款,放出去的总金额,用户的年龄分布、地区分布、性别分布等等。
  • 或者对于运维数据的监控,CPU、内存、磁盘、网络连接的使用情况,可以实现告警。

数据集成+流计算

  • 数据集成指的是把 Kafka 的数据导入 Hadoop、HBase 等离线数据仓库,实现数 据分析。
  • 第三块是流计算。什么是流(Stream)?它不是静态的数据,而是没有边界的、源源不断的产生的数据,就像水流一样。流计算指的就是 Stream 对做实时的计算。

搭建集群

安装kafka

  • 和单机版安装一样,分别 192.168.8.144、192.168.8.145、192.168.8.146 三台机器安装好Kafka 环境。具体安装过程,参考《Kafka 应用场景及架构设计详解》预习资料 。

修改配置文件

  • 分别修改 192.168.8.144、192.168.8.145、192.168.8.146 这 2 台机器上的配置文件

cd /usr/local/kafka/config

vim server.properties

  • 修改配置文件中的 broker.id 分别为 144、145、146

broker.id=144

  • listeners 分 别 设 置 为 PLAINTEXT://192.168.8.144:9092 、PLAINTEXT://192.168.8.145:9092 、PLAINTEXT://192.168.8.146:9092

listeners=PLAINTEXT://192.168.8.144:9092

  • 三台机器的 zookeeper.connect 都修改为以下内容:

zookeeper.connect=192.168.8.144:2181,192.168.8.145:2181,192.168.8.146:2181

启动3个服务

  • 分别启动 ZK。

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &

  • 再分别启动 kafka。

cd ../bin

./kafka-server-start.sh -daemon ../config/server.properties

集群下创建topic

  • 在 bin 目录下,

创建一个名为 gptest 的 topic,只有一个副本,一个分区:

sh kafka-topics.sh --create --topic gptest --bootstrap-server 192.168.8.144:9092

  • 查看已经创建的 topic:

sh kafka-topics.sh --bootstrap-server 192.168.8.144:9092 --describe --topic gptest

集群下启动Consumer

  • 在一个新的远程窗口中:

cd /usr/local/kafka/bin

sh kafka-console-consumer.sh --bootstrap-server 192.168.64.113:9092,192.168.64.72:9092,192.168.64.119:9092 --topic gptest

集群下启动Producer

  • 打开一个新的窗口,在 kafka 解压目录下:

sh kafka-console-producer.sh --broker-list 192.168.64.113:9092,192.168.64.72:9092,192.168.64.119:9092 --topic gptest

集群下Producer窗口发送消息

在生产者窗口输入hello world 回车

kafka单机伪集群安装

注意: 单机的 kafka 和集群的 kafka 不要混用一个 zk,否则会出现数据混乱的问题。

下载解压缩kafka

cd /usr/local/

wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz

tar -xzvf kafka_2.13-3.2.0.tgz

mv kafka_2.13-3.2.0 kafka cd kafka

修改配置文件

复制 3 个配置文件

cd config

cp server.properties server1.properties

cp server.properties server2.properties

cp server.properties server3.properties

修改配置文件中的 broker.id 分别为 1、2、3 listeners 这一行取消注释,端口号分别为 9093、9094、9095 log.dirs 分别设置为 kafka-logs1、kafka-logs2、kafka-logs3(先创建)

mkdir -p /tmp/kafka-logs1 /tmp/kafka-logs2 /tmp/kafka-logs3

server1.properties 的配置:

broker.id=1

listeners=PLAINTEXT://192.168.8.147:9093

log.dirs=/tmp/kafka-logs1

server2.properties 的配置:

broker.id=2

listeners=PLAINTEXT://192.168.8.147:9093

log.dirs=/tmp/kafka-logs2

server3.properties 的配置:

broker.id=3

listeners=PLAINTEXT://192.168.8.147:9093

log.dirs=/tmp/kafka-logs3

启动3个伪集群服务

第一步: 启动 ZK。 再启动 kafka。

cd ../bin

./kafka-server-start.sh -daemon ../config/server1.properties

./kafka-server-start.sh -daemon ../config/server2.properties

./kafka-server-start.sh -daemon ../config/server3.properties

PS:如果遇到 zk node exists 的问题,先把 brokers 节点删掉(临时解决方案)。

伪集群下创建Topic

在 bin 目录下,

创建一个名为 gptest 的 topic,只有一个副本,一个分区:

sh kafka-topics.sh --create --topic gptest --bootstrap-server 192.168.8.147:9092

查看已经创建的 topic:

sh kafka-topics.sh --bootstrap-server 192.168.8.147:9092 --describe --topic gptest

伪集群下启动Consumer

在一个新的远程窗口中:

kafka-console-consumer.sh --bootstrap-server 192.168.8.147:9093,192.168.8.147:9094,192.168.8.147:9095 --topic gptest --from-beginning

伪集群下启动 Producer

打开一个新的窗口,在 kafka 解压目录下:

kafka-console-producer.sh --broker-list 192.168.8.147:9093,192.168.8.147:9094,192.168.8.147:9095 --topic gptest

伪集群下 Producer

窗口发送消息 在生产者窗口输入 hello world 回车

 

标签:--,192.168,server,sh,kafka,properties
From: https://www.cnblogs.com/kevinzhao321/p/17251035.html

相关文章

  • kafka集群原理及部署
    官方地址https://kafka.apache.org/概述Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使......
  • Apache Kafka JNDI注入(CVE-2023-25194)漏洞复现浅析
    关于ApacheKafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。影响版本2.4.0<=Apachekafka<=3.2.2环境......
  • 初识Kafka
    介绍KafkaKafka是一款基于发布与订阅的消息系统。用生产者客户端API向Kafka生产消息,用消费者客户端API从Kafka读取这些消息。Kafka使用Zookeeper保存元数......
  • kafka 性能优化与常见问题优化处理方案
    本文为博主原创,未经允许不得转载:1.  JVM参数优化设置kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,修改bin/kafka-start-server.sh中的jvm设置exportKAFKA_HEAP......
  • kafka消息堆积,consumer掉线
    注:本文转自:https://www.toutiao.com/article/7160323779812983296/?log_from=5abd712547149_1679497545032线上kafka消息堆积,所有consumer全部掉线,到底怎么回事?最近处理......
  • Linux 部署:kafka(虚拟机集群)
    参考文档:https://blog.csdn.net/wt334502157/article/details/116518259目录1.节点规划2.部署kafka集群3.修改配置4.附录1.节点规划节点ipvm8110.99.0.8......
  • kafka的基本概念
    1BrokerKafka集群包含一个或多个服务器,服务器节点称为broker。如图,我们有2个broker,6个partition,则会均分;如果只有1个partition,那么另一个broker会闲置。理想情况,我们......
  • golang解决kafka消息重复发送和重复消费
    1、解决消息重复发送当使用Kafka生产者发送消息时,可以设置消息的Key,使用Key来保证相同Key的消息不会被重复发送。在发送消息时,可以使用带Key的消息发送方式,如下所示:msg......
  • 解决Kafka总是丢消息的方法和原理
    注:本文转自:https://www.toutiao.com/article/7210953985497678347/?log_from=f0ecce317abb8_1679450040551引入MQ消息中间件最直接的目的:系统解耦以及流量控制(削峰填谷)......
  • python处理kafka数据
    1、程序作用:从多个topic中读取数据--处理数据--写入新的kafkatopic中pip3installkafka-pythonimportjsonfromkafkaimportKafkaProducerfromkafkaimportKafk......