首页 > 编程语言 >python并行之flask-socketio

python并行之flask-socketio

时间:2023-12-02 23:11:32浏览次数:40  
标签:__ socketio python self server flask client print def

1、服务器端

from flask import *
from flask_socketio import *
from flask_socketio import SocketIO
from nasbench_lib.nasbench_201 import NASBench201
import random
import subprocess
class Server:
    def __init__(self, gpu):
        self.app = Flask(__name__)
        self.socketio = SocketIO(self.app, ping_timeout=3600000,
                                 ping_interval=3600000,
                                 max_http_buffer_size=int(1e32))
        self.gpu = gpu
        self.MIN_NUM_WORKERS = 1
        self.current_round = -1  # -1 for not yet started
        self.NUM_CLIENTS_CONTACTED_PER_ROUND = 1
        self.ready_client_sids = set()
        self.nas = NASBench201()
        # 设置 SocketIO 事件处理程序
        self.register_handles()
    def check_client_resource(self):
        self.client_resource = {}
        client_sids_selected = random.sample(list(self.ready_client_sids), self.NUM_CLIENTS_CONTACTED_PER_ROUND)
        for rid in client_sids_selected:
            emit('check_client_resource', {
                'round_number': self.current_round,
            }, room=rid)
    def register_handles(self):
        @self.socketio.on('connect')
        def handle_connect():
            print(request.sid, "connected")

        @self.socketio.on('reconnect')
        def handle_reconnect():
            print(request.sid, "reconnected")

        @self.socketio.on('disconnect')
        def handle_disconnect():
            print(request.sid, "disconnected")
            if request.sid in self.ready_client_sids:
                self.ready_client_sids.remove(request.sid)

        @self.socketio.on('client_wake_up')
        def handle_wake_up():
            print(f"服务器端被客户端{request.sid}唤醒.")
            emit('init')

        @self.socketio.on('client_ready')
        def handle_client_ready():
            print(f"服务器收到客户端{request.sid}准备完毕。开始check资源。")
            self.ready_client_sids.add(request.sid)
            if len(self.ready_client_sids) >= self.MIN_NUM_WORKERS:
                self.check_client_resource()
            else:
                print("没有足够的客户端连接....")

    def handle_connect(self):
        print("Client connected")

    def handle_sample(self):
        return self.nas.generate_random_for_multiview(10)

    def get_slurm_allocated_gpus(self):
        result = subprocess.run([
            'nvidia-smi',
            '--query-gpu=memory.total,memory.used',
            '--format=csv,nounits,noheader'
        ],stdout=subprocess.PIPE)
        output = result.stdout.decode('utf-8').strip().split('\n')
        gpu_0_memory = output[self.gpu].split(',')
        total_memory = int(gpu_0_memory[0])
        used_memory = int(gpu_0_memory[1])
        free_memory = total_memory - used_memory
        return free_memory

    def run(self, host='0.0.0.0', port=5000):
        self.socketio.run(self.app, host=host, port=port)

if __name__ == '__main__':
    server = Server(0)
    server.run()

 

2、客户端

import socketio
class Worker:
    def __init__(self, server_host, server_port):
        self.sio = socketio.Client()
        self.server_url = f'http://{server_host}:{server_port}'
        self.register_handles()
        self.connect_to_server()
    def on_init(self):
        print('客户端进行初始化.')
        #加载模型
        print("客户端本地模型加载完毕.")
        # ready to be dispatched for training
        self.sio.emit('client_ready')
    def register_handles(self):
        @self.sio.event
        def connect():
            print('客户端请求连接...')
            self.sio.emit("client_wake_up")

        @self.sio.event
        def disconnect():
            print('Disconnected')

        @self.sio.event
        def reconnect():
            print('Reconnected')

        @self.sio.on('check_client_resource')
        def on_check_client_resource(*args):
            self.sio.emit('check_client_resource_done')

        self.sio.on('init', self.on_init)

    def connect_to_server(self):
        print("Connecting to server...")
        self.sio.connect(self.server_url)
        self.sio.wait()
    def request_sample_architecture(self):
        self.sio.emit('sample', callback=self.print_response)

    def request_evolve_architectures(self, architectures):
        self.sio.emit('evolve', architectures, callback=self.print_response)
    @staticmethod
    def print_response(data):
        print("Response from server:", data)

 

3、管理端

from client import Worker
import torch.multiprocessing as mp

