Paranoid Pirate Heartbeating - worker

The worker described here, is the companion to the ZeroMQ router server described in the previous post. In another post there is an overview for this simple heartbeat example. Full C++ source code is available on github.

The app controller part spawns a new thread for the worker on this function:
void worker(char id, int lifespan) // 1
{
    const int PATIENCE = 3;
    HeartbeatWorker worker(id); // 2

    int patience = 0;
    int cycle = 0;
    int sent = 0;
    int iteration = 1;

    while(true)
    {
        uint8_t control = worker.recv(); // 3
        if(control == PHB_NOTHING)
        {
            if(cycle++ == PATIENCE)
            {
                if(patience++ == PATIENCE)
                    break; // 4

                int factor = static_cast<int>(std::pow(2.0, patience));
                boost::this_thread::sleep(bp::millisec(factor * BASE_INTERVAL)); // 5

                worker.reset();
                cycle = 0;
            }

            if(iteration++ == lifespan) // 6
                break;
        }
        else
            cycle = patience = 0; // 7
        worker.heartbeat(); // 8
    }
    worker.shutdown(); // 9
}
1. The worker is identified by a single character that is used to generate a unique id for each socket that is created here. The thing is that each time the worker looses contact with the server, the existing socket is closed and replaced by a new one. Any new socket should have a different id, so that the server won't be confused. As a second parameter we pass to the worker function its lifespan.
2. A HeartbeatWorker class is used to make the code more readable, it is showed below.
3. HeartbeatWorker::recv() combines a poll() and a recv() call to the worker socket. If no message was pending the PHB_NOTHING value is returned.
4. If no heartbeat is received from the server for a while, the worker shuts itself down.
5. But before committing suicide, the worker tries to reconnect to the server, closing its socket, waiting for a (growing) while, and then opening a new socket, with a new identity but pointing to the same endpoint.
6. When we reach the limit imposed by the caller, even if the server is still alive, we terminate the worker.
7. If there was actually something on the socket, we reset the cycle counting .
8. In any case, send an heartbeat to the server.
9. Gracefully terminate the worker.

This is my HeartbeatWorker implementation:
class HeartbeatWorker
{
private:
    zmq::context_t context_;
    std::unique_ptr<zmq::Socket> sk_; // 1
    std::string id_; // 2
    char idRoot_;
    bp::ptime heartbeat_; // 3
    zmq::pollitem_t items_[1];

public:
    HeartbeatWorker(char id) : context_(1), idRoot_(id),
        heartbeat_(bp::microsec_clock::local_time() +  bp::millisec(BASE_INTERVAL))
    {
        reset();

        items_[0].fd = 0;
        items_[0].events = ZMQ_POLLIN;
        items_[0].revents = 0;
    }

    void reset()
    {
        id_ += idRoot_; // 4
        sk_.reset(new zmq::Socket(context_, ZMQ_DEALER, id_)); // 5
        sk_->setLinger(0); // 6
        sk_->connect(SKA_BACKEND_CLI);
        items_[0].socket = *sk_.get(); // 7

        sk_->send(PHB_READY); // 8
    }

    void heartbeat() // 9
    {
        if(bp::microsec_clock::local_time() > heartbeat_)
        {
            heartbeat_ = bp::microsec_clock::local_time() + bp::millisec(BASE_INTERVAL);
            sk_->send(PHB_HEARTBEAT);
        }
    }

    void shutdown()
    {
        sk_->send(PHB_DOWN);
    }

    uint8_t recv() // 10
    {
        if(zmq::poll(items_, 1, BASE_INTERVAL * 1000) < 1)
            return PHB_NOTHING;

        uint8_t res = PHB_NOTHING;
        if(items_[0].revents & ZMQ_POLLIN)
            res = sk_->recvAsByte();
        items_[0].revents = 0;
        
        return res;
    }
};
1. Any time we lose the connection to the server, we kill the socket and create a new one. For this reason, I use a (smart) pointer instead of an object on the stack.
2. Socket id. It is based on the idRoot_ character (defined in the next line).
3. Expected time for sending an heartbeat to the server. The bp namespace is defined as synonym of boost::posix_time.
4. The socket id is changed any time a reset occurs.
5. The unique_ptr::reset() take cares of closing the previous socket (if set) before deleting it.
6. We don't want to wait for messages sent on a socket to be delivered when we close it. By default we have an indefinite lingering, but this is no good here, because it could happen that we send messages to a server that is not there anymore, and we want to close the socket (to create a new one) even if those messages are not delivered.
7. A tricky line. We store a pointer to socket, but the zmq::pollitem_t structure requires a socket itself about its members. So dereferencing is required.
8. Sends a "ready" message to the server.
9. The heartbeat message is sent to the server only if its time has arrived.
10. HeartbeatWorker::recv() combines zmq::poll() and an actual receiving on the socket. We expect in input a single-part message containing a single byte.

No comments:

Post a Comment