php & RabbitMQ

回覆文章
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

php & RabbitMQ

文章 yehlu »

http://www.yuansir-web.com/2013/05/31/r ... llo-world/

send.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-1
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';
$message = 'Hello World!';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

try {
        $channel = new AMQPChannel($connection);
        $exchange = new AMQPExchange($channel);
        $exchange->setName($exchangeName);
        $queue = new AMQPQueue($channel);
        $queue->setName($queueName);
        $exchange->publish($message, $routeKey);
        var_dump("[x] Sent 'Hello World!'");
} catch (AMQPConnectionException $e) {
        var_dump($e);
        exit();
}
$connection->disconnect();
receive.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-1
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();
$queue->bind($exchangeName, $routeKey);

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
}
$connection->disconnect();

function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        $queue->nack($envelope->getDeliveryTag());
}
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

install php amqp

文章 yehlu »

http://serverfault.com/questions/607993 ... untu-14-04

auto_phpamqp.sh

代碼: 選擇全部

#create a directory for sources
mkdir ~/kit
cd ~/kit

#download and install the rabbitmq c amqp lib
wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.5.1/rabbitmq-c-0.5.1.tar.gz
tar -zxvf rabbitmq-c-0.5.1.tar.gz
cd rabbitmq-c-0.5.1/
./configure
make
sudo make install

cd ..

#download and compile the amqp
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar -zxvf amqp-1.4.0.tgz
cd amqp-1.4.0/
phpize && ./configure --with-amqp && make && sudo make install

#Add amqp extension to php mods-availabile directory
echo "extension=amqp.so" > /etc/php5/mods-available/amqp.ini

#Enabled it in cli
cd /etc/php5/cli/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini
php -m | grep amqp

#Enabled it in cli
cd /etc/php5/apache2/conf.d/
ln -s ../../mods-available/amqp.ini 20-amqp.ini


#restart Apache and than check phpinfo on web
service apache2 restart
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ

文章 yehlu »

http://www.yuansir-web.com/2013/05/31/r ... %EF%BC%89/

new_task.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-2
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */

$exchangeName = 'demo';
$queueName = 'task_queue';
$routeKey = 'task_queue';
$message = empty($argv[1]) ? 'Hello World!' : ' '.$argv[1];

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declare();
$exchange->publish($message, $routeKey);
var_dump("[x] Sent $message");

$connection->disconnect();

worker.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-2
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */
$exchangeName = 'demo';
$queueName = 'task_queue';
$routeKey = 'task_queue';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declare();
$queue->bind($exchangeName, $routeKey);

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
        $channel->qos(0,1);
}
$connection->disconnect();

function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        sleep(substr_count($msg,'.'));
        $queue->ack($envelope->getDeliveryTag());
}

yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ

文章 yehlu »

http://www.yuansir-web.com/2013/05/31/r ... ishsubscr/

emit_log.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-3
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */

$exchangeName = 'logs';
$message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1];

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->declare();

$exchange->publish($message, '');
var_dump("[x] Sent $message");

$connection->disconnect();
receive_logs.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-3
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */
$exchangeName = 'logs';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declare();
$queue->bind($exchangeName, '');

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
}
$connection->disconnect();

function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump(" [x] Received:" . $msg);
        $queue->nack($envelope->getDeliveryTag());
}
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ

文章 yehlu »

http://www.yuansir-web.com/2013/05/31/r ... B1routing/

emit_log_direct.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-4
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */
$severity = count($argv) > 2 ? $argv[1] : 'info';
$message = empty($argv[2]) ? 'Hello World!' : ' ' . $argv[2];

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('direct_logs');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$exchange->publish($message, $severity);
var_dump("[x] Sent $message");

$connection->disconnect();

receive_logs_direct.php

代碼: 選擇全部

<?php

/**
 * PHP amqp(RabbitMQ) Demo-4
 * @author  yuansir <yuansir@live.cn/yuansir-web.com>
 */
$exchangeName = 'direct_logs';

$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declare();

$severities = $argv;
$file = $severities[0];
unset($severities[0]);
if (!$severities) {
        var_dump("Usage:$file [info] [warning] [error]");
        exit();
} else {
        foreach ($severities as $item) {
                $queue->bind($exchangeName, $item);
        }
}

