composer安装
composer require topthink/think-queue
配置目录config/queue.php如下:
return [ //'default' => 'sync', //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', //'queue' => 'default', //默认 'queue' => 'contract_queue', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
创建app\job命名空间文件,如果是单模块就是创建app\job,多模块的话,需要创建类似app\admin\job的文件位置,具体创建方法可自行百度
我是因为是多应用且是接口,所以位置在app\api\job,代码如下:
<?php namespace app\api\Job; use app\api\controller\ToolController; use think\facade\Db; use think\queue\Job; class ProcessJob { public function fire(Job $job, $data) { if(!$this->check_send_sms($data)){ $job->delete(); echo '==========不在范围内,不发送信息=========='; return; } //发送短信返回true或者false表示成功与否 $res = ToolController::sendSms($data['phone'], $data['msg']); if($res){ $job->delete(); echo '==========在范围内,发送信息成功=========='; return; }else{ if($job->attempts()>3){ $job->delete(); echo '==========发送失败重试超过3次,号码->'.$job->getJobId().'=========='; //$job->release(5); //延迟5秒执行 return; } } } public function failed(Job $job, $data){ // ...任务达到最大重试次数后,失败了 $job->delete(); echo '任务最大次数失败==============次数是'.$job->attempts().'========数据是'.json_encode($data).'============='; } public function check_send_sms($data) { $res = Db::name('table_name')->where('id', $data['insertGetId'])->find(); //当符合一定条件时 if(empty($res['data_name1']) || empty($res['data_name2'])){ return true; } return false; } }
在接口中调用如下:
<?php namespace app\api\controller; use app\api\Job\ProcessJob; use think\facade\Queue; use think\Request; //具体方法,创建数据时触发队列发送消息 public function createContract() { $post = $this->request->post(); //处理业务 $insertGetId = Db::name('contract')->insertGetId($post); if($post['type'] == 'MORNING'){ //当符合条件时 //两种方式发送消息 //队列发消息,立即执行 Queue::push(ProcessJob::class, ['data'=>$insertGetId]); Queue::later(30, \app\api\Job\ProcessJob::class, [ 'insertGetId'=>$insertGetId, 'phone'=>'xxxxxxxxxxx', 'msg'=>'需要发送短信的数据' ]); //延时队列发消息,隔30秒执行(30秒倒计时) } return true; }
最后运行
php think queue:listen 或者 php think queue:work
think-queue官方文档: