0%

MQTT协议学习

MQTT协议

一、MQTT协议基本原理

1、MQTT是一个基于客户端-服务器的消息发布、订阅传输协议。

2、三重身份:发布者(publish),代理(broker)(服务器),订阅者(subscribe)。

  • 消息的发布者和订阅者是客户端;
  • 消息的代理是服务器;
  • 消息的发布者可以同时是订阅者;

3、消息分为:主题(topic),负载(payload)。

  • 主题是消息的类型;
  • 负载是消息的内容;

例如:在平台122.112.244.31中,主题topic=“$creq/(deviceId)/+”, (其中:“+”表示通配一个层级)。
设备可以接受如下三种主题消息:

  • $creq/(deviceId)/cmd:表示平台下发的远程控制指令主题;
  • $creq/(deviceId)/upgrade:表示设备推送的固件升级指令主题, 平台发送puback消息后,平台会以$creq/(deviceId)/firmware发送固件二进制流数据到设备;
  • $creq/(deviceId)/firmware:表示平台发送固件数据的主题;

4、对传输消息有三种服务质量(QoS)(最多一次,至多一次,只有一次)。

二、Python模拟设备MQTT登录与数据收发

负载数据两种类型:实时数据、历史数据

(1)实时数据:

1
2
3
4
5
6
7
Byte 1  binary  数据格式类型:1          
Byte 2 binary 以下JSON字符串数据包大小 高位字节
Byte 3 低位字节
Byte 4 string 多类型:{"484021": {"L2_LF_1": 37.5, "L2_YL_1": 0}}
Byte... string 单类型:{"484021": {"L2_LF_1": 18.2}}
Byte n
Payload = (Byte 1 + Byte 2 + … + Byte n >> 二进制)

(2)历史数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Byte 1  binary  数据格式类型:2          
Byte 2 binary 以下JSON字符串数据包大小 高位字节
Byte 3 低位字节
Byte 4 string {
"484021": {
"L2_LF_1": {
"2018-08-02T08:52:32.449Z或1533199952449": 11.2,
"2018-08-02T09:52:32.449Z或1533203552449": 11.2,
"2018-08-02T10:52:32.449Z或1533207152449": 10.9,
},
"L2_LF_2": {
"2018-08-02T09:02:32.449Z":36.5,
}
}
}
Byte... string
Byte n
Payload = (Byte 1 + Byte 2 + … + Byte n >> 二进制)

流程如下图所示:

MQTT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import paho.mqtt.client as mqtt
import random
import time
from datetime import datetime
import Tools.JSON_Bin as JSTB


# 回调函数,在客户端成功连接时订阅特定的主题
def on_connect(client, userdata, flags, rc):
"""
:param client:客户端实例
:param userdata:用户数据
:param flags:连接标志
:param rc:连接结果码
0: 连接成功
1: 连接拒绝,不可接受的协议版本
2: 连接拒绝,客户端标识符无效
3: 连接拒绝,服务器不可用
4: 连接拒绝,无效的用户名或密码
5: 连接拒绝,未授权
:return:None
"""
print("Connected with result code " + str(rc))
if rc == 0:
# 连接成功后订阅主题
client.subscribe("$creq/39763643/+")
else:
print(f"Connection failed with result code {rc}")


# 当客户端收到PUBLISH消息时的回调函数
def on_message(client, userdata, msg):
print(f"Message received: Topic={msg.topic}, Payload={msg.payload.decode()}")


broker = '122.112.244.31' # 定义MQTT代理地址
port = 1883 # MQTT代理端口
client_id = '39763643' # 客户端ID
device_id = '39763643' # 产品ID
device_key = 'aYxqONjMSeCHhOQsJmkRTw' # 密码
data_format_type = 1 # 数据格式类型(1,2)

# 创建MQTT客户端实例
client = mqtt.Client(client_id)

# 设置用户名和密码
client.username_pw_set(device_id, device_key)

# 分配回调函数
client.on_connect = on_connect
client.on_message = on_message

# 连接到MQTT代理
client.connect(broker, 1883, 60)

# 上传数据内容
"""
单个或者多个检测类型的检测数据实时上传:
Byte 1 binary 数据格式类型:1
Byte 2 binary 以下json字符串数据包大小 高位字节
Byte 3 低位字节
Byte 4 string 多类型:{"484021": {"L2_LF_1": 37.5, "L2_YL_1": 0}}
Byte... string 单类型:{"484021": {"L2_LF_1": 18.2}}
Byte n

单个或多个监测类型的历史数据上传:
Byte 1 binary 数据格式类型:2
Byte 2 binary 以下json字符串数据包大小 高位字节
Byte 3 低位字节
Byte 4 string {
"484021": {
"L2_LF_1": {
"2018-08-02T08:52:32.449Z或1533199952449": 11.2,
"2018-08-02T09:52:32.449Z或1533203552449": 11.2,
"2018-08-02T10:52:32.449Z或1533207152449": 10.9,
},
"L2_LF_2": {
"2018-08-02T09:02:32.449Z":36.5,
}
}
}
Byte... string
Byte n
"""
payload = {
"39763643": {
"L1_JS_1": "30.7,-142.5,294.5,1"
}
}

while True:
random_numbers = [random.uniform(-100, 100) for _ in range(3)]
payload = {
"39763643": {
"L1_JS_1": "{},{},{},1".format(*random_numbers)
}
}

binary_data_packet = JSTB.JSON_To_Bin(payload)
"""
$dp为系统上传数据点的指令
$dr为设备回复平台下发的指令
"""
client.publish("$dp", binary_data_packet, qos=1, retain=False)

# 获取当前时间
current_time = datetime.now()

print(current_time.strftime("%Y-%m-%d %H:%M:%S"), "Published message with retain=True")

# 每隔120秒发送一次数据
time.sleep(30)

# 启动循环保持连接并处理接收到的消息(如果一直在写入数据则不需要这句代码)
# client.loop_forever()