首页 > 编程语言 >用Python实现Cmpp协议的教程

用Python实现Cmpp协议的教程

时间:2024-07-21 15:56:10浏览次数:18  
标签:Cmpp CMPP 教程 header Python self bytes int id

引言&协议概述

(CMPP)是中国移动为实现短信业务而制定的一种通信协议,全称叫做China Mobile Point to Point,用于在客户端(SP,Service Provider)和中国移动短信网关之间传输短消息,有时也叫做移动梦网短信业务。CMPP3.0是该协议的第三个版本,相比于前两个版本,它增加了对长短信的支持、优化了数据结构等。本文对CMPP协议进行介绍,并给出Python实现CMPP协议栈的思路。

Python的asyncio模块提供了一套简洁的异步IO编程模型,非常适合用于实现协议栈。

CMPP协议基于客户端/服务端模型工作。由客户端(短信应用,如手机,应用程序等)先和ISMG(Internet Short Message Gateway 互联网短信网关)建立起TCP长连接,并使用CMPP命令与ISMG进行交互,实现短信的发送和接收。在CMPP协议中,无需同步等待响应就可以发送下一个指令,实现者可以根据自己的需要,实现同步、异步两种消息传输模式,满足不同场景下的性能要求。

连接成功,发送短信并查询短信发送成功

连接成功,从ISMG接收到短信

 

协议帧介绍

在CMPP协议中,每个PDU都包含两个部分:CMPP Header和CMPP Body。

image.png

CMPP Header

Header包含以下字段,大小长度都是4字节

  • Total Length:整个PDU的长度,包括Header和Body。
  • Command ID:用于标识PDU的类型(例如,Connect、Submit等)。
  • Sequence Id:序列号,用来匹配请求和响应。

用Python Asyncio实现CMPP协议栈里的建立连接

可以以本文的代码作为基础,很容易地在上面扩展。

代码结构组织如下:

.
├── LICENSE
├── README.md
├── cmpp
│   ├── __init__.py
│   ├── client.py
│   ├── protocol.py
│   └── utils.py
├── requirements.txt
├── setup.cfg
└── setup.py
  • cmpp/protocol.py:定义不同 CMPP 协议数据单元 (PDU) 的数据类,包括 CmppHeader、CmppConnect、CmppConnectResp、CmppSubmit 和 CmppSubmitResp
  • cmpp/client.py:该类处理与 ISMG(互联网短消息网关)的连接以及发送/接收 PDU。 主要 asyncio 进行异步 I/O 操作
  • cmpp/utils.py:定义 BoundAtomic 类,它是一种线程安全的方式来管理具有最小值和最大值的序列号。保证CMPP序列号在一定的范围内
  • setup.py:配置要分发的包,指定包名称、版本、作者和依赖项等元数据。

利用Python锁实现sequence_id

sequence_id是从1到0x7FFFFFFF的值

import threading

class BoundAtomic:
    def __init__(self, min_val: int, max_val: int):
        assert min_val <= max_val, "min must be less than or equal to max"
        self.min = min_val
        self.max = max_val
        self.value = min_val
        self.lock = threading.Lock()

    def next_val(self) -> int:
        with self.lock:
            if self.value >= self.max:
                self.value = self.min
            else:
                self.value += 1
            return self.value

在Python中定义CMPP PDU,篇幅有限,仅定义数个PDU

from dataclasses import dataclass
from typing import Union, List

@dataclass
class CmppHeader:
    total_length: int
    command_id: int
    sequence_id: int

@dataclass
class CmppConnect:
    source_addr: str
    authenticator_source: bytes
    version: int
    timestamp: int

@dataclass
class CmppConnectResp:
    status: int
    authenticator_ismg: str
    version: int

@dataclass
class CmppSubmit:
    msg_id: int
    pk_total: int
    pk_number: int
    registered_delivery: int
    msg_level: int
    service_id: str
    fee_user_type: int
    fee_terminal_id: str
    fee_terminal_type: int
    tp_pid: int
    tp_udhi: int
    msg_fmt: int
    msg_src: str
    fee_type: str
    fee_code: str
    valid_time: str
    at_time: str
    src_id: str
    dest_usr_tl: int
    dest_terminal_id: List[str]
    dest_terminal_type: int
    msg_length: int
    msg_content: bytes
    link_id: str

