首页 > 编程语言 >用扩展的方式在 PHP 中使用 Kafka

用扩展的方式在 PHP 中使用 Kafka

时间:2023-05-13 14:32:22浏览次数:40  
标签:php 扩展 Kafka topicConf kafka message PHP rk

前言:

        由于之前在 PHP 中使用 Kafka 是通过 composer 包的方式,由于 nmred/kafka-php 很久没有维护,并且网上相关问题的文章也比较少。所以我这次换成 PHP 扩展 RdKafka 继续使用,主要介绍扩展安装和这种方式的基本操作。

 

安装:

1. 下载

2. 目录

       由于 php-rdkafka 依赖 librdkafka,linux 就需要先安装 librdkafka 后安装 php-rdkafka,而 windows 版本是如下几个文件,安装方法如下:

(1). 将 librdkafka.dll 和 librdkafka.pdb 放入 PHP 安装的根目录下,而 php_rdkafka.dll 和 php_rdkafka.pdb 放入 PHP 安装目录的 ext 下。

(2). php.ini 配置文件添加 extensinotallow=php_rdkafka.dll,最后重启 PHP。

(3). php-m 或这 phpinfo (); 就可以查看到扩展了。

通过 get_declared_classes() 也可以查看到扩展里预设的函数了。

使用:

1. 生产

public function kafkaTest()
{
	$rk = new \RdKafka\Producer();
	$rk->addBrokers("127.0.0.1:9092");
	$topic = $rk->newTopic("shop");

	$ret = [];
	for ($i = 0; $i < 5; $i++) {
		$content = "第" . $i . "次发送失败";
		$message = ["mobile" => "15623652142", "content" => $content];
		$payload = json_encode($message);

		// 指定向0号partition生产数据
		$ret[]['produce_res'] = $topic->produce(0, 0, $payload, "sms_$i");

		// 随机选择partition
		//$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
		if ($rk->getOutQLen() > 0) {
			$ret[]['produce_poll'] = $rk->poll(500);
		} else {
			$ret[]['produce_poll'] = $rk->poll(0);
		}
	}

	dump($ret);
}

2. 消费(从指定的 partition 消费)

protected function execute(Input $input, Output $output)
{
	$output->writeln("!!!hello kafka!!!");

	$conf = new \RdKafka\Conf();
	$conf->set('group.id', 'sms-consumer-group');

	$rk = new \RdKafka\Consumer($conf);
	$rk->addBrokers("127.0.0.1:9092");

	$topicConf = new \RdKafka\TopicConf();
	$topicConf->set('auto.commit.interval.ms', 100);
	$topicConf->set('offset.store.method', 'file');
	$topicConf->set('offset.store.path', sys_get_temp_dir());
	$topicConf->set('auto.offset.reset', 'smallest');
	$topic = $rk->newTopic("shop", $topicConf);
	$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

	while(true) {
		// 设置消费时的时间间隔,单位毫秒,以下表示5秒消费一个
		$message = $topic->consume(0, 5000);
		if ($message) {
			echo "读取到消息\n\r";

			// 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等
			var_dump($message);

			switch ($message->err) {
				case RD_KAFKA_RESP_ERR_NO_ERROR:
					echo "读取消息成功:\n\r";
					var_dump($message->payload);
					break;
				case RD_KAFKA_RESP_ERR__PARTITION_EOF:
					echo "读取消息失败\n\r";
					break;
				case RD_KAFKA_RESP_ERR__TIMED_OUT:
					echo "请求超时\n\r";
					break;
				default:
					throw new \Exception($message->errstr(), $message->err);
					break;
			}
		} else {
			echo "未读取到消息\n\r";
		}
	}

	$output->writeln("!!!the end!!!");
}

 

附加:

        在执行消费过程中,发现 kafka 停止服务,抛出的异常:ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed。

 

