You can find the complete C++ code for this example on github, and in the previous two posts some comments on the client side and on the worker.
There is still a tiny bit of code I should talk about, the dumpMessage() functions:
boost::mutex mio; // 1 void dumpMessage(const std::string& id, int value) // 2 { boost::lock_guard<boost::mutex> lock(mio); // 3 std::cout << id << ' ' << value << std::endl; } void dumpMessage(const std::string& id, const char* msg) { boost::lock_guard<boost::mutex> lock(mio); std::cout << id << ' ' << msg << std::endl; }1. In the rest of the code, all the synchronization among threads is done by 0MQ message exchanges, so we don't need mutexes and locks. But here we have to deal with many threads competing on the same shared resource, namely the standard output console. Ruling its access with a mutex is the most natural solution, I reckon.
2. I provide two overloads for the printing function, one for printing the socket/thread id plus its int payload, and one for id plus a message from the code to the user.
3. If there is already a thread running on the next line, we patiently wait here for our turn. The lock would be released by the dtor.
All the printing in the application is done through this couple of functions, with a notable exception in the client main function:
void lru(int nWorkers) { // ... boost::thread_group threads; // ... threads.join_all(); std::cout << "Number of processed messages: " << processed << std::endl; }But at that point we have already executed a "join all" on all the worker threads spawned by the client. We are back in the condition where only one thread is running for this process. So we don't have to worry about competing for that shared resource.
The design of this example should be clear. We enter the lru() function, create as many worker threads as required by the caller (actually, in real code it would be worthy to perform a check on that number), have a data exchange between the client and the workers, retrieve a result from all this job, and terminate the run.
From the ZeroMQ point of view, the interesting part is in the REQ-ROUTER pattern. Each worker has a request socket, so it would communicate to the client socket (a router) when it is ready for another run. When the router has no more job to be done, it would simply send a terminator to each worker asking for new feed, till no one of them is around anymore.
No comments:
Post a Comment