php如何并发消费mq中大量请求?

有个php项目要增加高并发的能力
看了网上的教学,说是用队列,目前想用队列,把请求放到队列里
理论上能明白,但是操作上有个问题,比如说同时有100万个订单同时产生,如何能同时处理10万个订单?目前用的thinkPHP-queue,也只能加几个进程去处理而已。

增加消费者数量:消费者越多,能够同时处理的请求数量就越多。创建了10个消费者线程,同时从RabbitMQ队列中读取请求:

<?php
//连接到RabbitMQ服务器
$connection = new AMQPConnection(array('host' => 'localhost', 'port' => 5672, 'vhost' => '/', 'username' => 'guest', 'password' => 'guest'));
$connection->connect();

//创建通道
$channel = new AMQPChannel($connection);

//创建队列
$queue = new AMQPQueue($channel);
$queue->setName('requests');

//创建10个消费者线程
for ($i = 0; $i < 10; $i++) {
    $pid = pcntl_fork();
    if ($pid == -1) {
        die('could not fork');
    } else if ($pid) {
        // parent
    } else {
        // child
        $queue->consume(function ($envelope, $queue) {
            $request = $envelope->getBody();
            //处理请求
            //...
        });
        exit();
    }
}

//等待所有消费者线程结束
for ($i = 0; $i < 10; $i++) {
    $status = 0;
    $pid = pcntl_wait($status);
}

//关闭连接
$connection->disconnect();

使用异步消息处理:可以使用异步消息处理,在处理消息时不阻塞,而是立即返回。使用 RabbitMQ 的消息队列:

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立 RabbitMQ 连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建队列
$channel->queue_declare('async_queue', false, false, false, false);

// 发送消息
$message = new AMQPMessage('Hello World!');
$channel->basic_publish($message, '', 'async_queue');

// 异步消费消息
$callback = function($message) {
    echo " [x] Received ", $message->body, "\n";
};

$channel->basic_consume('async_queue', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

// 关闭连接
$channel->close();
$connection->close();

建立了 RabbitMQ 连接,然后创建了一个队列并发送了消息,然后消费者异步地处理消息,不会阻塞,而是立即返回,最后关闭连接。

分割请求:将大量请求拆分为小请求,以提高处理效率。

// 分割请求的代码示例
$requests = getRequests(); // 获取所有请求
$chunkSize = 1000; // 将请求拆分为1000个一组
$chunks = array_chunk($requests, $chunkSize);
foreach ($chunks as $chunk) {
    processRequests($chunk); // 并发处理每一组请求
}

缓存队列中的请求:通过缓存队列中的请求,以提高处理效率。使用memcached或redis作为缓存:

// 使用memcached存储请求队列
$mem = new Memcached();
$mem->addServer("localhost", 11211);
$queue_key = 'request_queue';

// 向队列中添加请求
$mem->append($queue_key, $request);

// 从队列中获取请求
$requests = $mem->get($queue_key);

// 处理请求
foreach ($requests as $request) {
    // process request
}

// 清空队列
$mem->delete($queue_key);

// 使用redis存储请求队列
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$queue_key = 'request_queue';

// 向队列中添加请求
$redis->lpush($queue_key, $request);

// 从队列中获取请求
$requests = $redis->lrange($queue_key, 0, -1);

// 处理请求
foreach ($requests as $request) {
    // process request
}

// 清空队列
$redis->del($queue_key);