composer安装
composer require topthink/think-queue
配置目录config/queue.php如下:
return [
//'default' => 'sync', //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
//'queue' => 'default', //默认
'queue' => 'contract_queue',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
创建app\job命名空间文件,如果是单模块就是创建app\job,多模块的话,需要创建类似app\admin\job的文件位置,具体创建方法可自行百度
我是因为是多应用且是接口,所以位置在app\api\job,代码如下:
<?php
namespace app\api\Job;
use app\api\controller\ToolController;
use think\facade\Db;
use think\queue\Job;
class ProcessJob
{
public function fire(Job $job, $data)
{
if(!$this->check_send_sms($data)){
$job->delete();
echo '==========不在范围内,不发送信息==========';
return;
}
//发送短信返回true或者false表示成功与否
$res = ToolController::sendSms($data['phone'], $data['msg']);
if($res){
$job->delete();
echo '==========在范围内,发送信息成功==========';
return;
}else{
if($job->attempts()>3){
$job->delete();
echo '==========发送失败重试超过3次,号码->'.$job->getJobId().'==========';
//$job->release(5); //延迟5秒执行
return;
}
}
}
public function failed(Job $job, $data){
// ...任务达到最大重试次数后,失败了
$job->delete();
echo '任务最大次数失败==============次数是'.$job->attempts().'========数据是'.json_encode($data).'=============';
}
public function check_send_sms($data)
{
$res = Db::name('table_name')->where('id', $data['insertGetId'])->find();
//当符合一定条件时
if(empty($res['data_name1']) || empty($res['data_name2'])){
return true;
}
return false;
}
}
在接口中调用如下:
<?php
namespace app\api\controller;
use app\api\Job\ProcessJob;
use think\facade\Queue;
use think\Request;
//具体方法,创建数据时触发队列发送消息
public function createContract()
{
$post = $this->request->post();
//处理业务
$insertGetId = Db::name('contract')->insertGetId($post);
if($post['type'] == 'MORNING'){ //当符合条件时
//两种方式发送消息
//队列发消息,立即执行
Queue::push(ProcessJob::class, ['data'=>$insertGetId]);
Queue::later(30, \app\api\Job\ProcessJob::class, [
'insertGetId'=>$insertGetId,
'phone'=>'xxxxxxxxxxx',
'msg'=>'需要发送短信的数据'
]); //延时队列发消息,隔30秒执行(30秒倒计时)
}
return true;
}
最后运行
php think queue:listen 或者 php think queue:work
think-queue官方文档:
