php and python 初探 rabbitmq

  • 本章使用 docker 部署 rabbitmq, 并且使用php 以及python 進(jìn)行連接和通訊

  1. 使用docker 部署 rabbitmq
    部署rabbitmq
    首先我們調(diào)查得知rabbitmq的端口號(hào)都有哪些
4369 -- erlang發(fā)現(xiàn)口

5672 --client端通信口

15672 -- 管理界面ui端口

25672 -- server間內(nèi)部通信口

生成docker 容器

docker pull rabbitmq #不帶ui管理界面
docker run -d --name my-rabbitmq  -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq

可以通過(guò)15672這個(gè)端口訪問(wèn)ui管理,賬號(hào)密碼默認(rèn)為guest。
修改默認(rèn)帳密:
RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS

docker pull rabbitmq-management #有ui管理界面
docker run -d  docker run -d --name my-rabbitmq-management  -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq:management

進(jìn)入docker容器管理

docker exec -it my-rabbitmq bash

  1. 編寫代碼連接測(cè)試

php 代碼

在 php代碼編寫前需要 composer 引入 rabbitmq 的依賴
composer.json 代碼如下

{
    "require": {
        "php-amqplib/php-amqplib": ">=2.6.1"
    }
}
send 代碼 send.php :
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/vendor/autoload.php';

$conn_args = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
];

$conn = new AMQPStreamConnection($conn_args['host'], $conn_args['port'], $conn_args['login'], $conn_args['password']);

$channel = $conn->channel();
$channel->queue_declare('hello', false, false, false, false);

$msgData = new amqpData('zpc', '20');
$msg = new AMQPMessage($msgData->getData()); // 注意這里如果想要傳輸對(duì)象的話需要進(jìn)行序列化

//$msg = new AMQPMessage('test-msg-01');
$channel->basic_publish($msg, '', 'hello');

echo "[x] 'Sent Hello world!' \n";

$channel->close();
$conn->close();



class amqpData
{
    public $name;
    public $age;

    public function amqpData($name, $age)
    {
        $this->name = $name;
        $this->age = $age;
    }

    public function getData()
    {
        return json_encode($this);
    }
}
receive 代碼 receive.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
//    echo "'[X] Received' {$msg->body} \n";

    $body = $msg->body;
    // 如果傳輸?shù)氖切蛄谢瘮?shù)據(jù)的話這里進(jìn)行 反序列化
    $data = json_decode($body);

    echo '[X] Received ' . $data->name;  echo "\n";
};

$channel->basic_consume('hello', '',false,true,false,false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
測(cè)試
php send.php  

[x] 'Sent Hello world!'

此時(shí)前往容器去查看 rabbitmq 的隊(duì)列信息你會(huì)發(fā)現(xiàn)這樣的信息,說(shuō)明hello這個(gè)隊(duì)列里面已經(jīng)進(jìn)去了一條數(shù)據(jù)

cd /sbin/
rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello   1

開(kāi)始接收

php receive.php  

[*] Waiting for messages. To exit press CTRL+C
[X] Received zpc
至此php代碼測(cè)試成功

python 代碼測(cè)試

首先先pip 安裝庫(kù)

pip install pika

由于 php 的發(fā)送rabbitmq 的代碼已經(jīng)寫好了,就直接使用 send.php 進(jìn)行測(cè)試,代碼大同小異,只是 python 的簡(jiǎn)潔性真的很棒

receive 代碼 receive.py:
import pika,json

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))

channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print ("[x] Received ")
    # print (body)
    
    data = json.loads(str(body, 'utf-8'))
    print (data['name'])

channel.basic_consume(callback, queue="hello", no_ack=True)

print ('[*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

至此 python 和 php 的代碼測(cè)試完成了


可以同時(shí)開(kāi)啟多個(gè)消費(fèi)者來(lái)接收隊(duì)列內(nèi)容,

消息確認(rèn)

執(zhí)行任務(wù)可能需要幾秒鐘。您可能想知道如果其中一個(gè)消費(fèi)者開(kāi)始執(zhí)行長(zhǎng)任務(wù)并且僅在部分完成時(shí)死亡會(huì)發(fā)生什么。使用我們當(dāng)前的代碼,一旦RabbitMQ向客戶發(fā)送消息,它立即將其標(biāo)記為刪除。在這種情況下,如果你殺死一個(gè)工人,我們將丟失它剛剛處理的消息。我們還將丟失分發(fā)給這個(gè)特定工作者但尚未處理的所有消息。

但我們不想失去任何任務(wù)。如果工人死亡,我們希望將任務(wù)交付給另一名工人。

為了確保消息永不丟失,RabbitMQ支持 消息確認(rèn)。消費(fèi)者發(fā)回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。

如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送確認(rèn),RabbitMQ將理解消息未完全處理并將重新排隊(duì)。如果其他消費(fèi)者同時(shí)在線,則會(huì)迅速將其重新發(fā)送給其他消費(fèi)者。這樣你就可以確保沒(méi)有消息丟失,即使工人偶爾會(huì)死亡。

沒(méi)有任何消息超時(shí); 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新發(fā)送消息。即使處理消息需要非常長(zhǎng)的時(shí)間,也沒(méi)關(guān)系。

默認(rèn)情況下,消息確認(rèn)已關(guān)閉?,F(xiàn)在是時(shí)候通過(guò)設(shè)置第四個(gè)參數(shù)來(lái)打開(kāi)它們basic_consume (true表示沒(méi)有ACK),并從工作人員發(fā)送適當(dāng)?shù)拇_認(rèn),

修改我們的receive.php代碼

<?php

