Pages

Extending zmq::socket_t

It took me some time to work my way through the ØMQ LRU queue broker example. If I think how complex could be implementing the same functionality with a different framework, I won't complain much. Still, one could wonder if there is a way of simplifying the most tedious, and error prone, part of its coding. The 0MQ guys have an answer to this, a high level API for ZeroMQ, meaning a C binding built on top the basic, low level, C API. You can see it and get it from the czmq page on the ZeroMQ official site.

The czmq API is nice and good, but I would prefer to stick to the C++ light weight interface. It is not optimal, either, but it integrates better with my C++ code.

So, what I have done is extending the C++ zmq::socket_t class to provide multipart send() and receive(). It is not a big thing, but the impact on the resulting code, as I'll show you in a next post, is nice, making it easier to write and maintain. Notice that I have lazily written it only for 0MQ 2.x and for Windows by MSVC2010. I would consider it more a proof of concept than a stable piece of code.

You can see the full C++ source code for my zmq::Socket class on github (a new version for zmq::Socket is now available, follow the link for details). Here I have jotted down a few notes on what I think are the more interesting points:
// ...

namespace zmq
{
    typedef std::vector<std::string> Frames; // 1

    class Socket : public socket_t // 2
    {
    public:
        // ...

        bool send(const Frames& frames) // 3
        {
            if(!frames.size()) // 3a
                throw error_t();

            for(unsigned int i = 0; i < frames.size() - 1; ++i) // 3b
            {
                if(!send(frames[i], ZMQ_SNDMORE))
                    return false;
                if(!sendSeparator())
                    return false;
            }

            return send(frames.back()); // 3c
        }

        Frames blockingRecv(int n, bool checked =true) // 4
        {
            Frames frames;
            frames.reserve(n);

            int currentFrame = 1;
            do {
                zmq::message_t message;
                if(!socket_t::recv(&message, 0)) // 4a
                    throw error_t();

                if(!(currentFrame++ % 2)) // 4b
                {
                    if(message.size())
                        throw error_t();
                }
                else
                {
                    const char* base = static_cast<const char*>(message.data());
                    frames.push_back(std::string(base, base + message.size())); // 4c
                }
            } while(sockopt_rcvmore()); // 4d

            if(checked && frames.size() != n)
                throw error_t();

            return frames;
        }
        
        // ...

    private:
        // ...

        bool sockopt_rcvmore() // 5
        {
            int64_t rcvmore;
            size_t type_size = sizeof(int64_t);
            getsockopt(ZMQ_RCVMORE, &rcvmore, &type_size);
            return rcvmore ? true : false;
        }
    };
}
1. Instead of sending each frame in a multipart message as a different message, I am going to provide alternative methods that manage more frames in a single chunk. Frames is just a alias for a vector of strings representing a multipart message (stripped by the zero-length separator).
2. My Socket class IS-A zmq::socket_t, so it derives publicly from it.
3. Send for multipart messages. Remember that no separator (the empty frames between one "real" frame and the other) should be included in the vector of frames.
3a. It is expected at least a frame.
3b. All the frame but the last one should be managed in the same way. We send each of them followed by a separator, and all of them should be sent in "send more" mode. The actual job is done by a couple of methods, that prepare the zmq::message_t and then send it through the underlying socket. See full code for details (or follow the link for detail on its new version).
3c. Last frame is different. There is no separator after, and it should be send as "single/last" frame.
4. Receive method for multipart messages. For the current example, I needed just to implement blocking receive. A non blocking receive implementation would need some more planning, so I delayed its design to a better time. The function gets in input two parameters, the number of expected frames, and if a flag to specify what to do if actually there is a different number of frames. If we are positive on our request, we could specify that checked is true, meaning that in the mismatch case an exception is thrown. Otherwise the passed number of frames is considered just as a performance hint.
4a. Anything goes wrong this function throws an exception.
4b. I don't want any separator in the frame vectors, so I skip them, after ensuring that they are zero sized. Otherwise something weird should have happened.
4c. All the "real" frames are converted in std::string's and pushed in the container to be returned to the user.
4d. We go on looping till the message is not the last one in a multipart series, or it actually is a stand alone message. See (5) for details on how the check is done. Using a do-while loop should be the most natural choice, since we have to loop at least once, and the loop check has to be performed after the action (of receiving a frame, in this case) is executed.
5. Check the socket option "receive more" to see if the last received message has a next one to be fetched or not. Notice that this code has been designed for ZeroMQ version 2.x, so the option is a 64 bit object. For ZeroMQ version 3 we should have used a plain int instead.

No comments:

Post a Comment