Synchronized publisher

We are about to write a C++ server for a synchronized ØMQ PUB/SUB application. More details in the cited previous post, here we are more focused on the code itself.

We have three different kind of messages in our application: normal, control, terminator. We don't care much of real data exchange, and setting the text in a ZeroMQ message is a bore - so we bypass it in an hackish way just setting the length of the message, and not the real data. To make it a bit more readable to the casual reader, here is a few constants that we are about to use:
const int MSG_MESSAGE = 0;
const int MSG_HELLO = 1;
const int MSG_END = 2;

First part of the server code is the usual ZeroMQ setup, a context and the required sockets are created:
zmq::context_t context(1);

zmq::socket_t publisher(context, ZMQ_PUB);

zmq::socket_t sync(context, ZMQ_REP);

Then we have the first task of the server, detecting when all the expected subscribers are connected:

int currentSubscribers = 0;
while(currentSubscribers < SUB) // 1
sayHello(publisher); // 2
if(checkSubscribers(sync)) // 3

1. SUB is the expected number of subscribers.
2. sayHello() send an hello message on the PUB socket, see below.
3. checkSubscribers() returns true if a client shows up, sending a message to the REP socket, see below.
void sayHello(zmq::socket_t& socket)
zmq::message_t hello(MSG_HELLO);

bool checkSubscribers(zmq::socket_t& socket)
zmq::message_t ack;
if(socket.recv(&ack, ZMQ_NOBLOCK)) // 1
return true;
return false;

1. Could be interesting remarking that we make here a non blocking call to recv() because we don't want to hang on the socket till it receives a message, we just check if there is any message pending, if not we simply return false.

Now we can send the messages, followed by a terminator, to the subscribers:
for(int i = 0; i < 100000; ++i)
zmq::message_t message(MSG_MESSAGE);
zmq::message_t message(MSG_END);

Post written while reading the Z-Guide.

No comments:

Post a Comment