首页 > 其他分享 >粘包问题、socketserver模块实现并发

粘包问题、socketserver模块实现并发

时间:2024-06-22 19:53:44浏览次数:21  
标签:socketserver res len 粘包 并发 client recv conn size

TCP协议------------黏包现象

 1 1.服务端连续执行三次recv
 2 2.客户端连续执行三次send
 3 问题:服务端一次性接收到了客户端三次的消息 该现象称为"黏包现象"
 4 --------------------------------------
 5 黏包现象产生的原因:
 6     1.收消息的时候,不知道每次接收的数据到底有多大!!!!!!
 7             recv()括号里面规定了每次收消息的字节数,所以如果客户端连续发几次消息,但是总字节数小于括号里面规定收消息的字节数
 8     2.TCP也称为流式协议:数据像水流一样,没有间隔。
 9 TCP里面有一个算法会针对数据量较小且发送间隔较短的多条数据一次性合并打包发送,服务端就会一次性全都接收到!!!
10 ---------------
11 避免黏包现象的核心思路\关键点:
12     如何明确即将接收的数据具体的字节数有多大!!!!!!
13 ---------------
14 ps:如何将长度变化的数据全部制作成固定长度的数据??????

解决粘包问题办法一:

 1 服务端:
 2 
 3 import subprocess
 4 from socket import *
 5 import struct
 6 
 7 server = socket(AF_INET, SOCK_STREAM)
 8 server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
 9 server.bind(('127.0.0.1', 8891))
10 server.listen(5)
11 
12 # 服务端应该做两件事:
13 # 1.循环的从半链接池中取出链接请求与其建立双向链接,拿到链接对象
14 while True:
15     conn, client_addr = server.accept()
16     print('Connected by', client_addr)
17     # 2.拿到链接对象,与其进行通信循环
18     while True:
19         try:
20             res = conn.recv(1024)
21             if len(res) == 0: break
22             obj = subprocess.Popen(res.decode('utf-8')
23                                    , shell=True,
24                                    stdout=subprocess.PIPE,
25                                    stderr=subprocess.PIPE)
26             stdout_res = obj.stdout.read()
27             stderr_res = obj.stderr.read()
28             total_size = len(stdout_res) + len(stderr_res)
29             # print(len(stderr_res)+len(stdout_res))
30             #  把执行结果返回给客户端(错误和正确都返回)
31 
32             # 1.先发头信息(固定长度的bytes):对数据进行描述
33             # int--->固定长度的bytes(使用import struct模块),组包
34             header = struct.pack('i', total_size)
35             conn.send(header)
36 
37             # 2.再发真实的数据
38             conn.send(stdout_res)
39             conn.send(stderr_res)
40             # with open("1.mp4",mode='rb') as f:
41             #     for lin in f:
42             #         conn.send(lin)
43         except Exception:
44             break
45 
46     conn.close()
47 
48 
49 ==============================================
50 客户端
51 
52 from socket import *
53 import struct
54 
55 #  创建socket对象
56 client = socket(AF_INET, SOCK_STREAM)
57 client.connect(('127.0.0.1', 8891))
58 
59 while True:
60     msg = input('>>:').strip()
61     if len(msg) == 0: continue
62     client.send(msg.encode('utf-8'))
63     # 解决粘包问题的思路:
64     # 1.先收固定长度的头,解析出数据的描述信息,包括数据的总大小,total_size
65     header = client.recv(4)
66     #  解析头信息,获取数据的总大小struct.unpack
67     total_size = struct.unpack('i', header)[0]
68     print('total_size:', total_size)
69     # 2.根据解析出的描述信息,接收真实的数据:recv_size=0,循环接收,每接收一次,recv_size+=接收的长度
70     # 3.当recv_size==total_size时,说明接收完毕
71     recv_size = 0
72     while recv_size < total_size:
73         recv_data = client.recv(1024)
74         recv_size += len(recv_data)
75         print(recv_data.decode('gbk'), end='')
76     else:
77         print()

 

代码实现终极解决粘包问题

 1 服务端
 2 
 3 import subprocess
 4 from socket import *
 5 import struct
 6 import json
 7 
 8 server = socket(AF_INET, SOCK_STREAM)
 9 server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