var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {
        $queue->consume('callback');
}
$connection->disconnect();

function callback($envelope, $queue) {
        $msg = $envelope->getBody();
        var_dump('[x]' . $envelope->getRoutingKey() . ':' . $msg);
        $queue->nack($envelope->getDeliveryTag());
}

yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ

文章 yehlu »

http://www.coctec.com/subject/about/73097.html

四、Yupoo的消息系統

由於PHP的單線程模型,Yupoo把耗時較久的運算和I/O操作從HTTP請求周期中分離出來, 交給由Python實現的任務進程來完成,以保證請求響應速度。這些任務主要包括:郵件發送、數據索引、數據聚合和好友動態推送等等。PHP通過消息隊列 (Yupoo用的是RabbitMQ)來觸發任務執行。這些任務的主要特點為:

由用戶或者定時觸發的
耗時比較長的
需要非同步執行的
整個任務系統主要分為以消息分發、進程管理和工作進程組成。

圖檔
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

rabbitmq/rabbitmq-tutorials 官方範例程式碼

文章 yehlu »

https://github.com/rabbitmq/rabbitmq-tu ... master/php

PHP code for RabbitMQ tutorials
Here you can find PHP code examples from RabbitMQ tutorials.

To successfully use the examples you will need a running RabbitMQ server.

Requirements

Additionally you need PHP 5.3 and php-amqplib. To get these dependencies on Ubuntu type:

sudo apt-get install git-core php5-cli
Then you can install php-amqplib using Composer.

To do that install Composer and add it to your path, then run the following command inside this project folder:

composer.phar install
Code

Tutorial one: "Hello World!":

php send.php
php receive.php
Tutorial two: Work Queues:

php new_task.php "A very hard task which takes two seconds.."
php worker.php
Tutorial three: Publish/Subscribe

php receive_logs.php
php emit_log.php "info: This is the log message"
Tutorial four: Routing:

php receive_logs_direct.php info
php emit_log_direct.php info "The message"
Tutorial five: Topics:

php receive_logs_topic.php "*.rabbit"
php emit_log_topic.php red.rabbit Hello
Tutorial six: RPC:

php rpc_server.php
php rpc_client.php
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ 進階範例

文章 yehlu »

http://www.sitepoint.com/php-rabbitmq-a ... -examples/

PHP and RabbitMQ: Advanced Examples

Miguel Ibarra Romero Miguel Ibarra Romero
Published October 17, 2014

Tweet
Subscribe
This entry is part 2 of 2 in the series Message Queues in PHP with RabbitMQ
Message Queues in PHP with RabbitMQ

How to use RabbitMQ with PHP
PHP and RabbitMQ: Advanced Examples
In part 1 we covered the theory and a simple use case of the AMQP protocol in PHP with RabbitMQ as the broker. Now, let’s dive into some more advanced examples.

Example 1: send request to process data asynchronously among several workers

In the example of the previous part, we had one producer, one consumer. If the consumer died, messages would continue to stack up in the queue until the consumer started again. It would then process all the messages, one by one.

This can be less than ideal in a concurrent user environment with a fair amount of requests per minute. Fortunately, scaling the consumers is super easy, but let’s implement another example.

enter image description here

Let’s say we have an invoice generation service, where the users just need to provide the invoice number, and the system will automatically generate a pdf file and email it to the user. Generating and sending the email could take even several seconds if the server on which the generation process runs is resource limited. Now suppose we are required to support several transactions per second, how do we accomplish this without overwhelming the server?

We need to implement the following pattern:

enter image description here

Let’s look at our producer class:

代碼: 選擇全部

<?php
namespace Acme\AmqpWrapper;
 
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
class WorkerSender
{
    /* ... SOME OTHER CODE HERE ... */
     
