Python中使用matplotlib绘制mqtt数据实时图像功能

目录

  • 效果图
  • mqtt发布
  • mqtt订阅
  • matplotlib绘制动态图
  • matplotlib绘制mqtt数据实时图像

效果图

mqtt发布 本代码中publish是一个死循环,数据一直往外发送。
import randomimport timefrom paho.mqtt import client as mqtt_clientimport jsonfrom datetime import datetimebroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt/li"client_id = f'python-mqtt-{random.randint(0, 1000)}'# 随机生成客户端iddef connect_mqtt():def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef publish(client):while True:time.sleep(0.01)msg = json.dumps({"MAC": "0123456789","samplerate": 12,"sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]),"battery": 0.5,"acc": [[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],[random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],]})result = client.publish(topic, msg)status = result[0]if status == 0:print(f"Send `{msg}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}")def run():client = connect_mqtt()client.loop_start()publish(client)if __name__ == '__main__':run()


mqtt订阅
from paho.mqtt import client as mqtt_clientimport timeimport osbroker = 'broker.emqx.io'port = 1883topic = "/python/mqtt/li"def connect_mqtt(client_id):"""MQTT 连接函数。"""def on_connect(client, userdata, flags, rc):"""连接回调函数在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。"""if rc == 0:print("Connected to MQTT Broker! return code %d" % rc)else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)# client.username_pw_set('uname', 'upwd')# 链接mqtt所需的用户名和密码,没有可不写client.on_connect = on_connectclient.connect(broker , port)return clientdef subscribe(client: mqtt_client, a_topic):"""订阅消息"""def on_message(client, userdata, msg):"""消息回调函数在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。* 这里可添加自定义数据处理程序"""print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode()))client.subscribe(topic)client.on_message = on_messagedef run(client_id, topic):client = connect_mqtt(client_id)subscribe(client, topic)client.loop_forever()if __name__ == '__main__':run('test_eartag-003-python-li', 'zk100/gw/#')


matplotlib绘制动态图
import matplotlib.pyplot as pltimport numpy as npcount = 100# 图中最多数据量ax = list(range(count))# 保存图1数据ay = [0] * 100bx = list(range(count))# 保存图2数据by = [0] * 100num = count# 计数plt.ion()# 开启一个画图的窗口进入交互模式,用于实时更新数据plt.rcParams['figure.figsize'] = (10, 10)# 图像显示大小plt.rcParams['font.sans-serif'] = ['SimHei']# 防止中文标签乱码,还有通过导入字体文件的方法plt.rcParams['axes.unicode_minus'] = Falseplt.rcParams['lines.linewidth'] = 0.5# 设置曲线线条宽度plt.tight_layout()while True:plt.clf()# 清除刷新前的图表,防止数据量过大消耗内存plt.suptitle("总标题", fontsize=30)# 添加总标题,并设置文字大小g1 = np.random.random()# 生成随机数画图# 图表1ax.append(num)# 追加x坐标值ay.append(g1)# 追加y坐标值agraphic = plt.subplot(2, 1, 1)agraphic.set_title('子图表标题1')# 添加子标题agraphic.set_xlabel('x轴', fontsize=10)# 添加轴标签agraphic.set_ylabel('y轴', fontsize=20)plt.plot(ax[-count:], ay[-count:], 'g-')# 等于agraghic.plot(ax,ay,'g-')# 图表2bx.append(num)by.append(g1)bgraghic = plt.subplot(2, 1, 2)bgraghic.set_title('子图表标题2')bgraghic.plot(bx[-count:], by[-count:], 'r^')plt.pause(0.001)# 设置暂停时间,太快图表无法正常显示num = num + 1

【Python中使用matplotlib绘制mqtt数据实时图像功能】
matplotlib绘制mqtt数据实时图像
  • 单线程
先启动mqtt订阅服务
mqtt订阅中有阻塞,更新数据后因订阅服务没有结束,导致绘图程序无法绘图
先启动绘图程序
绘图程序本身也是个循环,拿不到mqtt的实时数据,图像无法更新
  • 两个服务加入协程,也不行。具体原因还不知道,容后补充。
  • mqtt作为线程启动,可解决上述问题
