首页 > 其他分享 >基于ESP32的环境监测系统设计

基于ESP32的环境监测系统设计

时间:2024-12-12 11:01:55浏览次数:5  
标签:基于 SET ESP32 0x00 write 环境监测 import self def

最终效果

<iframe allowfullscreen="true" data-mediaembed="csdn" frameborder="0" id="5LIJKUnb-1733744395061" src="https://live.csdn.net/v/embed/437946"></iframe>

环境监测

项目介绍

该项目是“物联网实验室监测控制系统设计(仿智能家居)”项目中的“环境监测系统设计”子项目,前者还包括“物联网设计”、“门禁设计”、“家电控制设计”和“小程序设计”等内容。本文只介绍“环境监测”部分。

项目功能实现的大致思路为:单片机采集温度、湿度、光照强度、烟雾传感器传来的数据,将此环境信息实时显示到屏幕上,并通过WiFi将信息传至MQTT服务器(以供小程序调用)。当实验室发生火灾时,驱动警报模块向实验室管理员发送短信和来电提示。当收到语音指令时,播报相应环境信息。

硬件设计

接线

ESP32

DS18B20

DHT11

GY-302

MQ-2

OLED

LU-ASR01

SYN6288

A9G

3V3

VCC

VCC

VCC

VCC

VCC

5V

VDD

GND

GND

GND

GND

GND

GND

G

G

GND

D23

U1RX

D27

DATA

D13

DQ

D18

SCL

D19

SDA

D15

DO

D32

AO

D25

SCL

D26

SDA

D17

RXD

D16

TX

PCB设计

此电路板仅是为了代替杜邦线而已,上面只有引脚排座,而没有任何电子元件。

工程 - 立创开源硬件平台

成本

ESP32

DS18B20

DHT11

GY-302

MQ-2

OLED

LU-ASR01

SYN6288

A9G

14

3.9

5.3

7.5

9

14.5

47.5

49.7

51

其中共需200元左右来购买该项目所需的模块。此外还需2根数据线、焊接工具(电烙铁、锡丝、引脚排座)、PCB打板或若干杜邦线。

软件设计

本次的开发环境为Thonny IDE,使用MicroPython语言编写驱动程序,采用裸机编程。自编库文件关系如下:

主程序

在完成所有模块的初始化后,进入循环。循环期间ESP32先获取四个传感器测得的实验室环境数据,之后将此信息显示到屏幕上,并通过WiFi传至MQTT服务器。最后处理烟雾浓度高和用户语音问讯这两种特殊情况。

 在循环中,烟雾警报和语音播报环境信息较为特殊:

这其中,text_message_and_call_flag为发短信和打电话的标志位,初始值为0,发送过短信并打过电话后置1。目的是确保循环里最多发一次短信并打一次电话。这是因为只有当实验室烟雾浓度高于阈值时,系统才会向实验室管理员发短信和打电话。此时,管理员必须立刻亲自到现场处理实验室安全事故并重置系统。在此期间,系统不会反复向实验室管理员发短信和打电话。

smog_analog是MQ-2模块采用11dB衰减后的模拟信号引脚数值。经实验得出,当其值大于1500时,实验室烟雾浓度高。因此将烟雾阈值设为1500。

循环5次是因为本系统采用裸机编程,在不影响其他功能的前提下,为及时响应用户的语音指令,程序需在大部分时间段内处于等待和处理语音信号阶段。当程序不处于此阶段时,系统依然可以响应用户的语音指令。这是因为LU-ASR01语音识别模块发送的串口信号会先缓存在ESP32的寄存器中,等到程序运行至此阶段时,便会处理相应信号。

main.py

from machine import Pin
import time

#引入自编函数
from ConnectWiFi import connect_wifi
from MQTTPublish import mqtt_initial, mqtt_publish_information
from DS18B20 import ds18b20_initial, temperature_measure
from DHT11 import dht11_Initial, humidity_measure
from GY302 import gy_302_Initial, illumination_measure
from MQ2 import MQ_2_Initial, smog_measure
from OLED import oled_Initial, oled_display_environmental_information
from Voice import voice_Initial, voice_out
from A9G import a9g_Initial, text_message_and_call


connect_wifi()
mqtt = mqtt_initial()

ds18b20 = ds18b20_initial()
dht11 = dht11_Initial()
(gy302_addr, gy302_i2c) = gy_302_Initial()
(MQ_2_digital_pin, MQ_2_analog_pin) = MQ_2_Initial()

OLED = oled_Initial()
voice_uart = voice_Initial()

