Let's think to a three-step multithreaded application. The main thread creates a worker thread, and sits waiting for it to signaling the job is done. The worker creates a sub-worker, does some job, than puts its result together with the one from its sub, and signals the main thread that it could go on.
No locks and mutexes are used in the application, with the noticeable exception of printing to standard output, resource shared among all the threads:
boost::mutex mio; void print(const char* message) { boost::lock_guard<boost::mutex> lock(mio); std::cout << boost::this_thread::get_id() << ": " << message << std::endl; }Here is the code to be executed by the main thread:
print("main: start"); void* context = zmq_init(1); print("main: create socket for answer"); void* skMain = zmq_socket(context, ZMQ_PAIR); // 1 zmq_bind(skMain, "inproc://main"); // 2 print("main: create thread for worker"); boost::thread t2(doWork, context); // 3 print("main: do some preparation job"); boost::this_thread::sleep(boost::posix_time::milliseconds(200)); // 4 print("main: wait for worker result"); zmq_recv(skMain, NULL, 0, 0); // 5 print("main: message received"); t2.join(); // 6 zmq_close(skMain); zmq_term(context); print("main: done");1. Even though this socket is used only a few lines down, it has to be created here, before the worker thread. The reason lies in the nature of pair sockets, they have no automatic reconnection capability, so the server socket must be up before its client try to connect.
2. This is a server socket in a pair connection, it uses an in process protocol, and it is identified by the name "main".
3. The worker is started, see below the code for doWork(), but pay attention to the fact that we are passing to it the 0MQ context.
4. Simulating some work.
5. We don't want to get any data from the worker, so we get an empty message. Here a bit of error handling would be a good idea, for a production code.
6. Wait the worker to terminate and then shut down the socket and the context.
Here is the worker in all its splendor:
void doWork(void* context) // 1 { print("worker: part 1 cooperating with another thread"); void* skSub = zmq_socket(context, ZMQ_PAIR); // 2 zmq_bind(skSub, "inproc://sub"); print("worker: create thread for subworker"); boost::thread t(doSub, context); // 3 print("worker: do something"); boost::this_thread::sleep(boost::posix_time::milliseconds(150)); print("worker: waiting for subworker"); zmq_recv(skSub, NULL, 0, 0); // 4 print("worker: subworker is done"); t.join(); zmq_close(skSub); // --- print("worker: part 2, answering to main"); void* skMain = zmq_socket(context, ZMQ_PAIR); // 5 zmq_connect(skMain, "inproc://main"); print("worker: doing something else"); boost::this_thread::sleep(boost::posix_time::milliseconds(150)); print("worker: signal back to main"); zmq_send(skMain, NULL, 0, 0); print("worker: done"); zmq_close(skMain); }1. The ZMQ context is the only thread-safe object in this structure, so it could, and should, be passed around threads that want to be coordinated.
2. Another pair socket used to connect the worker to a its sub-worker.
3. See below the code for the sub-worker function.
4. Again, this thread wait a communication from its relative worker, and then close the socket.
5. This socket is used to connect this thread with the main one, it expects the server socket for this connection to be already up.
The code for the sub-worker should be no surprise:
void doSub(void* context) { print("sub: start"); boost::this_thread::sleep(boost::posix_time::milliseconds(50)); void* skSub = zmq_socket(context, ZMQ_PAIR); // 1 zmq_connect(skSub, "inproc://sub"); boost::this_thread::sleep(boost::posix_time::milliseconds(50)); print("sub: tell to worker we're ready"); zmq_send(skSub, NULL, 0, 0); // 2 print("sub: done"); zmq_close(skSub); }1. Pair socket back to the worker.
2. An empty message to say that it is done.
I have written the code in this post expressly for ZeroMQ 3.1, and the standard C interface. It is basically a porting of my previous post that I wrote for the C++ official interface while reading the ZGuide (that currently still refers to version 2.1.x).
This is a great post, thanks...
ReplyDeleteThank you, I'm happy to see you find it useful :)
Delete