Pages

Async 0MQ app - worker

If you have already read the sort of informal design for this sample asynchronous application, then the post on the client code, and even the one on the server, you are ready to see how I have implemented the worker code.

In brief: this is a C++ ZeroMQ application, based on version 2.2, developed on Windows for MSVC2010, using the 0MQ C++ binding improved by defining a zmq::Socket class that extends the standard zmq::socket_t to simplify multipart messages management.

We have seen in the previous post that the server creates a (variable) number of worker threads and then expects these workers to be ready to work on the messages coming from the client. Here is that function:
void worker(zmq::context_t& context) // 1
{
    zmq::Socket skWorker(context, ZMQ_DEALER); // 2
    skWorker.connect(SK_BCK_ADDR);

    while(true)
    {
        zmq::Frames frames = skWorker.blockingRecv(2, false);
        if(frames.size() == 1) // 3
            break;

        int replies = rand_.getValue(); // 4
        for(int i =0; i < replies; ++i)
        {
            boost::this_thread::sleep(boost::posix_time::millisec(100 * rand_.getValue()));
            dumpId("worker reply");
            skWorker.send(frames);
        }
    }
}
1. Server and workers share the same context.
2. Each worker has its own DEALER socket that is connected (next line) to the DEALER socket on the server. SK_BCK_ADDR is a C-string specifying the inproc address used by the socket.
3. The "real" messages managed by the workers should be composed by two frames, the address of the client and the actual message payload. A mono-framed message is interpreted by the worker as a request from the server to shutdown.
4. To check the asynchronicity of the application, the worker has this strange behavior: it sends back to the client a random number of copies of the original message, each of them is sent with a random delay.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

No comments:

Post a Comment