A very simple REQ-ROUTER example

This is the simplest ZeroMQ REQ-ROUTER example I could conceive. I have written it for ØMQ 2.2 on Windows for MSVC, using the C++ built-in wrapper, with an extension to manage easier multipart messages (you can find it as an include file on github). I have made use of STL, Boost, and some C++11 support, but nothing impressive.

The application runs on just one process, on which a few client threads are created, each of them using its own REQ socket, all of them connected to the same ROUTER socket running on another thread. The protocol used is tcp, so we can easily port the example to a multiprocess setting.

Each client sends a single message to the server. The server gets all the messages, it knows in advance how many clients are going to connect to it, and then it sends back them all through the sockets.

The code will make all clearer. Here is an excerpt of the main function:
boost::thread_group threads;
for(int i = 0; i < nReq; ++i) // 1
    threads.create_thread(req4router); // 2
threads.create_thread(std::bind(router, nReq)); // 3
1. nReq is the number of REQ clients that we want the application to have. You should get it from the user and ensure it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REQ client runs its own copy of the req4router() function, see below.
3. The router runs on the router() function. It needs to know how many clients to expect.

As said, I used the tcp protocol, and this is how I defined the socket addresses:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
Both client and server make use of an utility function named dumpId() that I won't show in this post, please have a look on the previous posts to have an idea how it should work, but I guess you can figure it out by yourself. It just print to std::cout the strings are passed to it. It takes care of locking on a mutex, since the console is a shared resource we need to protect it from concurrent accesses. Besides it also print the thread id, for debugging purpose.

Here is the function executed by each client:
void req4router()
    dumpId("REQ startup");
    zmq::context_t context(1);
    zmq::Socket skReq(context, ZMQ_REQ); // 1
    skReq.connect(SK_ADDR_CLI); // 2
    dumpId("REQ CLI on", SK_ADDR_CLI);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    skReq.send(id); // 4
    std::string msg = skReq.recvAsString();
1. zmq::Socket is my zmq::socket_t subclass.
2. Let's connect this REQ socket as a client to the ROUTER socket, that plays as a server.
3. To check if everything works as expected, we should send a unique message. The thread id looks a good candidate for this job. To convert a Boost thread id to a string we need to explicitly cast it through lexical_cast.
4. Send and receiving on a socket go through specialized methods in zmq::Socket that take care of the dirty job of converting from standard string to ZeroMQ buffers.

And this is the server:
void router(int nReq)
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 1
    dumpId("ROUTER SRV on", SK_ADDR_SRV);

    std::vector<zmq::Frames> msgs; // 2
    for(int i = 0; i < nReq; ++i)
        msgs.push_back(skRouter.blockingRecv(3)); // 3
    dumpId("all received");

    std::for_each(msgs.begin(), msgs.end(), [&skRouter](zmq::Frames& frames)
        skRouter.send(frames); // 4

    dumpId("ROUTER done");
1. This is the ROUTER socket accepting the connection from the REQ client sockets.
2. Frames is actually just a typedef for a vector of strings. In msgs we store all the messages coming from the clients.
3. zmq::Socket::blockingRecv() gets the specified number of frames (three, in this case) from the socket, and returns them as a zmq::Frames object. That object is pushed in the buffer vector. The first frame contains the address of the sender, the second one is just a separator, and the third is the actual payload.
4. Just echo back to each client the message it has sent. The lambda function passed to for_each() needs to know what skRouter is, that is way it is specified in the capture clause, as parameter.

No comments:

Post a Comment