Pages

Multipart messages

ZeroMQ allows us working with simple messages, as we have seen in the previous posts, and with multipart ones. It is worthy to stress the fact that a multipart message is managed atomically by ØMQ, so a client would get all of it or nothing at all.

Let's modify the already seen publisher to let it send a multipart message that spans over three frames. The changes are localized in the for loop. Originally it was designed to send simple messages, now we change it in this way:
for(int i = 0; i < 100; ++i)
{
readyToSend();

for(int j = 0; j < 3; ++j) // 1
{
ss.str("");
ss << i%2 << ':' << i*42 << ':' << i << char('A' + j);
s = ss.str();

zmq::message_t message(s.length());
memcpy(message.data(), s.c_str(), s.length());

std::cout << "Sending " << s << std::endl;
publisher.send(message, (j != 2 ? ZMQ_SNDMORE : 0)); // 2
}
}

1. Internal for loop to send the three-framed multipart messages.
2. The key part of sending a multipart message is here. The flag ZMQ_SNDMORE is sent to specify that at least another frame is expected.

The client requires some more job. Let's modify the subscriber we have already written to read just one (possibly multipart) message and terminate. After creating a SUB socket, connecting it to the PUB, and setting the filter on the message stream, we call recv(). But now, we check the ZMQ_RCVMORE socket option to see if that the current message is in a multipart series, and it is not the last one:
while(true) // 1
{
zmq::message_t message;
subscriber.recv(&message);

__int64 more; // 2
size_t size = sizeof(__int64);
subscriber.getsockopt(ZMQ_RCVMORE, &more, &size); // 3
if(more) // 4
std::cout << "Reading ..." << std::endl;
else
{
std::cout << "Done" << std::endl;
break;
}
}

1. We loop until we get the last part of the current sequence of messages.
2. The option is stored in a 64 bit integer. This code is compiled with VC++, so it uses this __int64 non standard type. On linux you would probably use int64_t. If you want to compile the same code on different platforms, you should rely here on conditional compiling. Quite a nuisance, indeed.
3. Here is how we get a socket option.
4. If the "more" flag is set, it means we are reading a multipart message, and we haven't reach the final frame yet.


Publish-subscribe proxy server

In the ØMQ jargon, a device is a component that acts as a mediator between groups. It is used to keep the system architecture simple but still giving it a good degree of flexibility.

Here we are about to see in action a device designed for expanding a pub-sub system, giving a way to a group of clients to access the original publisher in a different way.

The implementation is pretty simple. We have a new component, the proxy, that from the point of view of the original server is just a subscriber as all the other ones. The interesting fact is that it is also a publisher, that is going to provide to its subscribers the messages it gets from its publisher.

Have a look to the official Z-Guide for more information.

No comments:

Post a Comment