    /**
     * Sends an invoice generation task to the workers
     * 
     * @param int $invoiceNum
     */
    public function execute($invoiceNum)
    {
        $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
         
        $channel->queue_declare(
            'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
            false,              #passive - can use this to check whether an exchange exists without modifying the server state
            true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
            false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
            false               #auto delete - queue is deleted when last consumer unsubscribes
            );
             
        $msg = new AMQPMessage(
            $invoiceNum,
            array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
            );
             
        $channel->basic_publish(
            $msg,               #message 
            '',                 #exchange
            'invoice_queue'     #routing key (queue)
            );
             
        $channel->close();
        $connection->close();
    }
}
The WorkerSender::execute() method will receive an invoice number. Next we create a connection, channel and queue as usual.

代碼: 選擇全部

<?php
/* ... SOME CODE HERE ... */
 
        $msg = new AMQPMessage(
            $invoiceNum,
            array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
            );
 
/* ... SOME CODE HERE ... */
Please notice that this time, while creating the message object, the constructor receives a second parameter: array('delivery_mode' => 2). In this case we want to state that the message should not be lost if the RabbitMQ server crashes. Please be aware that in order for this to work, the queue has to be declared durable, too.

The following code can be used to receive the form data and execute the producer:

代碼: 選擇全部

<?php
chdir(dirname(__DIR__));
require_once('vendor/autoload.php');
 
use Acme\AmqpWrapper\WorkerSender;
 
$inputFilters = array(
    'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
);
$input = filter_input_array(INPUT_POST, $inputFilters);
$sender = new WorkerSender();
$sender->execute($input['invoiceNo']);
Please use whichever input sanitization/validation you feel comfortable with.

Things get a little bit interesting on the consumer side:

代碼: 選擇全部

<?php
namespace Acme\AmqpWrapper;
 
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
class WorkerReceiver
{
    /* ... SOME OTHER CODE HERE ... */
     
    /**
     * Process incoming request to generate pdf invoices and send them through 
     * email.
     */
    public function listen()
    {
        $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
         
        $channel->queue_declare(
            'invoice_queue',    #queue
            false,              #passive
            true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
            false,              #exclusive - queues may only be accessed by the current connection
            false               #auto delete - the queue is deleted when all consumers have finished using it
            );
             
        /**
         * don't dispatch a new message to a worker until it has processed and 
         * acknowledged the previous one. Instead, it will dispatch it to the 
         * next worker that is not still busy.
         */
        $channel->basic_qos(
            null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
            1,      #prefetch count - prefetch window in terms of whole messages
            null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
            );
         
        /**
         * indicate interest in consuming messages from a particular queue. When they do 
         * so, we say that they register a consumer or, simply put, subscribe to a queue.
         * Each consumer (subscription) has an identifier called a consumer tag
         */
        $channel->basic_consume(
            'invoice_queue',        #queue
            '',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
            false,                  #no local - TRUE: the server will not send messages to the connection that published them
            false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
            false,                  #exclusive - queues may only be accessed by the current connection
            false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
            array($this, 'process') #callback
            );
             
        while(count($channel->callbacks)) {
            $this->log->addInfo('Waiting for incoming messages');
            $channel->wait();
        }
         
        $channel->close();
        $connection->close();
    }
     
    /**
     * process received request
     * 
     * @param AMQPMessage $msg
     */
    public function process(AMQPMessage $msg)
    {
        $this->generatePdf()->sendEmail();
         
        /**
         * If a consumer dies without sending an acknowledgement the AMQP broker 
         * will redeliver it to another consumer or, if none are available at the 
         * time, the broker will wait until at least one consumer is registered 
         * for the same queue before attempting redelivery
         */
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
     
    /**
     * Generates invoice's pdf
     * 
     * @return WorkerReceiver
     */
    private function generatePdf()
    {
        /**
         * Mocking a pdf generation processing time.  This will take between
         * 2 and 5 seconds
         */
        sleep(mt_rand(2, 5));
        return $this;
    }
     
    /**
     * Sends email
     * 
     * @return WorkerReceiver
     */
    private function sendEmail()
    {
        /**
         * Mocking email sending time.  This will take between 1 and 3 seconds
         */
        sleep(mt_rand(1,3));
        return $this;
    }
}
As usual, we have to create a connection, derive a channel and declare a queue (the queue’s parameters have to be de same as the producer).

代碼: 選擇全部

<?php
/* ... SOME CODE HERE ... */
 
