Ventilator - ØMQ push

We are creating a toy divide and conquer ØMQ application to show the push/pull pattern in action.

Here we are showing the code for the ventilator, the process that is delegated to split the original problem in a number of tiny sub-problems that could be passed to instances of the worker module to be solved using a parallel processing schema.

Besides it usage of the ZMQ_PUSH, this code is kind of interesting for its need for a random number - the fake length of each small sub-problem; and for sending data to the client that is not a stream of characters - here we send an integer.

To generate a pseudo-random number here we use the Boost library, creating this small class:
#include <boost/random.hpp>

class VentiRand
boost::random::mt19937 generator_; // 1
boost::random::uniform_int_distribution<> random_; // 2
VentiRand(int low, int hi) : random_(low, hi) {}

int getValue() { return random_(generator_); }


1. As generator we use the Mersenne twister, in its more commonly used 32 bit version.
2. We use a uniform distribution. Notice there is no type name inside the angular brackets since it is defaulted to int.

And this is the code for the ventilator:
zmq::context_t context(1); // 1
zmq::socket_t sender(context, ZMQ_PUSH); // 2
sender.bind("tcp://*:5557"); // 3

std::cout << "Press Enter when the workers are ready ";
std::string input;
std::getline(std::cin, input);
std::cout << "Sending tasks to workers" << std::endl;

int workload = 0;
zmq::message_t flag(sizeof(int));
memcpy(, &workload, sizeof(int)); // 4

sender.send(flag); // 5
} // 6

VentiRand vr(1, 100); // 7
int total = 0; // 8
for(int task_nbr = 0; task_nbr < 100; ++task_nbr)
int workload = vr.getValue();
total += workload;

zmq::message_t message(sizeof(int));
memcpy(, &workload, sizeof(int)); // 9
sender.send(message); // 10
} // 11

std::cout << "Total expected cost: " << total << " msec" << std::endl;

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 12
} // 13
catch(const zmq::error_t& ze)
std::cout << "Exception: " << ze.what() << std::endl;

1. A ØMQ context is created, calling the API function zmq_init()
2. The socket used here, created calling zmq_socket(), is of type ZMQ_PUSH. It is similar to the publisher in the sense that allows many clients to connect, but it behaves differently, sending a single message to just one client, and not a copy to all the clients as the publisher does.
3. Calling zmq_bind() we bind the socket to a specific protocol and port.
4. We create a message, setting as data an int value (in this case zero).
5. This zero valued message is sent by zmq_send(), signaling the start of batch.
6. Going out of scope, the message is closed - zmq_msg_close().
7. The random generator is created. We want a number in the range [1..100], representing the time cost in milliseconds for the current task.
8. We keep track of the total expected cost for the tasks, so to measure the efficiency of our job.
9. A message is created, setting as data the random int value just calculated.
10. The message is sent by zmq_send().
11. Exiting its scope, the message is deleted, calling zmq_msg_close() for it.
12. Let this process sleep a while, to give ØMQ time to do its job
13. Context and socket go out of scope, zmq_close() and zmq_term() are called.

This example is basically a rewrite in C++, using Boost to make it more portable, of the code you find in the official Z-Guide.

No comments:

Post a Comment