首页 > 编程语言 >Python可控制线程与TCP服务单元编程

Python可控制线程与TCP服务单元编程

时间:2024-08-29 14:58:29浏览次数:16  
标签:__ socket Python self sock 程与 TCP ._ def

大家好,我之前在我的CSDN博客上面发了一条《C++可控制线程》的文章,里面介绍了在线程中植入类似状态机的东西,进而将多线程编程进化为“服务单元”编程的技术。

接下来我们来看一个基于此项目的Python的TCP服务单元代码。

---------------------------------------------------------------------------------------

创建状态机类与服务单元类

首先,我们利用python构建一个可用于继承的,构建状态机的东西。

然后,我们继承这个类以及线程类,以构建服务单元类(cThread)

#cThread.py
#服务节点(原可控线程)类的定义

import time
import threading

class StatusMachine(object):
    #状态机

    def __init__(self):
        '''初始态静止'''
        self._stat = 0
        self._lock = threading.Lock()
        self._signal = threading.Semaphore(0)

    def _check(self):
        '''
        -1(终止)
         0(停止)
         1(唤醒)
         2(运行)
        '''
        with self._lock:
            return self._stat

    def _shut(self):
        self._lock.acquire()
        self._stat = -1
        self._signal.release()
        self._lock.release()

    def _stop(self):
        with self._lock:
            self._stat = 0
            self._signal.release()

    def _wake(self):
        with self._lock:
            self._stat = 1
            self._signal.release()
	    

    def _run(self):
        '''在唤醒事件中使用'''
        with self._lock:
            self._stat = 2
            self._signal.release()

    def _suspend(self):
        self._signal.acquire()
 

def thread_main(th):
    '''线程的主函数'''
    while th._check() != -1:

        while th._check() == 0:
            th.StopEvent()

        if th._check() == 1:
            th.WakeEvent()

        while th._check() == 2:
            th._signal.release()
            th.LoopEvent()

        th.ClearEvent()

class cThread(StatusMachine, threading.Thread):
    '''可控制线程'''

    def __init__(self):
        '''构造函数,先执行父类的构造'''
        StatusMachine.__init__(self)
        threading.Thread.__init__(self, target=thread_main, args=(self,))
        #启动
        self.start()

class TestThread(cThread):
    '''测试线程'''
    def __init__(self):
        cThread.__init__(self)

    def StopEvent(self):
        print('I am stopped.')
        self._suspend()

    def WakeEvent(self):
        print('Wake up.')
        self._run()

    def LoopEvent(self):
        print('I am running.')
        time.sleep(1)

    def ClearEvent(self):
        print('Clear');

if __name__ == '__main__':
    test = TestThread()
    time.sleep(2)
    test._wake()
    time.sleep(2)
    test._shut()

---------------------------------------------------------------------------------------

创建可接收任意长度消息的通信类

接下来根据我的另一篇博客,创建一个可接受任意长度的通信方法类。

具体实现方式是先发送消息的长度,接收方根据长度信息进行接收。

#这个文件包含了通信类,里面有根据套接字发送和接收数据静态类
import time
import json
import socket

class Communication:
    LENGTH = 1024
    
    def send(sock, msg):
        
        first = json.dumps({"length":len(msg.encode())}).encode()
        sock.sendall(first)
        
        recvmsg = sock.recv(Communication.LENGTH)
        assert json.loads(recvmsg.decode())['response'] == '1ok'
            
        sock.sendall(msg.encode())
           
        #recvmsg = sock.recv(Communication.LENGTH)
        #assert json.loads(recvmsg.decode())['response'] == '2ok'

    def recv(sock):
            
        recvmsg = sock.recv(Communication.LENGTH)
        d = json.loads(recvmsg.decode())
        l = d['length']
        
        first = json.dumps({"response":"1ok"}).encode()
        sock.sendall(first)
        
        recvmsg = sock.recv(l)
        assert len(recvmsg) == l
        
        #second = json.dumps({"response":"2ok"}).encode()
        #sock.sendall(second)
        
        return recvmsg.decode()
        
            
def lci_print(msg, addr="127.0.0.1", port=5200, pfunc=print):

        pfunc(':Send to %s:%d' % (addr, port))

        try:

            data = {
                "communication": {
                    "instruction": msg
                }
            }
            json_string = json.dumps(data)

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            sock.connect((addr, port))

            Communication.send(sock, json_string)
            s = Communication.recv(sock)

            pfunc(':connection successed.')
        except ConnectionRefusedError as e:
            pfunc(':connection refused')
        except ConnectionResetError as e:
            pfunc(':connection reset')
        except socket.timeout as e:
            pfunc(':time out')
#        except Exception as e:
#            return '连接失败 :' + str(e)

def lci_recv(socket):

    s = Communication.recv(socket)
    json_object = json.loads(s)
    Communication.send(socket, 'allok')
    return json_object['communication']['instruction']


if __name__ == '__main__':

    while True:
        lci_print("我已上线。")

---------------------------------------------------------------------------------------

构建TCP服务单元

#tcpserv.py
#这个文件包含了服务器节点的定义,所谓服务器节点就是一个单独的可控制线程

import time
import select
import socket
from cThread import cThread
from Comm import lci_recv