10 server.bind(('127.0.0.1', 8891))
11 server.listen(5)
12 
13 # 服务端应该做两件事:
14 # 1.循环的从半链接池中取出链接请求与其建立双向链接,拿到链接对象
15 while True:
16     conn, client_addr = server.accept()
17     print('Connected by', client_addr)
18     # 2.拿到链接对象,与其进行通信循环
19     while True:
20         try:
21             res = conn.recv(1024)
22             if len(res) == 0: break
23             obj = subprocess.Popen(res.decode('utf-8')
24                                    , shell=True,
25                                    stdout=subprocess.PIPE,
26                                    stderr=subprocess.PIPE)
27             stdout_res = obj.stdout.read()
28             stderr_res = obj.stderr.read()
29             total_size = len(stdout_res) + len(stderr_res)
30             # print(len(stderr_res)+len(stdout_res))
31             # 1.文件真实存在:制作头(头信息)
32             header_dic = {
33                 'filename': 'a.txt',
34                 'total_size': total_size,
35                 'md5': '123456'
36             }
37             # conn.send(json.dumps(header_dic).encode('utf-8'))
38             json_str = json.dumps(header_dic)
39             json_str_bytes = json_str.encode('utf-8')
40 
41             # 2.先发头的长度信息(固定长度的bytes):对数据进行描述
42             header_size = struct.pack('i', len(json_str_bytes))
43             conn.send(header_size)
44             # 3.发头信息
45             conn.send(json_str_bytes)
46 
47             # 4.再发真实的数据
48             conn.send(stdout_res)
49             conn.send(stderr_res)
50 
51 
52         except Exception:
53             break
54 
55     conn.close()
56 
57 
58 ================================================
59 客户端
60 
61 from socket import *
62 import struct
63 import json
64 
65 #  创建socket对象
66 client = socket(AF_INET, SOCK_STREAM)
67 client.connect(('127.0.0.1', 8891))
68 
69 while True:
70     msg = input('>>:').strip()
71     if len(msg) == 0: continue
72     client.send(msg.encode('utf-8'))
73     # 接收端
74     # 1.先收4个字节,从中提取接下来要收的头的长度
75     header_size = client.recv(4)
76     header_len = struct.unpack('i', header_size)[0]
77     # 2.接收头,并解析
78     json_str_bytes = client.recv(header_len)
79     json_str = json_str_bytes.decode('utf-8')
80     header_dic = json.loads(json_str)
81     print(header_dic)
82     total_size = header_dic['total_size']
83     # 提取信息:total_size=header_dic['total_size'],filename,md5
84     # 3.接收真实的数据
85     recv_size = 0
86     while recv_size < total_size:
87         recv_data = client.recv(1024)
88         recv_size += len(recv_data)
89         print(recv_data.decode('gbk'), end='')
90     else:
91         print()

 

PS补充:

 

【socketserver模块实现并发】

(基于tcp)

 1 服务端
 2 
 3 import socketserver
 4 
 5 
 6 class MyRequestHandler(socketserver.BaseRequestHandler):  # 继承BaseRequestHandler类
 7     def handle(self):
 8         # print(self.request)  # 如果tcp协议,self.request==>conn
 9         print(self.client_address)
10 
11         while True:
12             try:
13                 msg = self.request.recv(1024)
14                 if len(msg) == 0: break
15                 self.request.send(msg.upper())
16             except Exception:
17                 break
18 
19         self.request.close()
20 
21 
22 # 1.创建一个服务端对象,绑定端口,启动服务
23 s = socketserver.ThreadingTCPServer(('127.0.0.1', 8081), MyRequestHandler)
24 s.serve_forever()
25 # 等同于while True:
26 # conn, client_addr = server.accept()
27 # 启动一个线程(conn,client_addr)
28 
29 # 2.拿到链接对象,与其进行通信循环====》handle
30 
31 
32 =================================================
33 客户端
34 
35 from socket import *
36 
37 client = socket(AF_INET, SOCK_STREAM)
38 client.connect(('127.0.0.1', 8081))
39 
40 while True:
41     msg = input('>>:').strip()
42     if len(msg) == 0: continue
43     client.send(msg.encode('utf-8'))
44     res = client.recv(1024)
45     print(res.decode('utf-8'))

 

(基于udp)

 1 import socketserver
 2 
 3 
 4 class MyRequestHandler(socketserver.BaseRequestHandler):
 5     def handle(self):
 6         client_data = self.request[0]
 7         server = self.request[1]
 8         client_address = self.client_address
 9         print('客户端发来的数据%s' % client_data)
