Pages

LRU Pattern - worker

Second step of the LRU pattern example implementation. After the client, now is time to write the worker. The ZeroMQ version I used is the brand new 2.2.0, the chosen implementation language is C++, in the MSVC 2010 flavor, for the Windows operating system. The complete C++ source code is on github.

This is a multithread application, the client creates a thread for each worker, and each of them executes this function:
const char* SOCK_ADDR = "inproc://workers";

// ...

void worker(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 1
    zmq::socket_t worker(context, ZMQ_REQ); // 2
    zmq_setsockopt(worker, ZMQ_IDENTITY, id.c_str(), id.length());
    worker.connect(SOCK_ADDR);

    int processed = 0; // 3
    while(true)
    {
        zmq::message_t msg(&processed, sizeof(int), NULL);
        worker.send(msg); // 4
        zmq::message_t payload;
        if(worker.recv(&payload) == false) // 5
        {
            dumpMessage(id, "error receiving message");
            return;
        }
        if(payload.size() != sizeof(int)) // 6
        {
            dumpMessage(id, "terminating");
            return;
        }

        int value = *(int*)payload.data(); // 7
        dumpMessage(id, value);

        boost::this_thread::sleep(boost::posix_time::millisec(value)); // 8
        ++processed;
    }
}
1. As id for the REQ socket I used the thread id. To convert a Boost thread id to a string we need to go through a lexical cast.
2. The REQ socket is created on the context passed by the main thread: all the threads in a 0MQ multithread application that want to be connected should share the same context.
3. Keeping track of the messages processed in this thread, so that we can pass back this information to the client.
4. Sending a message through the socket. Notice that this socket has an identity set, so what we are sending here is actually a multipart message consisting of three frames: the identity, empty data, and the number of currently processed messages.
5. Receiving the reply to our request. The identity is automatically stripped, we have to take care only of the effective payload.
6. The error handling is very poor in this example. We expect an int as message, anything with a different size is considered equivalent to a null sized message, that I conventionally consider as a terminator.
7. Extract the int contained in the message ...
8. ... and use it to do some job, in this case just sleeping for a while.

All the synchronization among threads is done through messages sent on ZeroMQ sockets. The exception are the dumpMessage() functions, that have access to a resource (std::cout) shared among all threads. We'll how to deal with this in the next post.

No comments:

Post a Comment