Pages

ZeroMQ 3.1 multithreading reviewed

I have already written a post on how to approach multithreading with ØMQ. In that case I used 0MQ version 2.1 and the C++ wrapper API, here I am using the 3.1 release, still in beta, and the C standard interface.

But I guess this is not the most interesting point of this post, I found more challenging thinking a way of gracefully shutdown the worker threads managed by the server. The idea is take advantage of how polling reacts when the connected socket closes. But let's start from the beginning.

We want to write something like the just seen REQ-REP DEALER-ROUTER broker but here the broker does not connect the client to services each one running in a different process, but to a worker in another thread in the same process of the broker.

The client code changes only because I changed the requisite, to add a bit of fun. Now the client sends integers, the server feeds the workers with these value, that are used for a mysterious job - actually, just sleeping. From the client point of view, the change is limited to the message buffer:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_REQ);
zmq_connect(socket, "tcp://localhost:5559");

for(int i = 0; i != 10; ++i)
{
    std::cout << "Sending " << i << std::endl;
    zmq_send(socket, &i, sizeof(int), 0); // 1

    int buffer;
    int len = zmq_recv(socket, &buffer, sizeof(int), 0);
    if(len < sizeof(int) || len > sizeof(int)) // 2
        std::cout << "Unexpected answer (" << len << ") discarded";
    else
        std::cout << "Received " << buffer << std::endl;
}
zmq_send(socket, NULL, 0, 0); // 3

zmq_close(socket);
zmq_term(context);
1. The message now is an int, and its size is, well, the size of an int.
2. A very rough error handling. If the size of the message is not the expected one (or is an error flag), we just print an error message.
3. I'm going on with the convention of sending an empty message as a command to shut down the server.

The server needs to change a bit more, it should create a thread for each service we want to use for this run of the application. The number of threads is passed to the function as a parameter.
void mtServer(int nt)
{
    boost::thread_group threads; // 1

    void* context = zmq_init(1);
    void* frontend = zmq_socket(context, ZMQ_ROUTER); // 2
    zmq_bind(frontend, "tcp://*:5559");

    void* backend = zmq_socket(context, ZMQ_DEALER); // 3
    zmq_bind(backend, "inproc://workers"); // 4

    for(int i = 0; i < nt; ++i)
        threads.create_thread(std::bind(&doWork, context)); // 5

    const int NR_ITEMS = 2; // 6
    zmq_pollitem_t items[NR_ITEMS] = 
    {
        { frontend, 0, ZMQ_POLLIN, 0 },
        { backend, 0, ZMQ_POLLIN, 0 }
    };

    dump("Server is ready");
    while(true)
    {
        zmq_poll(items, NR_ITEMS, -1); // 7

        if(items[0].revents & ZMQ_POLLIN) // 8
            if(receiveAndSend(frontend, backend))
                break;
        if(items[1].revents & ZMQ_POLLIN) // 9
            receiveAndSend(backend, frontend);
    }

    dump("Shutting down");
    zmq_close(frontend);
    zmq_close(backend);
    zmq_term(context);

    threads.join_all(); // 10
}
1. We need to manage a group of thread, this Boost utility class is just right.
2. This router socket connects the server to the client request socket.
3. This dealer socket connects the server to all the worker sockets.
4. The protocol used for thread to thread communicating is "inproc".
5. A number of threads is created, each of them runs on the function doWork(), see it below, and gets in input the ZeroMQ context, that is thread-safe.
6. The broker pattern requires an array of poll items, each of them specifying the socket to be polled and the way the polling should act.
7. Polling on the items, waiting indefinitely for a message coming from either the frontend or the backend.
8. The frontend sent a message, receive from it and send to the backend. If receiveAndSend() returns true, we received a terminator from the client, it is time to exit the while loop.
9. Other way round, a message from the backend should be sent back to the frontend.
10. Wait all the worker threads to terminate before closing the application.

Writing a multithreaded application with ZeroMQ requires designing the code in a different way from the standard techniques. Synchronization among threads is ruled by messages exchange, so we usually don't use mutex and locks. Here I break this advice, and I'd say that it make sense. We have a shared resource, the output console, and we need to rule the access to it:
boost::mutex mio;

void dump(const char* header, int value)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::cout << boost::this_thread::get_id() << ' ' << header << ": " << value << std::endl;
}

void dump(const char* mss)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::cout << boost::this_thread::get_id() << ": " << mss << std::endl;
}
The receiveAndSend() function has not change much. Even if the messages now are int, we won't do any assumption on the messages passing from here. In any case we should remember that we have to manage multipart messages, required by the router pattern:
const int MSG_SIZE = 64;
size_t sockOptSize = sizeof(int); // 1

bool receiveAndSend(void* skFrom, void* skTo)
{
    int more;
    do {
        int message[MSG_SIZE];
        int len = zmq_recv(skFrom, message, MSG_SIZE, 0);
        zmq_getsockopt(skFrom, ZMQ_RCVMORE, &more, &sockOptSize);

        if(more == 0 && len == 0)
        {
            dump("Terminator!");
            return true;
        }
        zmq_send(skTo, message, len, more ? ZMQ_SNDMORE : 0);
    } while(more);

    return false;
}
1. The variable storing the sizeof int can't be const, because zmq_getsockopt() could change it.

Finally, the most interesting piece of code, the worker:
void doWork(void* context) // 1
{
    void* socket = zmq_socket(context, ZMQ_REP); // 2
    zmq_connect(socket, "inproc://workers");

    zmq_pollitem_t items[1] = { { socket, 0, ZMQ_POLLIN, 0 } }; // 3

    while(true)
    {
        if(zmq_poll(items, 1, -1) < 1) // 4
        {
            dump("Terminating worker");
            break;
        }

        int buffer;
        int size = zmq_recv(socket, &buffer, sizeof(int), 0); // 5
        if(size < 1 || size > sizeof(int))
        {
            dump("Unexpected termination!");
            break;
        }

        dump("Received", buffer);
        zmq_send(socket, &buffer, size, 0);

        boost::this_thread::sleep(boost::posix_time::seconds(buffer));
    }
    zmq_close(socket);
}
1. The 0MQ context is thread-safe, so we can safely pass it around the threads.
2. Each working thread has its own ZeroMQ reply socket connected to the backend socket in the main thread of the server by inproc protocol.
3. It looks a bit strange polling on just one item, but is exactly what we need here.
4. Polling indefinitely on the socket. If it returns in an error state (it is not expected a return value of zero) we can safely assume that the connected socket has been closed, and we can stop waiting for a message.
5. We know a message waits to be received, we check it has an int size, and then we send back the same message before sleeping for the number passed by the client.

2 comments: