consumer.php
$logger = new Logger('my_logger');
// Now add some handlers
// $logger->pushHandler(new StdoutHandler());
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.10.2.1');
$config->setTopics(array('test'));
$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
producer.php
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('0.10.0.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(function() {
return array(
array(
'topic' => 'test',
'value' => 'dgjll90',
'key' => '',
),
);
});
$producer->setLogger($logger);
$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode) {
var_dump($errorCode);
});
$producer->send(true);
生产者是没有问题的,能正常发送
但是消费者(consumer)始终得不到数据,查看日志,发现,到最后一直循环
my_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: HeartbeatRequest ApiVersion: 0 [] []
这句话
但有的时候又能正常返回数据,但一般几率很小,差不多10几次一次能正常返回,其他时候就是一直循环上面那句话
求解答
这个问题可能是由于 Kafka 服务没有正确启动或配置不正确导致的。您可以尝试以下步骤来解决问题:
确认 Kafka 服务已经正确启动,并且可以通过您的 PHP 代码连接到 Kafka 服务。
检查您的 Kafka 配置是否正确,特别是元数据刷新间隔、元数据代理列表、代理版本、主题、偏移重置等参数是否设置正确。
检查您的 PHP 代码是否正确,特别是消费者和生产者的配置是否正确,以及日志记录器和回调函数是否正确设置。
如果您使用的是较旧版本的 Kafka,可以尝试升级到最新版本,以获得更好的稳定性和性能。