Pages

Async 0mq application - DEALER/ROUTER client

Follow the link for a view on the asynchronous ZeroMQ application I am describing in these posts. Here I won't care of the big picture but I am focusing on the function that is executed by each thread running as a client.

Each client thread is created passing as argument the address of this function, defined as private in the class AsynchronousCS:
void client()
{
    zmq::context_t context(1); // 1
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skClient(context, ZMQ_DEALER, id); // 3
    skClient.connect(SK_CLI_ADDR);

    zmq_pollitem_t items[] = { { skClient, 0, ZMQ_POLLIN, 0 } };

    std::string message("message ");
    for(int i =0; i < 6; ++i) // 4
    {
        for(int heartbeat = 0; heartbeat < 100; ++heartbeat) // 5
        {
            zmq_poll(items, 1, 10 * 1000); // polls each 10 millis
            if(items[0].revents & ZMQ_POLLIN)
            {
                std::string msg = skClient.recvAsString(); // 6
                dumpId("client receives", msg); // 7

                items[0].revents = 0;
            }
        }
        dumpId("client sends", message);
        skClient.send(message);
        message += 'x';
    }

    while(zmq_poll(items, 1, 1000 * 1000) > 0) // 8
    {
        if(items[0].revents & ZMQ_POLLIN)
        {
            std::string msg = skClient.recvAsString();
            dumpId("coda", msg);
            items[0].revents = 0; // cleanup
        }
    }
}
1. Even if the client threads run in the same process as the server, each of them has its own ZeroMQ context, this makes it easy to refactor the application to more common solution where each client runs in its own process.
2. I am going to use the thead id as socket id.
3. zmq::Socket is my extension of the zmq::socket_t as found in the default 0MQ C++ binding. This constructor calls zmq_setsockopt() to set its socket ZMQ_IDENTITY to the passed id. This DEALER client socket is going to connect to a ROUTER server socket.
4. The client is going to send a few messages to the server.
5. We loop 100 times, polling on the socket with a limit of 10 millis. Expecting only a few messages in input, we could assume that this loop will take not much less than a second to be executed.
6. This zmq::Socket function takes care of receiving, on a zmq::message_t and converting it to a std::string.
7. Accessing the shared resource std::cout to dump a message requires a mutex/lock protection, all of that is taken care in this function.
8. As described in the previous post, as a way to keep the code simple, I have implemented no way for the server to tell to the client that the stream of messages is terminated, on the contrary, is the client itself that wait a "reasonable" amount of time (1 sec) after the last input message, before deciding that no more messages are going to come. This is an high unreliable solution, but for such a simple example should suffice.

Full source C++ code for this sample application is on github, where you can find the source for zmq::Socket too.

No comments:

Post a Comment