        $channel->basic_qos(
            null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
            1,      #prefetch count - prefetch window in terms of whole messages
            null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
            );
             
/* ... SOME CODE HERE ... */
In order to have worker behavior (dispatch messages among several proceses), we have to declare the Quality of Service (qos) parameters with $channel->basic_qos():

Prefetch size: no specific limit, we could have as many workers as we need
Prefetch count: how many messages to retrieve per worker before sending back an acknowledgement. This will make the worker to process 1 message at a time.
Global: null means that above settings will apply to this consumer only
Next, we will start consuming, with a key difference in the parameters. We will turn off automatic ack’s, since we will tell the RabbitMQ server when we have finished processing the message and be ready to receive a new one.

Now, how do we send that ack? Please take a look at the WorkerReceiver::process() method (which is declared as a callback method when a message is received). The calls to generatedPdf() and sendEmail() methods are just dummy methods that will simulate the time spent to accomplish both tasks. The $msg parameter not only contains the payload sent from the producer, it also contains information about the objects used by the producer. We can extract information about the channel used by the producer with $msg->delivery_info['channel'] (which is the same object type to the channel we opened for the consumer with $connection->channel();). Since we need to send the producer’s channel an acknowledgement that we have completed the process, we will use its basic_ack() method, sending as a parameter the delivery tag ($msg->delivery_info['delivery_tag']) RabbitMQ automatically generated in order to associate correctly to which message the ack belongs to.

How do we fire up the workers? Just create a file like the following, invoking the WorkerReceiver::listen() method:

代碼: 選擇全部

<?php
chdir(dirname(__DIR__));
 
require_once('vendor/autoload.php');
 
use Acme\AmqpWrapper\WorkerReceiver;
 
$worker = new WorkerReceiver();
 
$worker->listen();
Now use the php command (e.g. php worker.php or whichever name you have given to above file) to fire up the worker. But wait, the purpose was to have two or more workers, wasn’t it? No problem, fire up more workers in the same way in order to have multiple processes of the same file, and RabbitMQ will register the consumers and distribute work among them according to the QoS parameters.

Example 2: send RPC requests and expect a reply

So far, we have been sending messages to the RabbitMQ server without the user having to wait for a reply. This is ok for asynchronous processes that might take more time than the user is willing to spend just to see an ‘OK’ message. But what if we actually need a reply? Let’s say some result from a complex calculation, so we can show it to the user?

enter image description here

Let’s say we have a centralized login server (single sign on) that will work as an authentication mechanism isolated from the rest of our application(s). The only way to reach this server is through RabbitMQ. We need to implement a way to send the login credentials to this server and wait for a grant/deny access response.

We need to implement the following pattern:
enter image description here

As usual, let’s look at the producer first:

代碼: 選擇全部

<?php
namespace Acme\AmqpWrapper;
 
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
class RpcSender
{
    private $response;
 
    /**
     * @var string
     */
    private $corr_id;
 
    /* ...SOME OTHER CODE HERE... */
 
    /**
     * @param array $credentials
     * @return string
     */
    public function execute($credentials)
    {
        $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
         
        /*
         * creates an anonymous exclusive callback queue
         * $callback_queue has a value like amq.gen-_U0kJVm8helFzQk9P0z9gg
         */
        list($callback_queue, ,) = $channel->queue_declare(
            "",     #queue
            false,  #passive
            false,  #durable
            true,   #exclusive
            false   #auto delete
            );
             
        $channel->basic_consume(
            $callback_queue,            #queue
            '',                         #consumer tag
            false,                      #no local
            false,                      #no ack
            false,                      #exclusive
            false,                      #no wait
            array($this, 'onResponse')  #callback
            );
             
        $this->response = null;
         
        /*
         * $this->corr_id has a value like 53e26b393313a
         */
        $this->corr_id = uniqid();
        $jsonCredentials = json_encode($credentials);
 
        /*
         * create a message with two properties: reply_to, which is set to the 
         * callback queue and correlation_id, which is set to a unique value for 
         * every request
         */
        $msg = new AMQPMessage(
            $jsonCredentials,    #body
            array('correlation_id' => $this->corr_id, 'reply_to' => $callback_queue)    #properties
            );
             
        /*
         * The request is sent to an rpc_queue queue.
         */
        $channel->basic_publish(
            $msg,       #message 
            '',         #exchange
            'rpc_queue' #routing key
            );
         
        while(!$this->response) {
            $channel->wait();
        }
         
        $channel->close();
        $connection->close();
         
        return $this->response;
    }
 
