首页 > 编程语言 >php kafka生产者,消费者操作

php kafka生产者,消费者操作

时间:2023-04-03 18:36:02浏览次数:37  
标签:生产者 len KAFKA RD OFFSET kafka php rk

 

php7.2  kafka7.8.1

生产者代码

复制代码
<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);


//RD_KAFKA_PARTITION_UA自动选择分区
//$option可选
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "kafka123456789", 'kafka');
//kafka队列中的长度 

$len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }
复制代码

 

 这里我并没有开启集群,只是单机操作。发送 成功

 

 

消费者代码

复制代码
<?php

/**
 * 消费者消费消息
 *
 * 实现的例子来源于:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("127.0.0.1:9092");

$oObjTopic = $objRdKafka->newTopic('test');

/**
 * consumeStart
 *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
 *   第二个参数标识从什么位置开始拉取消息,可选值为
 *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
 *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一个参数是分区,第二个参数是超时时间
    $oMsg = $oObjTopic->consume(0, 1000);

    // 没拉取到消息时,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $oMsg->payload, "\n";
    }
}
复制代码

消费者获取到消息

标签:生产者,len,KAFKA,RD,OFFSET,kafka,php,rk
From: https://www.cnblogs.com/yzl042349/p/17283964.html

相关文章

  • crontab + ThinkPHP6 配合使用
    crontab+ThinkPHP6配合使用1:命令行执行phpthinkmake:commandHellohellophpthinkmake:command控制器名方法名2:console配置3:测试执行phpthinkhello控制台得到结果hello4:配置到linuxcrontab中/usr/local/bin/php/www/web/path/thinkhel......
  • php swoft 中的数据分层
        不仅仅局限于MVC。将数据在model这一个层面剖析开,优雅的处理数据 逻辑,缓存,业务,数据库操作的烦恼。 这个思路也适用于thinkphp,hyperf,imi等框架。不再简单的实现controller->model->view的处理过程。  简化代码,每一层清晰地定义相应处理的数据。......
  • php 扩展 rabbitmq popt
     首先是rabbitmq-c-master.tar.gz包,可以访问https://github.com/alanxz/rabbitmq-c去下载最新的wgethttps://github.com/alanxz/rabbitmq-c.gitwgethttps://github.com/alanxz/rabbitmq-c/archive/v0.10.0.tar.gz  0.8.0这个版本 对popt 要求低一些,如果你遇......
  • php架构之路,phper进阶,学习路线
     鉴于最近跟小伙伴聊了很多PHP架构发展方向的问题,相关技术整理了一下,也顺便规划了一下自己下年。【我的学习路线】一.常用的设计模式以及使用场景    以下是我用到过的  工厂,单例,策略,注册,适配,观察者,原型,装饰器,facade,loc,pipeline二.阅读一个框架源码 例如:laravel......
  • kafka 安装
     安装前期准备:1,准备三个节点(根据自己需求决定)2,三个节点上安装好zookeeper(也可以使用kafka自带的zookeeper)3,关闭防火墙chkconfig iptablesoff Kafka官网下载安装包 http://kafka.apache.org/downloads.html   1源码安装编译2已经编译好的,直接使用 我......
  • php 如何实现 git diff
    无意间想到这个问题,如何用php来实现gitdiff,如果实现了这个功能,岂不是能够使用php对在线编辑文件的功能做更进一步的优化和提升?查了一下还真有这样的库,话不多说,开始执行composerrequire--devsebastian/diff得到结果Infofromhttps://repo.packagist.org:#StandW......
  • PHPExcel 中文使用手册详解
     1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848......
  • linux使用php动态安装模块mysqli.so(ext/mysqlnd/mysqlnd.h: 没有那个文件或目录)
     由于我先安装的php,再安装的mysql!正常过程: 1、安装mysql 2、安装phpconfigure时带–with-mysql参数现在我不想重装,因此使用phpize动态安装mysqli,php版本为php-7.2.13 1先查看php下phpize路径  得知路劲为/usr/local/php/bin/phpize2切换到php源码包目录php-7......
  • 如何在PHP7中扩展mysql,先安装php7.2。后安装mysql
     相对与PHP5,PHP7的最大变化之一是移除了mysql扩展,推荐使用mysqli或者pdo_mysql,实际上在PHP5.5开始,PHP就着手开始准备弃用mysql扩展,如果你使用mysql扩展,可能看到过这样的提示”Deprecated:mysql_connect():Themysqlextensionisdeprecatedandwillberemovedinthefu......
  • Kafka在本地环境下不消费数据
    packagecom.test.core.condition;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;importorg.springframework.stereotype.Comp......