RabbitMQ 是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ 是一个邮箱、邮局、邮递员。RabbitMQ 和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——_消息_。一般提到 RabbitMQ 和消息,都用到一些专有名词。

  • *生产(Producing)*意思就是发送。发送消息的程序就是一个*生产者(producer)*。我们一般用”P”来表示:
  • *队列(queue)*就是邮箱的名称。消息通过你的应用程序和 RabbitMQ 进行传输,它们能够只存储在一个*队列(queue)*中。 *队列(queue)*没有任何限制,你要存储多少消息都可以——基本上是一个无限的缓冲。多个*生产者(producers)*能够把消息发送给同一个队列,同样,多个*消费者(consumers)*也能攻从一个*队列(queue)*中获取数据。队列可以化城这样(图上是队列的名称):
  • *消费(Consuming)*和获取消息是一样的意思。一个*消费者(consumer)*就是一个等待获取消息的程序。我们把它画作”C”:

Hello World!

(使用 pika 0.9.5 Python 客户端)

我们的“Hello world”不会很复杂——仅仅发送一个消息,然后获取它并输出到屏幕。这样以来我们需要两个程序,一个用作发送消息,另一个接受消息并打印消息内容

我们大体的设计是这样的:

生产者(Producer)把消息发送到一个名为“hello”的队列中。消费者(consumer)从这个队列中获取消息。

RabbitMQ 库

RabbitMQ 使用的是 AMQP 协议。要使用她你就必须需要一个使用同样协议的库。几乎所有的编程语言都有可选择的库。python 也是一样,可以从以下几个库中选择:

  • py-amqplib
  • txAMQP
  • pika

在这一系列教程中,我们打算使用PHPAMQP扩展。详细教程请查看:

mac os 下 RabbitMq 以及 PHP amqp 扩展安装记录

发送消息

我们第一个程序 send.php 会发送一个消息到队列中。首先要做的事情就是建立一个到 RabbitMQ 服务器的连接。

$connection = new AMQPConnection(array('host' =>'127.0.0.1', 'port' =>'5672', 'vhost' =>'/', 'login' =>'guest', 'password' => 'guest'));

现在我们已经连接上服务器了,那么,在发送消息之前我们需要确认队列是存在的。如果我们把消息发送到一个不存在的队列,RabbitMQ 会丢弃这条消息。我门先创建一个名为*hello*的队列,然后把消息发送到这个队列中。

$queue = new AMQPQueue($channel);
$queue->setName($queueName);

这时候我们就可以发送消息了,我们第一条消息只包含了 *Hello World!*字符串,我们打算把它发送到我们的*hello*队列。

在 RabbitMQ 中,消息是不能直接发送到队列,它需要发送到*交换器(exchange)*中。我们不打算在这里深入讨论它——你可以通过教程的第三部分了解更多。现在我们所需要了解的是如何使用默认的交换器(exchange),它使用一个空字符串来标识。交换器允许我们指定某条消息需要投递到哪个队列,$$routeKey 参数必须指定为队列的名称:

$exchange->publish($message, $routeKey);
var_dump("[x] Sent 'Hello World!'");

在退出程序之前,我们需要确认网络缓冲已经被刷写、消息已经投递到 RabbitMQ。完成这些事情(正确的关闭连接)是很简单的。

$connection->disconnect();

获取数据

我们的第二个程序 receive.php,将会从队列中获取消息并打印消息。

这次我们还是先要连接到 RabbitMQ 服务器。连接服务器的代码和之前是一样的。

下一步也和之前一样,我们需要确认队列是存在的。使用$queue->declare()创建一个队列——我们可以运行这个命令很多次,但是只有一个队列会创建。

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();

你也许要问为什么重复声明了队列——我们已经在前面的代码中声明了它。如果我们确定了队列是已经存在的,那么我们可以不这么做。比如先运行 send.php 程序。可是我们并不确定哪个程序先运行,这种情况的话再程序中重复声明是好的做法。

列出所有队列

你也许希望查看 RabbitMQ 由哪些队列、有多少消息在队列中。你可以使用 rabbitmqctl 工具(使用有权限的用户):

> $ sudo rabbitmqctl list_queues
> Listing queues ...
> hello    0
> ...done.
> ```
>
> (omit sudo on Windows)
>
> (在 Windows 中不需要 sudo 命令)

从队列中获取消息相对来说稍显复杂。需要为队列定义一个回调(callback)函数。当我们获取到消息的时候,~~Pika 库~~就会调用这个回调(callback)函数。我们的这个回调函数将会但因消息的内容到屏幕上。

```php
function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        $queue->ack($envelope->getDeliveryTag());
}

下一步,我们需要告诉 RabbitMQ 这个回调函数将会从*hello*队列中接收消息:

$queue->consume('callback');

要成功运行这些命令,我们必须保证队列是存在的,我们已经能够保证——我们之前已经使用创建了一个队列 queue_declare。

$queue->nack()//函数稍后会介绍。

最后,我们输入一个无限循环来等待消息数据并确运行回调函数。

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
}

整合

send.php 的全部代码:

<?php

/**
 * PHP amqp(RabbitMQ) Demo-1
 * @author  yuansir <yuansir@live.cn/yuanxuxu.com>
 */
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';
$message = 'Hello World!';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

try {
        $channel = new AMQPChannel($connection);
        $exchange = new AMQPExchange($channel);
        $exchange->setName($exchangeName);
        $queue = new AMQPQueue($channel);
        $queue->setName($queueName);
        $exchange->publish($message, $routeKey);
        var_dump("[x] Sent 'Hello World!'");
} catch (AMQPConnectionException $e) {
        var_dump($e);
        exit();
}
$connection->disconnect();

receive.py 的全部代码:

<?php

/**
 * PHP amqp(RabbitMQ) Demo-1
 * @author  yuansir <yuansir@live.cn/yuanxuxu.com>
 */
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();
$queue->bind($exchangeName, $routeKey);

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
}
$connection->disconnect();

function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        $queue->ack($envelope->getDeliveryTag());
}

现在就可以在终端中运行我们的程序了。首先,用 send.php 重续发送一条消息:

php send.php
string(23) "[x] Sent 'Hello World!'"</pre>

生产者(producer)程序 send.php 每次运行之后就会停止。现在我们就来接收消息:

php receive.php
string(46) "[*] Waiting for messages. To exit press CTRL+C"
string(26) " [x] Received:Hello World!"</pre>

成功了!我们已经通过 RabbitMQ 发送第一条消息。你也许已经注意到了,receive.py 程序并没有退出。它一直在准备获取消息,你可以通过 Ctrl-C 来终端它。

试下在新的终端中再次运行 send.php。

我们已经学会如何发送消息到一个已知队列中并接收消息。是时候移步到第二部分了,我们将会建立一个简单的*工作队列(work queue)*。

转载请注明: 转载自Ryan 是菜鸟 | LNMP 技术栈笔记

如果觉得本篇文章对您十分有益,何不 打赏一下

谢谢打赏

本文链接地址: RabbitMQ 官方中文入门教程(PHP 版) 第一部分:Hello World

知识共享许可协议 本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可