想使用多协程的方式 在多个生产者中同步抓取kafka数据 每个协程里面都从一个不重复的 AIOKafkaConsumer 中抓取并且输出 下面是我写的代码 从配置文件抓取 不同的配置 msg_kafka 然后通过for group in msg_kafka[1] 循环进行多个协程的分解 在配置文件传参无问题的情况下 程序无输出
import asyncio
from aiokafka import AIOKafkaConsumer
from GeneralMethod.GeneralMethod import GeneralMethod
async def consume(topic, bootstrap_servers="localhost:9092", group_id="my-group"):
"""消费处理逻辑"""
# 创建consumer实例并运行事件循环来启动它
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='latest',
)
await consumer.start()
try:
async for msg in consumer:
print(msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
await consumer.stop()
msg_kafka = GeneralMethod().read_kafka_config()
async def main():
tasks = []
for group in msg_kafka[1]:
topic = msg_kafka[1][group]['topic']
group_id = msg_kafka[1][group]['group_id']
servers = msg_kafka[1][group]['servers']
print('group', group, 'topic:', topic, 'group_id:', group_id, 'servers:', servers)
task = asyncio.create_task(consume(topic, servers, group_id))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
group.id
:消费者组,消费者隶属的消费者组名称,如果为空会报异常,一般而言,这个参数要有一定的业务意义。
props.put("group.id", "test");// 消费者组
一定要用python抓吗 ,用flink java代码写回比较容易抓取kafka数据,1.12版本之后自带了kafkasource