text_message_and_call_uart = a9g_Initial()
text_message_and_call_flag = 0	#记录是否发送过短信并打过电话,确保循环里最多发一次短信并打一次电话

while True:
    temperature = temperature_measure(ds18b20)
    humidity = humidity_measure(dht11)
    illumination = illumination_measure(gy302_addr, gy302_i2c)
    (smog_digital, smog_analog, smog_voltage) = smog_measure(MQ_2_digital_pin, MQ_2_analog_pin)
    
    print("温度:%.2f℃  湿度:%d%%  光照值:%dLx  烟雾模拟信号值:%d" %(temperature, humidity, illumination, smog_analog))
    
    oled_display_environmental_information(OLED, temperature, humidity, illumination, smog_analog)
    
    mqtt_publish_information(mqtt, temperature, humidity, illumination, smog_analog)
    
    if text_message_and_call_flag == 0 and smog_analog > 1500:
        text_message_and_call(text_message_and_call_uart)
        text_message_and_call_flag = 1
    
    for i in range(5):		#使程序大部分时间都用来等待语音信号。其实也可用中断实现
        voice_out(voice_uart, temperature, humidity, illumination, smog_analog)
        time.sleep_ms(200)

控制器连接WiFi

该功能实现的原理及流程可参考:ESP32连接wifi (MicroPython)-CSDN博客

ESP32连接WiFi需设置WiFi模式,并提供WiFi名称和密码:

ConnectWiFi.py

#esp32会自动尝试连接之前连接过的WiFi(比如路由器重启)
#若想连接其他WiFi,重启esp32(按EN键)后再执行新程序
#若未重启就运行连接其他WiFi的新程序,esp32有可能连接之前的WiFi

import network, time

def connect_wifi():
    wlan = network.WLAN(network.STA_IF)		# 无线终端模式
    wlan.active(True)       # activate the interface

    #print(wlan.scan())		#打印出扫描到的所有WiFi信号

    if not wlan.isconnected():		#如果WiFi未连接
        print('connecting to network...')
            
        wlan.connect("DOILMSBOIOT", "doilmsboiot")		#连接WiFi,名称:DOILMSBOIOT,密码:doilmsboiot
            
        i = 1
        while not wlan.isconnected():
            print("正在链接...{}".format(i))
            i += 1
            time.sleep(1)
            
        print("WiFi连接成功")	#打印提示信息
        print(wlan.config('mac'))      # 打印 the interface's MAC address
        print(wlan.ifconfig())      # 打印 the interface's IP/netmask/gw/DNS addresses

ESP32连接MQTT服务器并向指定主题发布指定内容

该功能实现的原理及流程可参考:ESP32向MQTT服务器发送消息(MicroPython)-CSDN博客

首先设置客户端名称和MQTT服务器域名,创建MQTT对象;随后连接服务器;接着定义发布主题、内容及相应属性;最后将消息发布至指定主题。

MQTTPublish.py

import network
from umqttsimple import MQTTClient

def mqtt_initial():
    # 创建MQTT,参数1:客户端名称,参数2:MQTT服务器
    mqtt = MQTTClient("esp32_{}".format(network.WLAN().config('mac')), "broker.emqx.io")  
    # 连接MQTT服务器
    mqtt.connect()
    return mqtt
    
def mqtt_publish_information(mqtt, temperature, humidity, illumination, smog):
    # 发布消息
    publishTopic = b"environmentalMonitoring1"
    publishContent = b"{\"temperature\":%.2f,  \"humidity\":%d,  \"illumination\":%d,  \"smog\":%d }" %(temperature, humidity, illumination, smog)
    mqtt.publish(publishTopic, publishContent, retain=True, qos=0)
    print('已向MQTT服务器发布消息')

umqttsimple.py

import usocket as socket
import ustruct as struct
from ubinascii import hexlify
 
 
class MQTTException(Exception):
    pass
 
 
