A ZeroMQ broker is meant to expand a simple REQ-REP message pattern in a much more flexible structure. While a REQ-REP connection is synchronous, the broker uses a DEALER-ROUTER pair of sockets that allows to manage asynchronously the message exchanges that go through it. We add a layer between the client and the service that lets us to hide one side to the other. The benefit of this is that we can can easily change the system's configuration modifying just the broker, while the other components are unaware of what is going on.

I have already done a post on the same matter, but using the C++ interface for 0MQ version 2.1.x, here the code is based on the still beta 3.1 version, and referring to the standard C API. Besides, you could get more details on the Z-Guide, but be aware that currently it is still based on ZMQ version 2.1.x.

The client code has just a tiny change: instead of sending directly its request to the reply service, it goes now to the broker. So I reused the code seen for the Hello REQ-REP client shown previously, changing port address in the connection:
zmq_connect(socket, "tcp://localhost:5559");
The REP side of the story changes more dramatically. It is not acting anymore as a server, but it is just a client of the broker providing to it a service:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_REP);
zmq_connect(socket,"tcp://localhost:5560"); // 1
    char buffer[MSG_SIZE];
    int size = zmq_recv(socket, buffer, MSG_SIZE, 0); // 2
    if(size < 1 || size > MSG_SIZE) // 3
        std::cout << "Terminating (" << size << ")" << std::endl;

    dump("Received", buffer, size); // 4
    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5
    zmq_send(socket, buffer, size, 0); // 6
1. Not a server anymore!
2. Receive a message from the broker. Remember that ZeroMQ version 3 uses raw byte array for storing the message.
3. Quite a crude error handling. Any error, and even messages bigger than expected (the user defined constant MSG_SIZE) are considered as messages of zero length, here conventionally seen as a command to shutdown the system.
4. We'll see below the simple dump() utility function that dump to the standard output console the 0MQ message with the passed header.
5. Some time expensive job emulated with a Boost sleep.
6. And finally the same message we got in input is sent back to the caller.

A 0MQ message is nothing more than a sequence of byte, no null terminator is expected, and so it can't be managed as c-string. A simple way to print a sequence of byte is this:
void dump(const char* header, const char* buffer, size_t size)
    std::cout << header << ": ";
    std::for_each(buffer, buffer + size, [](char c){ std::cout << c;}); // 1
    std::cout << std::endl;
1. for_each() is an STL algorithm, and its third argument is a C++11 lambda function. Much more on this topic in other posts in this same blog.

The most interesting component in this messaging pattern is the broker itself:
void* context = zmq_init(1);

void* frontend = zmq_socket(context, ZMQ_ROUTER); // 1
zmq_bind(frontend, "tcp://*:5559");

void* backend = zmq_socket(context, ZMQ_DEALER); // 2
zmq_bind(backend, "tcp://*:5560");

const int NR_ITEMS = 2;
zmq_pollitem_t items[NR_ITEMS] =
    { frontend, 0, ZMQ_POLLIN, 0 },
    { backend, 0, ZMQ_POLLIN, 0 }

    zmq_poll(items, NR_ITEMS, -1); // 3

    if(items[0].revents & ZMQ_POLLIN) // 4
        if(receiveAndSend(frontend, backend))
            break; // terminator!
    if(items[1].revents & ZMQ_POLLIN) // 5
        receiveAndSend(backend, frontend);
1. The router accepts connection from the REQ clients.
2. The dealer is in the server role for connections from REP clients.
3. The broker polls on its sockets to check for incoming messages. If it is not clear to you how the polling mechanism works, you could have a look to a couple of other posts. POLLIN on PUB-SUB should work as an introduction, Killing the workers is meant as a next example.
4. The frontend has sent a message. The broker should receive from it and send to the backend. The REQ client has the chance to shutdown the entire system sending an empty message. If the utility function receiveAndSend(), shown below, detects this condition it returns true, and the broker run is terminated.
5. The backend has sent a message. Same as (4), but swapping the terms.

The receiveAndSend() function could turn out to be more interesting than one would have expected:
    size_t sockOptSize = sizeof(int); // 1

    bool receiveAndSend(void* skFrom, void* skTo)
        bool terminator = false;
        int more; // 2
        do {
            char message[MSG_SIZE];
            int len = zmq_recv(skFrom, message, MSG_SIZE, 0);
            zmq_getsockopt(skFrom, ZMQ_RCVMORE, &more, &sockOptSize);

            std::cout << "(" << more << ") ";
            if(more == 0) // 3
                if(terminator = len == 0) // 4
                    std::cout << "Terminator!" << std::endl;
                    dump("Received", message, len);
                std::cout << std::endl;

            zmq_send(skTo, message, len, more ? ZMQ_SNDMORE : 0);
        } while(more);

        return terminator;
1. In ZeroMQ version 3 the socket options are stored in a bare int value, and not anymore in a fixed non-standard 64 bit integer type.
2. This application works with simple messages, but the broker manages multipart messages not for being prepared for future extensions but because it has to. In some way the broker should know who is the requester associated to a reply, when it comes back, and this is done by the ROUTER-DEALER pattern adding a prologue to the message. We don't care at all of what is in it, but we should know that it has been added to our original message.
3. No "more" means this is our actual message (warning! this assumes that the REQ client sends simple messages).
4. If the length of the message is zero, we have received a shutdown request. We are still passing this last message to the REP component, so that it would shutdown too.

No comments:

Post a Comment