首页 > 编程语言 >python使用mqtt

python使用mqtt

时间:2023-08-03 11:36:27浏览次数:55  
标签:python mem mqtt client connect 使用 cpu def

一、安装mqtt服务器

安装对应的软件:https://www.emqx.io/zh/downloads

推荐使用docker安装

默认账号和密码:admin、public

 

二、编写代码

消息发布程序

import time
import json
import psutil
import random
from paho.mqtt import client as mqtt_client

broker = '127.0.0.1'  # mqtt代理服务器地址
port = 1883
keepalive = 60  # 与代理通信之间允许的最长时间段(以秒为单位)
topic = "/python/mqtt"  # 消息主题
client_id = f'python-mqtt-pub-{random.randint(0, 1000)}'  # 客户端id不能重复


def to_M(n):
    '''将B转换为M'''
    u = 1024 * 1024
    m = round(n / u, 2)
    return m


def get_info():
    '''获取系统硬件信息:cpu利用率,cpu个数,系统负载,内存信息等'''
    cpu_percent = psutil.cpu_percent(interval=1)
    cpu_count = psutil.cpu_count()
    sys_loadavg = [round(x / psutil.cpu_count() * 100, 2) for x in psutil.getloadavg()]
    mem = psutil.virtual_memory()
    mem_total, men_free = to_M(mem.total), to_M(mem.free)
    mem_percent = mem.percent
    info = {
        'cpu_percent': cpu_percent,
        'cpu_count': cpu_count,
        'sys_loadavg': sys_loadavg,
        'mem_total': mem_total,
        'mem_percent': mem_percent,
        'men_free': men_free
    }
    # mqtt只能传输字符串数据
    return json.dumps(info)


def connect_mqtt():
    '''连接mqtt代理服务器'''

    def on_connect(client, userdata, flags, rc):
        '''连接回调函数'''
        # 响应状态码为0表示连接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)

    # 连接mqtt代理服务器,并获取连接引用
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port, keepalive)
    return client


def publish(client):
    '''发布消息'''
    while True:
        '''每隔4秒发布一次服务器信息'''
        time.sleep(4)
        msg = get_info()
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")


def run():
    '''运行发布者'''
    client = connect_mqtt()
    # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

 

消息订阅程序

import random
from paho.mqtt import client as mqtt_client

broker = '127.0.0.1'  # mqtt代理服务器地址
port = 1883
keepalive = 60  # 与代理通信之间允许的最长时间段(以秒为单位)
topic = "/python/mqtt"  # 消息主题
client_id = f'python-mqtt-sub-{random.randint(0, 1000)}'  # 客户端id不能重复


def connect_mqtt():
    '''连接mqtt代理服务器'''

    def on_connect(client, userdata, flags, rc):
        '''连接回调函数'''
        # 响应状态码为0表示连接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port, keepalive)
    return client


def subscribe(client: mqtt_client):
    '''订阅主题并接收消息'''

    def on_message(client, userdata, msg):
        '''订阅消息回调函数'''
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    # 订阅指定消息主题
    client.subscribe(topic)
    client.on_message = on_message


def run():
    # 运行订阅者
    client = connect_mqtt()
    subscribe(client)
    #  运行一个线程来自动调用loop()处理网络事件, 阻塞模式
    client.loop_forever()


if __name__ == '__main__':
    run()

 

标签:python,mem,mqtt,client,connect,使用,cpu,def
From: https://www.cnblogs.com/fyiyy/p/17602819.html

相关文章

  • 记录使用uview的tabs组件初始化渲染下划线移位问题解决
    问题描述:初始化渲染后tabs的下划线没有居中对其,出现异位。问题分析: 网上很多大佬分析过出现原因了记录下解决的过程: 在各个论坛搜集到解决方案都暂时无效 有使用v-if重新渲染的  有给类赋值偏移值的 有强行转换px的因为各种原因这些方法在自己身上没有奏效所以记......
  • Jemeter安装与简单使用
    安装Jemeter1.下载官网地址下载完成之后进行解压安装2.下载MySQL数据库驱动然后将数据库驱动放在jemeter安装目录的bin目录下面。3.启动点击bin目录下的jmeter.bat设置语言为中文使用Jemeter对MySQL进行压力测试1.添加线程组选择线程组配置线程组信息......
  • RPA开发复杂流程-为什么使用编码自动化而不是低代码?
    答:编码自动化可以让任何熟悉编码或脚本的人都能体验到更高的生产力、更好的复杂性管理、更高的协作和可审查性、改进的可读性和更高的性能。 ......
  • Linux环境下,使用远程连接工具过程中终端无法弹出图形窗口
    Linux操作系统,请通过管理网口/VNC等远程连接工具登录服务器,使用终端执行操作。如果在使用过程中无法弹出图形窗口,请执行以下操作。场景一:使用SSH连接工具登录LINUX服务器确保工具支持远程图形显示。SSH连接工具需要支持远程图形显示,才能弹出图形窗口。putty、SecureCRT默认......
  • 【银河麒麟】Python3.9的安装
    国产银河麒麟原装python3.5,版本较为落后,经过多次尝试+百度各种方法,现将安装python3.9的过程记录如下:1.安装依赖环境(打开终端)sudoaptupdatesudoapt-getinstallbuild-essentialzlib1g-devlibbz2-1.0libssl-devlibncurses5-devlibsqlite3-devlibreadline-devtk-de......
  • kratos项目中使用kafka实现延迟队列
    项目地址https://gitee.com/huoyingwhw/kratos_kafkaB站视频地址B站视频地址——kratos项目中使用kafka实现延迟队列......
  • LinphoneSDK v 5.2.94 使用方法
    前提vs2022 wpfLinphoneSDK的获取途径有两种1 下载 linphonesdk.5.2.94.nupkghttps://gitlab.linphone.org/BC/public/linphone-sdk/-/packages/然后引用 这里是没有dll的,只是引用了LinphoneWrapper.cs2 下载zip包https://download.linphone.org/releases/windows/sd......
  • linux mqtt 安装配置
    安装sudoaptinstallmosquitto配置密码用户sudomkdir-p/etc/mosquitto/configsudotouch/etc/mosquitto/config/pwfile.confsudomosquitto_passwd-b/etc/mosquitto/config/pwfile.confqq123456配置文件qtimes@AIBox-01-01-m:~$cat/etc/mosquitto/mosquitto.......
  • MAC设置使用命令行通过vs code打开目录
    未设置之前,我们需要先打开vscode,然后再command+o打开对应目录或文件,设置之后,就可以使用命令行直接打开目录或文件设置方式:打开vscode,shift+command+p打开命令面板输入shell,选择“在PATH中安装code命令”确认即可然后就可以通过命令行打开目录啦code.......
  • python教程 入门学习笔记 第5天 format函数拼接 两种打印方法 转义字符
    2)format函数拼接#format函数拼接s1="统计={0}{1}{2}".format("张三","工资",3400)#占位符{}中可以填写数字编号print(s1)s2="统计={}{}{}".format("李四","工资",4500)#用占位符{}拼接,占位符要与字符串数量一致print(s2)s3="统计={a}{b}{c}".forma......