    /**
     * When a message appears, it checks the correlation_id property. If it
     * matches the value from the request it returns the response to the
     * application.
     *
     * @param AMQPMessage $rep
     */
    public function onResponse(AMQPMessage $rep) {
        if($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }
}
Looking at RpcSender::execute method, please be aware that the $credentials parameter is an array in the form of ['username'=>'x', 'password'=>'y']. Again, we open a new connection and create a channel as usual.

代碼: 選擇全部

<?php
//...
        list($callback_queue, ,) = $channel->queue_declare(
            "",     #queue
            false,  #passive
            false,  #durable
            true,   #exclusive
            false   #auto delete
            );
//...  
The first difference comes from declaring a queue. First notice that we are using the list() construct to catch the result from $channel->queue_declare(). This is because we do not explicitly send a queue name while declaring it so we need to find out how this queue is identified. We are only interested in the first element of the result array, which will be an unique identifier of the queue (something like amq.gen-_U0kJVm8helFzQk9P0z9gg). The second change is that we need to declare this queue as exclusive, so there is no mix up in the results from other concurrent processes.

Another big change is that the producer will be a consumer of a queue too, when executing $channel->basic_consume() please notice that we are providing the $callback_queue value we got while declaring the queue. And like every consumer, we will declare a callback to execute when the process receives a response.

代碼: 選擇全部

<?php
//...
        /*
         * $this->corr_id has a value like 53e26b393313a
         */
        $this->corr_id = uniqid();
//...
Next, we have to create a correlation id for the message, this is nothing more than a unique identifier for each message. In the example we are using uniqid()’s output, but you can use whichever mechanism you prefer (as long as it does not create a race condition, does not need to be a strong, crypto-safe RNG).

代碼: 選擇全部

<?php
//...
        $msg = new AMQPMessage(
            $jsonCredentials,    #body
            array('correlation_id' => $this->corr_id, 'reply_to' => $callback_queue)    #properties
            );
//...
Now let’s create a message, which has important changes compared to what we were used to in the first 2 examples. Aside from assigning a json-encoded string containing the credentials we want to authenticate, we have to provide to the AMQPMessage constructor an array with two properties defined:

correlation_id: a tag for the message
reply_to: the queue identifier generated while declaring it
After publishing the message, we will evaluate the response, which will be empty at the beginning. While the response value remains empty, we will wait for a response from the channel with $channel->wait();.

Once we receive a response from the channel, the callback method will be invoked (RpcSender::onResponse()). This method will match the received correlation id against the one generated, and if they are the same, will set the response body, thus breaking the while loop.

What about the RPC consumer? Here it is:

代碼: 選擇全部

<?php
namespace Acme\AmqpWrapper;
 
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
class RpcReceiver
{
    /* ... SOME OTHER CODE HERE... */
 
    /**
     * Listens for incoming messages
     */
    public function listen()
    {
        $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
         
        $channel->queue_declare(
            'rpc_queue',    #queue 
            false,          #passive
            false,          #durable
            false,          #exclusive
            false           #autodelete
            );
         
        $channel->basic_qos(
            null,   #prefetch size
            1,      #prefetch count
            null    #global
            );
             
        $channel->basic_consume(
            'rpc_queue',                #queue
            '',                         #consumer tag
            false,                      #no local
            false,                      #no ack
            false,                      #exclusive
            false,                      #no wait
            array($this, 'callback')    #callback
            );
             
        while(count($channel->callbacks)) {
            $channel->wait();
        }
         
        $channel->close();
        $connection->close();
    }
 
