dh_server.py:
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
import socket
def ecdh_generater(received_public_key):
# Generate private key
private_key = ec.generate_private_key(ec.SECP256R1())
# Get public key
public_key = private_key.public_key()
# Deserialize public key received from other party
received_public_key = serialization.load_pem_public_key(received_public_key)
# Perform key exchange
shared_key = private_key.exchange(ec.ECDH(), received_public_key)
# Serialize public key to send to other party
serialized_public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return serialized_public_key, private_key, shared_key
if __name__ == '__main__':
# 创建 socket 对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connect_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 监听端口
server_socket.bind(("127.0.0.1", 11221))
server_socket.listen()
while True:
# 等待连接
client_socket, addr = server_socket.accept()
# 打印对端地址
print(f"Connection from {addr[0]}:{addr[1]}")
# 接收对端消息
data = client_socket.recv(1024)
# 生成对称秘钥
serialized_public_key, _, shared_key = ecdh_generater(data)
print(shared_key.hex())
# 连接对端
connect_socket.connect(("127.0.0.1", 11220))
# 发送公钥
connect_socket.sendall(serialized_public_key)
# 关闭socket连接
client_socket.close()
break
dh_client.py:
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
import socket
def ecdh_generater():
# Generate private key
private_key = ec.generate_private_key(ec.SECP256R1())
# Get public key
public_key = private_key.public_key()
# Serialize public key to send to other party
serialized_public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return serialized_public_key, private_key
if __name__ == '__main__':
# 创建 socket 对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connect_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 监听端口
server_socket.bind(("127.0.0.1", 11220))
server_socket.listen()
# 连接服务端
connect_socket.connect(("127.0.0.1", 11221))
# 生成公私钥
serialized_public_key, private_key = ecdh_generater()
# 发送公钥
connect_socket.sendall(serialized_public_key)
while True:
# 等待连接
client_socket, addr = server_socket.accept()
# 打印对端地址
print(f"Connection from {addr[0]}:{addr[1]}")
# 接收对端消息
data = client_socket.recv(1024)
# 公钥解码
received_public_key = serialization.load_pem_public_key(data)
# 共享秘钥
shared_key = private_key.exchange(ec.ECDH(), received_public_key)
print(shared_key.hex())
# 关闭连接
client_socket.close()
break
标签:__,socket,Python,ecdh,private,秘钥,key,server,public
From: https://www.cnblogs.com/desireroot7/p/17573550.html