【python】使用Python将MQTT数据写入MySQL
文章目录简化方案1、创建数据库2、安装paho库3、安装pymysql库4、使用python连接MQTT服务器5. 将数据写入MySQL数据库推送报文格式为:{"a":value,"b":value}需要将里面的a、b的值存入MySQL数据库,并将接受时间保存进数据库。简化方案在服务器上,模拟一台设备,订阅需要接收的主题,当接收到该主题发送来的数据的时候,将报文里面的两个数值保存到MySQL数据库
·
推送报文格式为:
{"a":value,"b":value}
需要将里面的a、b的值存入MySQL数据库,并将接受时间保存进数据库。
简化方案
在服务器上,模拟一台设备,订阅需要接收的主题,当接收到该主题发送来的数据的时候,将报文里面的两个数值保存到MySQL数据库里面。
1、创建数据库
三个字段,分别是是时间、a、b
CREATE TABLE `mqtt_test`.`test` ( `get_time` DATETIME NOT NULL , `a` INT NOT NULL , `b` INT NOT NULL ) ENGINE = InnoDB;
2、安装paho库
paho库用于完成mqtt通讯
anaconda下或云服务器下,在prompt下安装paho库:
pip install paho-mqtt
3、安装pymysql库
pymysql库用于完成mysql操作
pip install pymysql
4、使用python连接MQTT服务器
# -*- coding: utf-8 -*-
#!/usr/bin/python
# -*- coding: utf-8 -*
import paho.mqtt.client as mqtt
import json
import pymysql
import time
def gettime():
time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
return time1
# 服务器地址
host = 'MQTT服务器地址'
# 通信端口 默认端口1883
port = 1883
username = 'username '
password = 'password '
# 订阅主题名
topic = 'test'
# 连接后事件
def on_connect(client, userdata, flags, respons_code):
if respons_code == 0:
# 连接成功
print('Connection Succeed!')
else:
# 连接失败并显示错误代码
print('Connect Error status {0}'.format(respons_code))
# 订阅信息
client.subscribe(topic)
# 接收到数据后事件
def on_message(client, userdata, msg):
# 打印订阅消息主题
# print("topic", msg.topic)
# 打印消息数据
jsondata=json.loads(msg.payload)
print("msg payload", jsondata)
def main():
client = mqtt.Client()
# 注册事件
client.on_connect = on_connect
client.on_message = on_message
# 设置账号密码(如果需要的话)
client.username_pw_set(username, password=password)
# 连接到服务器
client.connect(host, port=port, keepalive=60)
# 守护连接状态
client.loop_forever()
if __name__ == '__main__':
main()
运行效果:
打开MQTT.FX,连接至MQTT服务器,并向test主题发送json信息,可以看到程序能够正确接收到报文。
5. 将数据写入MySQL数据库
收到信息后,将载荷信息按照json解析,然后储存如数据库中
# -*- coding: utf-8 -*-
#!/usr/bin/python
# -*- coding: utf-8 -*
import paho.mqtt.client as mqtt
import json
import pymysql
import time
def gettime():
time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
return time1
# 服务器地址
host = 'MQTT服务器地址'
# 通信端口 默认端口1883
port = 1883
username = 'username'
password = 'password'
# 订阅主题名
topic = 'test'
# 连接后事件
def on_connect(client, userdata, flags, respons_code):
if respons_code == 0:
# 连接成功
print('Connection Succeed!')
else:
# 连接失败并显示错误代码
print('Connect Error status {0}'.format(respons_code))
# 订阅信息
client.subscribe(topic)
# 接收到数据后事件
def on_message(client, userdata, msg):
global dddd
# 打印订阅消息主题
# print("topic", msg.topic)
# 打印消息数据
jsondata=json.loads(msg.payload)
print("msg payload", jsondata)
sqlsave(jsondata)
def main():
client = mqtt.Client()
# 注册事件
client.on_connect = on_connect
client.on_message = on_message
# 设置账号密码(如果需要的话)
client.username_pw_set(username, password=password)
# 连接到服务器
client.connect(host, port=port, keepalive=60)
# 守护连接状态
client.loop_forever()
#MySQL保存
def sqlsave(jsonData):
# 打开数据库连接
db = pymysql.connect(host="host_ip",user="user_name",password="password",database="mqtt_test",charset='utf8')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 插入语句
try:
sql = "INSERT INTO test (get_time,a,b) VALUES ('%s','%s','%s');" %(gettime(),jsonData['a'],jsonData['b'])
cursor.execute(sql)
db.commit()
print("数据库保存成功!")
except:
pass
# 关闭数据库连接
db.close()
if __name__ == '__main__':
main()
运行结果如图所示:
更多推荐
所有评论(0)