ZeroMQ快速入門

本文主要參考guotianqing的CSDN博客:ZeroMQ基礎(chǔ)入門官方文檔

ZeroMQ是一個(gè)輕量級(jí)消息通信庫,擴(kuò)展傳統(tǒng)的標(biāo)準(zhǔn)socket接口。提供了異步消息隊(duì)列的抽象,能夠?qū)崿F(xiàn)消息過濾,能夠無縫對(duì)接多種傳輸協(xié)議。其中MQ是消息隊(duì)列(Message Queue)的縮寫。其官方介紹如下:

ZeroMQ (also known as ?MQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.

ZMQ支持常見的三種基本通信模式,分別是Pub/Sub、Req/Rep、Push/Pull。比較重要的類型有:zmq::context_t,zmq::socket_tzmq::message_t。比較重要的方法則包括:socket_t::send(message_t),socket_t::recv(&message_t)socket_t::setsockopt()等。

Req/Rep

Request-Reply

服務(wù)端(響應(yīng)端ZMQ_REP):

//
//  Hello World server in C++
//  Binds REP socket to tcp://*:5555
//  Expects "Hello" from client, replies with "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>

#define sleep(n)    Sleep(n)
#endif

int main () {
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        //  Wait for next request from client
        socket.recv (&request);
        std::cout << "Received Hello" << std::endl;

        //  Do some 'work'
        sleep(1);

        //  Send reply back to client
        zmq::message_t reply (5);
        memcpy (reply.data (), "World", 5);
        socket.send (reply);
    }
    return 0;
}

客戶端(請(qǐng)求端ZMQ_REQ):

//
//  Hello World client in C++
//  Connects REQ socket to tcp://localhost:5555
//  Sends "Hello" to server, expects "World" back
//
#include <zmq.hpp>
#include <string>
#include <iostream>

int main ()
{
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REQ);

    std::cout << "Connecting to hello world server…" << std::endl;
    socket.connect ("tcp://localhost:5555");

    //  Do 10 requests, waiting each time for a response
    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Sending Hello " << request_nbr << "…" << std::endl;
        socket.send (request);

        //  Get the reply.
        zmq::message_t reply;
        socket.recv (&reply);
        std::cout << "Received World " << request_nbr << std::endl;
    }
    return 0;
}

對(duì)于字符串,需要注意的一點(diǎn)是,ZMQ不會(huì)關(guān)心發(fā)送消息的內(nèi)容,只要知道它所包含的字節(jié)數(shù)。這意味著,ZMQ的字符串是有長度的,且傳送時(shí)不加結(jié)束符。當(dāng)使用c語言接收時(shí),應(yīng)注意申請(qǐng)比長度多一個(gè)字節(jié)的存儲(chǔ)空間,并置位結(jié)束符’/0’,否則在打印字符串時(shí)可能得到奇怪的結(jié)果。

Pub/Sub

Publish-Subscribe

服務(wù)端(發(fā)布端ZMQ_PUB):

//  Weather update server in C++
//  Binds PUB socket to tcp://*:5556
//  Publishes random weather updates
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#if (defined (WIN32))
#include <zhelpers.hpp>
#endif

#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))

int main () {

    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);
    publisher.bind("tcp://*:5556");
    publisher.bind("ipc://weather.ipc");                // Not usable on Windows.

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {

        int zipcode, temperature, relhumidity;

        //  Get values that will fool the boss
        zipcode     = within (100000);
        temperature = within (215) - 80;
        relhumidity = within (50) + 10;

        //  Send message to all subscribers
        zmq::message_t message(20);
        snprintf ((char *) message.data(), 20 ,
            "%05d %d %d", zipcode, temperature, relhumidity);
        publisher.send(message);

    }
    return 0;
}

客戶端(訂閱端ZMQ_SUB):

//  Weather update client in C++
//  Connects SUB socket to tcp://localhost:5556
//  Collects weather updates and finds avg temp in zipcode
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <iostream>
#include <sstream>

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to talk to server
    std::cout << "Collecting updates from weather server…\n" << std::endl;
    zmq::socket_t subscriber (context, ZMQ_SUB);
    subscriber.connect("tcp://localhost:5556");

    //  Subscribe to zipcode, default is NYC, 10001
    const char *filter = (argc > 1)? argv [1]: "10001 ";
    subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));

    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {

        zmq::message_t update;
        int zipcode, temperature, relhumidity;

        subscriber.recv(&update);

        std::istringstream iss(static_cast<char*>(update.data()));
        iss >> zipcode >> temperature >> relhumidity ;

        total_temp += temperature;
    }
    std::cout     << "Average temperature for zipcode '"<< filter
                <<"' was "<<(int) (total_temp / update_nbr) <<"F"
                << std::endl;
    return 0;
}

需要注意的是,在使用SUB套接字時(shí),必須使用setsockopt()方法來設(shè)置訂閱的內(nèi)容。該方法可以用于設(shè)置消息過濾器,但如果你不設(shè)置訂閱內(nèi)容,那將什么消息都收不到。訂閱信息可以是任何字符串,可以設(shè)置多次。只要消息滿足其中一條訂閱信息,SUB套接字就會(huì)收到。訂閱者可以選擇不接收某類消息,也是通過setsockopt()方法實(shí)現(xiàn)的。

Push/Pull

push/pull

其他參考

https://github.com/booksbyus/zguide/tree/master/examples/C%2B%2B

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容