import jsonimport randomfrom paho.mqtt import client as mqtt_clientimport timeimport datetimefrom math import ceil, floorimport matplotlib.pyplot as pltimport _thread# 公共变量broker = 'broker.emqx.io'topic = "/python/mqtt/li"port = 1883client_id = f'python-mqtt-li-{random.randint(0, 100)}'show_num = 300x_num = [-1]# 计数acc1 = []acc2 = []acc3 = []acc4 = []acc5 = []acc6 = []stime = []"""mqtt subscribe topic"""def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime):"""将字符串型【毫秒级】格式化时间 转为 【13位】整型时间戳"""datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f")obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0return obj_stampdef int2datetime(int_float_timestamp):"""有小数点:分离小数点,整数转为格式化时间,小数点直接跟在后面无小数点:从第10位进行分离,所以本函数只适用于时间戳整数位数大于9且小于11."""if '.' in str(int_float_timestamp):int_float = str(int_float_timestamp).split('.')date = time.localtime(int(int_float[0]))tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date)secondafter = '.' + str(int_float[1])return str(tempDate) + secondafterdef parse_mqttmsg(msg):"""解析mqt头数据MAC samplerate sampletime battery acc"""content = json.loads(msg.payload.decode())span = 1000 / content['samplerate'] * 10time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000]sampletime = content['sampletime']sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime)acc = content['acc']for i in range(len(acc)):x_num.append(x_num[-1] + 1)acc1.append(acc[i][0])acc2.append(acc[i][1])acc3.append(acc[i][2])acc4.append(acc[i][3])acc5.append(acc[i][4])acc6.append(acc[i][5])if i != 0:sampletime_int += time_span[i % 2]stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000))else:stime.append(sampletime)print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1])def connect_mqtt():def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)passclient = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef subscribe(client: mqtt_client):def on_message(client, userdata, msg):# print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")parse_mqttmsg(msg)client.subscribe(topic)client.on_message = on_messagedef run():client = connect_mqtt()subscribe(client)client.loop_forever()""" draw figures """def draw_figure():plt.ion()# 开启一个画图的窗口进入交互模式,用于实时更新数据plt.rcParams['figure.figsize'] = (10, 10)# 图像显示大小plt.rcParams['font.sans-serif'] = ['SimHei']# 防止中文标签乱码,还有通过导入字体文件的方法plt.rcParams['axes.unicode_minus'] = Falseplt.rcParams['lines.linewidth'] = 0.5# 设置曲线线条宽度count = 0while True:plt.clf()# 清除刷新前的图表,防止数据量过大消耗内存plt.suptitle("总标题", fontsize=30)# 添加总标题,并设置文字大小plt.tight_layout()# 图表1agraphic = plt.subplot(2, 1, 1)agraphic.set_title('子图表标题1')# 添加子标题agraphic.set_xlabel('x轴', fontsize=10)# 添加轴标签agraphic.set_ylabel('y轴', fontsize=20)plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-')try:xtricks = list(range(len(acc1) - show_num, len(acc1), 10))# **1**xlabels = [stime[i] for i in xtricks]# **2**plt.xticks(xtricks, xlabels, rotation=15)except:pass# 图表2bgraghic = plt.subplot(2, 1, 2)bgraghic.set_title('子图表标题2')bgraghic.set_xlabel('x轴', fontsize=10)# 添加轴标签bgraghic.set_ylabel('y轴', fontsize=20)bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^')plt.pause(0.001)# 设置暂停时间,太快图表无法正常显示count = count + 1if __name__ == '__main__':# 多线程_thread.start_new_thread(run, ())draw_figure()

到此这篇关于Python中使用matplotlib绘制mqtt数据实时图像的文章就介绍到这了,更多相关matplotlib绘制mqtt数据实时图像内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    推荐阅读