PHP如何获取kafka消费信息

在知道kafka服务器和topic的情况下,php如何链接获取并输出json?
kafka服务器:n9-kkt.test001.com:30902
topic:test.info.tst

要在PHP中获取Kafka消费信息,你需要使用一个Kafka的客户端库,例如php-rdkafka。

首先,确保你已经安装了librdkafka和php-rdkafka扩展。你可以通过以下命令来安装:

# 安装librdkafka
$ pecl install rdkafka

# 在php.ini文件中添加扩展
extension=rdkafka.so

接下来,你可以使用以下代码来连接到Kafka服务器并获取消费信息并输出为JSON格式:

<?php

// Kafka服务器和Topic信息
$brokers = "n9-kkt.test001.com:30902";
$topic = "test.info.tst";

// 创建Kafka消费者实例
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', $brokers);
$consumer = new RdKafka\KafkaConsumer($conf);

// 订阅Topic
$consumer->subscribe([$topic]);

// 从Topic中读取消息
while (true) {
    $message = $consumer->consume(120 * 1000); // 超时时间为2分钟

    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            // 消费成功
            $payload = $message->payload;
            $json = json_encode($payload, JSON_UNESCAPED_UNICODE);
            echo $json;
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            // 已经到达分区末尾
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            // 超时等待消息
            break;
        default:
            // 其他错误
            echo "消费失败: " . $message->errstr() . "\n";
            break;
    }
}

// 关闭Kafka消费者
$consumer->close();

这段代码使用了RdKafka\KafkaConsumer类来创建一个Kafka消费者实例,并订阅了指定的Topic。然后,通过循环调用consume()方法来从Topic中读取消息。如果消费成功,将消息转为JSON格式并输出;如果已到达分区末尾或超时等待消息,则继续下一次循环;如果发生其他错误,则打印出错误信息。

请注意,上述代码仅提供了一个基本的示例,实际应用可能需要根据具体情况进行适当的调整和错误处理。

学会使用bin*.sh或bin\windows*.bat


<?php

// 创建Kafka消费者 
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'http://n9-kkt.test001.com:30902');
$consumer = new RdKafka\KafkaConsumer($conf);

// 订阅Topic
$consumer->subscribe(['test.info.tst']);

// 消费消息
while (true) {
  $message = $consumer->consume(120*1000);
  echo $message->payload . "\n";
}

// 关闭消费者
$consumer->close();

?>