该程序通过NB-IoT DTU连接到MQTT服务器,订阅主题SubTopic
,并在收到消息时打印主题和消息内容。按下按钮A时,随机生成一个整数并发布到主题PubTopic
。
from m5stack import *
from m5ui import *
from uiflow import *
from base.DTU_NB import DTU_NB
nb_topic = None
nb_msg = None
Rand = None
import random
def nbiot_mqtt_cb(nb_mq_topic, nb_mq_payload):
global nb_topic, nb_msg, Rand
nb_topic = nb_mq_topic
nb_msg = nb_mq_payload
print(nb_topic)
print(nb_msg)
pass
def buttonA_wasPressed():
global nb_topic, nb_msg, Rand
dtu_nb.mqtt_publish('PubTopic', str(Rand), 0)
pass
btnA.wasPressed(buttonA_wasPressed)
dtu_nb = DTU_NB()
dtu_nb.mqtt_to_connect('mqtt.m5stack.com', 1883, 'm5mqtt9', '', '', 120)
print(dtu_nb.mqtt_check_connection())
print(dtu_nb.mqtt_subscribe('SubTopic', nbiot_mqtt_cb, 0))
while True:
if (dtu_nb.mqtt_check_connection()) != None:
Rand = random.randint(100000, 999999)
dtu_nb.mqtt_poll()
wait_ms(2)
dtu_nb = DTU_NB()
modbus = dtu_nb.modbus_init(23, 33, 115200, 1, 1)
modbus.function_init(1, 0, 0)
modbus._mdbus_uart.any()
dtu_nb.get_gprs_network_registration()
dtu_nb.get_network_registration()
dtu_nb.get_single_quality()
dtu_nb.check_status()
dtu_nb.coap_to_connect('120.77.157.90', 5683)
dtu_nb.coap_destroy()
dtu_nb.coap_get('/m5stack-get')
dtu_nb.coap_post('/m5stack-post', '', content_format=0)
dtu_nb.coap_put('/m5stack-put', '', content_format=0)
1-6,15-16
modbus.find_address
modbus.find_function
modbus.find_quantity
modbus.receive_req_create_pdu()
modbus.create_slave_response(1)
modbus.update_process(1, 0, 0, [0, 0, 0])
dtu_nb.mqtt_check_connection()
dtu_nb.mqtt_to_connect('mqtt.m5stack.com', 1883, '', '', '', 120)
dtu_nb.mqtt_disconnect()
dtu_nb.mqtt_poll()
dtu_nb.mqtt_publish('', '', 0)
dtu_nb.mqtt_subscribe('', nbiot_mqtt_cb, 0)
def nbiot_mqtt_cb(nb_mq_topic, nb_mq_payload):
global nb_topic, nb_msg
nb_topic = nb_mq_topic
nb_msg = nb_mq_payload
pass
dtu_nb.mqtt_unsubscribe('')
dtu_nb.poweroff()
modbus._mdbus_uart.read()
modbus._mdbus_uart.readline()
modbus._mdbus_uart.read(10)
modbus.read_coils(1, 1, 0)
modbus.read_discrete_inputs(1, 1, 0)
modbus.read_holding_registers(1, 1, 0, True)
modbus.read_input_registers(1, 1, 0, True)
dtu_nb.reset()
dtu_nb.set_command_echo_mode(0)
modbus._mdbus_uart.write('')
modbus._mdbus_uart.write(''+"\r\n")
modbus.write_multiple_coils(1, 1, 0)
modbus.write_multiple_registers(1, 1, 0, True)
modbus._mdbus_uart.write(bytes([0, 0, 0]))
modbus.write_single_coil(1, 1, 0)
modbus.write_single_register(1, 1, 0, True)