解决方法:

       删除 kafka-logs 下的所有日志,再重新启动 Kafaka, kafka-server-start.bat ..\..\config\server.properties &

 


标签:php,扩展,Kafka,topicConf,kafka,message,PHP,rk
From: https://blog.51cto.com/u_11161174/6273705

相关文章

  • 如何在业务代码中使用 ThinkPHP5.1 封装的容器内反射方法
    invokeClass用法:可以不传命名空间实例化(通过反射实例化)$obj=Container::getInstance()->invokeClass(InvokerTest::class);var_dump($obj->invokerNews());die;-----------------------------------------------------------------------invokeMethod用法:传入带命名空间的类和......
  • 浅谈Kafka2.8+在Windows下的搭建与使用
    前言:    周末空闲时间无意找到了一套个性化推荐的源码,整体项目运用了SSH,HDFS,Flume,Hive,Kafka,Spark,Scala等。运行时,本来通过spark计算业务埋点数据时,却发现本地没有Kafka。因为我一直也没使用过Kafka,所以也作为新人,浅谈以下Kafka的环境安装与分别在PHP,Scala中的使用。 对比:1......
  • PHP对接ESP8266
    前言:    众所周知,在APP开发中,C/S是基本的开发结构。客户端负责应用的展示,交互与网络请求,服务端负责数据的传输,处理,存储,输出等等。而物联网除了之前介绍的MQTT实现H5实时控制舵机旋转角度来完成开关外,同样也可以通过HTTP的请求方式来和任何一门语言的服务端进行交互。  ......
  • 正确使用php开发系列:判断数组的key是否存在
    背景:我们习惯上使用!empty($data['data']['list']判断数组$data里有没有key为list的元素,正确判断key是否存在的方式应该使用array_key_exists 为什么不要使用!empty($data['data']['list'],因为当list不存在时,会报错!......
  • 基于FPGA的CAN通信,FPGA驱动SJA1000T芯片代码,实现标准帧与扩展帧的通信驱动,已上板调通,
    基于FPGA的CAN通信,FPGA驱动SJA1000T芯片代码,实现标准帧与扩展帧的通信驱动,已上板调通,价格美丽,欢迎咨询。品牌型号CANSJA1000T与世面上的不同,代码不是SJA1000T芯片代码,而是驱动该芯片的代码。ID:36100649782444620......
  • PHP发送文件到JAVA项目
    https://blog.csdn.net/u012685554/article/details/126995307亲测有用。php代码$file=$_FILES['order_upload']['tmp_name'];////var_dump($file);exit();////判断文件是否存在if(!file_exists($file)){......
  • php imagick圆角
    $watermark=new\Imagick();$watermark->readImage($avatarpath);$watermark->scaleImage(160,160,true);$watermark->setFormat('png');$mask=new\Imagick();$mask->newImage($watermark......
  • 浅谈一下ThinkPHP5.1实现事务嵌套的特性
    前言:       在我们平时做的一个项目中,线上环境突然发现数据库被锁住。导致很多有关数据插入和修改的接口全都瘫痪,项目基于ThinkPHP5.1。报错的时候,我们发现了一条sql错误日志,如下。   根据错误信息提示,是说有一个事务回滚时没有找到savepoint的暂存点。所以问题应该......
  • php 异步形式调取导出数据
    php部分ajax请求此部分functionaysncexec(){$lock_file='filelock.lock';if(file_exists($lock_file)){exit(json_encode(array('code'=>0)));}$url=base_url().'execcmd';......
  • 欧姆龙CP1H标准程序,一共控制五个伺本体四个+一个轴扩展包 含轴点动,回零,相对与绝对定位,
    欧姆龙CP1H标准程序,一共控制五个伺本体四个+一个轴扩展包含轴点动,回零,相对与绝对定位,整个项目的模块都有:主控程序,复位程序,手动,只要弄明白这个程序,就可以非常了解整个项目的程序如何去编写,从哪里开始下手ID:6321667928553930......