大家好,我之前在我的CSDN博客上面发了一条《C++可控制线程》的文章,里面介绍了在线程中植入类似状态机的东西,进而将多线程编程进化为“服务单元”编程的技术。
接下来我们来看一个基于此项目的Python的TCP服务单元代码。
---------------------------------------------------------------------------------------
创建状态机类与服务单元类
首先,我们利用python构建一个可用于继承的,构建状态机的东西。
然后,我们继承这个类以及线程类,以构建服务单元类(cThread)
#cThread.py
#服务节点(原可控线程)类的定义
import time
import threading
class StatusMachine(object):
#状态机
def __init__(self):
'''初始态静止'''
self._stat = 0
self._lock = threading.Lock()
self._signal = threading.Semaphore(0)
def _check(self):
'''
-1(终止)
0(停止)
1(唤醒)
2(运行)
'''
with self._lock:
return self._stat
def _shut(self):
self._lock.acquire()
self._stat = -1
self._signal.release()
self._lock.release()
def _stop(self):
with self._lock:
self._stat = 0
self._signal.release()
def _wake(self):
with self._lock:
self._stat = 1
self._signal.release()
def _run(self):
'''在唤醒事件中使用'''
with self._lock:
self._stat = 2
self._signal.release()
def _suspend(self):
self._signal.acquire()
def thread_main(th):
'''线程的主函数'''
while th._check() != -1:
while th._check() == 0:
th.StopEvent()
if th._check() == 1:
th.WakeEvent()
while th._check() == 2:
th._signal.release()
th.LoopEvent()
th.ClearEvent()
class cThread(StatusMachine, threading.Thread):
'''可控制线程'''
def __init__(self):
'''构造函数,先执行父类的构造'''
StatusMachine.__init__(self)
threading.Thread.__init__(self, target=thread_main, args=(self,))
#启动
self.start()
class TestThread(cThread):
'''测试线程'''
def __init__(self):
cThread.__init__(self)
def StopEvent(self):
print('I am stopped.')
self._suspend()
def WakeEvent(self):
print('Wake up.')
self._run()
def LoopEvent(self):
print('I am running.')
time.sleep(1)
def ClearEvent(self):
print('Clear');
if __name__ == '__main__':
test = TestThread()
time.sleep(2)
test._wake()
time.sleep(2)
test._shut()
---------------------------------------------------------------------------------------
创建可接收任意长度消息的通信类
接下来根据我的另一篇博客,创建一个可接受任意长度的通信方法类。
具体实现方式是先发送消息的长度,接收方根据长度信息进行接收。
#这个文件包含了通信类,里面有根据套接字发送和接收数据静态类
import time
import json
import socket
class Communication:
LENGTH = 1024
def send(sock, msg):
first = json.dumps({"length":len(msg.encode())}).encode()
sock.sendall(first)
recvmsg = sock.recv(Communication.LENGTH)
assert json.loads(recvmsg.decode())['response'] == '1ok'
sock.sendall(msg.encode())
#recvmsg = sock.recv(Communication.LENGTH)
#assert json.loads(recvmsg.decode())['response'] == '2ok'
def recv(sock):
recvmsg = sock.recv(Communication.LENGTH)
d = json.loads(recvmsg.decode())
l = d['length']
first = json.dumps({"response":"1ok"}).encode()
sock.sendall(first)
recvmsg = sock.recv(l)
assert len(recvmsg) == l
#second = json.dumps({"response":"2ok"}).encode()
#sock.sendall(second)
return recvmsg.decode()
def lci_print(msg, addr="127.0.0.1", port=5200, pfunc=print):
pfunc(':Send to %s:%d' % (addr, port))
try:
data = {
"communication": {
"instruction": msg
}
}
json_string = json.dumps(data)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((addr, port))
Communication.send(sock, json_string)
s = Communication.recv(sock)
pfunc(':connection successed.')
except ConnectionRefusedError as e:
pfunc(':connection refused')
except ConnectionResetError as e:
pfunc(':connection reset')
except socket.timeout as e:
pfunc(':time out')
# except Exception as e:
# return '连接失败 :' + str(e)
def lci_recv(socket):
s = Communication.recv(socket)
json_object = json.loads(s)
Communication.send(socket, 'allok')
return json_object['communication']['instruction']
if __name__ == '__main__':
while True:
lci_print("我已上线。")
---------------------------------------------------------------------------------------
构建TCP服务单元
#tcpserv.py
#这个文件包含了服务器节点的定义,所谓服务器节点就是一个单独的可控制线程
import time
import select
import socket
from cThread import cThread
from Comm import lci_recv
class tcpserver(cThread):
'''这个类在状态改变为非运行态时,关闭套接子。'''
#------------------------------------------------------
def __init__(self, port=5200, pfunc=print):
cThread.__init__(self)
self.port = port
self.socket = None
self.pfunc = pfunc
#------------------------------------------------------
'''
def _shut(self):
with self._lock:
self._stat = -1
self._signal.release()
# 关闭套接字
if self.socket: self.socket.close()
def _stop(self):
with self._lock:
self._stat = 0
self._signal.release()
# 关闭套接字
if self.socket: self.socket.close()
def _wake(self):
with self._lock:
self._stat = 1
self._signal.release()
# 关闭套接字,因为套接字成员只可能在唤醒后有值
if self.socket: self.socket.close()
'''
#------------------------------------------------------
def StopEvent(self):
self._suspend()
def WakeEvent(self):
self.pfunc('tcpserver wake up, port %d' % self.port)
# 创建套接字对象
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 获取本地主机名
#host = socket.gethostname()
host = '0.0.0.0'
port = self.port
# 设置为非阻塞模式
self.socket.setblocking(False)
#self.socket.settimeout(0.2)
# 绑定并监听
self.socket.bind((host, port))
self.socket.listen(10)
self.inputs = [self.socket]
self.outputs = []
#线程启动
self._run()
def LoopEvent(self):
try:
#使用select等待至少一个套接字准备好
readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, 0.1)
'''
print(readable)
print(writable)
print(exceptional)
print('')
'''
for s in readable:
if s is self.socket:
#处理新的连接
connection, client_address = s.accept()
#加上此句会导致堵塞
#self.pfunc(':connection from' + str(client_address))
connection.settimeout(1)
self.inputs.append(connection)
else:
#交流
#...
self.pfunc(':' + lci_recv(s))
self.inputs.remove(s)
s.close()
for s in exceptional:
self.pfunc(':handling exceptional condition for' + str(s.getpeername()))
self.inputs.remove(s)
s.close()
except OSError as e: self.pfunc(':Socket is already closed.') # 套接字已经关闭,读取了关闭的套接字
finally: pass
def ClearEvent(self):
for s in self.inputs:
s.close()
#self.pfunc(':tcpserver, clear.')
def BaseTest():
'''基本的测试, 测试是否能够关闭'''
server = tcpserver(5200)
server._wake()
#time.sleep(1)
#server._shut()
if __name__ == '__main__':
BaseTest()
标签:__,socket,Python,self,sock,程与,TCP,._,def
From: https://blog.csdn.net/gapcuda/article/details/141678547