消息队列

什么是消息队列?

消息队列,一般我们会简称它为MQ(Message Queue),队列是一种先进先出的数据结构。可以简单理解为:把要传输的数据放在队列中。

目前使用较多的消息队列中间件有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。 主要解决应用解耦,异步消息,流量削锋等问题。

消息队列中有四个重要的角色:

  • 消息:可被其他业务使用的数据

  • 队列:用于存储消息

  • 生产者:生产消息发送到消息队列中

  • 消费者:从消息队列中取消息

这些角色在不同的消息中间件中,有不同的实现方式,使用方式也不同,好在 Laravel 队列为不同的后台队列服务提供统一的 API,例如 RabbitMQ ,Amazon SQS,Redis,甚至其他基于关系型数据库的队列。

配置

与队列相关的配置都放在 config/queue.php 中。

connections 这个选项给 RabbitMQ ,Beanstalk,或者 Redis 这样的后端服务定义了一个特有的连接。每个连接可以多多个 队列

需要注意的是 config/queue.php 中,每个连接都包含了一个 queue 属性。队列任务被发给指定连接的时候会被分发到 queue 属性和指定连接相同的队列中。换句话说,如果你分发任务的时候没有定义分配到哪个队列,那么它就会被放到连接配置中 queue 属性所定义的默认队列中

default 选项定义了默认的是哪个连接。

创建任务类

所有的任务类,都是放在每个 Bundle 的 Jobs 目录下。任务类需要继承 Illuminate\Contracts\Queue\ShouldQueue 接口,这意味着这个任务将会被推送到队列中,而不是同步执行。

任务类包含两个方法和一些属性,下面看下产品中的一个例子:

<?php
namespace MembersBundle\Jobs;

use CompanysBundle\Services\CompanysService;
use PromotionsBundle\Services\SmsDriver\ShopexSmsClient;
use PromotionsBundle\Services\SmsService;
use EspierBundle\Jobs\Job;

class GroupSendSms extends Job
{
    public $smsData;
    public function __construct($smsData)
    {
        $this->smsData = $smsData;
    }

    public function handle()
    {
        $smsData = $this->smsData;
        try {
            $companyId = $smsData['company_id'];
            $mobiles = $smsData['send_to_phones'];
            $content = $smsData['sms_content'];

            app('log')->debug('短信群发1: fan-out =>'.$companyId);
            $companysService = new CompanysService();
            $shopexUid = $companysService->getPassportUidByCompanyId($companyId);

            app('log')->debug('短信群发2: fan-out =>'.$shopexUid);
            $smsService = new SmsService(new ShopexSmsClient($companyId, $shopexUid));

            $smsService->sendContent($companyId, $mobiles, $content, 'fan-out');
        } catch ( \Exception $e) {
            app('log')->debug('短信群发失败: fan-out =>'.var_export($e->getMessage(),1));
        }
    }
}
  • __construct: 构造方法用来初始化任务所需要的数据

  • handle:队列执行时所调用的方法

在队列处理任务时,会调用 handle 方法,而这里我们也可以通过 handle 方法的参数类型提示,让 Laravel服务容器 自动注入依赖对象。

比如,上面例子中的 handle 方法中有这样一段代码 $companysService = new CompanysService(); 我们可以这样改造:

public function handle(CompanysService $companysService)
    {
        $smsData = $this->smsData;
        try {
            $companyId = $smsData['company_id'];
            $mobiles = $smsData['send_to_phones'];
            $content = $smsData['sms_content'];

            app('log')->debug('短信群发1: fan-out =>'.$companyId);
            $shopexUid = $companysService->getPassportUidByCompanyId($companyId);

            app('log')->debug('短信群发2: fan-out =>'.$shopexUid);
            $smsService = new SmsService(new ShopexSmsClient($companyId, $shopexUid));

            $smsService->sendContent($companyId, $mobiles, $content, 'fan-out');
        } catch ( \Exception $e) {
            app('log')->debug('短信群发失败: fan-out =>'.var_export($e->getMessage(),1));
        }
    }

这样写 handle(CompanysService $companysService) 在执行任务时 Laravel 会自动初始化 CompanysService 类,并注入 handle 中。

分发任务

分发任务分为两步:先初始化任务类,然后再使用辅助方法dispatch()分发任务。

    public function smsSends(Request $request)
    {
        $inputdata = $request->all('mobile', 'sms_content');

        $memberSmsLogService = new MemberSmsLogService();
        $params['company_id'] = app('auth')->user()->get('company_id');
        $params['operator'] = '管理员';
        $params['send_to_phones'] = $inputdata['mobile'];
        $params['sms_content'] = $inputdata['sms_content'];
        $result = $memberSmsLogService->create($params);
        //分发队列
        $job = (new GroupSendSms($params))->onQueue('sms');
        dispatch($job);

        return $this->response->array($result);
    }

dispatch() 辅助方法的具体实现如下:

    function dispatch($job)
    {
        return app(Illuminate\Contracts\Bus\Dispatcher::class)->dispatch($job);
    }

同步调度

如果您想立即(同步)执行队列任务,可以使用 dispatchNow 方法。 使用此方法时,队列任务将不会排队,并立即在当前进程中运行:

app(Illuminate\Contracts\Bus\Dispatcher::class)->dispatchNow($job);

运行队列处理器

创建完成任务类,dispatch任务后,任务会在队列中,要想真正执行任务还要执行队列处理器 Laravel 包含了一个队列处理器以将推送到队列中的任务执行。你可以使用 queue:work Artisan 命令运行处理器。 注意一旦 queue:work 命令开始执行,它会一直运行直到它被手动停止或终端被关闭。

php artisan queue:work

Tip:要使 queue:work 进程一直在后台运行,你应该使用进程管理器比如 Supervisor 来确保队列处理器不会停止运行

记住,队列处理器是一个常驻的进程并且在内存中保存着已经启动的应用状态。因此,它们并不会在启动后注意到你代码的更改。所以,在你的重新部署过程中,请记得 重启你的队列处理器。

思考

上面我们通过创建任务类,分发任务,运行队列处理器我们的任务就可以异步执行了。在文章开头我们提到的消息队列中有四个重要的角色:消息、队列、生产者、消费者,而在 Laravel 的封装下只有任务类、任务分发(dispatch)、队列处理器。它们之间的对应关系可以这样理解:

  • 消息 是 初始化的任务类。

  • 队列 是我们在config/queue.php中配置的连接所对应的queue,消息被分发后它们可能会保存在RabbitMQ、Redis或者数据库中的队列中。

  • 生产者dispatch

  • 消费者队列处理器+任务类的 handle 方法

一个消息从产出到消费的整个流程可以分三步理解:

第一步:通过定义任务类的属性,定义消息结构体。同时在任务类中定义了消息的处理方法handle

第二步:我们根据业务初始化任务类然后调用 dispatch 方法。dispatch 方法会根据规则将任务类格式化为字符串格,然后将字符串放入队列中,此时 dispatch 完成生产者的职责。

第三步:消息被投递到队列之后,队列处理器就会从队列中获取字符串消息,根据规则将字符串消息初始化为对应的任务类,然后再调用任务的 handle 方法,完成消息的消费。

Last updated