$callback = function ($msg) {
    ...
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

//$channel->basic_consume('hello', '',false,true,false,false, $callback);
$channel->basic_consume('hello', '',false,false,false,false, $callback);

使用此代碼,我們可以確定即使您在處理消息時(shí)使用CTRL + C殺死一名工作人員,也不會(huì)丟失任何內(nèi)容。工人死后不久,所有未經(jīng)確認(rèn)的消息將被重新傳遞。

確認(rèn)必須在收到的交付的同一信道上發(fā)送。嘗試使用不同的通道進(jìn)行確認(rèn)將導(dǎo)致通道級(jí)協(xié)議異常。有關(guān)確認(rèn)文檔指南,請(qǐng)參閱了解更多信息。

被遺忘的確認(rèn)
錯(cuò)過(guò)ack是一個(gè)常見(jiàn)的錯(cuò)誤。這是一個(gè)簡(jiǎn)單的錯(cuò)誤,但后果是嚴(yán)重的。當(dāng)您的客戶端退出時(shí),消息將被重新傳遞(這可能看起來(lái)像隨機(jī)重新傳遞),但RabbitMQ將會(huì)占用越來(lái)越多的內(nèi)存,因?yàn)樗鼰o(wú)法釋放任何未經(jīng)處理的消息。

為了調(diào)試這種錯(cuò)誤,您可以使用rabbitmqctl 來(lái)打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,刪除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我們已經(jīng)學(xué)會(huì)了如何確保即使消費(fèi)者死亡,任務(wù)也不會(huì)丟失。但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會(huì)丟失。

當(dāng)RabbitMQ退出或崩潰時(shí),它將忘記隊(duì)列和消息,除非你告訴它不要。確保消息不會(huì)丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久。

首先,我們需要確保RabbitMQ永遠(yuǎn)不會(huì)丟失我們的隊(duì)列。為此,我們需要聲明它是持久的。為此,我們將第三個(gè)參數(shù)傳遞給queue_declare為true:

$channel->queue_declare('hello2', false, true, false, false);

此時(shí)我們確信即使RabbitMQ重新啟動(dòng),task_queue隊(duì)列也不會(huì)丟失?,F(xiàn)在我們需要將消息標(biāo)記為持久性 - 通過(guò)設(shè)置delivery_mode = 2消息屬性,AMQPMessage將其作為屬性數(shù)組的一部分。

$msg = new AMQPMessage('test-msg' . $i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

有關(guān)消息持久性的注釋

將消息標(biāo)記為持久性并不能完全保證消息不會(huì)丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當(dāng)RabbitMQ接受消息并且尚未保存消息時(shí),仍然有一個(gè)短時(shí)間窗口。此外,RabbitMQ不會(huì)為每條消息執(zhí)行fsync(2) - 它可能只是保存到緩存而不是真正寫入磁盤。持久性保證不強(qiáng),但對(duì)于我們簡(jiǎn)單的任務(wù)隊(duì)列來(lái)說(shuō)已經(jīng)足夠了。如果您需要更強(qiáng)的保證,那么您可以使用 發(fā)布者確認(rèn)

公平派遣

您可能已經(jīng)注意到調(diào)度仍然無(wú)法完全按照我們的意愿運(yùn)行。例如,在有兩個(gè)工人的情況下,當(dāng)所有奇怪的消息都很重,甚至消息很輕時(shí),一個(gè)工人將經(jīng)常忙碌而另一個(gè)工作人員幾乎不會(huì)做任何工作。好吧,RabbitMQ對(duì)此一無(wú)所知,仍然會(huì)均勻地發(fā)送消息。

發(fā)生這種情況是因?yàn)镽abbitMQ只是在消息進(jìn)入隊(duì)列時(shí)調(diào)度消息。它不會(huì)查看消費(fèi)者未確認(rèn)消息的數(shù)量。它只是盲目地向第n個(gè)消費(fèi)者發(fā)送每個(gè)第n個(gè)消息。
為了打敗我們可以使用basic_qos方法和 prefetch_count = 1設(shè)置。這告訴RabbitMQ不要一次向一個(gè)worker發(fā)送一條消息?;蛘撸瑩Q句話說(shuō),在處理并確認(rèn)前一個(gè)消息之前,不要向工作人員發(fā)送新消息。相反,它會(huì)將它發(fā)送給下一個(gè)仍然不忙的工人。

#receive.php
$channel->basic_qos(null, 1, null);

參考資料:
rabbitmq官方的php 代碼實(shí)例:http://www.rabbitmq.com/tutorials/tutorial-one-php.html
rabbitmq官方的python代碼實(shí)例:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
rabbitmq 的 docker 倉(cāng)庫(kù): https://hub.docker.com/_/rabbitmq/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Awesome PHP 一個(gè)PHP資源列表,內(nèi)容包括:庫(kù)、框架、模板、安全、代碼分析、日志、第三方庫(kù)、配置工具、W...
    guanguans閱讀 6,115評(píng)論 0 47
  • Composer Repositories Composer源 Firegento - Magento模塊Comp...
    零一間閱讀 4,017評(píng)論 1 66
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,533評(píng)論 19 139
  • awesome-php 收集整理一些常用的PHP類庫(kù), 資源以及技巧. 以便在工作中迅速的查找所需... 這個(gè)列表...
    guanguans閱讀 4,759評(píng)論 0 34
  • 每天早晨,南營(yíng)小區(qū)門口,買賣各類蔬菜,水果,水產(chǎn)品的人,烏泱泱一大片。兒子太小,一般買菜什么的,都去超市,干凈不說(shuō)...
    風(fēng)起云湮閱讀 613評(píng)論 7 5

友情鏈接更多精彩內(nèi)容