class GPUManager(object):
    def __init__(self):
        self.p_count = 10
        self.available_gpus = 6
        self.port_list = [
            78901,
            78902,
        ]

    def run_client(self):
        TIMEOUT = 48 * 3600
        proc = list()
        for i in range(self.p_count):
            worker = Worker("127.0.0.1", 5000)
            p = mp.Process(
                target=worker.connect_to_server,
            )
            proc.append(p)
        for p in proc:
            p.start()
        for p in proc:
            p.join(timeout=TIMEOUT)
        for p in proc:
            if p.is_alive():
                p.terminate()
                p.join()
        for p in proc:
            p.close()

    def run(self):
        worker = Worker("127.0.0.1", 5000)
        worker.connect_to_server()


if __name__ == '__main__':
    GPUManager().run()

 

标签:__,socketio,python,self,server,flask,client,print,def
From: https://www.cnblogs.com/zhangxianrong/p/17872438.html

相关文章

  • python task4
    python(四)循环for循环和循环范围for循环的特点基于提供的范围,重复执行特定次数的操作for_inrange(m,n+1)range(x,y)是左闭右开区间,包含x,不包含y(如果省略第一个参数,range起始范围默认为0)(如果添加第三个参数,第三个参数为“步长”step)或者defsumF......
  • 聪明办法学python task5/6
    循环for语句range()以指定数字开始并指定不同的增量(甚至可以是负数,有时这也叫做'步长')(前闭后开)for...else语句用于在循环结束后执行一段代码如果在循环过程中遇到了break语句,则会中断循环,此时不会执行else子句。while循环用CTRL+C来退出当前的无限循环。......
  • 聪明办法学python第四次打卡
    循环for循环的特点基于提供的范围,重复执行特定次数的操作for循环嵌套#下面的代码将输出二维坐标defprintCoordinates(xMax,yMax):forxinrange(1,xMax+1):foryinrange(1,yMax+1):print(f"({x},{y})",end="")print()printCoordinat......
  • Python学习 CHAPTER 5循环和CHAPTER 6字符串
    CHAPTER5循环一、FOR循环的特点基于提供的范围,重复执行特定次数的操作#注意:range(x,y)是左闭右开区间,包含x,不包含y 如果我们省略第一个参数会发生什么?for × in range(n+1) #range起始范围默认为0那如果我们添加第三个参数呢? #第三个参数为“步长”step for×in ra......
  • SMTP操作使用详解并通过python进行smtp邮件发送示例
    转载请注明出处:1.SMTP     SMTP 的全称是“SimpleMailTransferProtocol”,即简单邮件传输协议。它是一组用于从源地址到目的地址传输邮件的规范,通过它来控制邮件的中转方式。SMTP协议属于TCP/IP协议簇,它帮助每台计算机在发送或中转信件时找到下一个目的地。SMTP服......
  • 聪明办法学python(4)
    聪明办法学python(4)TASK06循环LOOP1.1for循环forxinrange(m,n+1)#range(x,y)是左闭右开区间1.2rangerange(n+1)#起始范围默认为0range(m,n,k)#第三个参数为步长stepforxinrange(5,20,7)==(5+12+19)eg:只对从m到n的奇数求和defsum(m,n):......
  • 聪明学python【5】+【6】
    聪明学python【5】+【6】【5】循环for本质(功能):基于提供的范围,重复执行特定次数的操作(注意:区分C语言和python的“重复”和“循环”:C语言更倾向于用循环数字来实现重复,Python可以直接实现重复,在需要时使用数字循环)格式:#格式1————循环xforxinrange(m,n+1):#格......
  • pythonDay19
    文件的两种用途from  import导入模块(都在同一个路径下) from  import案例 循环导入模块产生的问题 导入模块的优先级 如何解决第一次导入模块以后第二次再导入时候的路径来源 解决导入的模块路径来源不在同一个路径 包被导入是要注意的三点 相对......
  • python第四次笔记
    循环和字符串循环for循环forxinrange(x,y)forxinrange(1,10,2):print(int(x))表示遍历1到9之间的数,步长为2即打印13579while循环while(条件):执行行为whileTrue:print("nihao")这个代码会一直打印nihaocontinuebreakpasscontinue用于跳出本......
  • 聪明办法学Python_task4_11.28-12.2
    聪明办法学Python_task4_11.28-12.2聪明办法学Python_task4_11.28-12.21.task06循环1.1while循环1.2for循环1.3循环控制语句1.4range()函数2.task07字符串2.1字符串构成2.2字符串操作2.2.1字符串运算2.2.2索引&切片2.2.3相关函数1.task06......