ZeroMQ是一個(gè)輕量級(jí)消息通信庫,擴(kuò)展傳統(tǒng)的標(biāo)準(zhǔn)socket接口。提供了異步消息隊(duì)列的抽象,能夠?qū)崿F(xiàn)消息過濾,能夠無縫對(duì)接多種傳輸協(xié)議。其中MQ是消息隊(duì)列(Message Queue)的縮寫。其官方介紹如下:
ZeroMQ (also known as ?MQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.
ZMQ支持常見的三種基本通信模式,分別是Pub/Sub、Req/Rep、Push/Pull。比較重要的類型有:zmq::context_t,zmq::socket_t和zmq::message_t。比較重要的方法則包括:socket_t::send(message_t),socket_t::recv(&message_t)和socket_t::setsockopt()等。
Req/Rep

服務(wù)端(響應(yīng)端ZMQ_REP):
//
// Hello World server in C++
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
#define sleep(n) Sleep(n)
#endif
int main () {
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");
while (true) {
zmq::message_t request;
// Wait for next request from client
socket.recv (&request);
std::cout << "Received Hello" << std::endl;
// Do some 'work'
sleep(1);
// Send reply back to client
zmq::message_t reply (5);
memcpy (reply.data (), "World", 5);
socket.send (reply);
}
return 0;
}
客戶端(請(qǐng)求端ZMQ_REQ):
//
// Hello World client in C++
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
//
#include <zmq.hpp>
#include <string>
#include <iostream>
int main ()
{
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REQ);
std::cout << "Connecting to hello world server…" << std::endl;
socket.connect ("tcp://localhost:5555");
// Do 10 requests, waiting each time for a response
for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
zmq::message_t request (5);
memcpy (request.data (), "Hello", 5);
std::cout << "Sending Hello " << request_nbr << "…" << std::endl;
socket.send (request);
// Get the reply.
zmq::message_t reply;
socket.recv (&reply);
std::cout << "Received World " << request_nbr << std::endl;
}
return 0;
}
對(duì)于字符串,需要注意的一點(diǎn)是,ZMQ不會(huì)關(guān)心發(fā)送消息的內(nèi)容,只要知道它所包含的字節(jié)數(shù)。這意味著,ZMQ的字符串是有長度的,且傳送時(shí)不加結(jié)束符。當(dāng)使用c語言接收時(shí),應(yīng)注意申請(qǐng)比長度多一個(gè)字節(jié)的存儲(chǔ)空間,并置位結(jié)束符’/0’,否則在打印字符串時(shí)可能得到奇怪的結(jié)果。
Pub/Sub

服務(wù)端(發(fā)布端
ZMQ_PUB):
// Weather update server in C++
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#if (defined (WIN32))
#include <zhelpers.hpp>
#endif
#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
int main () {
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
publisher.bind("ipc://weather.ipc"); // Not usable on Windows.
// Initialize random number generator
srandom ((unsigned) time (NULL));
while (1) {
int zipcode, temperature, relhumidity;
// Get values that will fool the boss
zipcode = within (100000);
temperature = within (215) - 80;
relhumidity = within (50) + 10;
// Send message to all subscribers
zmq::message_t message(20);
snprintf ((char *) message.data(), 20 ,
"%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(message);
}
return 0;
}
客戶端(訂閱端ZMQ_SUB):
// Weather update client in C++
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <iostream>
#include <sstream>
int main (int argc, char *argv[])
{
zmq::context_t context (1);
// Socket to talk to server
std::cout << "Collecting updates from weather server…\n" << std::endl;
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
const char *filter = (argc > 1)? argv [1]: "10001 ";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
zmq::message_t update;
int zipcode, temperature, relhumidity;
subscriber.recv(&update);
std::istringstream iss(static_cast<char*>(update.data()));
iss >> zipcode >> temperature >> relhumidity ;
total_temp += temperature;
}
std::cout << "Average temperature for zipcode '"<< filter
<<"' was "<<(int) (total_temp / update_nbr) <<"F"
<< std::endl;
return 0;
}
需要注意的是,在使用SUB套接字時(shí),必須使用setsockopt()方法來設(shè)置訂閱的內(nèi)容。該方法可以用于設(shè)置消息過濾器,但如果你不設(shè)置訂閱內(nèi)容,那將什么消息都收不到。訂閱信息可以是任何字符串,可以設(shè)置多次。只要消息滿足其中一條訂閱信息,SUB套接字就會(huì)收到。訂閱者可以選擇不接收某類消息,也是通過setsockopt()方法實(shí)現(xiàn)的。
Push/Pull
其他參考
https://github.com/booksbyus/zguide/tree/master/examples/C%2B%2B