0%

物联网平台模拟下发指令

使用背景

通过post请求,向物联网平台发送指令,可实现指令批量压测以及数据爬取功能。

调用库

1
2
3
4
5
import requests
import json
import time
from loguru import logger
import time

保存日志

1
logger.add("11111111_{time}.log", encoding='utf-8')

构建类

1
2
3
4
5
class instruction:
def __init__(self):
self.username = " "
self.password = " "
self.deviceId = 11111111

尝试请求

1
2
3
4
5
6
7
8
9
10
def get_post_html(self, url, payload, headers):
"""尝试3次请求"""
i = 0
while i < 3:
try:
# 模拟请求下发,并接收响应
response = requests.request("POST", url, headers=headers, data=payload, timeout=20)
return response
except requests.exceptions.RequestException:
i += 1

获取access_taken

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def hc_access_token_get(self, username, password):
"""请求登录access_token"""
# 请求数据准备
url = "http://122.112.244.31:8080/chcnavAuthThings/iot/admin/login" # 正式平台
# url = "http://121.37.131.118:8080/chcnavAuthThings/iot/admin/login"#测试平台
payload = json.dumps({
"username": username,
"password": password
})
headers = {
'Content-Type': 'application/json'
}
# 模拟数据请求下发,并接受响应
response = self.get_post_html(url, payload, headers)
list1 = json.loads(response.text)
access_token = list1["data"]["access_token"]
return access_token

发送指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def hc_command_send(self):
lsm_token = self.hc_access_token_get(self.username, self.password)
url = "http://122.112.244.31:8080/chcnavThings/iot/device/command/send"

# tcp查询指令
payload = json.dumps({
"commandId": 6,
"deviceIds": [self.deviceId],
"commandArgs": []
})

headers = {
'access_token': lsm_token,
'Content-Type': 'application/json'
}

response = requests.request("POST", url, headers=headers, data=payload)
logger.info(response.text)

主函数

1
2
send_instruction = instruction()
send_instruction.hc_command_send()