工作队列

第一篇教程中,我们已经写了一个从已知队列中发送和获取消息的程序。在这篇教程中,我们将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Works )。

工作队列*(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。*当我们把*任务(Task)*当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

这个概念在网络应用中是非常有用的,它可以在短暂的 HTTP 请求中处理一些复杂的任务。

准备

之前的教程中,我们发送了一个包含“Hello World!”的字符串消息。现在,我们将发送一些字符串,把这些字符串当作复杂的任务。我们没有真是的例子,例如图片缩放、pdf 文件转换。所以使用 sleep()函数来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时 1 秒钟。比 如”Hello…”就会耗时 3 秒钟。

我们对之前教程的*send.php*做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中。我们把它命名为 new_task.php:

$message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1];
$exchange->publish($message, $routeKey);
var_dump("[x] Sent $message");

我们的旧脚本(_receive.php_)同样需要做一些改动:它需要为消息体中每一个点号(.)模拟 1 秒钟的操作。它会从队列中获取消息并执行,我们把它命名为 worker.php:

function callback($envelope, $queue) {
$msg = $envelope->getBody();
var_dump(" [x] Received:" . $msg);
sleep(substr_count($msg,'.'));
$queue->ack($envelope->getDeliveryTag());
}

轮询分发

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

首先,我们先同时运行两个 worker.php 脚本,它们都会从队列中获取消息,到底是不是这样呢?我们看看。

你需要打开三个终端,两个用来运行 worker.php 脚本,这两个终端就是我们的两个消费者(consumers)—— C1 和 C2。

shell1

$php worker.php
 [*] Waiting for messages. To exit press CTRL+C

shell2

$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C

第三个终端,我们用来发布新任务。你可以发送一些消息给消费者(consumers):

shell3

$ php new_task.php First message.

shell3

$ php new_task.php Second message..

shell3

$ php new_task.php Third message...

shell3

$ php new_task.php Fourth message....

shell3

$ php new_task.php Fifth message.....

看看到底发送了什么给我们的工作者(workers):

shell1

$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

shell2

$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默认来说,RabbitMQ 会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。

消息响应

当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被 RabbitMQ 发送给 消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送 到这个工作者的还没有处理的消息都会丢失。

我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ 提供了消息*响应(acknowledgments)*。消费者会通过一个 ack(响应),告诉 RabbitMQ 已经收到并处理了某条消息,然后 RabbitMQ 就会释放并删除这条消息。

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ 就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ 会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ 会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。 之前的例子中我们使用$queue->ack()。当工作者(worker)完成了任务,就发送一个响应。

function callback($envelope, $queue) {
    $msg = $envelope->getBody();
    var_dump(" [x] Received:" . $msg);
    sleep(substr_count($msg,'.'));
    $queue->ack($envelope->getDeliveryTag());
}
$queue->consume('callback');

运行上面的代码,我们发现即使使用 CTRL+C 杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。

忘了响应

一个很容易犯的错误就是忘了 basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ 就会占用越来越多的内存。

为了排除这种错误,你可以使用 rabbitmqctl 命令,输出 messages_unacknowledged 字段:

> $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
> Listing queues ...
> hello    0       0
> ...done.
> ```

## 消息持久化

如果你没有特意告诉 RabbitMQ,那么在它退出或者崩溃的时候,它将会流失所有的队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

首先,为了不让队列丢失,需要把它声明为*持久化(durable)*:

```php
$queue->setFlags(AMQP_DURABLE);

尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫 hello 的非持久化队列。RabbitMq 不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如 task_queue。

$queue->setName('task_queue');
$queue->setFlags(AMQP_DURABLE);
$queue->declare();

这个$queue->declare();必须在生产者(producer)和消费者(consumer)对应的代码中修改。

这时候,我们就可以确保在 RabbitMq 重启之后 queue_declare 队列不会丢失。

注意:消息持久化

将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了 RabbitMq 要把消息存到硬盘,但从 RabbitMq 收到消息到保存之间还是有一 个很小的间隔时间。因为 RabbitMq 并不是所有的消息都使用 fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的 持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。

公平分发

你应该已经发现,它仍旧没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而 RabbitMQ 并不知道这些,它仍然一如既往的派发消息。

这时因为 RabbitMQ 只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第 n-th 条消息发给第 n-th 个消费者。

我们可以使用$channel->qos();方法,并设置 prefetch_count=1。这样是告诉 RabbitMQ,再同一时刻,不要发送超过 1 条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ 就会把消息分发给下一个空闲的工作者(worker)。

$channel->qos(0,1);

关于队列大小

如果所有的工作者都处理繁忙状态,你的队列就会被填满。你需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。

整合

new_task.py 的完整代码:

<?php

/**
 * PHP amqp(RabbitMQ) Demo-2
 * @author  yuansir <yuansir@live.cn/yuanxuxu.com>
 */

$exchangeName = 'demo';
$queueName = 'task_queue';
$routeKey = 'task_queue';
$message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1];

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declare();
$exchange->publish($message, $routeKey);
var_dump("[x] Sent $message");

$connection->disconnect();

我们的 worker:

<?php

/**
 * PHP amqp(RabbitMQ) Demo-2
 * @author  yuansir <yuansir@live.cn/yuanxuxu.com>
 */
$exchangeName = 'demo';
$queueName = 'task_queue';
$routeKey = 'task_queue';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declare();
$queue->bind($exchangeName, $routeKey);

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
        $channel->qos(0,1);
}
$connection->disconnect();

function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        sleep(substr_count($msg,'.'));
        $queue->ack($envelope->getDeliveryTag());
}

使用消息响应和 prefetch_count 你就可以搭建起一个工作队列了。这些持久化的选项使得在 RabbitMQ 重启之后仍然能够恢复。

现在我们可以移步教程 3 学习如何发送相同的消息给多个消费者(consumers)

转载请注明: 转载自Ryan 是菜鸟 | LNMP 技术栈笔记

如果觉得本篇文章对您十分有益,何不 打赏一下

谢谢打赏

本文链接地址: RabbitMQ 官方中文入门教程(PHP 版) 第二部分:工作队列(Work queues)

知识共享许可协议 本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可