LRU Queue Broker version two

When working with the extended zmq socket I have introduced in the previous post, writing a ZeroMQ application like the LRU queue device previously described, gets easier.

OK, the comparison is not 100% fair, because when rewriting the example, I have not just used my brand new zmq::Socket, but I have also changed a bit the application login. Now the client is not sending an integer used by the worker to do some job (actually, sleeping), but just a string (its ID) that would be echoed back. It wouldn't be difficult to reintroduce the original behavior in this version, but I found that in this way the example is more readable. Besides, this version lacks almost completely of any error handling. This does not differ much from the previous version, still you don't want to keep such a wishful attitude in your production code.

[After a while, I have changed my Socket class to manage differently multipart messages, this impacted on this code, see my post where I say something more on the broker changes, still, the general structure of the code shown here is the same]

Saying that, here is how the client function looks now, compare it with the original LRU device client:
void client(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skClient(context, ZMQ_REQ, id); // 1
    skClient.connect(SK_ADDR_FRONTEND);
    dump(id, "client is up");

    skClient.send(id); // 2
    dump(id, "client request sent");

    std::string reply = skClient.recvAsString(); // 3
    dump(reply, "received by client");
}
1. Create a ØMQ request socket with the specified ID.
2. Send a std::string on the socket.
3. Receive a std::string on the socket.

The code is slimmer, terser, easier to write, read, and modify.

If you liked the impact of zmq::Socket on the client, you are going to love what it does to the LRU device worker:
void worker(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skWorker(context, ZMQ_REQ, id);
    skWorker.connect(SK_ADDR_BACKEND);

    zmq::Frames frames(2); // 1
    while(true)
    {
        skWorker.send(frames); // 2

        frames = skWorker.blockingRecv(2, false); // 3
        if(frames.size() != 2)
        {
            dump(id, "terminating");
            return;
        }
        dump(frames[0], "client id");
        dump(frames[1], "payload");
    }
}
1. Remember that Frames is a std::vector of std::string's. Here we are creating it with two elements set to empty.
2. First time we send a multipart message containing just empty frames (as expected), otherwise the received multipart message is sent back to the caller.
3. The call returns a dummy Frames (for terminating) or a Frames with the client ID and the payload. If your C++ compiler is recent, and implements the C++11 move syntax, this assignment is going to be as cheap as a few pointers moved around.

The device job of receiving on back- and frontend gets cleaner too:
void receivingOnBackend()
{
    zmq::Frames input = backend_.blockingRecv(3, false); // 1

    dump(input[0], "registering worker");
    qWorkers_.push(input[0]);

    if(input.size() == 3)
    {
        zmq::Frames output(input.begin() + 1, input.end()); // 2
        frontend_.send(output);
    }
}

void receivingOnFrontend()
{
    zmq::Frames input = frontend_.blockingRecv(2); // 3

    dump(input[0], "client id received on frontend");
    dump(input[1], "payload received on frontend");

    std::string id = qWorkers_.front();
    qWorkers_.pop();
    dump(id, "selected worker on frontend");

    zmq::Frames output;
    output.reserve(3); // 4
    output.push_back(id);
    output.insert(output.end(), input.begin(), input.end());

    backend_.send(output);
    dump(id, "message sent to worker");
}
1. The backend socket can receive two different kind of messages: just the worker ID, signalling that it is ready, or the worker ID followed by the client ID and the effective message payload.
2. If the backend sent a "real" message, the device should simply discard the worker ID (first element in the input Frames) and send the resulting multipart message to the frontend.
3. a message on the frontend is always made of two frames, a client ID, and the payload.
4. The device adds at the beginning a worker ID and then forward the multipart message as received by the client to the worker.

The full C++ code for this example is on github. I developed it for ZeroMQ 2.2.0 on MSVC 2010. Notice that the head version on github has differences with the code reported here, now it is not anymore a Socket responsibility to manage separators in multipart messages, but are managed in the user code, that means, here.

No comments:

Post a Comment