    /**
     * Executes when a message is received.
     *
     * @param AMQPMessage $req
     */
    public function callback(AMQPMessage $req) {
         
        $credentials = json_decode($req->body);
        $authResult = $this->auth($credentials);
 
        /*
         * Creating a reply message with the same correlation id than the incoming message
         */
        $msg = new AMQPMessage(
            json_encode(array('status' => $authResult)),            #message
            array('correlation_id' => $req->get('correlation_id'))  #options
            );
     
        /*
         * Publishing to the same channel from the incoming message
         */
        $req->delivery_info['channel']->basic_publish(
            $msg,                   #message
            '',                     #exchange
            $req->get('reply_to')   #routing key
            );
             
        /*
         * Acknowledging the message
         */
        $req->delivery_info['channel']->basic_ack(
            $req->delivery_info['delivery_tag'] #delivery tag
            );
    }
 
    /**
     * @param \stdClass $credentials
     * @return bool
     */
    private function auth(\stdClass $credentials) {
        if (($credentials->username == 'admin') && ($credentials->password == 'admin')) {
            return true;
        } else {
            return false;
        }
    }
}
Same old connection and channel creation :)

Same as declaring the queue, however this queue will have a predefined name (‘rpc_queue‘). We will define the QoS parameters since we will deactivate automatic acks, so we can notify when we are finished verifying the credentials and have a result.

代碼: 選擇全部

<?php
//...
        $msg = new AMQPMessage(
            json_encode(array('status' => $authResult)),            #message
            array('correlation_id' => $req->get('correlation_id'))  #options
            );
//...
The magic comes from within the declared callback. Once we are done authenticating the credentials (yes, I know the process is done against static username/password values, this tutorial is not about how to authenticate credentials ;) ), we have to create the return message with the same correlation id the producer created. We can extract this from the request message with $req->get('correlation_id'), passing this value the same way we did it in the producer.

代碼: 選擇全部

<?php
//...
        $req->delivery_info['channel']->basic_publish(
            $msg,                   #message
            '',                     #exchange
            $req->get('reply_to')   #routing key
            );
//...
Now we have to publish this message to the same queue that was created in the producer (the one with the ‘random’ name). We extract the queue’s name with $req->get('reply_to') and use it as the routing key in basic_publish().

Once we published the message, we have to send the ack notice to the channel with $req->delivery_info['channel']->basic_ack(), using the delivery tag in $req->delivery_info['delivery_tag'] so the producer can stop waiting.

Again, fire up a listening process and you are ready to go. You can even combine examples 2 and 3 to have a multi-worker rpc process to perform the authentication requests than can be scaled just by firing up several workers.

There is far more to be said about RabbitMQ and AMQP, like virtual hosts, exchange types, server administration, etc… you can find more application patterns (like routing, topics) here and at the documentation page. There is also a command line tool to manage RabbitMQ, as well as a web based interface.

If you liked this tutorial series and would like to see more about MQ and more real world use cases, please let us know in the comments below!
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ 進階範例-2

文章 yehlu »

http://www.binpress.com/tutorial/gettin ... in-php/164

Getting Started with RabbitMQ in PHP

ADD COMMENT WAWern Ancheta Oct 28, 2014
PHPPhpRabbitmq
In this tutorial I'll walk you through using message broker software RabbitMQ with PHP. It acts as a middleman between a producer and a consumer. A producer being the data that we want to pass, and consumer being the entity that we want to pass it to. RabbitMQ uses a queue, which you can think of as a mailbox where you drop your letters. RabbitMQ then takes the letters and delivers them to their destination.

Installing RabbitMQ
In Ubuntu and other Debian-based operating systems you can install RabbitMQ by executing the following commands from your terminal:

echo "deb http://www.rabbitmq.com/debian/ testing main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list > /dev/null
sudo wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo apt-key add rabbitmq-signing-key-public.asc
sudo apt-get update
sudo apt-get install rabbitmq-server -y
sudo service rabbitmq-server start
sudo rabbitmq-plugins enable rabbitmq_management
sudo service rabbitmq-server restart
The first command appends the RabbitMQ source to the software sources list. Next, we download the RabbitMQ signing key using wget and add it to Ubuntu. Then we call apt-get update to update the software sources list. Afterwards, we install the RabbitMQ server, start it and enable its management plugin. This provides an HTTP-based API management for monitoring your RabbitMQ server. Finally, we restart the RabbitMQ server so that changes will take effect.

The default username and password is guest. And the default port in which it runs is 5672.

