首页 > 其他分享 >Kafka使用(自动化)

Kafka使用(自动化)

时间:2023-07-06 18:11:26浏览次数:49  
标签:producer 使用 kafka topic host str 自动化 Kafka consumer

self.request_topic='requestRemoteModelServer'

self.response_topic='responseRemoteModelServer'

self.producer = Biz_模型控制服务.kafka_producer_init()

self.consumer=Biz_模型控制服务.kafka_consumer_init(self.response_topic,10000)

 

Biz_模型控制服务.producer_exec(self.producer,self.request_topic,req_params,'上传模型')

msg=Biz_模型控制服务.consumer_exec(self.consumer,self.response_topic,'上传模型')

 

Biz_模型控制服务.py

def producer_exec(producer:KafkaProducer,req_topic:str,req_params:dict,action_chn:str):

  producer.send(req_topic,value=req_params)

  logging_kafka_req(action_chn,req_topic,req_params)

 

def consumer_exec(consumer:KafkaConsumer,res_topic:str,action_chn:str):

  msg={}

  for message in consumer:

    msg=json.loads(message.value.decode())

    logging_kafka_res(action_chn,res_topic,msg)

    break

  consumer.commit()

  time.sleep(2)

  return msg

 

def kafka_producer_init(host:str=None)

  kafka_host=host

  if kafka_host is None:

    kafka_host = env_c.get('kafka_host')

  producer = KafkaProducer(

    bootstrap_servers=[kafka_host],

    key_serializer=lambda k: json.dumps(k).encode(),

    value_serializer=lambda v: json.dumps(v).encode(),

  )

  return producer

 

def kafka_consumer_init(res_topic:str,time_out:int,host:str=None):

  kafka_host=host

  if kafka_host is None:

    kafka_host=env_c.get('kafka_host')

  consumer=KafkaConsumer(

    res_topic,

    bootstrap_servers=kafka_host,

    group_id='cloud-chm',

    consumer_timeout_ms=time_out

  )

  topic_partition=consumer.assignment()

  partitions=consumer.beginning_offsets(list(topic_partition))

  if partitions != {}:

    for message in consumer:

      pass

    consumer.commit()

  return consumer

 

def logging_kafka_req(api_name:str,request_topic:str,request_params:dict):

  logging.info(f'{api_name} request topic:{request_topic}')

  logging.info(f'{api_name} request params:{request_params}')

 

def logging_kafka_res(api_name:str,response_topic:str=None,response_result:dict=None):

  logging.info(f'{api_name} response topic:{response_topic}')

  logging.info(f'{api_name} response result:{response_result}')

标签:producer,使用,kafka,topic,host,str,自动化,Kafka,consumer
From: https://www.cnblogs.com/nullnullnull/p/17532957.html

相关文章

  • 阿里区块链开放联盟使用http方式对接
    using(HttpClienthttpClient=newHttpClient()){/*所有的步骤实例都得进行http请求,当前demo中我的请求放在最后,只是一步一步的流程进行开发*/SendEntityentity=newSendEntity();#region步骤一:调用合约接口,将数据上链操作成功实现合约调用的列子(目前注释......
  • Excle使用正则提取字符串
    1、Alt+F11打开VB编辑工具2、选择sheet创建新模块3、插入下列代码FunctionRegexExtract(ByValinputStringAsString)AsStringDimregExAsObjectSetregEx=CreateObject("VBScript.RegExp")WithregEx.Global=True.Pattern=......
  • (转)一文搞定 Containerd 的使用
    原文:https://www.cnblogs.com/lvzhenjiang/p/15147993.html在学习Containerd之前我们有必要对Docker的发展历史做一个简单的回顾,因为这里面牵涉到的组件实战是有点多,有很多我们会经常听到,但是不清楚这些组件到底是干什么用的,比如 libcontainer、runc、containerd、CRI、OCI......
  • 使用Arduino制作摩尔斯电码收发器
       摩尔斯电码通过不同的排列顺序来表达不同的英文字母、数字和标点符号等。在今天,国际摩尔斯电码依然被使用着。比如,摩尔斯电码最广为人知的用法发送求救信号SOS,SOS信号的组合方式为:。再比如,假设我们通过摩尔斯电码发送“Arduino”,组合方式为:“.-.-.-....-..-.—”......
  • 使用Mkdocs生成项目文档
    MkDocs是一个基于Python的静态站点生成器,它可以将Markdown格式的文档转换为漂亮的静态网站。MkDocs提供了一种简单而灵活的方式来创建文档,并支持多种主题和插件。下面是一个简单的示例代码,演示如何使用MkDocs创建一个文档站点:安装MkDocs可以使用pip命令安装MkDocs:pipinstall......
  • MegEngine 使用小技巧:如何使用 MegCC 进行模型编译
    MegEngine 作为一个训推一体的AI框架,为用户提供了模型训练以及部署的能力。但是在部署模型时,由于会存在对于部署的模型计算来说不必要的代码,导致SDK体积比较大。为了解决上述问题,我们提供了新的工具:AI编译器 MegCC。MegCC有以下特性:只生成完成输入模型计算的必要的ke......
  • Django restframwork中使用分页及实现自定义分页
    关于为何要分页以及如何在Django+Template架构中如何使用分页,可以参考之前的文章django自定义分页类和使用总结[1]DjangoRestFramework中分页限制今天开篇我们先不讲如何使用,我们先说Django+restframework实现前后端分离项目开发时,分页功能使用的限制?缘由是之前在开发运维......
  • 使用Power Shell/PortQuery检查打开(侦听)端口
    你可以使用Test-NetConnection检查远程计算机上的端口是否可用(打开)。您可以使用它检查远程服务器或网络服务的响应和可用性,测试TCP端口是否被防火墙阻止,检查ICMP可用性和路由。事实上,Test-NetConnection取代了一些流行的网络管理工具,如ping、tracert、telnet、pathping、TCP端口扫......
  • 前端使用protobuf进行传参
    一.proto的理解1.以.proto结尾的文件是protobuf文件,且.proto是一种传参规则的定义。2.常用的请求传参方式是json或xml,因为在大多数的语言中这两种轻量型语言都能被其他语言识别到(java、python、javascript、c++等等)。proto作为区别于前两者且类似于前两者的语言:*它是一种语言......
  • Mac使用docker安装Doris
    一、编译源码(1)拉取编译镜像dockerpullapache/incubator-doris:build-env-1.2(2)Mac电脑上拉取源码gitclonehttps://github.com/apache/incubator-doris.git切换到指定分支gitfetchoriginbranch-0.12:branch-0.12gitcheckout branch-0.12(3)建议使用Docker挂载Ma......