As they state in the ZGuide, talking about their implementation for that protocol based on czmq, the high level C wrapper to 0MQ, "Heartbeating is simple once it works, but quite difficult to invent." For this reason I though it was interesting seeing first how heartbeating could work, and then integrating it in a paranoid pirate implementation.
I wrote the code for ZeroMQ version 2.2, developed on Windows + MSVC2010, using the standard 0MQ C++ wrapper with my zmq::Socket class that adds some features to the original zmq::socket_t. See previous posts, like this one, for details.
Since I am interested only in heartbeating, I have designed the application to be as simple as I could think, to the point of looking quite meaningless.
We have a server (without clients) with one or no associated worker. The server has a ROUTER socket, and the worker a DEALER one. They exchange only status information, no real payload.
When a worker signals to the server that it is alive, the server stores its id, and then it sends to the worker an heartbeat to signal it that it is still alive. The same from the worker, it sends to the server a heartbeat till it shutdowns.
If the server stops getting heartbeats from the worker, it simply remove its id, and doesn't care about it anymore.
The worker is more caring. It retries a few time the check on the server heartbeat, and only when it has lost any hope, it considers the server lost, wait for a while, an then create a new socket, trying to establish a new connection.
Even with this minimal requirement, the resulting code is not immediate. For this reason I split the discussion in a few posts. After this introduction you could read about the router server and the dealer worker in the next two posts.
The main thread in this heartbeat testing application runs a couple of test cases:
boost::thread_group threads; threads.create_thread(std::bind(server, INT_MAX)); // 1 threads.create_thread(std::bind(worker, 'A', 6)); // 2 threads.join_all();1. The server() function, expects in input an int parameter, representing how many iteration we want to run. Here I specify the largest int available, as defined in limits.h, meaning that I want to run it (almost) forever.
2. The worker() needs to know which character should be used as seed for the worker id, the second one is the number of heartbeat that the worker is going to send to the server before shutting down.
The result of this test case should be that the server should stay up till the workers sends all its heartbeats, then its is going to wait a bit more idle, before shutting down. The worker id should be "A".
Second test case:
boost::thread_group threads; threads.create_thread(std::bind(server, 3)); // 1 threads.create_thread(std::bind(worker, 'B', 7)); // 2 boost::this_thread::sleep(boost::posix_time::seconds(10)); // 3 threads.create_thread(std::bind(server, INT_MAX)); // 4 threads.join_all();1. This server is going to be short lived.
2. Same worker as before.
3. Ensure enough time passes before restarting the server.
4. An almost-forever server is started.
Here we expect that worker seeing the server going offline, restarting and completing its job. The worker id should swap from "A" to "AA".
The full C++ code for the complete example is on github.