- 本章使用 docker 部署 rabbitmq, 并且使用php 以及python 進(jìn)行連接和通訊
- 使用docker 部署 rabbitmq
部署rabbitmq
首先我們調(diào)查得知rabbitmq的端口號(hào)都有哪些
4369 -- erlang發(fā)現(xiàn)口
5672 --client端通信口
15672 -- 管理界面ui端口
25672 -- server間內(nèi)部通信口
生成docker 容器
docker pull rabbitmq #不帶ui管理界面
docker run -d --name my-rabbitmq -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq
可以通過(guò)15672這個(gè)端口訪問(wèn)ui管理,賬號(hào)密碼默認(rèn)為guest。
修改默認(rèn)帳密:
RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS
docker pull rabbitmq-management #有ui管理界面
docker run -d docker run -d --name my-rabbitmq-management -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq:management
進(jìn)入docker容器管理
docker exec -it my-rabbitmq bash
- 編寫代碼連接測(cè)試
php 代碼
在 php代碼編寫前需要 composer 引入 rabbitmq 的依賴
composer.json 代碼如下
{
"require": {
"php-amqplib/php-amqplib": ">=2.6.1"
}
}
send 代碼 send.php :
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require_once __DIR__ . '/vendor/autoload.php';
$conn_args = [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
];
$conn = new AMQPStreamConnection($conn_args['host'], $conn_args['port'], $conn_args['login'], $conn_args['password']);
$channel = $conn->channel();
$channel->queue_declare('hello', false, false, false, false);
$msgData = new amqpData('zpc', '20');
$msg = new AMQPMessage($msgData->getData()); // 注意這里如果想要傳輸對(duì)象的話需要進(jìn)行序列化
//$msg = new AMQPMessage('test-msg-01');
$channel->basic_publish($msg, '', 'hello');
echo "[x] 'Sent Hello world!' \n";
$channel->close();
$conn->close();
class amqpData
{
public $name;
public $age;
public function amqpData($name, $age)
{
$this->name = $name;
$this->age = $age;
}
public function getData()
{
return json_encode($this);
}
}
receive 代碼 receive.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
// echo "'[X] Received' {$msg->body} \n";
$body = $msg->body;
// 如果傳輸?shù)氖切蛄谢瘮?shù)據(jù)的話這里進(jìn)行 反序列化
$data = json_decode($body);
echo '[X] Received ' . $data->name; echo "\n";
};
$channel->basic_consume('hello', '',false,true,false,false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
測(cè)試
php send.php
[x] 'Sent Hello world!'
此時(shí)前往容器去查看 rabbitmq 的隊(duì)列信息你會(huì)發(fā)現(xiàn)這樣的信息,說(shuō)明hello這個(gè)隊(duì)列里面已經(jīng)進(jìn)去了一條數(shù)據(jù)
cd /sbin/
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello 1
開(kāi)始接收
php receive.php
[*] Waiting for messages. To exit press CTRL+C
[X] Received zpc
至此php代碼測(cè)試成功
python 代碼測(cè)試
首先先pip 安裝庫(kù)
pip install pika
由于 php 的發(fā)送rabbitmq 的代碼已經(jīng)寫好了,就直接使用 send.php 進(jìn)行測(cè)試,代碼大同小異,只是 python 的簡(jiǎn)潔性真的很棒
receive 代碼 receive.py:
import pika,json
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print ("[x] Received ")
# print (body)
data = json.loads(str(body, 'utf-8'))
print (data['name'])
channel.basic_consume(callback, queue="hello", no_ack=True)
print ('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
至此 python 和 php 的代碼測(cè)試完成了
可以同時(shí)開(kāi)啟多個(gè)消費(fèi)者來(lái)接收隊(duì)列內(nèi)容,
消息確認(rèn)
執(zhí)行任務(wù)可能需要幾秒鐘。您可能想知道如果其中一個(gè)消費(fèi)者開(kāi)始執(zhí)行長(zhǎng)任務(wù)并且僅在部分完成時(shí)死亡會(huì)發(fā)生什么。使用我們當(dāng)前的代碼,一旦RabbitMQ向客戶發(fā)送消息,它立即將其標(biāo)記為刪除。在這種情況下,如果你殺死一個(gè)工人,我們將丟失它剛剛處理的消息。我們還將丟失分發(fā)給這個(gè)特定工作者但尚未處理的所有消息。
但我們不想失去任何任務(wù)。如果工人死亡,我們希望將任務(wù)交付給另一名工人。
為了確保消息永不丟失,RabbitMQ支持 消息確認(rèn)。消費(fèi)者發(fā)回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。
如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送確認(rèn),RabbitMQ將理解消息未完全處理并將重新排隊(duì)。如果其他消費(fèi)者同時(shí)在線,則會(huì)迅速將其重新發(fā)送給其他消費(fèi)者。這樣你就可以確保沒(méi)有消息丟失,即使工人偶爾會(huì)死亡。
沒(méi)有任何消息超時(shí); 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新發(fā)送消息。即使處理消息需要非常長(zhǎng)的時(shí)間,也沒(méi)關(guān)系。
默認(rèn)情況下,消息確認(rèn)已關(guān)閉?,F(xiàn)在是時(shí)候通過(guò)設(shè)置第四個(gè)參數(shù)來(lái)打開(kāi)它們basic_consume (true表示沒(méi)有ACK),并從工作人員發(fā)送適當(dāng)?shù)拇_認(rèn),
修改我們的receive.php代碼
<?php
$callback = function ($msg) {
...
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//$channel->basic_consume('hello', '',false,true,false,false, $callback);
$channel->basic_consume('hello', '',false,false,false,false, $callback);
使用此代碼,我們可以確定即使您在處理消息時(shí)使用CTRL + C殺死一名工作人員,也不會(huì)丟失任何內(nèi)容。工人死后不久,所有未經(jīng)確認(rèn)的消息將被重新傳遞。
確認(rèn)必須在收到的交付的同一信道上發(fā)送。嘗試使用不同的通道進(jìn)行確認(rèn)將導(dǎo)致通道級(jí)協(xié)議異常。有關(guān)確認(rèn)的文檔指南,請(qǐng)參閱了解更多信息。
被遺忘的確認(rèn)
錯(cuò)過(guò)ack是一個(gè)常見(jiàn)的錯(cuò)誤。這是一個(gè)簡(jiǎn)單的錯(cuò)誤,但后果是嚴(yán)重的。當(dāng)您的客戶端退出時(shí),消息將被重新傳遞(這可能看起來(lái)像隨機(jī)重新傳遞),但RabbitMQ將會(huì)占用越來(lái)越多的內(nèi)存,因?yàn)樗鼰o(wú)法釋放任何未經(jīng)處理的消息。
為了調(diào)試這種錯(cuò)誤,您可以使用rabbitmqctl 來(lái)打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,刪除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我們已經(jīng)學(xué)會(huì)了如何確保即使消費(fèi)者死亡,任務(wù)也不會(huì)丟失。但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會(huì)丟失。
當(dāng)RabbitMQ退出或崩潰時(shí),它將忘記隊(duì)列和消息,除非你告訴它不要。確保消息不會(huì)丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久。
首先,我們需要確保RabbitMQ永遠(yuǎn)不會(huì)丟失我們的隊(duì)列。為此,我們需要聲明它是持久的。為此,我們將第三個(gè)參數(shù)傳遞給queue_declare為true:
$channel->queue_declare('hello2', false, true, false, false);
此時(shí)我們確信即使RabbitMQ重新啟動(dòng),task_queue隊(duì)列也不會(huì)丟失?,F(xiàn)在我們需要將消息標(biāo)記為持久性 - 通過(guò)設(shè)置delivery_mode = 2消息屬性,AMQPMessage將其作為屬性數(shù)組的一部分。
$msg = new AMQPMessage('test-msg' . $i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
有關(guān)消息持久性的注釋
將消息標(biāo)記為持久性并不能完全保證消息不會(huì)丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當(dāng)RabbitMQ接受消息并且尚未保存消息時(shí),仍然有一個(gè)短時(shí)間窗口。此外,RabbitMQ不會(huì)為每條消息執(zhí)行fsync(2) - 它可能只是保存到緩存而不是真正寫入磁盤。持久性保證不強(qiáng),但對(duì)于我們簡(jiǎn)單的任務(wù)隊(duì)列來(lái)說(shuō)已經(jīng)足夠了。如果您需要更強(qiáng)的保證,那么您可以使用 發(fā)布者確認(rèn)。
公平派遣
您可能已經(jīng)注意到調(diào)度仍然無(wú)法完全按照我們的意愿運(yùn)行。例如,在有兩個(gè)工人的情況下,當(dāng)所有奇怪的消息都很重,甚至消息很輕時(shí),一個(gè)工人將經(jīng)常忙碌而另一個(gè)工作人員幾乎不會(huì)做任何工作。好吧,RabbitMQ對(duì)此一無(wú)所知,仍然會(huì)均勻地發(fā)送消息。
發(fā)生這種情況是因?yàn)镽abbitMQ只是在消息進(jìn)入隊(duì)列時(shí)調(diào)度消息。它不會(huì)查看消費(fèi)者未確認(rèn)消息的數(shù)量。它只是盲目地向第n個(gè)消費(fèi)者發(fā)送每個(gè)第n個(gè)消息。
為了打敗我們可以使用basic_qos方法和 prefetch_count = 1設(shè)置。這告訴RabbitMQ不要一次向一個(gè)worker發(fā)送一條消息?;蛘撸瑩Q句話說(shuō),在處理并確認(rèn)前一個(gè)消息之前,不要向工作人員發(fā)送新消息。相反,它會(huì)將它發(fā)送給下一個(gè)仍然不忙的工人。
#receive.php
$channel->basic_qos(null, 1, null);
參考資料:
rabbitmq官方的php 代碼實(shí)例:http://www.rabbitmq.com/tutorials/tutorial-one-php.html
rabbitmq官方的python代碼實(shí)例:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
rabbitmq 的 docker 倉(cāng)庫(kù): https://hub.docker.com/_/rabbitmq/