10         server.sendto(client_data.upper(), client_address)
11 
12 
13 s = socketserver.ThreadingUDPServer(('127.0.0.1', 8089), MyRequestHandler)
14 s.serve_forever()
15 
16 # 相当于:只负责循环的收
17 # while True:
18 #     data,client_addr=server.recvfrom(1024)
19 #       启动一个线程处理后续的事情
20 #     server.sendto(data.upper(),client_addr)
21 
22 
23 ==============================================
24 客户端
25 
26 import socket
27 
28 client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
29 while True:
30     msg = input('>>:').strip()
31     client.sendto(msg.encode('utf-8'), ('127.0.0.1', 8089))
32     data, server_addr = client.recvfrom(1024)
33     print('Receive from server:', data.decode('utf-8'))
34 
35 client.close()

 

 

标签:socketserver,res,len,粘包,并发,client,recv,conn,size
From: https://www.cnblogs.com/liuliu1/p/18262669

相关文章

  • ConcurrentHashMap(并发工具类)
    并发工具类在JDK的并发包里提供了几个非常有用的并发容器和并发工具类。供我们在多线程开发中进行使用。5.1ConcurrentHashMap5.1.1概述以及基本使用在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。为了保证数据的安全性我......
  • 基于Redis和openresty实现高并发缓存架构
    目录概述缓存架构设计实践代码路由业务封装redis效果概述  本文是对项目中QPS高并发相关问题的一种解决方案,利用Nginx与Redis的高并发、超低延迟响应,结合Canal进行实现。openrestry官网  当程序需要提供较高的并发访问时,往往需要在程序中引入缓存......
  • Java 的多线程和并发处理,在项目中是如何使用它?
    在Java开发中,多线程和并发是我们经常需要处理的问题。它们能够让我们的应用在完成一些耗时任务的同时,仍然保持对用户的响应,提高了应用的性能和用户体验。接下来,让我们来详细地了解一下Java中的多线程和并发处理。Java中的多线程在说到多线程之前,我们首先要了解什么是线程。......
  • 并发编程的三大特性
    并发编程的三大特性一、原子性1.1什么是并发编程的原子性JMM(JavaMemoryModel)。不同的硬件和不同的操作系统在内存上的操作有一定差异的。Java为了解决相同代码在不同操作系统上出现的各种问题,用JMM屏蔽掉各种硬件和操作系统带来的差异。让Java的并发编程可以做到跨平......
  • 一文读懂Java多线程并发之内存模型
     什么是内存模型?Java内存模型(JavaMemoryModel)描述了Java编程语言中的线程如何与内存进行交互,是和多线程相关的一组规范,需要各个JVM的实现来遵守JMM规范,以便于开发者可以利用这些规范,更方便地开发多线程程序。有了这些规范,即便同一个程序在不同操作系统的虚拟机上运行......
  • 并发业务使用redis分布式锁
    伴随着业务体量的上升,我们的qps与并发问题越来越明显,这时候就需要用到让代码一定情况下进行串行执行的工具:锁1.业务场景代码@Override@Transactional(rollbackFor=Exception.class)publicObjecttestBatch(Useruser){LambdaQueryWrapper<Us......
  • Golang并发
    Sync.MutexMutex结构typeMutexstruct{ stateint32 semauint32}Sync.Mutex由两个字段构成,state用来表示当前互斥锁处于的状态,sema用于控制锁状态的信号量互斥锁state(32bit)主要记录了如下四种状态:waiter_num(29bit):记录了当前等待这个锁的goroutine数量starving(......
  • Tcp粘包半包问题(现实场景举例帮助理解)
    理解粘包问题时,我们可以将这个过程想象得更加生活化一些。想象你正在经营一家水果拼装店,你的任务是接收来自不同客户的水果订单,并将这些水果按照订单要求重新组装起来。每份订单中的水果都事先被切成了便于快递的“水果片”,并通过同一条传送带送过来。现在,你收到了两份订单,一......
  • CompletableFuture多线程并发处理
    CompletableFuture多线程并发处理   概要  一个接口可能需要调用N个其他服务的接口,这在项目开发中还是挺常见的。举个例子:用户请求获取订单信息,可能需要调用用户信息、商品详情、物流信息、商品推荐等接口,  如果是串行(按顺序依次执行每个任务)执行的话,接口的响应速......
  • MVCC多版本并发控制
    MVCC(MultiVersionConcurrencyControl)多版本并发控制,是指在使用READCOMMITTED、REPEATABLEREAD这两种隔离级别的事务执行SELECT操作时访问记录的版本链的过程,使不同事务的读写操作能够并发执行,提升系统性能。MVCC机制的核心是在做SELECT操作前会生产一个ReadView,READCO......