Pages

Sink - ØMQ pull

Third and last part of the divide and conquer ØMQ application. Now we take care of the sink, the process that is the server, in a pull role, to which the workers send a message, using a push socket, when they complete their task as assigned by the ventilator.

This is the resulting C++ code:
try
{
zmq::context_t context(1);
zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.bind("tcp://*:5558"); // 2

{ // 3
zmq::message_t flag;
if(receiver.recv(&flag))
std::cout << '.';
else
std::cout << '!';
} // 4

boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time(); // 5

for(int task_nbr = 0; task_nbr < 100; task_nbr++) // 6
{
zmq::message_t tick;
if(receiver.recv(&tick))
std::cout << (task_nbr % 10 ? '.' : ':');
else
std::cout << '!';
}
std::cout << std::endl;

boost::posix_time::ptime end = boost::posix_time::microsec_clock::local_time(); // 7
std::cout << "Total time: " << end - start << std::endl;
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. We are about to use a pull socket.
2. It is set as a server using the TCP protocol.
3. We wait for the first message, signalling the start of batch.
4. Exiting its scope, the message is destroyed, and its cleanup is implicitely performed by zmq_msg_close().
5. Keep track of the current time, to do some statistics check.
6. One hundred "real" messages are expected, let's loop on all of them.
7. Calculate and report duration of batch

If you read the official Z-Guide you will find there the original C code that I have rewritten in C++ with some Boost to have some fun (!)

No comments:

Post a Comment