首页 > 编程语言 >python实现kafka收到消息然后在通过websockt发送给其他服务器的方法(异步调用并且收到其中一个的消息在转发)

python实现kafka收到消息然后在通过websockt发送给其他服务器的方法(异步调用并且收到其中一个的消息在转发)

时间:2023-05-22 14:57:24浏览次数:70  
标签:websocket python websockt kafka 收到 start import loop asyncio

import asyncio
import threading
from kafka import KafkaConsumer
import websockets

connected = set()

async def handler(websocket, path):
    connected.add(websocket)
    while True:
        await asyncio.sleep(1)

def start_kafka():
    consumer = KafkaConsumer(
                '123456789abcd_device',
                bootstrap_servers="192.168.49.27" + ':9092',
            )
    for message in consumer:
        data = f"Received message from Kafka: {message}"
        print(data)
        for websocket in connected:
            asyncio.run_coroutine_threadsafe(websocket.send(data), loop)

def start_websocket():
    global loop
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    start_server = websockets.serve(handler, "localhost", 8000)
    loop.run_until_complete(start_server)
    loop.run_forever()

def start_kafka_and_websocket():
    t1 = threading.Thread(target=start_kafka)
    t2 = threading.Thread(target=start_websocket)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

start_kafka_and_websocket()

 

标签:websocket,python,websockt,kafka,收到,start,import,loop,asyncio
From: https://www.cnblogs.com/xiaoruirui/p/17420591.html

相关文章

  • 【PYTHON】pandas字符替换
    处理文本数据时,常见的存储格式为textfile格式,对应行分隔符为"\n",列分隔符为"\t"。而大家往往不会直接使用txt格式文件进行日常操作,Excel更为简便通用。因此,如果我们需要处理的Excel数据中,某个取值内出现了"\t"或"\n"或"\r\n"符号,转为txt格式文件处理将出现数据错位的情况......
  • python学习笔记32:操作sqlite数据库
    importsqlite3#1.创建数据库连接#如果test.db存在,则建立连接,返回connect对象#如果test.db不存在,则新建数据库,再建立连接,返回connect对象conn=sqlite3.connect(database='test.db')#2.创建cursor对象cursor=conn.cursor()#SQL指令sql='''......
  • Python基础知识一
    1:print输出信息  例子: ( 所有的标点符号都要是英文状态下输入,要不然会报错)print(“helloworld”)注意:python和python32:ipython在python前加i,此命令拥有和python类似的功能,但同时拥有linux下执行命令的功能ipython或者ipython33:注释在python中“#”右边的字符为注释,......
  • Python爬虫以及数据可视化分析之某站热搜排行榜信息爬取分析
    目录前言一,确定目标二,发送请求三,解析数据四,保存数据pyecharts进行可视化“某站”数据排名前10视频类型“某站”标题标签可视化“某站”喜欢视频分类概况总结前言本项目将会对“某站”热搜排行的数据进行网页信息爬取以及数据可视化分析本教程仅供学习参考!首先,准备好相关库requ......
  • Python学习
    3-13字符串类型字符串类型:str   1.定义格式:       变量='内容'           打印一行       变量="内容"           打印一行       变量='''内容'''或者三引号           可以通过回车的方式换行,且打印出......
  • Python自动化运维
    2-27在命令行窗口中启动的Python解释器中实现在Python自带的IDLE中实现print("Helloworld")编码规范每个import语句只导入一个模块,尽量避免一次导入多个模块不要在行尾添加分号“:”,也不要用分号将两条命令放在同一行建议每行不超过80个字符使用必要的空行可以增加代码的可读性运......
  • python 基本数据类型以及内置方法(有这一篇就够了)
    一、数据类型介绍在Python中,数据类型是区分数据的种类和存储方式的标识符。它定义了数据的取值范围、占用空间大小、可操作特性等。Python中常见的数据类型包括数字、字符串、列表、元组、集合和字典等。数据类型在编程中的作用主要有以下几个方面:内存空间的管理:不同的数据......
  • python的守护线程(简介、作用及代码实例)
    转载:(14条消息)python的守护线程(简介、作用及代码实例)_python守护线程的作用_HXH.py的博客-CSDN博客python守护线程简介守护线程的理解:如果当前python线程是守护线程,那么意味着这个线程是“不重要”的,“不重要”意味着如果他的主进程结束了但该守护线程没有运行完,守护进程就会被......
  • 机器学习数据顺序随机打乱:Python实现
      本文介绍基于Python语言,实现机器学习、深度学习等模型训练时,数据集打乱的具体操作。1为什么要打乱数据集  在机器学习中,如果不进行数据集的打乱,则可能导致模型在训练过程中出现具有“偏见”的情况,降低其泛化能力,从而降低训练精度。例如,如果我们做深度学习的分类,其中初始......
  • python 办公常用一:从文本文件中提取手机号码
    python办公常用一、从文本文件中提取手机号码给定一个文本文件从中提取所有手机号码importredefmain():withopen(path,encoding="utf-8")asf:data=f.read()res=re.findall(r'(?:13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-3......