Pages

Async 0mq application - server

This sample ZeroMQ async application is described in a previous post, here I am commenting the server part of it, that is connected to the already seen clients by DEALER/ROUTER sockets, and to a few workers that we'll seen next, by DEALER/DEALER sockets.
The AsynchronousCS constructor creates a thread running on this function, part of the private section of the same class:
void server(int nWorkers) // 1
{
    zmq::context_t context(1);
    zmq::Socket skFrontend(context, ZMQ_ROUTER); // 2
    skFrontend.bind(SK_SRV_ADDR);

    zmq::Socket skBackend(context, ZMQ_DEALER); // 3
    skBackend.bind(SK_BCK_ADDR);

    boost::thread_group threads;
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(&AsynchronousCS::worker, this, std::ref(context))); // 4

    zmq_pollitem_t items [] =
    {
        { skFrontend, 0, ZMQ_POLLIN, 0 },
        { skBackend,  0, ZMQ_POLLIN, 0 }
    };

    while(true)
    {
        if(zmq_poll(items, 2, 3000 * 1000) < 1) // 5
            break;

        if(items[0].revents & ZMQ_POLLIN) // 6
        {
            zmq::Frames frames = skFrontend.blockingRecv(2);
            skBackend.send(frames);
            items[0].revents = 0; // cleanup
        }
        if(items[1].revents & ZMQ_POLLIN) // 7
        {
            zmq::Frames frames = skBackend.blockingRecv(2);
            skFrontend.send(frames);
            items[1].revents = 0; // cleanup
        }
    }

    for(int i = 0; i < nWorkers; ++i) // 8
        skBackend.send("");

    threads.join_all(); // 9
}
1. The number of workers used by the server is set by the user.
2. zmq::Socket is an extension of the zmq::socket_t as found in the default 0MQ C++ binding. This server socket is used to keep a connection the client (DEALER) sockets.
3. This second socket is used to keep a DEALER/DEALER connection with each worker.
4. A new thread is created for each worker (its code is discussed in the next post), notice that a reference to the ZeroMQ context is passed to each worker, so that all sockets on the server are sharing the same 0MQ context.
5. As a simple way of terminating the server execution, I set the limit of three seconds on the polling. If nothing is received on either socket in such a time frame, the indefinite loop is interrupted.
6. Input received on the frontend, the multipart message is read and passed to the backend.
7. Same as (6), but the other way round.
8. When the clients are not sending anymore messages, it is time to send a terminator to each worker. As a convention, a single empty message would be interpreted by the worker as a terminator.
9. Give time to the workers to terminate, and then the server is down.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

No comments:

Post a Comment