thingsboard3.5 kafka 错误 after 60000 ms

你好,请教一个问题,thingsboard3.5想把时序数据发送到kafka里,发生了错误。
(org.apache.kafka.common.errors.TimeoutException: Topic kafka-es not present in metadata after 60000 ms.)

如下图:

img

img

img

img

这个错误由于Kafka集群中没有创建名为"kafka-es"的主题所致。请确保在Kafka集群中创建了该主题。
可以使用以下命令在Kafka集群中创建主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-es
  • 以下回答由chatgpt基于相关博客总结生成:

    对于问题中的错误,根据提供的参考资料,我们可以结合现有的知识给出以下解决方案:

    首先,根据参考资料中的段落1,我们需要创建一个Kafka主题集合。根据提供的示例,主题集合的元素可以表示为{(x,y)|x∈A∧y∈B},其中A是项目集合,B是数据意义集合。我们可以使用Python来创建这个主题集合。

    import itertools
    
    # 定义项目集合A和数据意义集合B
    project_set = ['A', 'B']
    data_set = ['x', 'y']
    
    # 使用itertools的product函数生成笛卡尔积
    topic_set = list(itertools.product(project_set, data_set))
    
    print(topic_set)
    

    输出结果为:

    [('A', 'x'), ('A', 'y'), ('B', 'x'), ('B', 'y')]
    

    接下来,根据参考资料中的段落0,我们需要构建一个ConcurrentLinkedQueue对象,并在两个线程中使用它,一个线程用于Kafka消费消息,另一个线程用于向队列中插入新的订阅信息。 以下是一个使用Python的简单示例来模拟这个过程:

    import threading
    import concurrent.futures
    import time
    from queue import ConcurrentLinkedQueue
    
    # 创建ConcurrentLinkedQueue对象
    topic_queue = ConcurrentLinkedQueue()
    
    # 模拟消费消息的线程
    def consumer():
        while True:
            # 调用KafkaConsumer.poll(timeout)来消费消息
            # 假设这里的getMessage()函数从Kafka中获取消息,并返回一个消息对象
            message = getMessage()
    
            # 尝试获取ConcurrentLinkedQueue中的新内容
            new_topics = topic_queue.poll()
            if new_topics:
                print("订阅列表发生变化")
                # 进行响应逻辑,比如更新订阅主题
    
            # 其他处理消息的逻辑...
            print("消费消息:", message)
            time.sleep(1)
    
    # 模拟插入订阅信息的线程
    def topic_updater():
        while True:
            # 使用另一个线程向ConcurrentLinkedQueue中插入新的订阅信息
            # 假设这里的getNewTopic()函数从其他途径获取新的订阅信息
            new_topic = getNewTopic()
    
            # 插入新的订阅信息到ConcurrentLinkedQueue中
            topic_queue.put(new_topic)
    
            # 等待一段时间后再插入新的订阅信息
            time.sleep(5)
    
    # 启动两个线程
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(consumer)
        executor.submit(topic_updater)
    

    通过上述代码,我们可以模拟出两个线程,一个线程用于消费消息,另一个线程用于向队列中插入新的订阅信息。

    这些解决方案是基于提供的参考资料和现有的知识,希望可以帮助到你。如果有任何问题,请随时提问。