@dataclass
class CmppSubmitResp:
    msg_id: int
    result: int

@dataclass
class CmppPdu:
    header: CmppHeader
    body: Union[CmppHeader, CmppConnectResp, CmppSubmit, CmppSubmitResp]

实现编解码方法

@dataclass
class CmppConnect:
    source_addr: str
    authenticator_source: bytes
    version: int
    # MMDDHHMMSS format
    timestamp: int

    def encode(self) -> bytes:
        source_addr_bytes = self.source_addr.encode('utf-8').ljust(6, b'\x00')
        version_byte = self.version.to_bytes(1, 'big')
        timestamp_bytes = self.timestamp.to_bytes(4, 'big')
        return source_addr_bytes + self.authenticator_source + version_byte + timestamp_bytes

@dataclass
class CmppConnectResp:
    status: int
    authenticator_ismg: str
    version: int

    @staticmethod
    def decode(data: bytes) -> 'CmppConnectResp':
        status = int.from_bytes(data[0:4], 'big')
        authenticator_ismg = data[4:20].rstrip(b'\x00').decode('utf-8')
        version = data[20]
        return CmppConnectResp(status=status, authenticator_ismg=authenticator_ismg, version=version)

@dataclass
class CmppPdu:
    header: CmppHeader
    body: Union[CmppConnect, CmppConnectResp, CmppSubmit, CmppSubmitResp]

    def encode(self) -> bytes:
        body_bytes = self.body.encode()
        self.header.total_length = len(body_bytes) + 12
        header_bytes = (self.header.total_length.to_bytes(4, 'big') +
                        self.header.command_id.to_bytes(4, 'big') +
                        self.header.sequence_id.to_bytes(4, 'big'))
        return header_bytes + body_bytes

    @staticmethod
    def decode(data: bytes) -> 'CmppPdu':
        header = CmppHeader(total_length=int.from_bytes(data[0:4], 'big'),
                            command_id=int.from_bytes(data[4:8], 'big'),
                            sequence_id=int.from_bytes(data[8:12], 'big'))

        body_data = data[12:header.total_length]
        if header.command_id == CONNECT_RESP_ID:
            body = CmppConnectResp.decode(body_data)
        else:
            raise NotImplementedError("not implemented yet.")

        return CmppPdu(header=header, body=body)

asyncio tcp流相关代码

class CmppClient:
    def __init__(self, host: str, port: int):
        self.host = host
        self.port = port
        self.sequence_id = BoundAtomic(1, 0x7FFFFFFF)
        self.reader = None
        self.writer = None

    async def connect(self):
        self.reader, self.writer = await asyncio.open_connection(self.host, self.port)

    async def close(self):
        if self.writer:
            self.writer.close()

实现同步的connect_ismg方法

    async def connect_ismg(self, request: CmppConnect):
        if self.writer is None or self.reader is None:
            raise ConnectionError("Client is not connected")
        sequence_id = self.sequence_id.next_val()
        header = CmppHeader(0, command_id=CONNECT_ID, sequence_id=sequence_id)
        pdu: CmppPdu = CmppPdu(header=header, body=request)
        self.writer.write(pdu.encode())
        await self.writer.drain()

        length_bytes = await self.reader.readexactly(4)
        response_length = int.from_bytes(length_bytes)

        response_data = await self.reader.readexactly(response_length)

        return CmppPdu.decode(response_data)

运行example,验证连接成功

async def main():
    client = CmppClient(host='localhost', port=7890)

    await client.connect()
    print("Connected to ISMG")

    connect_request = CmppConnect(
        source_addr='source_addr',
        authenticator_source=b'authenticator_source',
        version=0,
        timestamp=1122334455,
    )

    connect_response = await client.connect_ismg(connect_request)
    print(f"Connect response: {connect_response}")

    await client.close()
    print("Connection closed")

asyncio.run(main())

image.png

总结

