We have seen how to rewrite a Divide and Conquer application for ØMQ 3.1 in its three components: ventilator, worker, sink. We complained about the workers helplessly waiting for new messages from the ventilator at the end of the job, requiring an interrupt to end their agony, while we wanted them to gracefully shutdown. From the previous post we know how to let a piece of code to poll on more sockets in input, and now we can use that technique to overcome the worker hanging issue.
This is the porting of the same solution for the C++ interface to 0MQ 2.1 I have done in the past. The common source of inspiration is the official ZGuide.
We want change both the sink and the worker implementations. They will be connected with a PUB-SUB relation, where the sink is going to be a PUB server sending an (empty) message to all the client SUB workers to signal when the job is done.
The change in the sink is tiny. After the local ZeroMQ context is initialized, beside the PULL socket delegated to rule the flow of messages coming from the workers, we add the PUB socket:
// ... void* context = zmq_init(1); // ... void* terminator = zmq_socket(context, ZMQ_PUB); zmq_bind(terminator, "tcp://*:5559"); // ...Then all goes as shown in the original worker code, but after all the messages have been received, just before cleaning up the environment (in cauda venenum, as the Romans said) we send an empty message:
// ... zmq_send(terminator, NULL, 0, 0); zmq_close(terminator); zmq_close(socket); zmq_term(context);The worker requires a more substantial redesign. First step is adding a SUB 0MQ socket that connects to the PUB one in the sink:
void* context = zmq_init(1); // ... void* skController = zmq_socket(context, ZMQ_SUB); zmq_connect(skController, "tcp://localhost:5559"); zmq_setsockopt(skController, ZMQ_SUBSCRIBE, NULL, 0); zmq_pollitem_t items [] = { { skPull, 0, ZMQ_POLLIN, 0 }, { skController, 0, ZMQ_POLLIN, 0 } }; // ...The array of zmq_pollitem_t objects is used for polling. The worker has two input socket, and we want to poll over them. We do that in the infinite loop. In the previous post we used a timed poll, here we set the timeout to -1, meaning that ZeroMQ blocks indefinitely on poll waiting for a message:
while(true) { if(zmq_poll(items, 2, -1) < 1) // 1 { std::cout << "Unexpected polling termination" << std::endl; break; } if(items[0].revents & ZMQ_POLLIN) // 2 { // receiving the message on skPull ... // ... } if(items [1].revents & ZMQ_POLLIN) // 3 { std::cout << " Kill!" << std::endl; break; } } zmq_close(skController); // 4 // ...1. Given that we are pending forever on zmq_poll() waiting for a message to arrive, a return value of 0 is unexpected, and -1 means something bad happened. In both case, we just give a feedback to the user and break the loop.
2. The zeroth element in the zmq_pollitem_t array is the PULL socket connected to the ventilator, so if a message is pending on it, we go on running the code as was written in the original implementation.
3. If we receive a message on the ZMQ_SUB, it is time to exiting the loop. It is not even worth reading the message pending, we already know it is empty, and nothing more will follow.
4. But we should remember to close all the sockets, included the new one we just introduced.
No comments:
Post a Comment