class tcpserver(cThread):
    '''这个类在状态改变为非运行态时,关闭套接子。'''
    #------------------------------------------------------
    def __init__(self, port=5200, pfunc=print):
        cThread.__init__(self)
        self.port = port
        self.socket = None
        self.pfunc = pfunc

    #------------------------------------------------------
    '''
    def _shut(self):
        with self._lock:
            self._stat = -1
            self._signal.release()

            # 关闭套接字
            if self.socket: self.socket.close()

    def _stop(self):
        with self._lock:
            self._stat = 0
            self._signal.release()

            # 关闭套接字
            if self.socket: self.socket.close()

    def _wake(self):
        with self._lock:
            self._stat = 1
            self._signal.release()

            # 关闭套接字,因为套接字成员只可能在唤醒后有值
            if self.socket: self.socket.close()
    '''
    #------------------------------------------------------
    def StopEvent(self):
        self._suspend()

    def WakeEvent(self):

        self.pfunc('tcpserver wake up, port %d' % self.port)
        # 创建套接字对象
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 获取本地主机名
        #host = socket.gethostname()
        host = '0.0.0.0'
        port = self.port

        # 设置为非阻塞模式  
        self.socket.setblocking(False)
        #self.socket.settimeout(0.2)

        # 绑定并监听
        self.socket.bind((host, port))
        self.socket.listen(10)

        self.inputs = [self.socket]
        self.outputs = []

        #线程启动
        self._run()

    def LoopEvent(self):

        try:
            #使用select等待至少一个套接字准备好
            readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, 0.1)
            '''
            print(readable)
            print(writable)
            print(exceptional)
            print('')
            '''
            for s in readable:
                if s is self.socket:
                    #处理新的连接
                    connection, client_address = s.accept()
                    #加上此句会导致堵塞
                    #self.pfunc(':connection from' + str(client_address))
                    connection.settimeout(1)
                    self.inputs.append(connection)
                else:
                    #交流
                    #...
                    self.pfunc(':' + lci_recv(s))

                    self.inputs.remove(s)
                    s.close()

            for s in exceptional:  
                self.pfunc(':handling exceptional condition for' + str(s.getpeername()))  
                self.inputs.remove(s)  
                s.close()  

        except OSError as e: self.pfunc(':Socket is already closed.')  # 套接字已经关闭,读取了关闭的套接字
        finally: pass


    def ClearEvent(self):
        for s in self.inputs:
            s.close()
        #self.pfunc(':tcpserver, clear.')


def BaseTest():
    '''基本的测试, 测试是否能够关闭'''

    server = tcpserver(5200)
    server._wake()
    #time.sleep(1)
    #server._shut()

if __name__ == '__main__':
    BaseTest()

标签:__,socket,Python,self,sock,程与,TCP,._,def
From: https://blog.csdn.net/gapcuda/article/details/141678547

相关文章

  • 【Python】将网格数据写入到VTK文件
    1.vtk文件格式根据官网进行总结vtk文件组成:5个部分.第一部分,第一行:表明文件版本.写"#vtkDataFileVersion2.0"就行第二部分,第二行:表明标题(title).随便写.第三部分,第三行:ASCII或者BINARY第四部分,开始定义datasetstructure.这部分用于描述数据集的几何和拓扑......
  • Python模块之functools.partial
    在Python编程中,functools.partial是一个强大的工具,它提供了一种部分应用函数的方式,能够在创建新函数时固定部分参数,从而在后续调用中减少需要传递的参数数量。本文将深入介绍functools.partial的基本概念、使用方法,并通过丰富的示例代码演示其在不同场景中的实际应用。什么是func......
  • Python模块之subprocess
    转载:https://www.cnblogs.com/goldsunshine/p/17558075.html subprocess是Python中执行操作系统级别的命令的模块,所谓系级级别的命令就是如ls/etc/userifconfig等和操作系统有关的命令。subprocess创建子进程来执行相关命令,并连接它们的输入、输出和错误管道,获取它们的......
  • Python 中常用的人工智能库和工具
    在Python中,有许多强大的人工智能库和工具,它们为开发各种人工智能应用提供了有力的支持。以下是一些常用的人工智能库和工具介绍:一、机器学习库Scikit-learnScikit-learn是一个广泛应用于机器学习的Python库。它提供了各种经典的机器学习算法,包括分类、回归、聚类等......
  • 如何使用python实现批量ping,附代码
    以下为使用Python实现批量ping的多种方法及代码示例:方法一:importsubprocesswithopen('hosts.txt','r')asf:hosts=f.readlines()forhostinhosts:result=subprocess.check_output(('ping','-c','1',host.stri......
  • python+flask计算机毕业设计大学生闲置物品交易网站(程序+开题+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着互联网技术的飞速发展,电子商务已成为人们日常生活不可或缺的一部分,极大地促进了商品交易的便捷性与效率。然而,在大学校园内,一个普遍存......
  • python+flask计算机毕业设计二手车交易管理系统的设计与开发(程序+开题+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着汽车消费市场的日益成熟与车辆更新换代速度的加快,二手车交易已成为汽车市场不可或缺的一部分。然而,传统二手车交易过程中存在信息不对......
  • python+flask计算机毕业设计基于的零食销售商城(程序+开题+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着互联网的飞速发展,电子商务已成为现代商业不可或缺的一部分,深刻改变着人们的消费习惯。零食作为日常消费的重要组成部分,其市场潜力巨大......
  • LeetCode-Python-1539. 第 k 个缺失的正整数(二分)
    给你一个 严格升序排列 的正整数数组 arr 和一个整数 k 。请你找到这个数组里第 k 个缺失的正整数。示例1:输入:arr=[2,3,4,7,11],k=5输出:9解释:缺失的正整数包括[1,5,6,8,9,10,12,13,...]。第5个缺失的正整数为9。示例2:输入:arr=[1,2,3,4],k=2......
  • 探索 AI Agents:从理念到 Python 实际运用
    作者:老余捞鱼原创不易,转载请标明出处及原作者。写在前面的话:    本文主要介绍了如何利用人工智能代理(AIAgents)从概念到Python中的实际应用,以及如何构建一个内容创作工作流程,通过多个代理协作完成从视频分析到博客撰写的复杂任务,完成后也许这会改变你对人工智能......