class MQTTClient:
    def __init__(
        self,
        client_id,
        server,
        port=0,
        user=None,
        password=None,
        keepalive=0,
        ssl=False,
        ssl_params={},
    ):
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.sock = None
        self.server = server
        self.port = port
        self.ssl = ssl
        self.ssl_params = ssl_params
        self.pid = 0
        self.cb = None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False
 
    def _send_str(self, s):
        self.sock.write(struct.pack("!H", len(s)))
        self.sock.write(s)
 
    def _recv_len(self):
        n = 0
        sh = 0
        while 1:
            b = self.sock.read(1)[0]
            n |= (b & 0x7F) << sh
            if not b & 0x80:
                return n
            sh += 7
 
    def set_callback(self, f):
        self.cb = f
 
    def set_last_will(self, topic, msg, retain=False, qos=0):
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain
 
    def connect(self, clean_session=True):
        self.sock = socket.socket()
        addr = socket.getaddrinfo(self.server, self.port)[0][-1]
        self.sock.connect(addr)
        if self.ssl:
            import ussl
 
            self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
        premsg = bytearray(b"\x10\0\0\0\0\0")
        msg = bytearray(b"\x04MQTT\x04\x02\0\0")
 
        sz = 10 + 2 + len(self.client_id)
        msg[6] = clean_session << 1
        if self.user is not None:
            sz += 2 + len(self.user) + 2 + len(self.pswd)
            msg[6] |= 0xC0
        if self.keepalive:
            assert self.keepalive < 65536
            msg[7] |= self.keepalive >> 8
            msg[8] |= self.keepalive & 0x00FF
        if self.lw_topic:
            sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg[6] |= self.lw_retain << 5
 
        i = 1
        while sz > 0x7F:
            premsg[i] = (sz & 0x7F) | 0x80
            sz >>= 7
            i += 1
        premsg[i] = sz
 
        self.sock.write(premsg, i + 2)
        self.sock.write(msg)
        # print(hex(len(msg)), hexlify(msg, ":"))
        self._send_str(self.client_id)
        if self.lw_topic:
            self._send_str(self.lw_topic)
            self._send_str(self.lw_msg)
        if self.user is not None:
            self._send_str(self.user)
            self._send_str(self.pswd)
        resp = self.sock.read(4)
        assert resp[0] == 0x20 and resp[1] == 0x02
        if resp[3] != 0:
            raise MQTTException(resp[3])
        return resp[2] & 1
 
    def disconnect(self):
        self.sock.write(b"\xe0\0")
        self.sock.close()
 
    def ping(self):
        self.sock.write(b"\xc0\0")
 
    def publish(self, topic, msg, retain=False, qos=0):
        pkt = bytearray(b"\x30\0\0\0")
        pkt[0] |= qos << 1 | retain
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        assert sz < 2097152
        i = 1
        while sz > 0x7F:
            pkt[i] = (sz & 0x7F) | 0x80
            sz >>= 7
            i += 1
        pkt[i] = sz
        # print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt, i + 1)
        self._send_str(topic)
        if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            self.sock.write(pkt, 2)
        self.sock.write(msg)
        if qos == 1:
            while 1:
                op = self.wait_msg()
                if op == 0x40:
                    sz = self.sock.read(1)
                    assert sz == b"\x02"
                    rcv_pid = self.sock.read(2)
                    rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
                    if pid == rcv_pid:
                        return
        elif qos == 2:
            assert 0
 
    def subscribe(self, topic, qos=0):
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
        # print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt)
        self._send_str(topic)
        self.sock.write(qos.to_bytes(1, "little"))
        while 1:
            op = self.wait_msg()
            if op == 0x90:
                resp = self.sock.read(4)
                # print(resp)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                if resp[3] == 0x80:
                    raise MQTTException(resp[3])
                return
 
    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
        res = self.sock.read(1)
        self.sock.setblocking(True)
        if res is None:
            return None
        if res == b"":
            raise OSError(-1)
        if res == b"\xd0":  # PINGRESP
            sz = self.sock.read(1)[0]
            assert sz == 0
            return None
        op = res[0]
        if op & 0xF0 != 0x30:
            return op
        sz = self._recv_len()
        topic_len = self.sock.read(2)
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = self.sock.read(topic_len)
        sz -= topic_len + 2
        if op & 6:
            pid = self.sock.read(2)
            pid = pid[0] << 8 | pid[1]
            sz -= 2
        msg = self.sock.read(sz)
        self.cb(topic, msg)
        if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
            self.sock.write(pkt)
        elif op & 6 == 4:
            assert 0
 
    # Checks whether a pending message from server is available.
    # If not, returns immediately with None. Otherwise, does
    # the same processing as wait_msg.
    def check_msg(self):
        self.sock.setblocking(False)
        return self.wait_msg()

测量温度

该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + DS18B20 测量环境温度_esp32用thonny编程的语法-CSDN博客

DS18B20.py

from machine import Pin
import onewire, ds18x20

def ds18b20_initial():
    ds_pin = Pin(13)
    ds18b20 = ds18x20.DS18X20(onewire.OneWire(ds_pin))
    return ds18b20
    
def temperature_measure(ds18b20):
    rom = ds18b20.scan()
    ds18b20.convert_temp()
    temperature = ds18b20.read_temp(rom[0])
    temperature = round(temperature, 2)
    return temperature

