想用代码实现MQTT动态变更话题
当订阅topic "request/qos" 和“request/delay” 发布消息时, 发布消息的topic“counter//" 的值会随之改变。
例如:"request/qos" 发布message “1”, “counter//" 收到消息后会变成“counter/1/”
当前代码只能实现更改一次, 没办法更改第二次, 可以帮忙看一下吗?
import time
import paho.mqtt.client as paho
from paho import mqtt
import sys
def on_connect(client, userdata, flags, rc, properties=None):
print("CONNACK received with code %s." % rc)
def on_message(client, userdata, msg):
global pubtop, req1, req2
r = msg.payload.decode()
t = msg.topic
if t == req1:
if r != q:
pubtop = pubtop.replace(q, r)
else:
print('same QoS, nothing changed')
elif t == req2:
if r != d:
pubtop = pubtop.replace(d, r)
else:
print('same delay, nothing changed')
#client.publish()
print(f"Received {r}
from {t}
topic")
client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
client.on_connect = on_connect
client.connect("127.0.0.1", 1883)
client.on_message = on_message
time.sleep(1)
req1='request/qos'
req2='request/delay'
client.subscribe([(req1, 1),(req2, 1)])
client.loop_start()
counter = 0
c='counter'
q='qos'
d='delay'
s='/'
pubtop=c+s+q+s+d
while True:
try:
time.sleep(1)
msg = f"counter: {counter}"
#pubtop = 'counter/qos/delay'
result = client.publish(pubtop, msg, 1)
status = result[0]
if status == 0:
print(f"Send {msg}
to topic {pubtop}
")
counter += 1
except KeyboardInterrupt:
print("EXIT")
client.disconnect()
sys.exit()
更改只能成功一次, 无法一直变更代码
应该是on_message 没有进入循环,求帮忙解答, 十分感谢
需要代码可根据 topic "request/qos"的数据变更topic 'counter/qos/delay'中 qos 和delay的值
给个简单的解决方案,基于你的思路:
在代码顶部引入re,然后修改你的on_message函数,通过正则替换实现你的topic的修改,你试试看
import re
def on_message(client, userdata, msg):
global pubtop, req1, req2
r = msg.payload.decode()
t = msg.topic
if t == req1:
pubtop = re.sub(r'/\w+/','/'+r+'/',pubtop)
elif t == req2:
pubtop = re.sub(r'/\w+$','/'+r, pubtop)
print(f"Received{r} from {t} topic")
有没有可能是你的代码逻辑错了
counter = 0
c='counter'
q='qos'
d='delay'
s='/'
pubtop=c+s+q+s+d
感觉这个变量pubtop
从头到尾都没有进行过变化。
·
while True:
try:
time.sleep(1)
msg = f"counter: {counter}"
pubtop = 'counter/qos/delay'
改为:
while True:
try:
time.sleep(1)
msg = f"counter: {counter}"
pubtop = f'counter/{counter}/delay'
试下?
如有问题及时沟通
代码如下:
#!/usr/bin/env python
#coding:utf-8
import time
import json
import psutil
import random
from paho.mqtt import client as mqtt_client
broker = '127.0.0.1' # mqtt代理服务器地址
port = 1883
keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
client_id = f'python-mqtt-pub-{random.randint(0, 1000)}' # 客户端id不能重复
def connect_mqtt():
'''连接mqtt代理服务器'''
def on_connect(client, userdata, flags, rc):
'''连接回调函数'''
# 响应状态码为0表示连接成功
if rc == 0:
print("Connected to MQTT OK!")
else:
print("Failed to connect, return code %d\n", rc)
# 连接mqtt代理服务器,并获取连接引用
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port, keepalive)
return client
def publish(client, topic, msg):
'''发布消息'''
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
def subscribe(client: mqtt_client):
# 订阅主题并接收消息
def on_message(client, userdata, msg):
'''订阅消息回调函数'''
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
publish(client, f"/{msg.topic}/{msg.payload.decode()}",msg.payload.decode())
req1 = 'request/qos'
req2 = 'request/delay'
client.subscribe([(req1, 1), (req2, 1)])
# 订阅指定消息主题
client.on_message = on_message
def run():
client = connect_mqtt()
# 运行一个线程来自动调用loop()处理网络事件, 非阻塞
client.loop_start()
# 运行订阅者
subscribe(client)
# 运行一个线程来自动调用loop()处理网络事件, 阻塞模式
# client.loop_forever()
if __name__ == '__main__':
run()
控制台执行结果如下:
Connected to MQTT OK!
Received `1` from `request/qos` topic
Send `1` to topic `/request/qos/1`
Received `2` from `request/qos` topic
Send `2` to topic `/request/qos/2`
Received `1` from `request/delay` topic
Send `1` to topic `/request/delay/1`
Received `2` from `request/delay` topic
Send `2` to topic `/request/delay/2`
命令行执行如下
(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/qos" -m "1"
(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/qos" -m "2"
(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/delay" -m "1"
(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/delay" -m "2"
若有帮助,谢谢采纳~
springboot+mqtt+apache apollo,监听信息并可以动态更改topic
https://blog.csdn.net/qq_39997707/article/details/120742813
SpringBoot2.0_MQTT消息订阅之动态Topic
https://blog.csdn.net/qq_41018959/article/details/90901992