推送报文格式为:

{"a":value,"b":value}

需要将里面的a、b的值存入MySQL数据库,并将接受时间保存进数据库。

简化方案

在服务器上,模拟一台设备,订阅需要接收的主题,当接收到该主题发送来的数据的时候,将报文里面的两个数值保存到MySQL数据库里面。

1、创建数据库

三个字段,分别是是时间、a、b

CREATE TABLE `mqtt_test`.`test` ( `get_time` DATETIME NOT NULL , `a` INT NOT NULL , `b` INT NOT NULL ) ENGINE = InnoDB;

2、安装paho库

paho库用于完成mqtt通讯

anaconda下或云服务器下,在prompt下安装paho库:

pip install paho-mqtt

image-20210828174131514

3、安装pymysql库

pymysql库用于完成mysql操作

pip install pymysql

image-20210828175539710

4、使用python连接MQTT服务器

# -*- coding: utf-8 -*-

#!/usr/bin/python
# -*- coding: utf-8 -*

import paho.mqtt.client as mqtt
import json
import pymysql
import time

def gettime():
    time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
    return time1

# 服务器地址
host = 'MQTT服务器地址'
# 通信端口 默认端口1883
port = 1883


username = 'username '
password = 'password '

# 订阅主题名
topic = 'test'


# 连接后事件
def on_connect(client, userdata, flags, respons_code):
    if respons_code == 0:
        # 连接成功
        print('Connection Succeed!')
    else:
        # 连接失败并显示错误代码
        print('Connect Error status {0}'.format(respons_code))
    # 订阅信息
    client.subscribe(topic)


# 接收到数据后事件
def on_message(client, userdata, msg):
    # 打印订阅消息主题
    # print("topic", msg.topic)
    # 打印消息数据
    jsondata=json.loads(msg.payload)
    print("msg payload", jsondata)

def main():
    client = mqtt.Client()
    # 注册事件
    client.on_connect = on_connect
    client.on_message = on_message
    # 设置账号密码(如果需要的话)
    client.username_pw_set(username, password=password)
    # 连接到服务器
    client.connect(host, port=port, keepalive=60)
    # 守护连接状态
    client.loop_forever()


if __name__ == '__main__':
    main()




运行效果:

打开MQTT.FX,连接至MQTT服务器,并向test主题发送json信息,可以看到程序能够正确接收到报文。

image-20210828175301948

5. 将数据写入MySQL数据库

收到信息后,将载荷信息按照json解析,然后储存如数据库中

# -*- coding: utf-8 -*-

#!/usr/bin/python
# -*- coding: utf-8 -*

import paho.mqtt.client as mqtt
import json
import pymysql
import time

def gettime():
    time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
    return time1

# 服务器地址
host = 'MQTT服务器地址'
# 通信端口 默认端口1883
port = 1883


username = 'username'
password = 'password'

# 订阅主题名
topic = 'test'

# 连接后事件
def on_connect(client, userdata, flags, respons_code):
    if respons_code == 0:
        # 连接成功
        print('Connection Succeed!')
    else:
        # 连接失败并显示错误代码
        print('Connect Error status {0}'.format(respons_code))
    # 订阅信息
    client.subscribe(topic)


# 接收到数据后事件
def on_message(client, userdata, msg):
    global dddd
    # 打印订阅消息主题
    # print("topic", msg.topic)
    # 打印消息数据
    jsondata=json.loads(msg.payload)
    print("msg payload", jsondata)
    sqlsave(jsondata)

def main():
    client = mqtt.Client()
    # 注册事件
    client.on_connect = on_connect
    client.on_message = on_message
    # 设置账号密码(如果需要的话)
    client.username_pw_set(username, password=password)
    # 连接到服务器
    client.connect(host, port=port, keepalive=60)
    # 守护连接状态
    client.loop_forever()

#MySQL保存
def sqlsave(jsonData):
      # 打开数据库连接
    db = pymysql.connect(host="host_ip",user="user_name",password="password",database="mqtt_test",charset='utf8')
    # 使用cursor()方法获取操作游标 
    cursor = db.cursor()
    # SQL 插入语句
    try:
        sql = "INSERT INTO test (get_time,a,b)  VALUES ('%s','%s','%s');" %(gettime(),jsonData['a'],jsonData['b'])
        cursor.execute(sql)
        db.commit()
        print("数据库保存成功!")
    except:
        pass
    # 关闭数据库连接
    db.close()


if __name__ == '__main__':
    main()




运行结果如图所示:

image-20210828182458711

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