python读取kafka数据

想使用多协程的方式 在多个生产者中同步抓取kafka数据 每个协程里面都从一个不重复的 AIOKafkaConsumer 中抓取并且输出 下面是我写的代码 从配置文件抓取 不同的配置 msg_kafka 然后通过for group in msg_kafka[1] 循环进行多个协程的分解 在配置文件传参无问题的情况下 程序无输出

import asyncio

from aiokafka import AIOKafkaConsumer
from GeneralMethod.GeneralMethod import GeneralMethod


async def consume(topic, bootstrap_servers="localhost:9092", group_id="my-group"):
    """消费处理逻辑"""
    # 创建consumer实例并运行事件循环来启动它
    consumer = AIOKafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        auto_offset_reset='latest',
    )
    await consumer.start()


    try:
        async for msg in consumer:
            print(msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        await consumer.stop()


msg_kafka = GeneralMethod().read_kafka_config()


async def main():
    tasks = []

    for group in msg_kafka[1]:
        topic = msg_kafka[1][group]['topic']
        group_id = msg_kafka[1][group]['group_id']
        servers = msg_kafka[1][group]['servers']
        print('group', group, 'topic:', topic, 'group_id:', group_id, 'servers:', servers)
        task = asyncio.create_task(consume(topic, servers, group_id))
        tasks.append(task)
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main())
  • 这篇博客: kafka消费者-实例参数配置中的 group.id 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • group.id:消费者组,消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。

    props.put("group.id", "test");// 消费者组
    

一定要用python抓吗 ,用flink java代码写回比较容易抓取kafka数据,1.12版本之后自带了kafkasource