ZMQ_SNDMORE-ZMQ_RCVMORE in ZeroMQ 3.1

Migrating from ØMQ 2.1 to 3.1 there is a change in the multipart message mechanism that could potentially be a source for big headaches. The message options now are stored in a plain int, and not in fixed size 64 bit integer type. Actually, if in your environment an int is a 64 bit type this is not a change at all. Otherwise it could happens that code that was working fine shows now unexpected behavior.

The example is a simple 0MQ PUB-SUB application, where the server sends a multipart message in four parts to the client:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_PUB);
zmq_bind(socket, "tcp://*:50014");

readyToSend();
std::stringstream ss;
for(int i = 0; i < 3; ++i)
{
    ss.str("");
    ss << "multi part message: " << i;
    std::string s = ss.str();

    int len = s.length();
    std::cout << "Sending " << s << std::endl;
    zmq_send(socket, s.c_str(), s.length(), ZMQ_SNDMORE); // 1
}
char* buffer = "That's all";
zmq_send(socket, buffer, strlen(buffer), 0); // 2

zmq_close(socket);
zmq_term(context);
Not much to say in top we have already seen when talking about PUB servers. Just keep your eye on the last parameter of zmq_send():
1. All the messages in the loop have the option ZMQ_SNDMORE specified. This means that they are considered parts of a single multipart message that is completed by the first subsequent message with no option (0) specified.
2. And this is it, a "normal" message, but since it is the first sent after a few ZMQ_SNDMORE ones, it is considered "special". It is the last multipart message of its lot.

The SUB client is not much different from a normal ZeroMQ subscriber. But here we are expecting a single message from the server, possibly a multipart one:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_SUB);
zmq_connect(socket, "tcp://localhost:50014");
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, NULL, 0);

while(true)
{
    char buffer[MSG_SIZE];
    int len = zmq_recv(socket, buffer, MSG_SIZE, 0);
    buffer[(len > 0 ? len : 0)] = '\0';

    std::cout << buffer << std::endl;

    int more;
    size_t size = sizeof(int); // 1
    zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &size); // 2
    if(more)
        std::cout << "Reading ... " << std::endl;
    else
    {
        std::cout << "Done" << std::endl;
        break;
    }
}

zmq_close(socket);
zmq_term(context);
1. This is the change in ZeroMQ version 3, a plain int is specified.
2. The variable more is set to 1 if the current message is part of a multipart series (and it is not the tail one), otherwise is set to 0.

If you had legacy code, and if you are compiling you 0MQ code for a 32 (or even less) bit platform, and you didn't adjust the size of "more", ZeroMQ would set only half (or less) of your "more" variable. And since "more" here is not initialized, we should expect it to always carry a true (in the sense of non-zero) value.

2 comments:

  1. this post saved my life! the examples show using an int64_t in version 2, which is the first google result when searching for zmq_rcvmore.

    ReplyDelete
    Replies
    1. I'm happy this post helped you :)

      Delete