thinkphp6+workerman如何执行异步任务

当前有一项目,需要生成10万数据,但是执行时间比较长,现想通过使用workerman的异步任务让另一进程执行,不需要等待此程序结束而可以直接进行下步操作

任务进程服务端

use Workerman\Worker;

require_once __DIR__ . '/Workerman/Autoloader.php';

// task worker,使用Text协议

$task_worker = new Worker('Text://0.0.0.0:12345');

// task进程数可以根据需要多开一些

$task_worker->count = 100;

$task_worker->name = 'TaskWorker';

//只有php7才支持task->reusePort,可以让每个task进程均衡的接收任务

//$task->reusePort = true;

$task_worker->onMessage = function($connection, $task_data)

{

     // 假设发来的是json数据

     $task_data = json_decode($task_data, true);

     // 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....

     $task_result = ......

     // 发送结果

     $connection->send(json_encode($task_result));

};

Worker::runAll();

 

在workerman中调用


use Workerman\Worker;

use \Workerman\Connection\AsyncTcpConnection;

require_once __DIR__ . '/Workerman/Autoloader.php';

// websocket服务

$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function($ws_connection, $message)

{

    // 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip

    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');

    // 任务及参数数据

    $task_data = array(

        'function' => 'send_mail',

        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),

    );

    // 发送数据

    $task_connection->send(json_encode($task_data));

    // 异步获得结果

    $task_connection->onMessage = function($task_connection, $task_result)use($ws_connection)

    {

         // 结果

         var_dump($task_result);

         // 获得结果后记得关闭异步连接

         $task_connection->close();

         // 通知对应的websocket客户端任务完成

         $ws_connection->send('task complete');

    };

    // 执行异步连接

    $task_connection->connect();

}

Worker::runAll();