Pages

Paranoid Pirate Heartbeating - server

As described in the previous post, I have developed a cut to the bone ZeroMQ C++ router-dealer heartbeat exchange-only application that should help understanding the basics of the process.

The reason to write such an example is that, as they say in the ZGuide, "as for the Paranoid Pirate queue, the heartbeating is quite tricky to get right". I hope that such a stripped down example could help to clarify the concept.

In its simplest configuration, this heartbeat example is supposed to run as a monoprocess, multithread application. The main thread spawns two new threads, one for the server and one for a worker, and they are left alone to interact till their natural termination.

We could even let run just the server (or, as we'll see in the next post, the worker) alone. It won't get any ping from a worker, and after a while it would shutdown.

Here is the server function, it makes use of a custom class, HeartbeatServer, that I comment below:
void server(int lifespan) // 1
{
    HeartbeatServer server(lifespan); // 2

    while(server.isAlive()) // 3
    {
        zmq::Frames input = server.recv(); // 4
        if(!input.empty())
        {
            uint8_t control = (input.size() != 2 || input[1].empty()) ? PHB_UNEXPECTED : input[1].at(0);

            switch(control)
            {
            case PHB_READY:
                server.pushWorker(input[0]); // 5
                break;
            case PHB_HEARTBEAT:
                server.refreshWorker(input[0]); // 6
                break;
            case PHB_DOWN:
                server.dropWorker(input[0]); // 7
                break;
            default: // 8
                break;
            }
        }
        server.heartbeat(); // 9
    }
}
1. The server function expects in input a parameter stating how long it should theoretically run, passing INT_MAX (as defined in limits.h) states that we'd like to run the server (almost) forever.
2. Create an Heartbeat Server, see below for details.
3. HeartbeatServer::isAlive() returns false when the server is ready to shutdown.
4. HeartbeatServer::recv() combines a poll and a receive on the server socket. If a (multipart) message is pending on the socket, it is received and returned as deque of strings (if you wonder why, have a look at this post). We expect a two-part message, with the worker id as first element and the payload (a single byte) in second position. Whatever else could be happily discarded.
5. If the payload is a "ready" message, the worker id is stored in the server.
6. We received an "heartbeat" from the worker, we refresh its status. Remember that a worker should be seen as active by the server only if it sends at least an heartbeat once in a while. After a longish silence from its side, we should consider it as lost in space.
7. If the worker ends its life gracefully, it sends a message to notify the server, so that it would remove it on the spot.
8. We received something weird. Let it know to the user.
9. If there is a worker pending on the server, send an heartbeat to it.

And this is the server class:
class HeartbeatServer
{
private:
    zmq::context_t context_;
    zmq::Socket backend_;
    zmq::pollitem_t items_[1];

    std::string wid_; // 1
    bp::ptime expiry_; // 2

    int lifespan_; // 3
    int busy_; // 4
    bp::ptime heartbeat_; // 5
    enum { INTERVAL = 1000, BUSY = 5 };
public:
    HeartbeatServer(int lifespan = INT_MAX) : context_(1), backend_(context_, ZMQ_ROUTER), lifespan_(lifespan), 
        heartbeat_(bp::microsec_clock::local_time() + bp::millisec(INTERVAL)), busy_(BUSY)
    {
        backend_.bind(SKA_BACKEND_SRV);
        backend_.setLinger(0); // 6

        items_[0].socket = backend_; // 7
        items_[0].fd = 0;
        items_[0].events = ZMQ_POLLIN;
        items_[0].revents = 0;
    }

    bool isAlive() // 8
    {
        return busy_ > 0 && lifespan_ > 0;
    }

    zmq::Frames recv() // 9
    {
        --lifespan_;

        zmq::Frames res;
        if(zmq::poll(items_, 1, INTERVAL * 1000) > 0)
        {
            busy_ = BUSY;

            if(items_[0].revents & ZMQ_POLLIN)
                res = backend_.blockingRecv();
            items_[0].revents = 0;

            return res;
        }
        else
            --busy_;

        return res; // no message
    }

    void heartbeat() // 10
    {
        if(wid_.empty())
            return;

        // if it's time, send heartbeat
        if(bp::microsec_clock::local_time() > heartbeat_)
        {
            std::string control(&PHB_HEARTBEAT, &PHB_HEARTBEAT + 1);
            backend_.send(wid_, control);
            heartbeat_ = bp::microsec_clock::local_time() + bp::millisec(BASE_INTERVAL);
        }

        // if the worker is expired, remove its id 
        if(expiry_ < bp::microsec_clock::local_time())
            wid_ = "";
    }

    void pushWorker(const std::string& id) // 11
    {
        wid_ = id;
    }

    void refreshWorker(const std::string& id)
    {
        if(wid_ == id)
            expiry_ = bp::microsec_clock::local_time() + bp::millisec(CHECK_FACTOR * BASE_INTERVAL);
    }

    void dropWorker(const std::string& id)
    {
        if(wid_ == id)
            wid_ = "";
    }
};
1. The worker id pending on the server. In this minimal implementation we can have only one worker. If no worker is available, the string is empty.
2. Keep track of when the worker identified by (1) is going to expire. The namespace bp is a synonym for boost::posix_time.
3. Expected server lifespan.
4. If the server is idle for a while, this counter goes to zero, and then we signal that the server should shutdown.
5. When the server should send the next heartbeat.
6. The sockets in this app have a troubled life, it often happens that one sends a message to the other but that one is already gone. Besides, the worker has a custom identity, so it is marked as persistent. To avoid the server socket hanging on termination, it is better to specify that we don't want it linger. This function calls zmq_setsockopt() for the ZMQ_LINGER option on the underlying socket.
7. Initialize the array of zmq::pollitem_t to poll on the socket.
8. To be considered alive, the server should both be in the time frame defined by the user and busy.
9. Each time recv() is called, the server life expectancy decreases. If there is actually anything on the socket, the busy counter is restored to its maximum value, otherwise it decreases too.
10. An heartbeat is sent to the pending worker only if a worker is actually there, and it is actually time to send it. After sending it (if required) the worker expiration time is checked, to see if it is time to remove it from the server.
11. In this trivial implementation, pushing, refreshing, dropping a worker on the server is very easy. We have just one (or none) of them, so we just have to ensure nothing unexpected (bad worker id) happens.

The full C++ code for the application is on github.

No comments:

Post a Comment