首页 > 系统相关 >如何在CentOS 7上安装Apache Kafka

如何在CentOS 7上安装Apache Kafka

时间:2022-11-17 15:24:04浏览次数:64  
标签:zookeeper CentOS sudo kafka 复制 Apache 服务器 Kafka

介绍

Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。Kafka集群不仅具有高度可扩展性和容错性,而且与其他消息代理(如ActiveMQ和RabbitMQ相比,它还具有更高的吞吐量。虽然它通常用作发布/订阅消息传递系统,但许多组织也将其用于日志聚合,因为它为已发布的消息提供持久存储。

发布/订阅消息传递系统允许一个或多个生成器发布消息,而不考虑comsumer的数量或他们将如何处理消息。将自动通知已订阅的客户端有关更新和新消息的创建。与客户端定期轮询以确定新消息是否可用的系统相比,此系统更高效且可扩展。

在本教程中,您将在CentOS 7上安装和使用Apache Kafka 1.1.0。

准备

要继续,您将需要:

  • 一个CentOS 7服务器和一个具有sudo权限的非root用户。没有服务器的同学可以在这里购买,不过我个人更推荐您使用免费的腾讯云开发者实验室进行试验,学会安装后再购买服务器
  • 服务器上至少有4GB的RAM。没有这么多RAM的安装可能导致Kafka服务失败,Java虚拟机(JVM)在启动期间抛出“Out Of Memory”异常。
  • 在您的服务器上安装OpenJDK 8。Kafka是用Java编写的,所以它需要一个JVM; 但是,它的启动shell脚本有一个版本检测错误,导致它无法启动8以上的JVM版本。

第1步 - 为Kafka创建用户

由于Kafka可以通过网络处理请求,因此您应该为其创建专用用户。如果Kafka服务器受到损害,这可以最大限度地减少对CentOS机器的损害。我们将在此步骤中创建一个专用的kafka用户,但是您应该创建一个不同的非root用户,以便在完成Kafka设置后在此服务器上执行其他任务。

以具有sudo权限的非root 用户身份登录,使用useradd命令创建名为kafka的用户:

sudo useradd kafka -m

-m标志确保将为用户创建主目录。此主目录/home/kafka将充当我们的工作区目录,用于执行以下部分中的命令。

使用passwd以设置密码:

sudo passwd kafka

使用adduser命令将kafka用户添加到wheel组中,以便它具有安装Kafka依赖项所需的权限:

sudo usermod -aG wheel kafka

您的kafka用户现已准备就绪。使用su方式登录此帐户:

su -l kafka

现在我们已经创建了特定于Kafka的用户,我们可以继续下载和解压缩Kafka二进制文件。

第2步 - 下载和提取Kafka二进制文件

让我们将kafka二进制文件下载并解压缩到我们kafka用户主目录中的专用文件夹中。

首先,在/home/kafka中创建一个目录Downloads以存储您的下载:

mkdir ~/Downloads

使用curl下载 Kafka 的二进制文件:

curl "http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz" -o ~/Downloads/kafka.tgz

创建一个名为kafka的目录并更改为此目录。这将是Kafka安装的基本目录:

mkdir ~/kafka && cd ~/kafka

使用tar命令提取下载的存档:

tar -xvzf ~/Downloads/kafka.tgz --strip 1

我们指定--strip 1标志以确保存档的内容~/kafka/本身被提取而不是在其内部的另一个目录(例如~/kafka/kafka_2.12-1.1.0/)中提取。

现在我们已经成功下载并解压缩了二进制文件,我们可以继续配置Kafka以允许删除主题。

第3步 - 配置Kafka服务器

Kafka的默认行为将不允许我们删除可以发布消息的主题,类别,组或订阅源名称。要修改它,让我们编辑配置文件。

Kafka的配置选项在server.properties中指定。用vi或你喜欢的编辑器打开这个文件:

vi ~/kafka/config/server.properties

让我们添加一个允许我们删除Kafka主题的设置。按下i以插入文本,并将以下内容添加到文件的底部:

delete.topic.enable = true

完成后,按ESC退出插入模式并按:wq将更改写入文件并退出。现在我们已经配置了Kafka,我们可以继续创建systemd单元文件,以便在启动时运行并启用它。

第4步 - 创建系统单元文件并启动Kafka服务器

在本节中,我们将为Kafka服务创建systemd单元文件。这将帮助我们执行常见的服务操作,例如以与其他Linux服务一致的方式启动,停止和重新启动Kafka。

Zookeeper是Kafka用于管理其集群状态和配置的服务。它通常在许多分布式系统中用作不可或缺的组件。如果您想了解更多信息,请访问官方Zookeeper文档

zookeeper创建单位文件:

sudo vi /etc/systemd/system/zookeeper.service

在文件中输入以下单位定义:

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
​
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
​
[Install]
WantedBy=multi-user.target

[Unit]部分指定Zookeeper在启动之前需要网络并且文件系统准备就绪。

[Service]部分指定systemd应使用zookeeper-server-start.shzookeeper-server-stop.shshell文件来启动和停止服务。它还指定Zookeeper如果异常退出则应自动重启。

接下来,为kafka创建systemd服务文件:

sudo vi /etc/systemd/system/kafka.service

在文件中输入以下单位定义:

[Unit]
Requires=zookeeper.service
After=zookeeper.service
​
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
​
[Install]
WantedBy=multi-user.target

[Unit]部分指定此单元文件依赖于zookeeper.service。这将确保zookeeperkafa服务启动时自动启动。

[Service]部分指定systemd应使用kafka-server-start.shkafka-server-stop.shshell文件来启动和停止服务。它还指定如果Kafka异常退出则应自动重启。

现在已经定义了单元,使用以下命令启动Kafka:

sudo systemctl start kafka

要确保服务器已成功启动,请检查kafka单位的日志:

journalctl -u kafka

您应该看到类似于以下内容的输出:

Jul 17 18:38:59 kafka-centos systemd[1]: Started kafka.service.

您现在有一个Kafka服务器侦听端口9092

虽然我们已启动该kafka服务,但如果我们要重新启动服务器,它将不会自动启动。要kafka在服务器启动时启用,请运行:

sudo systemctl enable kafka

现在我们已经启动并启用了服务,让我们检查安装。

第5步 - 测试安装

让我们发布并使用“Hello World”消息以确保Kafka服务器正常运行。在Kafka中发布消息需要:

  • 一个producer,它能够将记录和数据发布到主题中。
  • 一个consumer,它读取主题消息和数据。

首先,通过键入以下命名创建一个主题TutorialTopic

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

您可以使用kafka-console-producer.sh脚本从命令行创建生成器。它期望Kafka服务器的主机名,端口和主题名称作为参数。

键入以下内容将字符串"Hello, World"发布到TutorialTopic主题:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

接下来,您可以使用kafka-console-consumer.sh脚本创建Kafka使用者。它期望ZooKeeper服务器的主机名和端口,以及主题名称作为参数。

以下命令使用来自TutorialTopic的消息。请注意--from-beginning标志的使用,该标志允许消费在comsumer启动之前发布的消息:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

如果没有配置问题,您应该在终端中看到Hello, World

Hello, World

该脚本将继续运行,等待更多消息发布到该主题。随意打开一个新的终端并启动一个生产者发布更多的消息。您应该能够在comsumer的输出中看到它们。

完成测试后,按CTRL+C以停止使用者脚本。现在我们已经测试了安装,让我们继续安装KafkaT。

第6步 - 安装KafkaT(可选)

KafkaT是Airbnb的一款工具,可让您更轻松地查看有关Kafka群集的详细信息,并从命令行执行某些管理任务。因为它是一个Ruby gem,所以你需要Ruby才能使用它。您还需要ruby-devel和构建相关的软件包(例如makegcc)来构建其依赖的其他gem。使用yum安装它们:

sudo yum install ruby ruby-devel make gcc patch

您现在可以使用gem命令安装KafkaT:

sudo gem install kafkat

KafkaT使用.kafkatcfg作为配置文件来确定Kafka服务器的安装和日志目录。它还应该有一个条目将KafkaT指向您的ZooKeeper实例。

创建一个名为.kafkatcfg的新文件:

vi ~/.kafkatcfg

添加以下行以指定有关Kafka服务器和Zookeeper实例的必需信息:

{
  "kafka_path": "~/kafka",
  "log_path": "/tmp/kafka-logs",
  "zk_path": "localhost:2181"
}

您现在可以使用KafkaT了。首先,您可以使用它来查看有关所有Kafka分区的详细信息:

kafkat partitions

您将看到以下输出:

Topic                 Partition   Leader      Replicas        ISRs    
TutorialTopic         0             0         [0]             [0]
__consumer_offsets    0             0         [0]                           [0]
...
...

你会看到TutorialTopic__consumer_offsets,它们是用于存储客户端相关信息所使用Kafka的内部话题。您可以放心地忽略以__consumer_offsets开头的行。

步骤7 - 设置多节点群集(可选)

如果要使用更多CentOS 7计算机创建多代理群集,则应在每台新计算机上重复步骤1,步骤4和步骤5。此外,您应该在server.properties文件中作以下更改:

  • 应更改broker.id属性的值,使其在整个群集中是唯一的。此属性唯一标识集群中的每个服务器,并且可以将任何字符串作为其值。例如,"server1""server2"等。
  • 应更改zookeeper.connect属性的值,以便所有节点都指向同一个ZooKeeper实例。此属性指定Zookeeper实例的地址并遵循<HOSTNAME/IP_ADDRESS>:<PORT>格式。例如"203.0.113.0:2181""203.0.113.1:2181"等等。

如果要为群集设置多个ZooKeeper实例,则每个节点上的zookeeper.connect属性值应该是一个相同的逗号分隔字符串,其中列出了所有ZooKeeper实例的IP地址和端口号。

第8步 - 限制Kafka用户

现在所有安装都已完成,您可以删除kafka用户的管理员权限。在执行此操作之前,请注销并以任何其他非root sudo用户身份重新登录。如果您仍在运行相同的shell会话,那么只需键入exit即可启动本教程。

从sudo组中删除kafka用户:

sudo gpasswd -d kafka wheel

要进一步提高Kafka服务器的安全性,请使用该命令锁定kafka用户的密码passwd。这可以确保没有人可以使用此帐户直接登录服务器:

sudo passwd kafka -l

此时,只有root或sudo用户可以通过键入kafka命令登录:

sudo su - kafka

将来,如果要解锁,请使用passwd-u选项:

sudo passwd kafka -u

您现在已成功限制kafka用户的管理员权限。

结论

您现在可以在CentOS服务器上安全地运行Apache Kafka。您可以使用Kafka客户端(可用于大多数编程语言)创建Kafka生产者和使用者,从而在项目中使用它。

想要了解更多关于安装Apache Kafka的相关教程,请前往腾讯云+社区学习更多知识。

标签:zookeeper,CentOS,sudo,kafka,复制,Apache,服务器,Kafka
From: https://www.cnblogs.com/gaoyanbing/p/16899578.html

相关文章

  • Kafka启动报错:/bin/kafka-run-class.sh: line 258: exec: java: not found
    Kafka启动报错处理:/opt/module/kafka/bin/kafka-run-class.sh:第258行:exec:java:未找到今天在安装kafka后启动的时候出现了报错:/software/kafka_2.11-0.11.0.0/bin......
  • LINUX CENTOS7 部署步骤 EMQX
    0. MQTT服务器开源https://www.emqx.io/zh1.官方安装教程https://www.emqx.io/zh/downloads?os=CentOS2.搬运curl-shttps://assets.emqx.com/scripts/install-e......
  • centos7 php 服务重启
    apache服务启动systemctlstarthttpd停止systemctlstophttpd重启systemctlrestarthttpdmysql服务启动systemctlstartmysqld停止systemctlstopmysq......
  • [SpringBoot-Dubbo] 报错:NoClassDefFoundError: org/apache/curator/framework/recipe
    NoClassDefFoundError:org/apache/curator/framework/recipes/cache/NodeCacheListener缺少curator依赖<dependency><groupId>org.apache.curator</groupId><ar......
  • intellij springmvc项目报错:[RMI TCP Connection(3)-127.0.0.1] org.apache.jasper.se
    idea部署项目报错信息:[RMITCPConnection(3)-127.0.0.1]org.apache.jasper.servlet.TldScanner.scanJarsAtleastoneJARwasscannedforTLDsyetcontainednoTLDs.......
  • 14_Kafka高级_生产者ISR
    生产者数据的可靠性保证。刚才已经讲完了,数据发到哪个分区,但是,数据生产者已经发过去了,但是kafka有没有接到,那就是另一个问题。现在就准备讲,生产者数据的可靠性保证怎么实现......
  • 12_Kafka高级_文件存储
    实际上能看到的就是topicName+partitionID这个文件。这个文件下有两个东西很重要。.log结尾的文件是真正存储数据的。.index是存放索引的。先看看.log文件:默认会存储7......
  • 11_Kafka高级_工作流程
    kafka的工作流程和文件存储机制:当生产者往一个不存在的主题发送数据的时候,也可以发送。自己会帮你创建主题,一个分区,一个副本。是server.properties文件里面配置的。这是默......
  • 13_Kafka高级_生产者分区策略
    刚才主要讲的是存储的内容,主要的index和.log两个文件。kafka的生产者:有个分区策略:分区的原因:1、可以以partition为单位进行读写2、提高集群的负载能力。生产者分区的原......
  • 09_Kafka入门_数据日志分离
    关闭kafka但是会有延时,关闭的比较慢。之后再看一下就没了,需要等待一下。我们删除每台机器的logs文件夹:我们希望把数据和logs分开来放。这时候,就相当于kafka新装的一样......