首页 > 编程语言 >kafka.php

kafka.php

时间:2023-05-24 21:45:14浏览次数:35  
标签:set kafka topicConf offset message php payload

setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: // echo "Assign: "; // var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: // echo "Revoke: "; // var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息, // 所以同一个组内的消费者数量如果订阅了一个topic, // 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。 $conf->set('group.id', $group_id); // 添加 kafka集群服务器地址 $conf->set('metadata.broker.list', $host); //'localhost:9092,localhost:9093,localhost:9094,localhost:9095' // 针对低延迟进行了优化的配置。这允许PHP进程/请求尽快发送消息并快速终止 $conf->set('socket.timeout.ms', 5000); $conf->set('fetch.wait.max.ms', 2000); //多进程和信号 if (function_exists('pcntl_sigprocmask')) { pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); $conf->set('internal.termination.signal', SIGIO); } else { $conf->set('queue.buffering.max.ms', 1); } $topicConf = new \RdKafka\TopicConf(); // 在interval.ms的时间内自动提交确认、建议不要启动, 1是启动,0是未启动 // $topicConf->set('auto.commit.enable', 1); $topicConf->set('auto.commit.interval.ms', 100); //smallest:简单理解为从头开始消费,largest:简单理解为从最新的开始消费 $topicConf->set('auto.offset.reset', 'earliest'); // 设置offset的存储为broker //$topicConf->set('offset.store.method', 'broker'); // 设置offset的存储为file //$topicConf->set('offset.store.method', 'file'); // 设置offset的存储路径 // $topicConf->set('offset.store.path', 'kafka_offset.log'); //$topicConf->set('offset.store.path', __DIR__); $conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); // 更新订阅集(自动分配partitions ) $consumer->subscribe([$topic]); // 指定topic分配partitions使用那个分区 // $consumer->assign([ // new \RdKafka\TopicPartition("zzy8", 0), // new \RdKafka\TopicPartition("zzy8", 1), // ]); while (true) { // 设置120s为超时 $message = $consumer->consume(3 * 1000); if (!empty($message)) { switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // print_r($message); // 打印消息 // 拆解对象为数组,并根据业务需求处理数据 $payload = json_decode($message->payload,true); // $key = $message->key; $action = $payload['action']; unset($payload['action']); $mongoId = $payload['mongoId']??0; unset($payload['mongoId']); $esId = $payload['esId']??0; unset($payload['esId']); if ($action == 'insert') { mongoInsert($mongoManager,$payload); esUpInsert($esClient,$payload); }elseif ($action == 'update') { mongoUpdate($mongoManager,$payload,$mongoId); esUpInsert($esClient,$payload,$esId); }elseif ($action == 'delete') { mongoDelete($mongoManager,$mongoId); esDelete($esClient,$esId); } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: // echo "Timed out\n"; break; default: var_dump("nothing"); throw new \Exception($message->errstr(), $message->err); break; } } else { var_dump('this is empty obj!!!'); } } } $mongoManager = mongoConnect(); $esClient = esConnect(); kafka($mongoManager,$esClient);

标签:set,kafka,topicConf,offset,message,php,payload
From: https://www.cnblogs.com/xivzhou/p/17429611.html

相关文章

  • es.php
    setHosts(['192.168.133.131'])->build(); //如果es设置了密码 //$es=\Elasticsearch\ClientBuilder::create()->setHosts(['http://username:[email protected]:9200'])->build() return$es;}functiongetIndicesSetting(){ $......
  • mongo.php
    "root",'password'=>"123456",'db'=>"admin"]);return$manager;}functionmongoQuery($manager){//查询$filter=['age'=>['$gt'=>5]......
  • 打一个适合自己主机的nginx和php的包
    创建适配主机的nginx和php的包##先m01上配置一遍wordpress#安装nginx,php包#官方源安装vim/etc/yum.repos.d/nginx.repo[nginx-stable]name=nginxstablerepobaseurl=http://nginx.org/packages/centos/$releasever/$basearch/gpgcheck=1enabled=1gpgkey=https://ng......
  • php中日期时间字符串可以直接比较大小
    php中的日期时间字符串可以直接比较大小很多朋友还不知道,一直使用strtotime转换字符串为时间戳再比较大小。$time1='09:00';$time2='12:51';$time3='22:00';self::assertTrue($time2>$time1);self::assertTrue($time2<$time3);$date1='2021-02-03';$......
  • php实现占位符模板替换
    php实现占位符模板替换对接过微信模板消息,或者阿里大于接口的,应该都知道,三方会给你一些模板,模板里有一些占位符,你只需要按照模板里的占位符填充参数即可。demo,实现一个地址跳转系统用户提供域名和路径参数,系统配置模板即可,用户传参即可替换。//用户提供的参数$patterns='u......
  • [PHP](MD5、sha1)比较漏洞-笔记
    PhP(MD5、sha1)比较漏洞(弱比较、强比较、强碰撞)弱比较md5和sha1弱比较都是利用php解析哈希值以“0E”开头的特性,结果都为0符合参数1的字符串值和参数2的字符串值不相等,但md5值相等。如:240610708,aabg7XSs,aabC9RqS,s878926199a这四段字符串MD5编码后结果分别对应240610708:0E462097......
  • 关于PHP正则表达式这回事
    ......
  • php特性第一天
    目录899091929394959697989989preg_match()返回pattern的匹配次数。它的值将是0次(不匹配)或1次,因为preg_match()在第一次匹配后将会停止搜索。preg_match_all()不同于此,它会一直搜索subject直到到达结尾。如果发生错误preg_match()返回FALSE。get方式传入num参数,然后通......
  • php特性第二天
    99array_push()函数向第一个参数的数组尾部添加一个或多个元素(入栈),然后返回新数组的长度。该函数等于多次调用$array[]=$value。in_array()函数搜索数组中是否存在指定的值。(注意:in_array()函数有漏洞没有设置第三个参数就可以形成自动转换)file_put_contents()函......
  • 【Kafka从入门到成神系列 一】Kafka基本概述和架构
    ......