测量湿度

该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + DHT11 测量环境温湿度_esp32+micropython 检测土壤湿度 代码-CSDN博客

DHT11.py

from machine import Pin
import dht 

def dht11_Initial():
    dht11 = dht.DHT11(Pin(27))
    return dht11

def humidity_measure(dht11):
    dht11.measure()
    humidity = dht11.humidity()
    return humidity

测量光照强度

该功能实现的原理及流程可参考:

Thonny IDE + MicroPython + ESP32 + GY-302 测量环境中的光照强度_esp32 micropython 光照强度-CSDN博客

 

GY302.py

from machine import Pin,I2C
#import time
 
def gy_302_Initial():
    gy302_addr = 0x23  # 光线传感器I2C地址
    gy302_i2c = I2C(0, freq = 1_000_000)		#初始化IIC0,使用默认引脚“SCL=18、SDA=19”,传输速度:100 Kbps
    #print(hex(i2c.scan()[0]))  # 打印器件I2C地址
    gy302_i2c.writeto(gy302_addr, chr(0x01)) 		# 发送上电命令 
    #i2c.writeto(gy302_addr, chr(0x07)) 	# 发送复位命令
    gy302_i2c.writeto(gy302_addr, chr(0x10)) 		# 发送高分辨率连续测量命令  分辨率:1Lx  测量时间:120ms
    #time.sleep_ms(180)		# 保证通讯 
    return (gy302_addr, gy302_i2c)
    
def illumination_measure(gy302_addr, gy302_i2c):
    gy = gy302_i2c.readfrom(gy302_addr, 2) 		# 从gy302_addr地址设备读取2位数据
    gy302 = float(gy[0] << 8 | gy[1]) / 1.2 		# 左移,可以理解为 gy[0]*0xff 
    return gy302

测量烟雾浓度

该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 +MQ-2 测量环境中的烟雾浓度_micropython+esp32+mq2传感器-CSDN博客

MQ2.py

#对于模拟信号,浓度越高,输出电压越高。
#对于数字信号,高于阈值,为0,LED亮;低于阈值,为1,LED灭。
#通电后,需要先预热约60s后测量的数据才稳定。通电后出现正常的轻度发热现象,因为内部有电热丝。
#顺时针旋转,阈值升高;逆时针旋转,阈值降低

from machine import Pin, ADC

def MQ_2_Initial():
    MQ_2_digital_pin = Pin(15)	
    MQ_2_analog_pin = ADC(32)	
    MQ_2_analog_pin.atten(ADC.ATTN_11DB)	 #11dB衰减,使测量范围大致变为:0 ~ 3.3V
    return (MQ_2_digital_pin, MQ_2_analog_pin)

def smog_measure(MQ_2_digital_pin, MQ_2_analog_pin):
    #print("数字信号值:",MQ_2_digital_pin.value())
    #print("模拟信号值:",MQ_2_analog_pin.read()," ","输出电压:",round(MQ_2_analog_pin.read_uv()/1000000,2),"V")
    return	(MQ_2_digital_pin.value(), MQ_2_analog_pin.read(), round(MQ_2_analog_pin.read_uv()/1000000,2))

屏幕显示环境信息

该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + 0.96寸OLED(IIC) 显示任意字符_thonny esp32 oled-CSDN博客

OLED.py

from machine import Pin, I2C
from SSD1306 import SSD1306_I2C

def oled_Initial():
    i2c = I2C(1)	# 初始化IIC1,使用默认引脚“SCL=25、SDA=26”,传输速度:400 Kbps
    OLED = SSD1306_I2C(128, 64, i2c)		# 屏幕长度:128像素;屏幕宽度:64像素
    return OLED