本文简单对CMPP协议进行了介绍,并尝试用python实现协议栈,但实际商用发送短信往往更加复杂,面临诸如流控、运营商对接、传输层安全等问题,可以选择华为云消息&短信(Message & SMS)服务通过http协议接入华为云短信服务是华为云携手全球多家优质运营商和渠道,为企业用户提供的通信服务。企业调用API或使用群发助手,即可使用验证码、通知短信服务。

 

标签:Cmpp,CMPP,教程,header,Python,self,bytes,int,id
From: https://blog.csdn.net/chengxuyuanlaow/article/details/140587720

相关文章

  • 黑苹果macOS系统U盘版/恢复版基础安装教程
    因分为两种安装方式,本文主要介绍两种安装方式:U盘安装,以及在Windows下使用镜像恢复软件安装的方式。本文的操作方法支持Windows和macOS分别使用不同硬盘的安装方法。如果要安装成单个硬盘多系统的方式,注意你的分区结构。两种方法列举如下(OpenCore同样适用): U盘安装法:16GU......
  • Python 请求库无法成功完成 POST(Nanotec 电机控制器)
    我有一个运行RESTWeb服务的电机控制器,我想使用Python对其进行控制。我能够使用Python请求库成功执行GET请求。但是我无法执行POST请求。它给了我以下错误:requests.exceptions.ConnectionError:('Connectionaborted.',RemoteDisconnected('Remoteendclosed......
  • 为什么 exitonclick 在我的 Python Turtle 图形程序中不起作用?
    我正在开发一个PythonTurtle图形程序,我正在尝试使用exitonclick方法在单击窗口时关闭窗口。但是,它似乎不起作用。fromturtleimportTurtle,Screenrem=Turtle()screen=Screen()rem.fd(70)defclear():screen.clearscreen()screen.listen()s......
  • 服务器和本地主机上对相同请求(curl、python aiohttp)的不同响应
    我有一个用Python编写的解析器(aiohttp、bs4)。解析器的功能之一是通过链接访问文件(例如:https://modsfire.com/d/Mwv01aESgj73zx7)。importaiohttpimportyarlimportasynciofrompprintimportpprintMODSFIRE_URL="https://modsfire.com/"COOKIES={......
  • C++ PDF PoDoFo库使用教程
    #include<podofo/podofo.h>#include<iostream>//AllPoDoFoclassesarememberofthePoDoFonamespace.//usingnamespacestd;usingnamespacePoDoFo;PdfFont*getFont(PdfDocument&doc);//Base14+othernon-Base14fontsforcomparis......
  • python 中两体问题的集成
    我正在尝试使用python和pygame创建一个二体Sim作为更大项目目标的第一阶段,以在屏幕上显示对象。我目前的主要问题是,轨道卫星在目标行星周围倾斜时它应该处于稳定的320公里圆形轨道上。我为四种不同的集成制作了四种不同的功能。Euler、Leapfrog、Verlet和RK4。......
  • 如何保护Python代码不被用户读取?
    我正在用Python开发一个软件,该软件将分发给我雇主的客户。我的雇主希望使用有时间限制的许可证文件来限制软件的使用。如果我们分发.py文件甚至.pyc文件,则很容易(反编译并)删除检查另一个方面是我的雇主不希望我们的客户读取代码,担心代码可能被窃取或......
  • 无法更新 Python MySQL 中的tinyint
    我无法使用以下代码更新我的Tinyint值(valid_state):importmysql.connectormydb=mysql.connector.connect(host="localhost",user="mahdi",passwd="Abcd@1324",database="mycustomers")mycursor=mydb.cursor()sql="UPDATE`......
  • Cython:将 2D 数组从 Python 传递到 C 并检索它
    我正在尝试使用Cython用C语言构建相机驱动程序的包装器。我是Cython的新手(两周前开始)。经过一番努力,我可以成功开发结构体、一维数组的包装器,但现在我陷入了二维数组的困境。相机的CAPI之一采用2D数组指针作为输入,并将捕获的图像分配给它。该函数需要从Python调......
  • 如何使用python将大文件上传到Onedrive
    importrequestsfrommsalimportPublicClientApplicationfromdotenvimportload_dotenvimportos#Loadenvironmentvariablesload_dotenv()#ConfigurationfromenvironmentvariablesCLIENT_ID=os.getenv('CLIENT_ID')#TENANT_IDisnotused......