If you're on another operating system, you can find how to install RabbitMQ for your specific operating system here: Downloading and Installing RabbitMQ.

Working with RabbitMQ
Once you're done installing RabbitMQ, we can now install the AMQP library for PHP, which implements the Advanced Messaging Queue Protocol. Start by creating a new directory, which is where we'll put all the files for testing RabbitMQ. Next, create a composer.json file and add the following:

{
"require": {
"videlalvaro/php-amqplib": "2.2.*"
}
}
Open up your terminal and cd into the directory you created earlier, then execute composer install to install the AMQP library.

Before we move on, lets also install Swiftmailer. You can do that by executing the following command from your terminal. This also adds an entry to Swiftmailer to your composer.json:

composer require swiftmailer/swiftmailer @stable
If you haven't figured it out by now, we'll be using Swiftmailer for the sample app we're creating, which will send emails to our imaginary users. Normally, emails takes a few seconds before they're sent. Adding an attachment to an email also adds to that time. In the real world we don't really want our users to wait. What we want to do is make them believe that we've already sent the email by showing a message that it's been sent.

This is where RabbitMQ comes in. We'll use it as some sort of a mailbox that multiple users can just drop their messages in. RabbitMQ will then take care of sending the messages in the background.

Sending Messages
First let's create the form to be used for sending emails. It'll accept the name and email address of the sender, the email address of the receiver and then the subject and message. Name the file form.php:

<?php
if(!empty($_GET['sent'])){
?>
<div>
Your message was sent!
</div>
<?php
}
?>
<form action="mailer.php" method="POST">
<div>
<label for="from">From</label>
<input type="text" name="from" id="from">
</div>
<div>
<label for="from_email">From Email</label>
<input type="text" name="from_email" id="from_email">
</div>
<div>
<label for="to_email">To Email</label>
<input type="text" name="to_email" id="to_email">
</div>
<div>
<label for="subject">Subject</label>
<input type="text" name="subject" id="subject">
</div>
<div>
<label for="message">Message</label>
<textarea name="message" id="message" cols="30" rows="10"></textarea>
</div>
<div>
<button type="submit">Send</button>
</div>
</form>
Next, create the file that will push the message into the queue, and name it file sender.php.

Require the autoload.php file so that our dependencies will be automatically loaded by PHP. Then use the AMQPConnection and AMQPMessage from the AMQP library. AMQPConnection allows us to create a new connection to the RabbitMQ server and AMQPMessage allows us to create messages that we can push to the queue.

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('email_queue', false, false, false, false);

$data = json_encode($_POST);

$msg = new AMQPMessage($data, array('delivery_mode' => 2));
$channel->basic_publish($msg, '', 'email_queue');

header('Location: form.php?sent=true');
Breaking it down, first we create a fresh connection by creating a new instance of the AMQPConnection class. This requires the following arguments:

host - The host that the RabbitMQ server is running on. In this case we've installed RabbitMQ on the same computer we're running the script in. So it should be localhost. Note that in the real world we'll install RabbitMQ on another server, different from the one were using to serve our websites. So instead of localhost we'd use the public IP address of that server.
port - The port that the RabbitMQ server is running on.
user - The username for logging into the server. By default, the username is set to guest.
password - The password for logging into the server. By default, the password is set to guest.
Now, we'll create a channel. We can do that by calling the channel() method from the connection that we've just declared.

$channel = $connection->channel();
Then we'll declare the queue to be used by calling the queue_declare method.

$channel->queue_declare('email_queue', false, false, false, false);
The queue_declare method takes up the following arguments:

queue name - A name that you want to use for the queue. You can supply anything for this.
passive - A boolean value for specifying whether to check for an existing exchange.
durable - A boolean value for specifying whether the RabbitMQ holds on to a queue when the server crashes.
exclusive - A boolean value for specifying whether the queue is used by only one connection.
auto-delete - A boolean value for specifying whether the queue is deleted when the last subscriber unsubscribes.
Next we convert the POST data that we receive from the form to a JSON string. We can only pass strings as a message so we'll have to convert this later on into an array on the receiver's end.