# 用到的UTF-8编码字库
fonts = {
    0xE6B8A9:	# “温”的UTF-8编码
    [0x00,0x23,0x12,0x12,0x83,0x42,0x42,0x13,0x10,0x27,0xE4,0x24,0x24,0x24,0x2F,0x00,
    0x00,0xF8,0x08,0x08,0xF8,0x08,0x08,0xF8,0x00,0xFC,0xA4,0xA4,0xA4,0xA4,0xFE,0x00],  	#温
    
    0xE6B9BF:
    [0x00,0x27,0x14,0x14,0x87,0x44,0x44,0x17,0x11,0x21,0xE9,0x25,0x23,0x21,0x2F,0x00,
    0x00,0xF8,0x08,0x08,0xF8,0x08,0x08,0xF8,0x20,0x20,0x24,0x28,0x30,0x20,0xFE,0x00],	#湿
    
    0xE5BAA6:
    [0x01,0x00,0x3F,0x22,0x22,0x3F,0x22,0x22,0x23,0x20,0x2F,0x24,0x42,0x41,0x86,0x38,
    0x00,0x80,0xFE,0x20,0x20,0xFC,0x20,0x20,0xE0,0x00,0xF0,0x10,0x20,0xC0,0x30,0x0E],	#度
    
    0xE58589:
    [0x01,0x21,0x11,0x09,0x09,0x01,0xFF,0x04,0x04,0x04,0x04,0x08,0x08,0x10,0x20,0xC0,
    0x00,0x08,0x08,0x10,0x20,0x00,0xFE,0x40,0x40,0x40,0x40,0x42,0x42,0x42,0x3E,0x00],	#光
    
    0xE785A7:
    [0x00,0x7D,0x44,0x44,0x44,0x44,0x7D,0x44,0x44,0x44,0x44,0x7C,0x00,0x48,0x44,0x84,
    0x00,0xFC,0x44,0x44,0x44,0x94,0x08,0xFC,0x84,0x84,0x84,0xFC,0x00,0x88,0x44,0x44],	#照

    0xE7839F:
    [0x10,0x13,0x12,0x16,0x5A,0x52,0x53,0x92,0x12,0x12,0x12,0x2A,0x27,0x42,0x43,0x82,
    0x00,0xFE,0x02,0x22,0x22,0x22,0xFE,0x22,0x22,0x52,0x4A,0x8A,0x02,0x02,0xFE,0x02],	#烟
    
    0xE99BBE:
    [0x00,0x3F,0x01,0x7F,0x49,0x01,0x1D,0x08,0x1F,0x68,0x07,0x7A,0x0F,0x04,0x18,0x00,
     0x00,0xF8,0x00,0xFC,0x24,0x00,0x70,0x00,0xF0,0x20,0xC0,0x3C,0xE0,0x20,0xC0,0x00],	#雾
    
    0xE28483:  
    [0x00,0x00,0x00,0x00,0x20,0x33,0x04,0x08,0x08,0x08,0x08,0x07,0x00,0x00,0x00,0x00,
    0x00,0x00,0x00,0x00,0x00,0x80,0x40,0x00,0x00,0x00,0x40,0x80,0x00,0x00,0x00,0x00],	#℃
}

# 先找出字符的utf-8编码,再在fonts中找到对应的点阵,最后将点阵输出到OLED上,从而实现在OLED上显示汉字和特殊字符
def oled_utf8(ch_str, x_axis, y_axis, OLED): 
   offset_ = 0 
   for k in ch_str: 
       code = 0x00  # 将中文转成16进制编码 
       data_code = k.encode("utf-8")
       code |= data_code[0] << 16
       code |= data_code[1] << 8
       code |= data_code[2]
       byte_data = fonts[code]
       for y in range(0, 16):
           a_ = bin(byte_data[y]).replace('0b', '')
           while len(a_) < 8:
               a_ = '0'+ a_
           b_ = bin(byte_data[y+16]).replace('0b', '')
           while len(b_) < 8:
               b_ = '0'+ b_
           for x in range(0, 8):
               OLED.pixel(x_axis + offset_ + x,    y+y_axis, int(a_[x]))   
               OLED.pixel(x_axis + offset_ + x +8, y+y_axis, int(b_[x]))   
       offset_ += 16
    
def oled_acsii(ch_str, x_axis, y_axis, OLED):
    OLED.text(ch_str, x_axis, y_axis)

def oled_display_environmental_information(OLED, temperature, humidity, illumination, smog):
    OLED.fill(0)	# 清屏
    
    oled_utf8('温度', 0, 0, OLED)
    oled_acsii(':', 32, 5, OLED)
    oled_acsii('{:.2f}'.format(temperature), 40, 5, OLED)
    oled_utf8('℃', 40+8*len(str('{:.2f}'.format(temperature))), 0, OLED)

    oled_utf8('湿度', 0, 16, OLED)
    oled_acsii(':', 32, 21, OLED)
    oled_acsii(str(humidity), 40, 21, OLED)
    oled_acsii('%', 40+8*len(str(humidity)), 21, OLED)

    oled_utf8('光照', 0, 32, OLED)
    oled_acsii(':', 32, 37, OLED)
    oled_acsii('{:.0f}'.format(illumination), 40, 37, OLED)
    oled_acsii('LX', 40+8*len(str('{:.0f}'.format(illumination))), 37, OLED)
    
    oled_utf8('烟雾', 0, 48, OLED)
    oled_acsii(':', 32, 53, OLED)
    oled_acsii(str(smog), 40, 53, OLED)

    OLED.show()		# 显示上述信息

SSD1306.py

#MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
 
import time
import framebuf
 
# register definitions
SET_CONTRAST        = const(0x81)
SET_ENTIRE_ON       = const(0xa4)
SET_NORM_INV        = const(0xa6)
SET_DISP            = const(0xae)
SET_MEM_ADDR        = const(0x20)
SET_COL_ADDR        = const(0x21)
SET_PAGE_ADDR       = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP       = const(0xa0)
SET_MUX_RATIO       = const(0xa8)
SET_COM_OUT_DIR     = const(0xc0)
SET_DISP_OFFSET     = const(0xd3)
SET_COM_PIN_CFG     = const(0xda)
SET_DISP_CLK_DIV    = const(0xd5)
SET_PRECHARGE       = const(0xd9)
SET_VCOM_DESEL      = const(0xdb)
SET_CHARGE_PUMP     = const(0x8d)
 
 
class SSD1306:
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        # Note the subclass must initialize self.framebuf to a framebuffer.
        # This is necessary because the underlying data buffer is different
        # between I2C and SPI implementations (I2C needs an extra byte).
        self.poweron()
        self.init_display()
 
    def init_display(self):
        for cmd in (
            SET_DISP | 0x00, # off
            # address setting
            SET_MEM_ADDR, 0x00, # horizontal
            # resolution and layout
            SET_DISP_START_LINE | 0x00,
            SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
            SET_MUX_RATIO, self.height - 1,
            SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
            SET_DISP_OFFSET, 0x00,
            SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV, 0x80,
            SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
            SET_VCOM_DESEL, 0x30, # 0.83*Vcc
            # display
            SET_CONTRAST, 0xff, # maximum
            SET_ENTIRE_ON, # output follows RAM contents
            SET_NORM_INV, # not inverted
            # charge pump
            SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01): # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()
 
    def poweroff(self):
        self.write_cmd(SET_DISP | 0x00)
 
    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)
 
    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))
 
    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width == 64:
            # displays with width of 64 pixels are shifted by 32
            x0 += 32
            x1 += 32
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_framebuf()
 
    def fill(self, col):	# col = 1,屏幕全亮;col = 0,屏幕全灭,用于清屏
        self.framebuf.fill(col)
 
    def pixel(self, x, y, col):
        self.framebuf.pixel(x, y, col)
 
    def scroll(self, dx, dy):
        self.framebuf.scroll(dx, dy)
 
    def text(self, string, x, y, col=1):
        self.framebuf.text(string, x, y, col)
 
 
