在知道kafka服务器和topic的情况下,php如何链接获取并输出json?
kafka服务器:n9-kkt.test001.com:30902
topic:test.info.tst
要在PHP中获取Kafka消费信息,你需要使用一个Kafka的客户端库,例如php-rdkafka。
首先,确保你已经安装了librdkafka和php-rdkafka扩展。你可以通过以下命令来安装:
# 安装librdkafka
$ pecl install rdkafka
# 在php.ini文件中添加扩展
extension=rdkafka.so
接下来,你可以使用以下代码来连接到Kafka服务器并获取消费信息并输出为JSON格式:
<?php
// Kafka服务器和Topic信息
$brokers = "n9-kkt.test001.com:30902";
$topic = "test.info.tst";
// 创建Kafka消费者实例
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', $brokers);
$consumer = new RdKafka\KafkaConsumer($conf);
// 订阅Topic
$consumer->subscribe([$topic]);
// 从Topic中读取消息
while (true) {
$message = $consumer->consume(120 * 1000); // 超时时间为2分钟
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
// 消费成功
$payload = $message->payload;
$json = json_encode($payload, JSON_UNESCAPED_UNICODE);
echo $json;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// 已经到达分区末尾
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// 超时等待消息
break;
default:
// 其他错误
echo "消费失败: " . $message->errstr() . "\n";
break;
}
}
// 关闭Kafka消费者
$consumer->close();
这段代码使用了RdKafka\KafkaConsumer类来创建一个Kafka消费者实例,并订阅了指定的Topic。然后,通过循环调用consume()方法来从Topic中读取消息。如果消费成功,将消息转为JSON格式并输出;如果已到达分区末尾或超时等待消息,则继续下一次循环;如果发生其他错误,则打印出错误信息。
请注意,上述代码仅提供了一个基本的示例,实际应用可能需要根据具体情况进行适当的调整和错误处理。
学会使用bin*.sh或bin\windows*.bat
<?php
// 创建Kafka消费者
$conf = new RdKafka\Conf();
$conf->set('bootstrap.servers', 'http://n9-kkt.test001.com:30902');
$consumer = new RdKafka\KafkaConsumer($conf);
// 订阅Topic
$consumer->subscribe(['test.info.tst']);
// 消费消息
while (true) {
$message = $consumer->consume(120*1000);
echo $message->payload . "\n";
}
// 关闭消费者
$consumer->close();
?>