一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列

本篇文章给大家带来了关于thinkphp的相关知识,其中主要介绍了关于使用think-queue来实现普通队列和延迟队列的相关内容,think-queue是thinkphp官方提供的一个消息队列服务,下面一起来看一下,希望对大家有帮助。

一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列

推荐学习:《PHP视频教程》

###TP6 队列

TP6 中使用 think-queue 可以实现普通队列和延迟队列。

立即学习“PHP免费学习笔记(深入)”;

think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等队列的多队列, 内存限制 ,启动,停止,守护等消息队列可降级为同步执行

消息队列实现过程

1、通过生产者推送消息到消息队列服务中

2、消息队列服务将收到的消息存入redis队列中(zset)

3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条

4、处理获取下来的消息调用业务类进行处理相关业务

5、业务处理后,需要从队列中删除消息

composer 安装 think-queue

composer require topthink/think-queue

配置文件

安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。

tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。

return [    'default'     => 'redis',    'connections' => [        'sync'     => [            'type' => 'sync',        ],        'database' => [            'type'       => 'database',            'queue'      => 'default',            'table'      => 'jobs',            'connection' => null,        ],        'redis'    => [            'type'       => 'redis',            'queue'      => 'default',            'host'       => env('redis.host', '127.0.0.1'),            'port'       => env('redis.port', '6379'),            'password'   => env('redis.password','123456'),            'select'     => 0,            'timeout'    => 0,            'persistent' => false,        ],    ],    'failed'      => [        'type'  => 'none',        'table' => 'failed_jobs',    ],];

创建目录及队列消费类文件

在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类

序列猴子开放平台 序列猴子开放平台

具有长序列、多模态、单模型、大数据等特点的超大规模语言模型

序列猴子开放平台 0 查看详情 序列猴子开放平台

getJobId(); // 队列的数据库id或者redis key        // $jobClassName = $job->getName(); // 队列对象类        // $queueName = $job->getQueue(); // 队列名称        // 如果已经执行中或者执行完成就不再执行了        if (!$this->checkJob($jobId, $data)) {            $job->delete();            Cache::store('redis')->delete($jobId);            return ;        }        // 执行业务处理        if ($this->execute($data)) {            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));            $job->delete(); // 任务执行成功后删除            Cache::store('redis')->delete($jobId); // 删除redis中的缓存        } else {            // 检查任务重试次数            if ($job->attempts() > 3) {                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行                //$job->release(10);                 // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数                //$job->failed();                   // 第3种处理方式:删除任务                $job->delete(); // 任务执行后删除                Cache::store('redis')->delete($jobId); // 删除redis中的缓存            }        }    }    /**     * 消息在到达消费者时可能已经不需要执行了     * @param  string  $jobId     * @param $message     * @return bool 任务执行的结果     * @throws PsrSimpleCacheInvalidArgumentException     */    protected function checkJob(string $jobId, $message): bool    {        // 查询redis        $data = Cache::store('redis')->get($jobId);        if (!empty($data)) {            return false;        }        Cache::store('redis')->set($jobId, $message);        return true;    }    /**     * @describe: 根据消息中的数据进行实际的业务处理     * @param $data 数据     * @return bool 返回结果     */    abstract protected function execute($data): bool;}

所有真正的消费类继承基础抽象类

<?phpnamespace appqueuetest;use appqueueQueue;class Test extends Queue{    protected function execute($data): bool    {       // 具体消费业务逻辑    }}

生产者逻辑

use thinkfacadeQueue;// 普通队列生成调用方式Queue::push($job, $data, $queueName);// 例:Queue::push(Test::class, $data, $queueName);// 延时队列生成调用方式Queue::later($delay, $job, $data, $queueName);// 例如使用延时队列 10 秒后执行:Queue::later(10 , Test::class, $data, $queueName);

开启进程监听任务并执行

php think queue:listenphp think queue:work

命令模式介绍

命令模式

queue:work 命令

work 命令: 该命令将启动一个 work 进程来处理消息队列。

php think queue:work --queue TestQueue

queue:listen 命令

listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

php think queue:listen --queue TestQueue

命令行参数

Work 模式

php think queue:work --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出--queue  helloJobQueue  //要处理的队列的名称--delay  0         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0--force            //系统处于维护状态时是否仍然处理任务,并未找到相关说明--memory 128       //该进程允许使用的内存上限,以 M 为单位--sleep  3         //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)--tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0

Listen 模式

php think queue:listen --queue  helloJobQueue    //监听的队列的名称--delay  0          //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0--memory 128        //该进程允许使用的内存上限,以 M 为单位--sleep  3          //如果队列中无任务,则多长时间后重新检查,daemon模式下有效--tries  0          //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0--timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

可以看到 listen 模式下,不包含 --deamon 参数,原因下面会说明

消息队列的开始,停止与重启

开始一个消息队列:

php think queue:work

停止所有的消息队列:

php think queue:restart

重启所有的消息队列:

php think queue:restart php think queue:work

推荐学习:《PHP视频教程》

以上就是一起聊聊thinkphp6使用think-queue实现普通队列和延迟队列的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/226914.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月3日 19:30:06
下一篇 2025年11月3日 19:31:18

相关推荐

发表回复

登录后才能评论
关注微信