最终效果
<iframe allowfullscreen="true" data-mediaembed="csdn" frameborder="0" id="5LIJKUnb-1733744395061" src="https://live.csdn.net/v/embed/437946"></iframe>环境监测
项目介绍
该项目是“物联网实验室监测控制系统设计(仿智能家居)”项目中的“环境监测系统设计”子项目,前者还包括“物联网设计”、“门禁设计”、“家电控制设计”和“小程序设计”等内容。本文只介绍“环境监测”部分。
项目功能实现的大致思路为:单片机采集温度、湿度、光照强度、烟雾传感器传来的数据,将此环境信息实时显示到屏幕上,并通过WiFi将信息传至MQTT服务器(以供小程序调用)。当实验室发生火灾时,驱动警报模块向实验室管理员发送短信和来电提示。当收到语音指令时,播报相应环境信息。
硬件设计
接线
ESP32 | DS18B20 | DHT11 | GY-302 | MQ-2 | OLED | LU-ASR01 | SYN6288 | A9G |
3V3 | VCC | VCC | VCC | VCC | VCC | 5V | VDD | |
GND | GND | GND | GND | GND | GND | G | G | GND |
D23 | U1RX | |||||||
D27 | DATA | |||||||
D13 | DQ | |||||||
D18 | SCL | |||||||
D19 | SDA | |||||||
D15 | DO | |||||||
D32 | AO | |||||||
D25 | SCL | |||||||
D26 | SDA | |||||||
D17 | RXD | |||||||
D16 | TX |
PCB设计
此电路板仅是为了代替杜邦线而已,上面只有引脚排座,而没有任何电子元件。
成本
ESP32 | DS18B20 | DHT11 | GY-302 | MQ-2 | OLED | LU-ASR01 | SYN6288 | A9G |
14 | 3.9 | 5.3 | 7.5 | 9 | 14.5 | 47.5 | 49.7 | 51 |
其中共需200元左右来购买该项目所需的模块。此外还需2根数据线、焊接工具(电烙铁、锡丝、引脚排座)、PCB打板或若干杜邦线。
软件设计
本次的开发环境为Thonny IDE,使用MicroPython语言编写驱动程序,采用裸机编程。自编库文件关系如下:
主程序
在完成所有模块的初始化后,进入循环。循环期间ESP32先获取四个传感器测得的实验室环境数据,之后将此信息显示到屏幕上,并通过WiFi传至MQTT服务器。最后处理烟雾浓度高和用户语音问讯这两种特殊情况。
在循环中,烟雾警报和语音播报环境信息较为特殊:
这其中,text_message_and_call_flag为发短信和打电话的标志位,初始值为0,发送过短信并打过电话后置1。目的是确保循环里最多发一次短信并打一次电话。这是因为只有当实验室烟雾浓度高于阈值时,系统才会向实验室管理员发短信和打电话。此时,管理员必须立刻亲自到现场处理实验室安全事故并重置系统。在此期间,系统不会反复向实验室管理员发短信和打电话。
smog_analog是MQ-2模块采用11dB衰减后的模拟信号引脚数值。经实验得出,当其值大于1500时,实验室烟雾浓度高。因此将烟雾阈值设为1500。
循环5次是因为本系统采用裸机编程,在不影响其他功能的前提下,为及时响应用户的语音指令,程序需在大部分时间段内处于等待和处理语音信号阶段。当程序不处于此阶段时,系统依然可以响应用户的语音指令。这是因为LU-ASR01语音识别模块发送的串口信号会先缓存在ESP32的寄存器中,等到程序运行至此阶段时,便会处理相应信号。
main.py
from machine import Pin
import time
#引入自编函数
from ConnectWiFi import connect_wifi
from MQTTPublish import mqtt_initial, mqtt_publish_information
from DS18B20 import ds18b20_initial, temperature_measure
from DHT11 import dht11_Initial, humidity_measure
from GY302 import gy_302_Initial, illumination_measure
from MQ2 import MQ_2_Initial, smog_measure
from OLED import oled_Initial, oled_display_environmental_information
from Voice import voice_Initial, voice_out
from A9G import a9g_Initial, text_message_and_call
connect_wifi()
mqtt = mqtt_initial()
ds18b20 = ds18b20_initial()
dht11 = dht11_Initial()
(gy302_addr, gy302_i2c) = gy_302_Initial()
(MQ_2_digital_pin, MQ_2_analog_pin) = MQ_2_Initial()
OLED = oled_Initial()
voice_uart = voice_Initial()
text_message_and_call_uart = a9g_Initial()
text_message_and_call_flag = 0 #记录是否发送过短信并打过电话,确保循环里最多发一次短信并打一次电话
while True:
temperature = temperature_measure(ds18b20)
humidity = humidity_measure(dht11)
illumination = illumination_measure(gy302_addr, gy302_i2c)
(smog_digital, smog_analog, smog_voltage) = smog_measure(MQ_2_digital_pin, MQ_2_analog_pin)
print("温度:%.2f℃ 湿度:%d%% 光照值:%dLx 烟雾模拟信号值:%d" %(temperature, humidity, illumination, smog_analog))
oled_display_environmental_information(OLED, temperature, humidity, illumination, smog_analog)
mqtt_publish_information(mqtt, temperature, humidity, illumination, smog_analog)
if text_message_and_call_flag == 0 and smog_analog > 1500:
text_message_and_call(text_message_and_call_uart)
text_message_and_call_flag = 1
for i in range(5): #使程序大部分时间都用来等待语音信号。其实也可用中断实现
voice_out(voice_uart, temperature, humidity, illumination, smog_analog)
time.sleep_ms(200)
控制器连接WiFi
该功能实现的原理及流程可参考:ESP32连接wifi (MicroPython)-CSDN博客
ESP32连接WiFi需设置WiFi模式,并提供WiFi名称和密码:
ConnectWiFi.py
#esp32会自动尝试连接之前连接过的WiFi(比如路由器重启)
#若想连接其他WiFi,重启esp32(按EN键)后再执行新程序
#若未重启就运行连接其他WiFi的新程序,esp32有可能连接之前的WiFi
import network, time
def connect_wifi():
wlan = network.WLAN(network.STA_IF) # 无线终端模式
wlan.active(True) # activate the interface
#print(wlan.scan()) #打印出扫描到的所有WiFi信号
if not wlan.isconnected(): #如果WiFi未连接
print('connecting to network...')
wlan.connect("DOILMSBOIOT", "doilmsboiot") #连接WiFi,名称:DOILMSBOIOT,密码:doilmsboiot
i = 1
while not wlan.isconnected():
print("正在链接...{}".format(i))
i += 1
time.sleep(1)
print("WiFi连接成功") #打印提示信息
print(wlan.config('mac')) # 打印 the interface's MAC address
print(wlan.ifconfig()) # 打印 the interface's IP/netmask/gw/DNS addresses
ESP32连接MQTT服务器并向指定主题发布指定内容
该功能实现的原理及流程可参考:ESP32向MQTT服务器发送消息(MicroPython)-CSDN博客
首先设置客户端名称和MQTT服务器域名,创建MQTT对象;随后连接服务器;接着定义发布主题、内容及相应属性;最后将消息发布至指定主题。
MQTTPublish.py
import network
from umqttsimple import MQTTClient
def mqtt_initial():
# 创建MQTT,参数1:客户端名称,参数2:MQTT服务器
mqtt = MQTTClient("esp32_{}".format(network.WLAN().config('mac')), "broker.emqx.io")
# 连接MQTT服务器
mqtt.connect()
return mqtt
def mqtt_publish_information(mqtt, temperature, humidity, illumination, smog):
# 发布消息
publishTopic = b"environmentalMonitoring1"
publishContent = b"{\"temperature\":%.2f, \"humidity\":%d, \"illumination\":%d, \"smog\":%d }" %(temperature, humidity, illumination, smog)
mqtt.publish(publishTopic, publishContent, retain=True, qos=0)
print('已向MQTT服务器发布消息')
umqttsimple.py
import usocket as socket
import ustruct as struct
from ubinascii import hexlify
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None
self.lw_msg = None
self.lw_qos = 0
self.lw_retain = False
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def _recv_len(self):
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7F) << sh
if not b & 0x80:
return n
sh += 7
def set_callback(self, f):
self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0):
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain
def connect(self, clean_session=True):
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[7] |= self.keepalive >> 8
msg[8] |= self.keepalive & 0x00FF
if self.lw_topic:
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[6] |= self.lw_retain << 5
i = 1
while sz > 0x7F:
premsg[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
self.sock.write(premsg, i + 2)
self.sock.write(msg)
# print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.write(b"\xe0\0")
self.sock.close()
def ping(self):
self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"\x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
# print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xF0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0
# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()
测量温度
该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + DS18B20 测量环境温度_esp32用thonny编程的语法-CSDN博客
DS18B20.py
from machine import Pin
import onewire, ds18x20
def ds18b20_initial():
ds_pin = Pin(13)
ds18b20 = ds18x20.DS18X20(onewire.OneWire(ds_pin))
return ds18b20
def temperature_measure(ds18b20):
rom = ds18b20.scan()
ds18b20.convert_temp()
temperature = ds18b20.read_temp(rom[0])
temperature = round(temperature, 2)
return temperature
测量湿度
该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + DHT11 测量环境温湿度_esp32+micropython 检测土壤湿度 代码-CSDN博客
DHT11.py
from machine import Pin
import dht
def dht11_Initial():
dht11 = dht.DHT11(Pin(27))
return dht11
def humidity_measure(dht11):
dht11.measure()
humidity = dht11.humidity()
return humidity
测量光照强度
该功能实现的原理及流程可参考:
Thonny IDE + MicroPython + ESP32 + GY-302 测量环境中的光照强度_esp32 micropython 光照强度-CSDN博客
GY302.py
from machine import Pin,I2C
#import time
def gy_302_Initial():
gy302_addr = 0x23 # 光线传感器I2C地址
gy302_i2c = I2C(0, freq = 1_000_000) #初始化IIC0,使用默认引脚“SCL=18、SDA=19”,传输速度:100 Kbps
#print(hex(i2c.scan()[0])) # 打印器件I2C地址
gy302_i2c.writeto(gy302_addr, chr(0x01)) # 发送上电命令
#i2c.writeto(gy302_addr, chr(0x07)) # 发送复位命令
gy302_i2c.writeto(gy302_addr, chr(0x10)) # 发送高分辨率连续测量命令 分辨率:1Lx 测量时间:120ms
#time.sleep_ms(180) # 保证通讯
return (gy302_addr, gy302_i2c)
def illumination_measure(gy302_addr, gy302_i2c):
gy = gy302_i2c.readfrom(gy302_addr, 2) # 从gy302_addr地址设备读取2位数据
gy302 = float(gy[0] << 8 | gy[1]) / 1.2 # 左移,可以理解为 gy[0]*0xff
return gy302
测量烟雾浓度
该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 +MQ-2 测量环境中的烟雾浓度_micropython+esp32+mq2传感器-CSDN博客
MQ2.py
#对于模拟信号,浓度越高,输出电压越高。
#对于数字信号,高于阈值,为0,LED亮;低于阈值,为1,LED灭。
#通电后,需要先预热约60s后测量的数据才稳定。通电后出现正常的轻度发热现象,因为内部有电热丝。
#顺时针旋转,阈值升高;逆时针旋转,阈值降低
from machine import Pin, ADC
def MQ_2_Initial():
MQ_2_digital_pin = Pin(15)
MQ_2_analog_pin = ADC(32)
MQ_2_analog_pin.atten(ADC.ATTN_11DB) #11dB衰减,使测量范围大致变为:0 ~ 3.3V
return (MQ_2_digital_pin, MQ_2_analog_pin)
def smog_measure(MQ_2_digital_pin, MQ_2_analog_pin):
#print("数字信号值:",MQ_2_digital_pin.value())
#print("模拟信号值:",MQ_2_analog_pin.read()," ","输出电压:",round(MQ_2_analog_pin.read_uv()/1000000,2),"V")
return (MQ_2_digital_pin.value(), MQ_2_analog_pin.read(), round(MQ_2_analog_pin.read_uv()/1000000,2))
屏幕显示环境信息
该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + 0.96寸OLED(IIC) 显示任意字符_thonny esp32 oled-CSDN博客
OLED.py
from machine import Pin, I2C
from SSD1306 import SSD1306_I2C
def oled_Initial():
i2c = I2C(1) # 初始化IIC1,使用默认引脚“SCL=25、SDA=26”,传输速度:400 Kbps
OLED = SSD1306_I2C(128, 64, i2c) # 屏幕长度:128像素;屏幕宽度:64像素
return OLED
# 用到的UTF-8编码字库
fonts = {
0xE6B8A9: # “温”的UTF-8编码
[0x00,0x23,0x12,0x12,0x83,0x42,0x42,0x13,0x10,0x27,0xE4,0x24,0x24,0x24,0x2F,0x00,
0x00,0xF8,0x08,0x08,0xF8,0x08,0x08,0xF8,0x00,0xFC,0xA4,0xA4,0xA4,0xA4,0xFE,0x00], #温
0xE6B9BF:
[0x00,0x27,0x14,0x14,0x87,0x44,0x44,0x17,0x11,0x21,0xE9,0x25,0x23,0x21,0x2F,0x00,
0x00,0xF8,0x08,0x08,0xF8,0x08,0x08,0xF8,0x20,0x20,0x24,0x28,0x30,0x20,0xFE,0x00], #湿
0xE5BAA6:
[0x01,0x00,0x3F,0x22,0x22,0x3F,0x22,0x22,0x23,0x20,0x2F,0x24,0x42,0x41,0x86,0x38,
0x00,0x80,0xFE,0x20,0x20,0xFC,0x20,0x20,0xE0,0x00,0xF0,0x10,0x20,0xC0,0x30,0x0E], #度
0xE58589:
[0x01,0x21,0x11,0x09,0x09,0x01,0xFF,0x04,0x04,0x04,0x04,0x08,0x08,0x10,0x20,0xC0,
0x00,0x08,0x08,0x10,0x20,0x00,0xFE,0x40,0x40,0x40,0x40,0x42,0x42,0x42,0x3E,0x00], #光
0xE785A7:
[0x00,0x7D,0x44,0x44,0x44,0x44,0x7D,0x44,0x44,0x44,0x44,0x7C,0x00,0x48,0x44,0x84,
0x00,0xFC,0x44,0x44,0x44,0x94,0x08,0xFC,0x84,0x84,0x84,0xFC,0x00,0x88,0x44,0x44], #照
0xE7839F:
[0x10,0x13,0x12,0x16,0x5A,0x52,0x53,0x92,0x12,0x12,0x12,0x2A,0x27,0x42,0x43,0x82,
0x00,0xFE,0x02,0x22,0x22,0x22,0xFE,0x22,0x22,0x52,0x4A,0x8A,0x02,0x02,0xFE,0x02], #烟
0xE99BBE:
[0x00,0x3F,0x01,0x7F,0x49,0x01,0x1D,0x08,0x1F,0x68,0x07,0x7A,0x0F,0x04,0x18,0x00,
0x00,0xF8,0x00,0xFC,0x24,0x00,0x70,0x00,0xF0,0x20,0xC0,0x3C,0xE0,0x20,0xC0,0x00], #雾
0xE28483:
[0x00,0x00,0x00,0x00,0x20,0x33,0x04,0x08,0x08,0x08,0x08,0x07,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x80,0x40,0x00,0x00,0x00,0x40,0x80,0x00,0x00,0x00,0x00], #℃
}
# 先找出字符的utf-8编码,再在fonts中找到对应的点阵,最后将点阵输出到OLED上,从而实现在OLED上显示汉字和特殊字符
def oled_utf8(ch_str, x_axis, y_axis, OLED):
offset_ = 0
for k in ch_str:
code = 0x00 # 将中文转成16进制编码
data_code = k.encode("utf-8")
code |= data_code[0] << 16
code |= data_code[1] << 8
code |= data_code[2]
byte_data = fonts[code]
for y in range(0, 16):
a_ = bin(byte_data[y]).replace('0b', '')
while len(a_) < 8:
a_ = '0'+ a_
b_ = bin(byte_data[y+16]).replace('0b', '')
while len(b_) < 8:
b_ = '0'+ b_
for x in range(0, 8):
OLED.pixel(x_axis + offset_ + x, y+y_axis, int(a_[x]))
OLED.pixel(x_axis + offset_ + x +8, y+y_axis, int(b_[x]))
offset_ += 16
def oled_acsii(ch_str, x_axis, y_axis, OLED):
OLED.text(ch_str, x_axis, y_axis)
def oled_display_environmental_information(OLED, temperature, humidity, illumination, smog):
OLED.fill(0) # 清屏
oled_utf8('温度', 0, 0, OLED)
oled_acsii(':', 32, 5, OLED)
oled_acsii('{:.2f}'.format(temperature), 40, 5, OLED)
oled_utf8('℃', 40+8*len(str('{:.2f}'.format(temperature))), 0, OLED)
oled_utf8('湿度', 0, 16, OLED)
oled_acsii(':', 32, 21, OLED)
oled_acsii(str(humidity), 40, 21, OLED)
oled_acsii('%', 40+8*len(str(humidity)), 21, OLED)
oled_utf8('光照', 0, 32, OLED)
oled_acsii(':', 32, 37, OLED)
oled_acsii('{:.0f}'.format(illumination), 40, 37, OLED)
oled_acsii('LX', 40+8*len(str('{:.0f}'.format(illumination))), 37, OLED)
oled_utf8('烟雾', 0, 48, OLED)
oled_acsii(':', 32, 53, OLED)
oled_acsii(str(smog), 40, 53, OLED)
OLED.show() # 显示上述信息
SSD1306.py
#MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
import time
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xa4)
SET_NORM_INV = const(0xa6)
SET_DISP = const(0xae)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xa0)
SET_MUX_RATIO = const(0xa8)
SET_COM_OUT_DIR = const(0xc0)
SET_DISP_OFFSET = const(0xd3)
SET_COM_PIN_CFG = const(0xda)
SET_DISP_CLK_DIV = const(0xd5)
SET_PRECHARGE = const(0xd9)
SET_VCOM_DESEL = const(0xdb)
SET_CHARGE_PUMP = const(0x8d)
class SSD1306:
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
# Note the subclass must initialize self.framebuf to a framebuffer.
# This is necessary because the underlying data buffer is different
# between I2C and SPI implementations (I2C needs an extra byte).
self.poweron()
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR, 0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
SET_VCOM_DESEL, 0x30, # 0.83*Vcc
# display
SET_CONTRAST, 0xff, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_framebuf()
def fill(self, col): # col = 1,屏幕全亮;col = 0,屏幕全灭,用于清屏
self.framebuf.fill(col)
def pixel(self, x, y, col):
self.framebuf.pixel(x, y, col)
def scroll(self, dx, dy):
self.framebuf.scroll(dx, dy)
def text(self, string, x, y, col=1):
self.framebuf.text(string, x, y, col)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
# buffer is used to mask this byte from the framebuffer operations
# (without a major memory hit as memoryview doesn't copy to a separate
# buffer).
self.buffer = bytearray(((height // 8) * width) + 1)
self.buffer[0] = 0x40 # Set first byte of data buffer to Co=0, D/C=1
self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_framebuf(self):
# Blast out the frame buffer using a single I2C transaction to support
# hardware I2C interfaces.
self.i2c.writeto(self.addr, self.buffer)
def poweron(self):
pass
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.buffer = bytearray((height // 8) * width)
self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.low()
self.cs.low()
self.spi.write(bytearray([cmd]))
self.cs.high()
def write_framebuf(self):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.high()
self.cs.low()
self.spi.write(self.buffer)
self.cs.high()
def poweron(self):
self.res.high()
time.sleep_ms(1)
self.res.low()
time.sleep_ms(10)
self.res.high()
语音识别与播报
语音识别
该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + LU-ASR01语音识别-CSDN博客
仅使用LU-ASR01语音识别开发板的语音识别功能。当语音识别模块识别到特定语音时,通过串口输出特定字符。此时,ESP32的串口会接收到这个特定字符,从而得知用户说出的特定语音。待识别语音和输出字符之间的对应关系如下表所示:
待识别语音 | 输出字符 |
温度 | t |
湿度 | h |
光照 | i |
烟雾 | s |
语音播报
该功能实现的原理及流程可参考:Thonny IDE + MicroPython + ESP32 + SYN6288 输出语音-CSDN博客、SYN6288语音合成模块使用说明(MicroPython、STM32、Arduino)-CSDN博客
驱动SYN6288语音合成模块进行中文语音播报的流程如下:
Voice.py
from machine import UART
import time
from VoiceDelay import voice_delay
from integer2code import code_int
def voice_Initial():
voice_uart = UART(2, 9600) # 初始化串口, TX:17 RX:16, 波特率9600
return voice_uart
def voice_out(voice_uart, temperature, humidity, illumination, smog):
if voice_uart.any(): # 如果串口收到了数据
received_data = voice_uart.read() # 读取所有收到的信息
print('接收到的串口消息为:{}'.format(received_data))
if received_data == b't':
if temperature < 0:
voice_uart.write(bytes([253, 0, 7, 1, 0, 193, 227, 207, 194, 212])) # 零下
time.sleep_ms(750)
voice_uart.write(bytes(code_int(round(temperature))))
time.sleep_ms(voice_delay(round(temperature)))
voice_uart.write(bytes([253, 0, 9, 1, 0, 201, 227, 202, 207, 182, 200, 164])) # 摄氏度
if received_data == b'h':
voice_uart.write(bytes([253, 0, 9, 1, 0, 176, 217, 183, 214, 214, 174, 133])) # #百分之
time.sleep_ms(1100)
voice_uart.write(bytes(code_int(humidity)))
if received_data == b'i':
voice_uart.write(bytes(code_int(round(illumination))))
time.sleep_ms(voice_delay(round(illumination)))
voice_uart.write(bytes([253, 0, 9, 1, 0, 192, 213, 191, 203, 203, 185, 230])) # 勒克斯
if received_data == b's':
voice_uart.write(bytes(code_int(smog)))
VoiceDelay.py
def get_num(num):
numbers = {
1: '一', 2: '二', 3: '三', 4: '四', 5: '五',
6: '六', 7: '七', 8: '八', 9: '九', 0: '零'
}
return numbers.get(num)
def get_danwei(danwei): #断位
danweis = {
1: 'temp', 2: '十', 3: '百', 4: '千',
5: '万', 6: '十', 7: '百', 8: '千',
9: '亿', 10: '十'
}
return danweis.get(danwei)
'''
得到一个整数对应的中文读音,如:12345读作一万两千三百四十五
但此函数不完善,存在少数错误,
如:输入0,输出None; 输入1002003, 输出['\u4e00', '\u767e', '\u96f6', '\u4e8c', '\u5343', '\u96f6', '\u4e09'](一百零二千零三)
但仍可勉强用于判断一个整数对应的中文读音的音节数(作为voice_delay(num)函数的参数)
'''
def num2Chinese(num):
n = list((' '.join(str(abs(num)))).split())
out = []
danwei = 1
zero = 0
for i in n[::-1]:
num = get_num(int(i))
danwei_out = get_danwei(danwei)
if i == '0':
if zero != 0:
zero = 0
if not danwei == 1 or danwei == 5 or danwei == 9: # 1、5、9时候不输出0
out.insert(0, num) # 其余情况输出0
# 因为是倒着来的,所以插入要插到第一个
danwei += 1
else:
out.insert(0, danwei_out)
out.insert(0, num)
danwei += 1
else:
if danwei == 1 or danwei == 5 or danwei == 9:
out.insert(0, danwei_out)
danwei += 1
else:
danwei += 1
continue
else:
if danwei == 6:
if i == '1':
out.insert(0, danwei_out)
zero = 1
danwei += 1
else:
out.insert(0, danwei_out)
out.insert(0, num)
zero = 1
danwei += 1
else:
out.insert(0, danwei_out)
out.insert(0, num)
zero = 1
danwei += 1
temp = [i for i,j in enumerate(out) if j == 'temp']
while len(temp) != 0:
out.pop(temp[0])
temp = [i for i,j in enumerate(out) if j == 'temp']
#print(' '.join(out))
return out
def voice_delay(num):
milliseconds = len(num2Chinese(num))*375
return milliseconds
integer2code.py
# 输入:一个整数
# 输出:这个整数的位数
# 功能:计算给定整数是几位数
def num_of_digits(num):
return len(str(abs(num))) # 将整数转换为字符串,然后计算字符串的长度
# 输入:一个整数
# 输出:一个存有这个整数每一位数字的列表
# 功能:整数分解
def decompose_integer(num):
digits = [int(d) for d in str(abs(num))] # 将整数转换为字符串,然后将每个字符转换为整数并存储在列表中
return digits
# 输入:一个1位数
# 输出:该数字对应的ASCII码值
def get_ascii_of_unit_digit(single_digit):
return ord(str(single_digit))
# 输入:一个整数
# 输出:这个整数的绝对值对应的语音编码
# 功能:通过串口发送此编码,SYN6288语音播报模块可以读出这个整数的绝对值
def code_int(num):
digits = decompose_integer(num) # 得到一个存有这个整数每一位数字的列表
i = num_of_digits(num) #计算给定整数的位数
code = [0] * 12 # 最多存12位,即num最多为6位数.
code[0] = 253
code[1] = 0
code[2] = i + 3
code[3] = 1
code[4] = 0
for r in range(i):
code[4 + (r + 1)] = get_ascii_of_unit_digit(digits[r]) # 得到给定的1位数对应的ASCII码
check = 0
check_code = [0, 0, 0, 0, 0, 0, 0, 0]
code_copy = code[:i + 5]
for m in range(8):
for r in range(i + 5):
check_code[m] += (code_copy[r] & 1)
code_copy[r] >>= 1
check += (check_code[m] & 1) * (2 ** m)
code[4 + i + 1] = check # 这位存校验码
return code
发送短信和来电提示
该功能实现的原理及流程可参考:A9G模块实现发短信和打电话的流程说明-CSDN博客、Thonny IDE + MicroPython + ESP32 + A9G 发短信打电话-CSDN博客
A9G.py
from machine import UART # 导入串口模块
# import time
'''
UART0 UART1 UART2
TX 1 10 17
RX 3 9 16
UART0已被上位机占用
UART1的默认引脚用于外接flash
'''
def a9g_Initial():
text_message_and_call_uart = UART(1, baudrate=115200, tx=23,) # 初始化串口1,波特率115200,tx=23,不使用rx
return text_message_and_call_uart
def text_message_and_call(text_message_and_call_uart):
'''
AT_instruction = "AT\r\n" # 查询是否与模块建立联系
text_message_and_call_uart.write(AT_instruction)
time.sleep_ms(100)
AT_instruction = "AT+CSQ\r\n" # 查询信号强度 第一个参数为信号强度值
text_message_and_call_uart.write(AT_instruction)
time.sleep_ms(100)
AT_instruction = "AT+CCID\r\n" # 获取SIM卡的序列号,用来检测是否有SIM卡
text_message_and_call_uart.write(AT_instruction)
time.sleep_ms(100)
'''
AT_instruction = "AT+CMGF=1\r\n" # 文本方式发送短信
text_message_and_call_uart.write(AT_instruction)
AT_instruction = "AT+CMGS=\"18601652379\"\r\n" # 向指定号码发送短信
text_message_and_call_uart.write(AT_instruction)
AT_instruction = "The smoke concentration in the laboratory is high, please deal with it as soon as possible.\r\n" # 短信的内容
text_message_and_call_uart.write(AT_instruction)
AT_instruction = [0x1a] # 结束符
text_message_and_call_uart.write(bytes(AT_instruction))
AT_instruction = "ATD18601652379\r\n" # 向指定号码打电话
text_message_and_call_uart.write(AT_instruction)
错误和注意事项
- 按ESP32的EN复位后,DHT11模块会报错,导致其无法上电后自动运行。但使用Thonny编译器的开始按钮(软起动)可正常执行程序。
- 当环境信息的数值为0时,语音播报出错,原因见VoiceDelay.py的num2Chinese函数。
- A9G模块需要更高的电压、更大的电流,要用microUSB独立供电(插手机充电器上)。其microUSB口不要和其他模块共用同一个拓展坞,电脑的usb也带不动它。
- MQ-2模块上电后要等3分钟使其稳定。MQ-2模块会发热,不要靠近温湿度模块。
参考
Quick reference for the ESP32 — MicroPython latest documentation
Python+ESP32 快速上手(持续更新中)【 通俗易懂 】_哔哩哔哩_bilibili
标签:基于,SET,ESP32,0x00,write,环境监测,import,self,def From: https://blog.csdn.net/qq_44955826/article/details/144359532