一.在Linux中搭建mqtt服务
环境:Linux 版本Ubuntu 18.04.1 LTS
1.进入https://www.emqx.com/zh/try?product=broker下载开源版本 EMQX
此处选择zip格式
2.下载后将emqx-4.4.14-otp24.3.4.2-1-ubuntu18.04-amd64.zip文件通过FileZilla工具上传到linux中 /home/emqx目录下
3.进入到 /home/emqx 使用unzip命令解压文件
cd /home/emqx
unzip emqx-4.4.14-otp24.3.4.2-1-ubuntu18.04-amd64.zip
4.进入到 /home/emqx/emqx/bin目录下启动服务,启动后 broker代理 访问地址为本机,默认端口为1883,端口可修改
./emqx start
到此已完成了简单的linux mqtt服务搭建
二.使用python 实现 Subscribe(订阅者)
server:
import json import sys import logging import os import time import paho.mqtt.client as mqtt filename = str(os.path.basename(__file__).split('.')[0]) + ".log" logger = logging.getLogger() logger.setLevel(logging.INFO) push_log = logging.FileHandler(filename, 'a', encoding='utf-8') push_log.setLevel(logging.INFO) logger.addHandler(push_log) def on_connect(client, userdata, flags, rc): """ 订阅主题 :param client: :param userdata: :param flags: :param rc: :return: """ if not rc: print("ready go!") client.subscribe([("/data/#", 2), ("test", 2)]) else: print("连接失败") def on_message(client, userdata, msg): """ 接收客户端发送的消息 :param client: 连接信息 :param userdata: :param msg: 客户端返回的消息 :return: """ info1 = "---------接收客户端信息时间" + str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) + "---------" print(info1) logger.info(info1) payload = json.loads(msg.payload.decode('utf-8')) print(payload) logger.info(payload) def server_connect(client): client.on_connect = on_connect # 启用订阅模式 client.on_message = on_message # 接收消息 info2 = "-----------服务启动时间" + str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())) + "------------" print(info2) logger.info(info2) client.connect("111.111.111.111", 1883, 60) # 连接到broker # client.loop_start() # 以start方式运行,需要启动一个守护线程,让服务端运行,否则会随主线程死亡 client.loop_forever() # 以forever方式阻塞运行。 def server_stop(client): client.loop_stop() # 停止服务端 sys.exit(0) def server_main(): client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) client = mqtt.Client(client_id, transport='tcp') server_connect(client) if __name__ == '__main__': # 启动监听 server_main()
三.使用python 实现 Publish(发布者)
client:
import json import paho.mqtt.client as mqtt import time import schedule client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) client = mqtt.Client(client_id, transport='tcp') client.connect("111.111.111.111", 1883, 60) # 连接到broker client.loop_start() def client_main(message: str): """ 客户端发布消息 :param message: 消息体 :return: """ time_now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) payload = {"msg": "200 OK", "data": "{}".format(message), "send_time": "{}".format(time_now)} client.publish("test", json.dumps(payload, ensure_ascii=False)) # publish(主题, 消息体), 即test 为主题, # json.dumps(payload, ensure_ascii=False)为消息体 print("{}".format(time_now) + "成功发送") return True if __name__ == '__main__': msg = str({'label': 'packageName', 'score': '0.73040956', 'textrecog': 'YJ3EZ-S'}) schedule.every(4).seconds.do(client_main, msg) # 每4s执行一次 while True: schedule.run_pending()
运行结果:
server端(Subscribe):
client端(publish):
标签:__,python,import,param,mqtt,client,time,服务器 From: https://www.cnblogs.com/mian-1122/p/17106560.html