$data = json_encode($_POST);
Next we create a new message. This accepts 2 arguments: the data and an array of options. For the array of options we specify the delivery_mode to 2 which means that the message is persistent. This means that it isn't lost when the server crashes or an error occurs.

$msg = new AMQPMessage($data, array('delivery_mode' => 2));
Next we'll publish the message by calling the basic_publish() method on the channel. This accepts 3 arguments: the message, the exchange and the name of the queue. If you're wondering why we set the value of exchange to an empty string, that's because we don't really need it. The exchange is commonly used for pub-sub patterns. What were using here is just basic publish.

$channel->basic_publish($msg, '', 'email_queue');
Finally, we just redirect the user to the form.

header('Location: form.php?sent=true');
Receiving Messages
Now we're ready to write the code that will receive the messages sent by users. Name the file receiver.php. Here're the full contents of the file:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('email_queue', false, false, false, false);

echo ' * Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){

echo " * Message received", "\n";
$data = json_decode($msg->body, true);

$from = $data['from'];
$from_email = $data['from_email'];
$to_email = $data['to_email'];
$subject = $data['subject'];
$message = $data['message'];

$transporter = Swift_SmtpTransport::newInstance('smtp.gmail.com', 465, 'ssl')
->setUsername('YOUR_GMAIL_EMAIL')
->setPassword('YOUR_GMAIL_PASSWORD');

$mailer = Swift_Mailer::newInstance($transporter);

$message = Swift_Message::newInstance($transporter)
->setSubject($subject)
->setFrom(array($from_email => $from))
->setTo(array($to_email))
->setBody($message);

$mailer->send($message);

echo " * Message was sent", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('email_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}
Breaking it down, the first 5 lines of code are basically the same as the one we have on the sender.php file. Then we just output a message saying how we can stop the file from running. We need to run this file from the terminal, so to stop it we just hit CTRL + C.

Now we'll declare a named function used for processing the message that we passed from the sender. The first thing it does is output that the message was received. Then we use json_decode() to convert the JSON string back to an array.

$callback = function($msg){
echo " * Message received", "\n";
$data = json_decode($msg->body, true);
};
Next we'll extract the data and assign them to each of their own variables:

$from = $data['from'];
$from_email = $data['from_email'];
$to_email = $data['to_email'];
$subject = $data['subject'];
$message = $data['message'];
Then we declare a new transporter to be used by Swiftmailer, which allows us to use a gmail account for sending emails. Declaring a new instance accepts 3 arguments: the host, port and the encryption. Then we set the username and password.

$transporter = Swift_SmtpTransport::newInstance('smtp.gmail.com', 465, 'ssl')
->setUsername('YOUR_GMAIL_EMAIL')
->setPassword('YOUR_GMAIL_PASSWORD');
Our next order of business is declaring a new mailer instance, and supplying the transporter as an argument.

$mailer = Swift_Mailer::newInstance($transporter);
Then we'll create a new message, which also takes up the transporter as its argument. We then set the subject, from field, to field, and body of the message.

$message = Swift_Message::newInstance($transporter)
->setSubject($subject)
->setFrom(array($from_email => $from))
->setTo(array($to_email))
->setBody($message);
Finally, we send the message and output that the message was sent. The last line basically tells RabbitMQ that the sending of the message has indeed been successful.

$mailer->send($message);
echo " * Message was sent", "\n";

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
Running the program
You can now run the receiver by going to your terminal and executing the following command:

php receiver.php
Once it's running, go to your browser and access the sender.php file. Enter the details of your message and click on send. You're instantly greeted by a 'Your message was sent!' text, but if you immediately check your email account its not there yet. If it's not there then the queue is still processing it. Check the output displayed on the terminal window where you executed the receiver. You should see a 'Message was sent' output if the email was already sent.

Conclusion
RabbitMQ is a nice way for implementing messaging applications such as the one we created in this tutorial. We've barely scratch the surface of what's possible with RabbitMQ with this tutorial. I recommend that you check out the getting started guides and the documentation if you'd like to dive deeper.
yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ

文章 yehlu »

yehlu
Site Admin
文章: 3245
註冊時間: 2004-04-15 17:20:21
來自: CodeCharge Support Engineer

Re: php & RabbitMQ

文章 yehlu »

回覆文章

回到「PHP」