We have used the ZeroMQ request/reply pattern to create a very simple echo application that connects synchronously a client, that
asks for an echo of its message, and a server, that
provides the service.
Now we want to extend that application, so that we could have many server and many clients, and even have a way to change their number dynamically.
How could we do that? Adding something in between that take cares of the extra complexity we want. This something is called broker and it does the dirty job of getting a request from a client, sending it to a server, and then doing the same, but the other way round, with the reply.
The changes in the client code are minimal. The just have to ensure the request socket is now connecting to the broker. So, if it is on localhost at port 5559 we'll have this:
socket.connect("tcp://localhost:5559");
More relevant are the changes in the service that provides a reply. Actually it is not anymore a proper server, it is now seen as a broker client that connects to it in the REP role. So its socket would be created an setup like this:
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.connect("tcp://localhost:5560");
Besides, for testing purpose makes sense to add in the code a delay between receiving and sending back the message, maybe using the Boost sleep() function:
// some time expensive job
boost::this_thread::sleep(boost::posix_time::seconds(1));
The fun stuff, as one could expect, is in the broker. It can't use REQ and REP sockets, since we want it to be asynchronous, so it uses ROUTER and DEALER sockets, that interfaces with request and reply, but leaving us the flexibility of a non-blocking interface.
Here is a C++ possible implementation:
try
{
zmq::context_t context(1);
zmq::socket_t frontend(context, ZMQ_ROUTER); // 1
frontend.bind("tcp://*:5559");
zmq::socket_t backend(context, ZMQ_DEALER); // 2
backend.bind("tcp://*:5560");
const int NR_ITEMS = 2;
zmq_pollitem_t items[NR_ITEMS] =
{
{ frontend, 0, ZMQ_POLLIN, 0 },
{ backend, 0, ZMQ_POLLIN, 0 }
};
while(true)
{
zmq::poll(items, NR_ITEMS); // 3
if(items[0].revents & ZMQ_POLLIN) // frontend sent a message
receiveAndSend(frontend, backend);
if(items[1].revents & ZMQ_POLLIN) // backend sent a message
receiveAndSend(backend, frontend);
}
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}
1. This ROUTER socket connects to the REQ socket, accepting its request.
2. The DEALER socket is used to connect to the REP socket, to forward to it the client request.
3. The broker hangs waiting for messages from both sides of its connections.
The heart of the broker is the receiveAndSend() function that acts as a mediator between the REQ and REP sockets. Let see how I have implemented it:
namespace
{
size_t size = sizeof(__int64);
void receiveAndSend(zmq::socket_t& from, zmq::socket_t& to)
{
__int64 more;
do {
zmq::message_t message;
from.recv(&message);
from.getsockopt(ZMQ_RCVMORE, &more, &size);
to.send(message, more ? ZMQ_SNDMORE : 0);
} while(more);
}
}
Even though our REQ/REP message exchange is meant to involve only non-multipart messages, our broker should actually manage them. The fact is that the ROUTER/DEALER protocol has to keep track in some way of the origin/destination of the traffic, and this is done adding a frame in the message - we don't care what it is actually in it, at least for the moment, but we should manage it correctly, if we want our application to work.
More details on broker router/dealer sockets in the official Z-Guide, where you could also find the original C code on which I have based the C++ code have seen above.
Go to the full post