- 文件结构
- MQTT 客户端
import paho.mqtt.client as mqtt
client_dict = {}
import logging
_logger = logging.getLogger(__name__)
class MQTTClient(object):
def __init__(self, client_id, protocol, broker, port, username, password):
self.client_id = client_id
self.protocol = protocol
self.broker = broker
self.port = port
self.username = username
self.password = password
self.client = mqtt.Client(client_id, protocol)
self.client.username_pw_set(self.username, self.password)
def connect(self, keepalive=60):
self.client.connect(self.broker, self.port, keepalive)
self.client.loop_start()
def is_connected(self):
return self.client.is_connected()
def disconnect(self):
self.client.disconnect()
self.client.loop_stop()
def subscribe(self, topic, qos):
self.client.subscribe(topic, qos)
def publish(self, topic, data, qos):
_logger.debug(f"{topic}>>>>>>>>>> 发送消息 >>>>>>> {data}")
self.client.publish(topic=topic, payload=data, qos=qos)
def on_message(self, client, userdata, msg):
print("收到消息:", msg.topic, msg.payload.decode()
- MQTT客户端模型
# -*- coding: utf-8 -*-
# &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# DTCloud360
# 邮箱: 200570059@qq.com
# 作者:'ZYF.Yan'
# 公司网址: http://www.dtcloud360.com/
# Copyright 中亿丰数字科技集团有限公司
# 日期:2023/5/31 11:11
# &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# -*- coding: utf-8 -*-
from . import mqtt
from ...system_base import datetime_tools
from dtcloud import models, fields
class MqttClient(models.Model):
_name = "mqtt.client"
_rec_name = "name"
_description = "Mqtt客户端"
_order = "id"
# string 名称 copy 是否可复制 required 是否必须 default 默认值 False 或 字段对应类型 readonly 是否只读 store 是否存储数据库
name = fields.Char(string="客户端名称")
broker = fields.Char(string="主机名", default="10.201.97.11")
port = fields.Integer(string="端口号", default=1390)
path = fields.Char(string="路由", default="/mqtt")
user_name = fields.Char(string="用户")
password = fields.Char(string="密码")
keepalive = fields.Integer(string="生命周期", default=60)
clean_start = fields.Boolean(string="是否清除会话", default=True)
timeout = fields.Integer(string="话过期时间", help="会话过期时间,单位(秒)", default=0)
tls = fields.Boolean(string="TLS", default=False)
state = fields.Selection([("1", "正在连接"), ("2", "断开连接")], string="链接状态", help="True:链接成功 False:未连接")
date = fields.Datetime(string="时间", default=datetime_tools.now())
mqtt_pub_ids = fields.One2many(comodel_name="mqtt.pub", inverse_name="client_id", string="发布")
mqtt_sub_ids = fields.One2many(comodel_name="mqtt.sub", inverse_name="client_id", string="订阅")
# 获取MQTT客户端
def get_mqtt_client(self):
mqtt_client = mqtt.client_dict.get(self.name)
if mqtt_client and mqtt_client.is_connected():
return mqtt_client
mqtt_client = mqtt.MQTTClient(self.name, 5, self.broker, self.port, self.user_name, self.password)
mqtt_client.connect(keepalive=self.keepalive)
return mqtt_client
# 关闭客户端
def disconnect(self):
mqtt_client = self.get_mqtt_client()
mqtt_client.disconnect()
mqtt_client.loop_stop()
- MQTT消息发布
# -*- coding: utf-8 -*-
# &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# DTCloud360
# 邮箱: 200570059@qq.com
# 作者:'ZYF.Yan'
# 公司网址: http://www.dtcloud360.com/
# Copyright 中亿丰数字科技集团有限公司
# 日期:2023/5/31 11:11
# &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# -*- coding: utf-8 -*-
from ...system_base import datetime_tools
from dtcloud import models, fields
class MqttPub(models.Model):
_name = "mqtt.pub"
_rec_name = "name"
_description = "Mqtt发布"
_order = "id"
# string 名称
client_id = fields.Many2one("mqtt.client", "客户端")
name = fields.Char(string="主题", default="testtopic/1")
qos = fields.Integer(string="QoS", default=0)
text = fields.Text(string="发布消息", default='{"msg": "hello"}')
retain = fields.Boolean(string="保留", default=False)
date = fields.Datetime(string="时间", default=datetime_tools.now())
mqtt_pub_lines = fields.One2many(comodel_name="mqtt.pub.line", inverse_name="mqtt_pub_id", )
# 消息发布
def publish(self, topic_append=None):
mqtt_client = self.client_id.get_mqtt_client()
topic = self.name
if topic_append:
topic = self.name + topic_append
mqtt_client.publish(topic=topic, data=self.text, qos=self.qos)
self.mqtt_pub_lines.create({
"name": topic,
"qos": self.qos,
"text": self.text,
"retain": self.retain,
"date": self.date,
"mqtt_pub_id": self.id,
})
class MqttPubLine(models.Model):
_name = "mqtt.pub.line"
_rec_name = "name"
_description = "Mqtt发布明细"
_order = "id"
mqtt_pub_id = fields.Many2one("mqtt.pub", "Mqtt发布")
name = fields.Char(string="主题", default="testtopic/1")
qos = fields.Integer(string="QoS", default=0)
text = fields.Text(string="发布消息", default='{"msg": "hello"}')
retain = fields.Boolean(string="保留", default=False)
date = fields.Datetime(string="时间", default=datesstime_tools.now())
标签:集成,string,fields,self,mqtt,MQTT,client,Odoo,name From: https://www.cnblogs.com/DTCLOUD/p/17460356.html作者:燕春