class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        # Add an extra byte to the data buffer to hold an I2C data/command byte
        # to use hardware-compatible I2C transactions.  A memoryview of the
        # buffer is used to mask this byte from the framebuffer operations
        # (without a major memory hit as memoryview doesn't copy to a separate
        # buffer).
        self.buffer = bytearray(((height // 8) * width) + 1)
        self.buffer[0] = 0x40  # Set first byte of data buffer to Co=0, D/C=1
        self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
        super().__init__(width, height, external_vcc)
 
    def write_cmd(self, cmd):
        self.temp[0] = 0x80 # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)
 
    def write_framebuf(self):
        # Blast out the frame buffer using a single I2C transaction to support
        # hardware I2C interfaces.
        self.i2c.writeto(self.addr, self.buffer)
 
    def poweron(self):
        pass
 
 
class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        self.buffer = bytearray((height // 8) * width)
        self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
        super().__init__(width, height, external_vcc)
 
    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.low()
        self.cs.low()
        self.spi.write(bytearray([cmd]))
        self.cs.high()
 
    def write_framebuf(self):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.high()
        self.cs.low()
        self.spi.write(self.buffer)
        self.cs.high()
 
    def poweron(self):
        self.res.high()
        time.sleep_ms(1)
        self.res.low()
        time.sleep_ms(10)
        self.res.high()

语音识别与播报

语音识别

该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + LU-ASR01语音识别-CSDN博客

仅使用LU-ASR01语音识别开发板的语音识别功能。当语音识别模块识别到特定语音时,通过串口输出特定字符。此时,ESP32的串口会接收到这个特定字符,从而得知用户说出的特定语音。待识别语音和输出字符之间的对应关系如下表所示:

待识别语音

输出字符

温度

t

湿度

h

光照

i

烟雾

s

语音播报

该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + SYN6288 输出语音-CSDN博客SYN6288语音合成模块使用说明(MicroPython、STM32、Arduino)-CSDN博客

驱动SYN6288语音合成模块进行中文语音播报的流程如下:

Voice.py

from machine import UART 
import time

from VoiceDelay import voice_delay
from integer2code import code_int

def voice_Initial():
    voice_uart = UART(2, 9600)       # 初始化串口, TX:17 RX:16, 波特率9600
    return voice_uart
    
def voice_out(voice_uart, temperature, humidity, illumination, smog):    
    if voice_uart.any():				# 如果串口收到了数据
        received_data = voice_uart.read()			# 读取所有收到的信息
        print('接收到的串口消息为:{}'.format(received_data))

        if received_data == b't':
            if temperature < 0:
                 voice_uart.write(bytes([253, 0, 7, 1, 0, 193, 227, 207, 194, 212]))	# 零下
                 time.sleep_ms(750)
            voice_uart.write(bytes(code_int(round(temperature))))
            time.sleep_ms(voice_delay(round(temperature)))
            voice_uart.write(bytes([253, 0, 9, 1, 0, 201, 227, 202, 207, 182, 200, 164]))	# 摄氏度
            
        if received_data == b'h':
            voice_uart.write(bytes([253, 0, 9, 1, 0, 176, 217, 183, 214, 214, 174, 133]))	# #百分之
            time.sleep_ms(1100)
            voice_uart.write(bytes(code_int(humidity)))
            
        if received_data == b'i':
            voice_uart.write(bytes(code_int(round(illumination))))
            time.sleep_ms(voice_delay(round(illumination)))
            voice_uart.write(bytes([253, 0, 9, 1, 0, 192, 213, 191, 203, 203, 185, 230]))	# 勒克斯
        
        if received_data == b's':
            voice_uart.write(bytes(code_int(smog)))

VoiceDelay.py

def get_num(num):
    numbers = {
               1: '一', 2: '二', 3: '三', 4: '四', 5: '五',
               6: '六', 7: '七', 8: '八', 9: '九', 0: '零'
              }
    return numbers.get(num)

def get_danwei(danwei):		#断位
    danweis = {
               1: 'temp', 2: '十', 3: '百', 4: '千',
               5: '万', 6: '十', 7: '百', 8: '千',
               9: '亿', 10: '十'
               }
    return danweis.get(danwei)

'''
得到一个整数对应的中文读音,如:12345读作一万两千三百四十五
但此函数不完善,存在少数错误,
如:输入0,输出None;   输入1002003, 输出['\u4e00', '\u767e', '\u96f6', '\u4e8c', '\u5343', '\u96f6', '\u4e09'](一百零二千零三)
但仍可勉强用于判断一个整数对应的中文读音的音节数(作为voice_delay(num)函数的参数)
'''
def num2Chinese(num):
    n = list((' '.join(str(abs(num)))).split())

    out = []
    danwei = 1
    zero = 0
    for i in n[::-1]:
        num = get_num(int(i))
        danwei_out = get_danwei(danwei)
        
        if i == '0':
            if zero != 0:
                zero = 0
                if not danwei == 1 or danwei == 5 or danwei == 9: # 1、5、9时候不输出0
                    out.insert(0, num)  # 其余情况输出0
                                        # 因为是倒着来的,所以插入要插到第一个
                    danwei += 1
                else:
                    out.insert(0, danwei_out)
                    out.insert(0, num)
                    danwei += 1
            else:
                if danwei == 1 or danwei == 5 or danwei == 9:
                    out.insert(0, danwei_out)
                    danwei += 1
                else:
                    danwei += 1
                    continue
        else:
            if danwei == 6:
                if i == '1':
                    out.insert(0, danwei_out)
                    zero = 1
                    danwei += 1
                else:
                    out.insert(0, danwei_out)
                    out.insert(0, num)
                    zero = 1
                    danwei += 1
            else:
                out.insert(0, danwei_out)
                out.insert(0, num)
                zero = 1
                danwei += 1

    temp = [i for i,j in enumerate(out) if j == 'temp']
    while len(temp) != 0:
        out.pop(temp[0])
        temp = [i for i,j in enumerate(out) if j == 'temp']
    
    #print(' '.join(out))
    
    return out  

def voice_delay(num):
    milliseconds = len(num2Chinese(num))*375
    return milliseconds

integer2code.py

# 输入:一个整数
# 输出:这个整数的位数
# 功能:计算给定整数是几位数
def num_of_digits(num):		 
    return len(str(abs(num)))	# 将整数转换为字符串,然后计算字符串的长度

# 输入:一个整数
# 输出:一个存有这个整数每一位数字的列表
# 功能:整数分解
def decompose_integer(num):		
    digits = [int(d) for d in str(abs(num))]	# 将整数转换为字符串,然后将每个字符转换为整数并存储在列表中
    return digits

# 输入:一个1位数
# 输出:该数字对应的ASCII码值
def get_ascii_of_unit_digit(single_digit):
    return ord(str(single_digit))

# 输入:一个整数
# 输出:这个整数的绝对值对应的语音编码
# 功能:通过串口发送此编码,SYN6288语音播报模块可以读出这个整数的绝对值
def code_int(num):
    digits = decompose_integer(num)		# 得到一个存有这个整数每一位数字的列表
    i = num_of_digits(num)	#计算给定整数的位数
    
    code = [0] * 12  # 最多存12位,即num最多为6位数.
    code[0] = 253
    code[1] = 0
    code[2] = i + 3
    code[3] = 1
    code[4] = 0
    
    for r in range(i):
        code[4 + (r + 1)] = get_ascii_of_unit_digit(digits[r])	# 得到给定的1位数对应的ASCII码
    
    
    check = 0
    check_code = [0, 0, 0, 0, 0, 0, 0, 0]
    code_copy = code[:i + 5]

    for m in range(8):
        for r in range(i + 5):
            check_code[m] += (code_copy[r] & 1)
            code_copy[r] >>= 1
        check += (check_code[m] & 1) * (2 ** m)
    
    code[4 + i + 1] = check  # 这位存校验码
    
    
    return code

发送短信和来电提示

该功能实现的原理及流程可参考:A9G模块实现发短信和打电话的流程说明-CSDN博客Thonny IDE + MicroPython + ESP32 + A9G 发短信打电话-CSDN博客

A9G.py

from machine import UART     # 导入串口模块
# import time


'''
    UART0	UART1	UART2
TX	  1		 10		 17
RX	  3		 9		 16

UART0已被上位机占用
UART1的默认引脚用于外接flash
'''


def a9g_Initial():
    text_message_and_call_uart = UART(1, baudrate=115200, tx=23,)       # 初始化串口1,波特率115200,tx=23,不使用rx
    return text_message_and_call_uart

def text_message_and_call(text_message_and_call_uart):
    '''
    AT_instruction = "AT\r\n"	# 查询是否与模块建立联系
    text_message_and_call_uart.write(AT_instruction)
    time.sleep_ms(100)

    AT_instruction = "AT+CSQ\r\n"	# 查询信号强度 第一个参数为信号强度值
    text_message_and_call_uart.write(AT_instruction)
    time.sleep_ms(100)

    AT_instruction = "AT+CCID\r\n"	# 获取SIM卡的序列号,用来检测是否有SIM卡
    text_message_and_call_uart.write(AT_instruction)
    time.sleep_ms(100)
    '''

    AT_instruction = "AT+CMGF=1\r\n"	# 文本方式发送短信  
    text_message_and_call_uart.write(AT_instruction)

    AT_instruction = "AT+CMGS=\"18601652379\"\r\n"	# 向指定号码发送短信  
    text_message_and_call_uart.write(AT_instruction)

    AT_instruction = "The smoke concentration in the laboratory is high, please deal with it as soon as possible.\r\n"	 # 短信的内容
    text_message_and_call_uart.write(AT_instruction)

    AT_instruction = [0x1a]		# 结束符
    text_message_and_call_uart.write(bytes(AT_instruction))
    
    AT_instruction = "ATD18601652379\r\n"	#   向指定号码打电话
    text_message_and_call_uart.write(AT_instruction)

错误和注意事项

  • 按ESP32的EN复位后,DHT11模块会报错,导致其无法上电后自动运行。但使用Thonny编译器的开始按钮(软起动)可正常执行程序。
  • 当环境信息的数值为0时,语音播报出错,原因见VoiceDelay.py的num2Chinese函数。
  • A9G模块需要更高的电压、更大的电流,要用microUSB独立供电(插手机充电器上)。其microUSB口不要和其他模块共用同一个拓展坞,电脑的usb也带不动它。
  • MQ-2模块上电后要等3分钟使其稳定。MQ-2模块会发热,不要靠近温湿度模块。

参考

Quick reference for the ESP32 — MicroPython latest documentation

Python+ESP32 快速上手(持续更新中)【 通俗易懂 】_哔哩哔哩_bilibili

标签:基于,SET,ESP32,0x00,write,环境监测,import,self,def
From: https://blog.csdn.net/qq_44955826/article/details/144359532

相关文章