为了测试树莓派的GPIO接口,并尝试发出数据,做此实验并记录。


实验实现了以下内容: 

  1. 通过树莓派GPIO来连接并控制DHT11传感器,获得温度,湿度数据;
  2. 使用MQTT将树莓派所获取的温度与湿度数据广播;
  3. PC端通过订阅MQTT服务器的广播获取湿度与温度数据,并存入数据库。

1.树莓派连接控制DHT11传感器,获得温度,湿度数据并用MQTT广播

  1. 在树莓派未通电情况下连接GPIO引脚与DHT11;

启动树莓派,并根据所连接引脚进行编码。

# 此代码块在树莓派端运行
import time
import board
import adafruit_dht  # 读取DHT11数据使用的库,该库由官方提供下载
from datetime import datetime
import paho.mqtt.client as mqtt  # mqtt库
import json 

# mqtt 服务器地址/端口
broker = "192.168.137.57"
port = 1883
keepalive = 60
# 需要发布的主题
topic = "DHT11_message"

# 实例化并连接
client = mqtt.Client()
client.connect(broker, port)

Pin = board.D4  # 使用GPIO(BCM),注意根据官方提供的GPIO引脚图进行连接
dhtDevice = adafruit_dht.DHT11(Pin)

while True:
    try:
        # 获取时间
        recDate = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        # 获取传感器数值
        temperature_c = dhtDevice.temperature
        humidity = dhtDevice.humidity
        # mqtt 传输
        msg = json.dumps({
            'Tem':temperature_c,
            'Hum':humidity,
            'Date':recDate
        })
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        print(msg)

    except RuntimeError as error:
        # Errors happen fairly often, DHT's are hard to read, just keep going
        print(error.args[0])
        time.sleep(2.0)
        continue
    except Exception as error:
        dhtDevice.exit()
        raise error
    time.sleep(2.0)

2.订阅MQTT服务器的广播获取湿度与温度数据,并存入数据库

本实验中我将MQTT服务器搭建在树莓派上,所以MQTT服务器的地址就是树莓派目前的IP。当然也可以在PC端搭建MQTT服务器

'''
PC端DHT11数据接收,并写入数据库中
'''
import paho.mqtt.client as mqtt
import time
import json
import pymysql

# 连接数据库用参数
DBHOST = 'localhost'
DBUSER = 'root'
DBPASS = '123456'  # 数据库登录密码
DBNAME = 'DHT11'   # 数据库 库名

# 连接数据库并创建实例
try:
    # 连接数据库,所需参数(主机名,用户名,密码,数据库名)
    db = pymysql.connect(host=DBHOST, user=DBUSER, password=DBPASS, database=DBNAME,charset='utf8')
    print('连接成功')
except pymysql.Error as e:
    print('连接数据库失败'+str(e))

cur = db.cursor()

# MQTT服务器地址及端口
HOST = "192.168.000.000"  # 服务器ip地址
MQTT_PORT = 1883  # 服务器端口(基本默认为1883)

# 访问mqtt数据部分
def on_connect(client, userdata, flags, rc):
    rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权"]
    print("connect:", rc_status[rc])

def on_message(client, userdata, msg):
    print("主题:", msg.topic)
    res = msg.payload.decode("UTF-8")
    data = json.loads(res)
    print("消息:", data)
    write_mysql(data)

#写入数据库
def write_mysql(data):
    sql ="insert into sensordata (nowtime,tem,hum) values (%s,%s,%s)"
    params = (data['Date'],data['Tem'],data['Hum'])
    db.ping(reconnect=True)
    cur.execute(sql,params)
    db.commit()
    print("数据库插入成功")

def main():
    client = mqtt.Client()
    client.on_connect = on_connect  # 返回连接状态的回调函数
    client.connect(HOST, MQTT_PORT, keepalive=600)  # 连接服务器
    client.on_message = on_message  # 定义回调函数
    client.subscribe('DHT11_message', qos=0)  # 订阅主题
    client.loop_forever()           

# 执行
main()


Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