下载zookeeper
Kafka 依赖 Zookeeper 进行分布式协调,所以需要下载Zookeeper ,当然你也可以使用kafka包里自带的一个默认配置的 Zookeeper。这里我们单独下载一个
- 访问Zookeeper官方下载页面
- 在页面中找到最新的稳定版本,点击相应的下载链接,下载 Zookeeper 的压缩包文件
- 解压 Zookeeper,将下载的压缩包文件解压到一个目录,例如 D:\zookeeper。解压后,目录结构类似于:
D:\zookeeper
├── bin
├── conf
├── lib
├── logs
└── ...
- 配置 Zookeeper,打开D:\zookeeper\conf 目录,将 zoo_sample.cfg 文件复制并重命名为 zoo.cfg。使用文本编辑器(如 Notepad++)打开 zoo.cfg 文件,并检查以下配置:
tickTime=2000
dataDir=D:/zookeeper/data
clientPort=2181
- tickTime 是 Zookeeper 服务器和客户端之间的心跳时间(以毫秒为单位)。
- dataDir 是 Zookeeper 存储数据的目录,确保路径是有效的并且存在。
- clientPort 是 Zookeeper 服务的端口,默认是 2181。
确保 D:\zookeeper\data 目录存在,如果不存在,请手动创建。
- 启动 Zookeeper,打开命令提示符(win+R输入cmd),切换到D盘,进入zookeeper\bin,执行zkServer.cmd
- 在bin目录下,执行zkServer.cmd status可以看到 Zookeeper 的运行状态,
执行zkCli.cmd -server localhost:2181可以连接到 Zookeeper 客户端
下载kafka
- 访问Kafka 官方下载页面下载最新稳定版本的 Kafka 二进制文件
- 解压 Kafka
将下载的 Kafka 压缩包解压到一个目录,例如 D:\kafka。 - 启动 Kafka 服务器,打开一个新的命令提示符窗口,进入 Kafka 的 bin\windows 目录:
cd D:\kafka\bin\windows
使用以下命令启动 Kafka 服务器:
kafka-server-start.bat ..\..\config\server.properties
kafka的启动需要加载config\server.properties的配置。
窗口不要关闭,至此,你已启动了zookeeper和kafka。当然你也可以用kafka自带的zookeeper。自带的启动方式为:
打开命令提示符,进入 Kafka 的 bin\windows 目录:
cd D:\kafka\bin\windows
zookeeper-server-start.bat ..\..\config\zookeeper.properties
注意,如果你已经启动了单独下载安装的zookeeper就不要再启动kafka自带的zookeeper了,否则可能出现端口被占用的情况,如果出现端口被占用,请杀死对应的进程。(如果失败请检查2181端口是否被占用,netstat -ano | findstr :2181,如果有被占用,结束进程taskkill /F /PID 1234。。这里的1234pid要写实际的pid)
安装PHP 的 Kafka 扩展
为了让 PHP 能够与 Kafka 交互,你需要安装 rdkafka 扩展,并且可以选择合适的 Kafka PHP 客户端库,如 longlang/phpkafka。
- 安装 rdkafka 扩展
访问 PECL: rdkafka 并下载适用于你 PHP 版本的 php_rdkafka.dll 文件。
点击查看自己对应的版本,下载对应的包。确定php的版本号,操作系统位数,nts或者ts。如果不清楚,可以使用phpinfo来查看自己的信息。或者直接使用php -v
- 解压文件,主要用到2个dll文件,将librdkafka.dll放在php的安装目录下,比如D:\phpstudy_pro\Extensions\php\php7.4.3nts,将php_rdkafka.dll放在ext中,比如
D:\phpstudy_pro\Extensions\php\php7.4.3nts\ext - 编辑 php.ini 文件,开启rdkafka扩展
extension=php_rdkafka.dll
或者
extension=rdkafka
保存文件,重启php
安装 Kafka PHP 客户端库
进入你的php项目,使用composer安装longlang/phpkafka,或者nmred/kafka-php
longlang
composer require longlang/phpkafka
nmred
composer require nmred/kafka-php
如果你的composer报错,比如laravel框架中,laravel/horizon的错误,是缺少PCNTL扩展,这个扩展windows下不支持,可以忽略平台使用
composer require longlang/phpkafka --ignore-platform-reqs
nmred
composer require nmred/kafka-php --ignore-platform-reqs
以下使用nmred 举例
配置和使用 Kafka
创建kafka的配置文件
laravel 中在config新增一个kafka.php
<?php
return [
'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),
'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'laravel-consumer-group'),
'topics' => [
'test-topic' => [
'partition' => 0,
'replica' => 1,
],
],
];
创建 Kafka 生产者
创建一个生成类
<?php
namespace App\Http\Controllers;
use RdKafka\Producer;
class KafkaProducerController extends Controller
{
public function produce()
{
$conf = new \RdKafka\Conf();
$producer = new Producer($conf);
$producer->addBrokers(config('kafka.brokers'));
$topic = $producer->newTopic('test-topic');
$topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Hello Kafka');
$producer->flush(10000);
return 'Message produced successfully';
}
}
配置好对应的api路由后,访问一次就是生产一条消息
创建kafka消费者
这里使用的是laravel框架,需要在app/console/commands下创建一个定时任务,
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use RdKafka\Consumer;
use RdKafka\TopicConf;
class KafkaConsumer extends Command
{
protected $signature = 'kafka:consume';
protected $description = 'Consume messages from Kafka';
public function handle()
{
$conf = new \RdKafka\Conf();
$conf->set('group.id', config('kafka.consumer_group_id'));
// 开启自动提交偏移量
$conf->set('enable.auto.commit', 'true');
$consumer = new Consumer($conf);
$consumer->addBrokers(config('kafka.brokers'));
$topicConf = new TopicConf();
$topicConf->set('auto.offset.reset', 'latest');
$topic = $consumer->newTopic('test-topic', $topicConf);
// 开始从指定分区消费消息,注意不再从头开始。
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
// 消费消息,等待时间为1000ms。
$message = $topic->consume(0, 1000);
// 检查是否成功消费到消息,并且没有错误。
if ($message && $message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
// 输出消息的内容。
$this->info($message->payload);
}
}
}
}
已经拿到消息了,居然的kafka的操作这里不细讲,只注重windows下本地开发环境kafka的安装。记得在消费时,开启你的zookeeper和kafka。