MQTT 变更发布publish topic

问题遇到的现象和发生背景

想用代码实现MQTT动态变更话题
当订阅topic "request/qos" 和“request/delay” 发布消息时, 发布消息的topic“counter//" 的值会随之改变。

例如:"request/qos" 发布message “1”, “counter//" 收到消息后会变成“counter/1/”
当前代码只能实现更改一次, 没办法更改第二次, 可以帮忙看一下吗?

问题相关代码,请勿粘贴截图

import time
import paho.mqtt.client as paho
from paho import mqtt
import sys

def on_connect(client, userdata, flags, rc, properties=None):
print("CONNACK received with code %s." % rc)

def on_message(client, userdata, msg):
global pubtop, req1, req2
r = msg.payload.decode()
t = msg.topic
if t == req1:
if r != q:
pubtop = pubtop.replace(q, r)
else:
print('same QoS, nothing changed')
elif t == req2:
if r != d:
pubtop = pubtop.replace(d, r)
else:
print('same delay, nothing changed')
#client.publish()
print(f"Received {r} from {t} topic")

client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
client.on_connect = on_connect

client.connect("127.0.0.1", 1883)

client.on_message = on_message

time.sleep(1)

req1='request/qos'
req2='request/delay'
client.subscribe([(req1, 1),(req2, 1)])

client.loop_start()

counter = 0
c='counter'
q='qos'
d='delay'
s='/'
pubtop=c+s+q+s+d

while True:
try:
time.sleep(1)
msg = f"counter: {counter}"
#pubtop = 'counter/qos/delay'
result = client.publish(pubtop, msg, 1)
status = result[0]
if status == 0:
print(f"Send {msg} to topic {pubtop}")
counter += 1
except KeyboardInterrupt:
print("EXIT")
client.disconnect()
sys.exit()

运行结果及报错内容

img

更改只能成功一次, 无法一直变更代码

我的解答思路和尝试过的方法

应该是on_message 没有进入循环,求帮忙解答, 十分感谢

我想要达到的结果

需要代码可根据 topic "request/qos"的数据变更topic 'counter/qos/delay'中 qos 和delay的值

给个简单的解决方案,基于你的思路:
在代码顶部引入re,然后修改你的on_message函数,通过正则替换实现你的topic的修改,你试试看

import re
def on_message(client, userdata, msg):
    global pubtop, req1, req2
    r = msg.payload.decode()
    t = msg.topic
    if t == req1:
        pubtop = re.sub(r'/\w+/','/'+r+'/',pubtop)
    elif t == req2:
        pubtop = re.sub(r'/\w+$','/'+r, pubtop)
    print(f"Received{r} from {t} topic")

有没有可能是你的代码逻辑错了

counter = 0
c='counter'
q='qos'
d='delay'
s='/'
pubtop=c+s+q+s+d

感觉这个变量pubtop从头到尾都没有进行过变化。


·

while True:
    try:
        time.sleep(1)
        msg = f"counter: {counter}"
        pubtop = 'counter/qos/delay'    

改为:

while True:
    try:
        time.sleep(1)
        msg = f"counter: {counter}"
        pubtop = f'counter/{counter}/delay'

试下?


如有问题及时沟通

代码如下:

#!/usr/bin/env python
#coding:utf-8

import time
import json
import psutil
import random
from paho.mqtt import client as mqtt_client

broker = '127.0.0.1'  # mqtt代理服务器地址
port = 1883
keepalive = 60     # 与代理通信之间允许的最长时间段(以秒为单位)
client_id = f'python-mqtt-pub-{random.randint(0, 1000)}'  # 客户端id不能重复



def connect_mqtt():
    '''连接mqtt代理服务器'''
    def on_connect(client, userdata, flags, rc):
        '''连接回调函数'''
        # 响应状态码为0表示连接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # 连接mqtt代理服务器,并获取连接引用
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port, keepalive)
    return client


def publish(client, topic, msg):
    '''发布消息'''
    result = client.publish(topic, msg)
    status = result[0]
    if status == 0:
        print(f"Send `{msg}` to topic `{topic}`")
    else:
        print(f"Failed to send message to topic {topic}")


def subscribe(client: mqtt_client):

    # 订阅主题并接收消息
    def on_message(client, userdata, msg):
        '''订阅消息回调函数'''
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        publish(client, f"/{msg.topic}/{msg.payload.decode()}",msg.payload.decode())
    req1 = 'request/qos'
    req2 = 'request/delay'
    client.subscribe([(req1, 1), (req2, 1)])
    # 订阅指定消息主题
    client.on_message = on_message




def run():

    client = connect_mqtt()
    # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
    client.loop_start()
    # 运行订阅者
    subscribe(client)
    #  运行一个线程来自动调用loop()处理网络事件, 阻塞模式
    # client.loop_forever()


if __name__ == '__main__':
    run()


控制台执行结果如下:

Connected to MQTT OK!
Received `1` from `request/qos` topic
Send `1` to topic `/request/qos/1`
Received `2` from `request/qos` topic
Send `2` to topic `/request/qos/2`
Received `1` from `request/delay` topic
Send `1` to topic `/request/delay/1`
Received `2` from `request/delay` topic
Send `2` to topic `/request/delay/2`

命令行执行如下

(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/qos" -m "1"
(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/qos" -m "2"
(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/delay" -m "1"
(base) valley@192 ~ % mosquitto_pub -h 127.0.0.1 -t "request/delay" -m "2"

若有帮助,谢谢采纳~

springboot+mqtt+apache apollo,监听信息并可以动态更改topic
https://blog.csdn.net/qq_39997707/article/details/120742813
SpringBoot2.0_MQTT消息订阅之动态Topic
https://blog.csdn.net/qq_41018959/article/details/90901992