最近公司的一部分业务需要解耦,上消息队列。
原本打算使用 RabbitMQ ,不过啃了一段时间,太难了。。。
先使用laravel自带的消息队列来实现吧。
准备
laravel的队列驱动器有好多种,比如 "sync", "database", "beanstalkd", "sqs", "redis" 啥的。
我们选取redis。并使用 Horizon 来管理我们队列。
装包包
composer require laravel/horizonphp artisan horizon:install //安装
php artisan queue:failed-table
php artisan migrate //数据迁移
配置
'local' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['high','default','low'],
'balance' => 'simple',
'processes' => 3, //进程数
'tries' => 1,
],]
Horizon 提供了三种均衡策略: simple , auto ,和 false 。默认的是 simple , 会将收到的任务均分给队列进程。
auto 策略会根据当前的工作量调整每个队列的工作进程任务数量。例如:如果 notifications 队列有 1000 个待执行任务,但是你的 render 队列是空的,Horizon 会分配更多的工作进程给 notifications 队列,直到 notifications 队列中所有任务执行完成。当配置项 balance 配置为 false ,Horizon 会使用 Laravel 默认执行行为,它将按照配置中列出的顺序处理队列任务。
当使用 auto 策略时,您可以定义 minProcesses 和 maxProcesses 配置选项,以控制最小和最大进程数范围应向上和向下扩展到:
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default'],
'balance' => 'auto',
'minProcesses' => 1,
'maxProcesses' => 10,
'tries' => 3,
],]
添加队列
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Carbon\Carbon;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
//
}
/**
* Execute the job.
*
* @return void
*/
public function handle($time)
{
Log::info('info',array('context'=>Carbon::now()->toDateTimeString(),'time'=>$time));
}
}
添加队列任务
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
class test
{
public function index(){
ProcessPodcast::dispatch('延时执行')->delay(now()->addMinutes(1));
ProcessPodcast::dispatchNow('立即执行'); //立即执行 同步调用
ProcessPodcast::dispatch('优先级高')->onQueue('high');
}
}
启动
php artisan horizon //启动
nohub php artisan horizon & //挂后台启动
php artisan horizon:pause //暂停
php artisan horizon:continue //继续