使用背景
通过post请求,向物联网平台发送指令,可实现指令批量压测以及数据爬取功能。
调用库
1 2 3 4 5
| import requests import json import time from loguru import logger import time
|
保存日志
1
| logger.add("11111111_{time}.log", encoding='utf-8')
|
构建类
1 2 3 4 5
| class instruction: def __init__(self): self.username = " " self.password = " " self.deviceId = 11111111
|
尝试请求
1 2 3 4 5 6 7 8 9 10
| def get_post_html(self, url, payload, headers): """尝试3次请求""" i = 0 while i < 3: try: response = requests.request("POST", url, headers=headers, data=payload, timeout=20) return response except requests.exceptions.RequestException: i += 1
|
获取access_taken
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| def hc_access_token_get(self, username, password): """请求登录access_token""" url = "http://122.112.244.31:8080/chcnavAuthThings/iot/admin/login" payload = json.dumps({ "username": username, "password": password }) headers = { 'Content-Type': 'application/json' } response = self.get_post_html(url, payload, headers) list1 = json.loads(response.text) access_token = list1["data"]["access_token"] return access_token
|
发送指令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def hc_command_send(self): lsm_token = self.hc_access_token_get(self.username, self.password) url = "http://122.112.244.31:8080/chcnavThings/iot/device/command/send" payload = json.dumps({ "commandId": 6, "deviceIds": [self.deviceId], "commandArgs": [] })
headers = { 'access_token': lsm_token, 'Content-Type': 'application/json' }
response = requests.request("POST", url, headers=headers, data=payload) logger.info(response.text)
|
主函数
1 2
| send_instruction = instruction() send_instruction.hc_command_send()
|