密码学专题训练
实 验 报 告
实验二 signal协议
2.1简介
Signal是一种私人通信平台,它使用Signal Protocol作为加密协议来保护用户的信息安全和隐私。Signal Protocol是一种端到端的加密通信协议,具有极高的安全性。它被认为是世界上最安全的通信协议之一。该协议的基本原理是在通信双方之间进行终端加密,这意味着只有发送者和接收者能够解密消息,即使Signal自身也无法查看或存储消息的内容。
2.2基础知识
迪菲-赫尔曼密钥交换协议(Diffie–Hellman key exchange)
允许通信两方在没有安全预共享密钥的情况下建立共享秘密,并可以用于加密通信。DH协议的基本思想是通过模数取幂的方式得出一个共享的秘密值,而这个结果对外界来说是不可知的。
拟定Alice和Bob要确定一个消息密钥
1、Alice和Bob各自创建符合DH协议的密钥对,假设Alice密钥对为(私钥A,公钥A),Bob密钥对为(私钥B,公钥B)
2、双方发送自己的公钥给对方,即使有黑客监听,他只能得到公钥A,公钥B
3、Alice用自己的私钥和Bob的公钥计算消息密钥为S,即DH (私钥A,公钥B) =密钥S
4、Bob用自己的私钥和Alice的公钥计算消息密钥也为S;即DH (私钥B,公钥A) =密钥S
5、双方同时确定了协商密钥S,后续可以通过S衍生出消息密钥,进行加密通讯。
6、黑客只知道公钥A和公钥B,因为不知道任意一方的私钥,所以无法计算出密钥S。
该实验中没有第四次
2.3双棘轮算法
棘轮:棘轮就是一种特殊的齿轮,他只能往一个方向转下去,而不能往回转。
单棘轮算法:只向一个方向转动,不能往回转,保证前向安全或后向安全。
KDF算法也是一种棘轮算法,KDF算法导出的KDF链只能往后面派生,而不能计算出前向的密钥,这就保证了,如果某一轮的密钥被破解出来,但前面的密钥是无法计算出来的,也就是前面的消息无法被解密。
如果再加上一个棘轮算法,就可以再前向安全的基础上保障后向安全,即一条消息的密钥被破解,之前和之后的消息密钥都无法推算,这种算法被称为“双棘轮算法”signal protocol协议双棘轮加密算法为:“KDF链棘轮”+“DH棘轮”。以保证消息的前向安全和后向安全。
- 棘轮一
DH棘轮,DH棘轮运作在EK变换的DH链上
步进条件:
当一方角色转换时,即角色在发送者和接收者之间出现转换时
步进方式:
①情况一:当接收者转为发送者时,生成自己新的EK对,利用自己新EK私钥,步进棘轮
②情况二:当发送者转为接收者时,收到对方新的EK公钥,利用对方新EK公钥,步进棘轮
- 棘轮二
KDF棘轮(也称为对称密钥棘轮),KDF棘轮运作在根链,接收链,发送链这三条链上
步进条件:
当要发送一条新信息(即需要新的加密密钥),或者要接收一条新信息(即需要新的解密密钥)时
步进方式:
①情况一:当要发送加密信息或接收解密信息时,DH棘轮未出现步进时。直接在当前的发送或接收链上往前做KDF步进
②情况二:当要发送加密信息或接收解密信息时,DH棘轮出现步进时。需要先步进DH棘轮,之后在新的DH棘轮的衍生下,做崭新的KDF接收链或发送链步进
2.4数学定理证明
假设,用户Alice有一个秘密整数a=6,用户Bob有一个秘密整数b=15。Alice和Bob提前协商了两数字,它们分别是素数p=23及其原根g=5(设m是正整数,a是整数,若a mod m的阶等于φ(m),则称a为模m的一个原根。其中φ(m)表示m的欧拉函数)。Alice通过a计算出公共整数A=g^a mod p=5^6 mod 23=8,Bob通过b计算出公共整数B=g^b mod p=5^15 mod 23=19。通过公共网络,Alice和Bob交换了公共整数A和B。然后,A使用自己的秘密整数a和对方的公共整数B计算得到协商数字s=B^a mod p=19^6 mod 23=2,而B也使用自己的秘密整数b和对方的公共整数A计算得到协商整数s=A^b mod p=8^15 mod 23=2。此时,Alice和Bob成功地获得了相同的协商整数s。而自始至终,Alice和Body只通过公共网络获取了对方的公共整数,即便有人也得到了公共整数,也无法计算出同样的协商整数s。
Windows 11
Python Python 3.10.11
Pycharm 2023
四、实验内容与步骤
该项目分为server.py、client.py、utils.py、gui_client1.py、gui_client2.py、double_ratchet_offline.py
4.1Utils.py:
import base64
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import x25519
def b64_encode(msg):
# base64 encoding helper function
return base64.b64encode(msg)
def b64_decode(msg):
# base64 decoding helper function
return base64.b64decode(msg)
def pad(msg):
# pkcs7 padding
msg = bytes(msg,'ascii')
num = 16 - (len(msg) % 16)
return msg + bytes([num] * num)
def unpad(msg):
# remove pkcs7 padding
return msg[:-msg[-1]]
def hkdf(inp, length):
# use HKDF on an input to derive a key
hkdf = HKDF(algorithm=hashes.SHA256(), length=length, salt=b'',
info=b'', backend=default_backend())
return hkdf.derive(inp)
def pk_to_bytes(pk_obj):
return pk_obj.public_bytes(encoding=serialization.Encoding.Raw,format=serialization.PublicFormat.Raw)
def byte_to_pk(byte):
return x25519.X25519PublicKey.from_public_bytes(byte)
这段 Python 代码包含了几个函数,每个函数都执行不同的加密、编码或解码操作。下列是每个函数的作用:
1、b64_encode(msg) 和 b64_decode(msg) 函数是用来执行 Base64 编码和解码的操作。Base64 是一种编码方式,用于将二进制数据转换为可打印字符,便于在文本协议中传输。b64_encode 接受一个消息 msg 并对其进行 Base64 编码,而 b64_decode 则对 Base64 编码的消息进行解码。
2、pad(msg) 函数执行的是 PKCS7 填充。PKCS7 是一种填充方案,用于填充消息,使其长度能够被块大小整除。在这个函数中,消息 msg 首先被转换为 ASCII 编码的字节序列,然后计算需要填充的字节数,接着在末尾添加相应数量的字节以达到填充要求。
3、unpad(msg) 函数执行的是 PKCS7 的反向操作,即移除填充。根据 PKCS7 规则,函数通过检查最后一个字节的值确定了需要移除的填充字节数,然后将这些字节从消息中移除。
4、hkdf(inp, length) 函数使用 HKDF(HMAC-based Extract-and-Expand Key Derivation Function)算法对输入 inp 执行密钥派生,生成指定长度 length 的密钥。HKDF 是一种密钥派生函数,用于从输入生成一个或多个密钥。
5、pk_to_bytes(pk_obj) 和 byte_to_pk(byte) 函数涉及公钥的序列化和反序列化。pk_to_bytes 函数接受一个公钥对象 pk_obj,将其转换为原始的字节表示。byte_to_pk 函数接受字节表示的公钥,并将其转换回公钥对象。
4.2double_ratchet_offline.py
在此Python 代码实现了端到端加密协议,基于Double Ratchet算法。这个算法在实现端到端加密时非常有用,比如在即时通讯应用中确保消息的安全传输。在这个代码中Alice和Bob通过X3DH协议进行密钥交换,然后使用Double Ratchet算法来进行端到端加密的通信。
主要部分:
1.密钥协商和初始化
使用了X25519曲线进行密钥交换。
X3DH方法用于执行四个Diffie-Hellman密钥交换,得到一个共享的密钥sk。init_ratchet方法初始化了根密钥链和发送/接收密钥链。
2. Double Ratchet
SymmRatchet类包含了状态以及生成新的对称密钥的方法next基于输入来改变状态并生成新的密钥和IV。send方法用于发送加密消息,生成密文并将其发送给对方,同时传递当前的DH公钥。recv方法用于接收加密消息,解密消息并打印解密后的内容。
3.加密和解密
采用了对称加密算法AES-CBC。
pad和unpad方法用于填充和去除填充,采用了PKCS7标准填充方案。
4.实例化Alice和Bob
Alice和Bob分别执行X3DH密钥交换,并初始化各自的密钥链。Alice和Bob可以通过send和recv方法进行加密消息的发送和接收。
5.流程:
(1)Alice和Bob执行X3DH密钥交换。
(2)Alice初始化密钥链并使用Bob的DH公钥进行ratchet。
(3)Alice向Bob发送加密消息,同时发送她的新DH公钥。
(4)Bob接收Alice的消息,使用她的DH公钥进行ratchet并向Alice回复加密消息。
这个流程会持续下去,每次发送消息时,发送方都会更新密钥链,并且接收方会使用新的密钥链来解密消息。这保证了消息的安全性和前向保密。
4.3使用流程:
首先启动server.py, 程序自动设置服务器的主机和端口:
host = 127.0.0.1 port = 7976
将主机地址和端口绑定到Socket并开始监听连接
运行gui_client1.py
在弹出的窗口输中入Alice点击continue
运行gui_client2.py
在弹出的两个对话框中可以任意输入,对方可以接受消息
实验代码:
double_ratchet_offline.py
import base64
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey, Ed25519PrivateKey
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend
from Cryptodome.Cipher import AES
def b64(msg):
# base64 encoding helper function
return base64.encodebytes(msg).decode('utf-8').strip()
def hkdf(inp, length):
# use HKDF on an input to derive a key
hkdf = HKDF(algorithm=hashes.SHA256(), length=length, salt=b'',
info=b'', backend=default_backend())
return hkdf.derive(inp)
class SymmRatchet(object):
def __init__(self, key):
self.state = key
def next(self, inp=b''):
# turn the ratchet, changing the state and yielding a new key and IV
print("state", self.state)
print("inp", inp)
output = hkdf(self.state + inp, 80)
print("output", output)
self.state = output[:32]
outkey, iv = output[32:64], output[64:]
return outkey, iv
class Bob(object):
def __init__(self):
# generate Bob's keys
self.IKb = X25519PrivateKey.generate()
self.SPKb = X25519PrivateKey.generate()
self.OPKb = X25519PrivateKey.generate()
self.DHratchet = X25519PrivateKey.generate()
def x3dh(self, alice):
# perform the 4 Diffie Hellman exchanges (X3DH)
dh1 = self.SPKb.exchange(alice.IKa.public_key())
dh2 = self.IKb.exchange(alice.EKa.public_key())
dh3 = self.SPKb.exchange(alice.EKa.public_key())
dh4 = self.OPKb.exchange(alice.EKa.public_key())
# the shared key is KDF(DH1||DH2||DH3||DH4)
self.sk = hkdf(dh1 + dh2 + dh3 + dh4, 32)
print(f"sk:: {self.sk} \n\n")
print('[Bob]\tShared key:', b64(self.sk))
def init_ratchets(self):
# initialise the root chain with the shared key
self.root_ratchet = SymmRatchet(self.sk)
# initialise the sending and recving chains
self.recv_ratchet = SymmRatchet(self.root_ratchet.next()[0])
self.send_ratchet = SymmRatchet(self.root_ratchet.next()[0])
def dh_ratchet(self, alice_public):
# perform a DH ratchet rotation using Alice's public key
dh_recv = self.DHratchet.exchange(alice_public)
shared_recv = self.root_ratchet.next(dh_recv)[0]
# use Alice's public and our old private key
# to get a new recv ratchet
self.recv_ratchet = SymmRatchet(shared_recv)
print('[Bob]\tRecv ratchet seed:', b64(shared_recv))
# generate a new key pair and send ratchet
# our new public key will be sent with the next message to Alice
self.DHratchet = X25519PrivateKey.generate()
dh_send = self.DHratchet.exchange(alice_public)
shared_send = self.root_ratchet.next(dh_send)[0]
self.send_ratchet = SymmRatchet(shared_send)
print('[Bob]\tSend ratchet seed:', b64(shared_send))
def send(self, alice, msg):
key, iv = self.send_ratchet.next()
cipher = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(msg))
print('[Bob]\tSending ciphertext to Alice:', b64(cipher))
# send ciphertext and current DH public key
alice.recv(cipher, self.DHratchet.public_key())
def recv(self, cipher, alice_public_key):
# receive Alice's new public key and use it to perform a DH
self.dh_ratchet(alice_public_key)
key, iv = self.recv_ratchet.next()
# decrypt the message using the new recv ratchet
msg = unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(cipher))
print('[Bob]\tDecrypted message:', msg)
class Alice(object):
def __init__(self):
# generate Alice's keys
self.IKa = X25519PrivateKey.generate()
self.EKa = X25519PrivateKey.generate()
self.DHratchet = None
def x3dh(self, bob):
# perform the 4 Diffie Hellman exchanges (X3DH)
dh1 = self.IKa.exchange(bob.SPKb.public_key())
dh2 = self.EKa.exchange(bob.IKb.public_key())
dh3 = self.EKa.exchange(bob.SPKb.public_key())
dh4 = self.EKa.exchange(bob.OPKb.public_key())
# the shared key is KDF(DH1||DH2||DH3||DH4)
self.sk = hkdf(dh1 + dh2 + dh3 + dh4, 32)
print('[Alice]\tShared key:', b64(self.sk))
def init_ratchets(self):
# initialise the root chain with the shared key
self.root_ratchet = SymmRatchet(self.sk)
# initialise the sending and recving chains
self.send_ratchet = SymmRatchet(self.root_ratchet.next()[0])
self.recv_ratchet = SymmRatchet(self.root_ratchet.next()[0])
def dh_ratchet(self, bob_public):
# perform a DH ratchet rotation using Bob's public key
if self.DHratchet is not None:
# the first time we don't have a DH ratchet yet
dh_recv = self.DHratchet.exchange(bob_public)
shared_recv = self.root_ratchet.next(dh_recv)[0]
# use Bob's public and our old private key
# to get a new recv ratchet
self.recv_ratchet = SymmRatchet(shared_recv)
print('[Alice]\tRecv ratchet seed:', b64(shared_recv))
# generate a new key pair and send ratchet
# our new public key will be sent with the next message to Bob
self.DHratchet = X25519PrivateKey.generate()
dh_send = self.DHratchet.exchange(bob_public)
shared_send = self.root_ratchet.next(dh_send)[0]
self.send_ratchet = SymmRatchet(shared_send)
print('[Alice]\tSend ratchet seed:', b64(shared_send))
def send(self, bob, msg):
key, iv = self.send_ratchet.next()
cipher = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(msg))
print('[Alice]\tSending ciphertext to Bob:', b64(cipher))
# send ciphertext and current DH public key
bob.recv(cipher, self.DHratchet.public_key())
def recv(self, cipher, bob_public_key):
# receive Bob's new public key and use it to perform a DH
self.dh_ratchet(bob_public_key)
key, iv = self.recv_ratchet.next()
# decrypt the message using the new recv ratchet
msg = unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(cipher))
print('[Alice]\tDecrypted message:', msg)
def pad(msg):
# pkcs7 padding
num = 16 - (len(msg) % 16)
return msg + bytes([num] * num)
def unpad(msg):
# remove pkcs7 padding
return msg[:-msg[-1]]
alice = Alice()
bob = Bob()
# Alice performs an X3DH while Bob is offline, using his uploaded keys
alice.x3dh(bob)
# Bob comes online and performs an X3DH using Alice's public keys
bob.x3dh(alice)
# Initialize their symmetric ratchets
alice.init_ratchets()
bob.init_ratchets()
# Initialise Alice's sending ratchet with Bob's public key
alice.dh_ratchet(bob.DHratchet.public_key())
# Alice sends Bob a message and her new DH ratchet public key
alice.send(bob, b'Hello Bob!')
# Bob uses that information to sync with Alice and send her a message
bob.send(alice, b'Hello to you too, Alice!')
# # Alice sends Bob a message and her new DH ratchet public key
# alice.send(bob, b'asfd Bob!')
# # Bob uses that information to sync with Alice and send her a message
# bob.send(alice, b'Heasdfo, Alice!')
# # Alice sends Bob a message and her new DH ratchet public key
# alice.send(bob, b'Hasdfllo Bob!')
# # Bob uses that information to sync with Alice and send her a message
# bob.send(alice, b'Hello asdfasd, Alice!')
Sever.Py
import socket
import threading
import json
import time
print("Server starting....")
time.sleep(2)
host = '127.0.0.1' #LocalHost
port = 7976 #Choosing unreserved port
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #socket initialization
server.bind((host, port)) #binding host and port to socket
server.listen()
print(f"Server Listening at: {host, port}")
clients = []
nicknames = []
pub_key_bundle = {}
def broadcast(message): #broadcast function declaration
for client in clients:
client.send(message)
def handle(client):
while True:
try: #recieving valid messages from client
message = client.recv(2048).decode('utf-8')
print(message)
try:
act_msg = message.split(":")[1]
# print(act_msg)
except:
act_msg = message
if act_msg in nicknames:
# print(True)
send_pk = pub_key_bundle[act_msg]
# print(send_pk)
client.send(str(send_pk).encode('utf-8'))
else:
for i in clients:
if(i != server and i != client):
i.sendall(message.encode('utf-8'))
# broadcast(message.encode('utf-8'))
except: #removing clients
index = clients.index(client)
clients.remove(client)
client.close()
nickname = nicknames[index]
broadcast('{} left!'.format(nickname).encode('utf-8'))
nicknames.remove(nickname)
break
def receive(): #accepting multiple clients
while True:
client, address = server.accept()
print("Connected with {}".format(str(address)))
#send initial message to client
client.send('NICKNAME'.encode('utf-8'))
#receive nickname and pk from client
nickname = client.recv(2048).decode('utf-8')
client_pk = client.recv(2048)
pub_key_bundle[nickname] = client_pk
nicknames.append(nickname)
clients.append(client)
print("Nickname is {}".format(nickname))
print(f'[{nickname}]: PK received')
broadcast("{} joined!".format(nickname).encode('utf-8'))
client.send('Connected to server!'.encode('utf-8'))
#sending public key bundle to client
pk_bundle_endoded = "\nAvailable Users\n" + str(nicknames)
client.send(pk_bundle_endoded.encode('utf-8'))
# time.sleep(0.5)
# client.send("Talk".encode('utf-8'))
thread = threading.Thread(target=handle, args=(client,))
thread.start()
receive()
Cilint客户端
from logging import raiseExceptions
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey, Ed25519PrivateKey
from Cryptodome.Cipher import AES
import json
import socket, threading
from utils import *
import ast
import time
ROOT_KEY = b"o\x99\xa1\xdd@#\xc0\x0b \xec\xf5\x80GI\xbf\xca\x8b\x16}L;j\x02f\x07'\x88\x8f\x816e4"
nickname = input("Choose your nickname: ")
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #socket initialization
client.connect(('127.0.0.1', 7976)) #connecting client to server
class SymmetricRatchet(object):
def __init__(self, key) -> None:
self.state = key
def next(self, inp=b''):
# print("state",self.state)
output = hkdf(self.state + inp, 80)
# print(output)
self.state = output[:32]
outkey = output[32:64]
iv = output[64:]
return outkey, iv
class Client(object):
def __init__(self) -> None:
self.DHratchet = X25519PrivateKey.generate()
self.sk = ROOT_KEY
def init_ratchets(self):
self.root_ratchet = SymmetricRatchet(self.sk)
self.recv_ratchet = SymmetricRatchet(self.root_ratchet.next()[0])
self.send_ratchet = SymmetricRatchet(self.root_ratchet.next()[0])
def dh_ratchet(self, alice_pk):
self.DHratchet = X25519PrivateKey.generate()
dh_send = self.DHratchet.exchange(alice_pk)
shared_send = self.root_ratchet.next(dh_send)[0]
self.send_ratchet = SymmetricRatchet(shared_send)
print('Send ratchet seed:', b64_encode(shared_send))
def receive_ratchet(self,alice_pk):
dh_recv = self.DHratchet.exchange(alice_pk)
shared_recv = self.root_ratchet.next(dh_recv)[0]
self.recv_ratchet = SymmetricRatchet(shared_recv)
print('Recv ratchet seed:', b64_encode(shared_recv))
def enc(self, msg):
key, iv = self.send_ratchet.next()
cipher = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(msg))
return cipher, self.DHratchet.public_key()
def dec(self, cipher, alice_pk):
self.receive_ratchet(alice_pk)
key, iv = self.recv_ratchet.next()
msg = unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(cipher))
print(str(msg,'utf-8'))
alice = Client()
alice.init_ratchets()
pk_obj = alice.DHratchet.public_key()
init_pk = pk_obj.public_bytes(encoding=serialization.Encoding.Raw,format=serialization.PublicFormat.Raw)
'''
Initial Client is alice, subsequent client is BOB
Here Bob tries to communicate with Alice first
'''
def receive():
while True: #making valid connection
try:
message = client.recv(2048).decode('utf-8')
if message == 'NICKNAME': #hello message received from the server.
print("## ##")
print("# Registration Phase #")
print("## ##\n")
init_server_msg = nickname
client.send(init_server_msg.encode('utf-8'))
client.send(init_pk)
print("Public Key Sent to user")
elif message[0:1] == "[": ## receiving pk bundle from server i.e a list.
available_users = ast.literal_eval(message) # convert list
print("## ##")
print("# Active Users #")
print("## ##\n")
print(available_users)
print("Who would you like to talk to??")
elif message[0:4] == "Talk":
print("Who would you like to talk to??")
elif message[0:2] == "b'" or message[0:2] == 'b"': ## if message is pubkey then starts with b(byte)
global alice_pk
if message[-2] == "=":
# print("received_msg_nPK", alice_pk)
byte_msg = ast.literal_eval(message)
decode_msg = b64_decode(byte_msg)
# print("decoded msg:", decode_msg)
alice.dec(decode_msg, alice_pk)
else:
mes = ast.literal_eval(message)
# print(len(mes))
alice_pk = x25519.X25519PublicKey.from_public_bytes(mes)
print("PK received")
else:
print(message)
except Exception as e: #case on wrong ip/port details
print(e)
print("An error occured!")
client.close()
break
def write():
count = 0
while True: #message layout
message = '{}:{}'.format(nickname, input(''))
count += 1
try:
if alice_pk:
count = 2
except:
pass
if count > 1:
alice.dh_ratchet(alice_pk)
cipher, pk = alice.enc(message)
pk_byte = pk_to_bytes(pk)
client.send(str(pk_byte).encode('utf-8'))
time.sleep(0.5)
c = b64_encode(cipher)
client.send(str(c).encode('utf-8'))
else:
client.send(str(message).encode('utf-8'))
receive_thread = threading.Thread(target=receive) #receiving multiple messages
receive_thread.start()
write_thread = threading.Thread(target=write) #sending messages
write_thread.start()
实验心得:
在本次实验中,我成功设计并实现了基于signal协议的双棘轮端到端通信程序。通过该程序,我实现了安全的、实时的通信,并保护了通信数据的机密性和完整性。
了解了signal的双棘轮的原理,学习了一些python库的使用。
标签:协议,signal,Alice,send,ratchet,key,msg,self From: https://www.cnblogs.com/maqun/p/18521659