Pages

Synchronized publisher

We are about to write a C++ server for a synchronized ØMQ PUB/SUB application. More details in the cited previous post, here we are more focused on the code itself.

We have three different kind of messages in our application: normal, control, terminator. We don't care much of real data exchange, and setting the text in a ZeroMQ message is a bore - so we bypass it in an hackish way just setting the length of the message, and not the real data. To make it a bit more readable to the casual reader, here is a few constants that we are about to use:
const int MSG_MESSAGE = 0;
const int MSG_HELLO = 1;
const int MSG_END = 2;

First part of the server code is the usual ZeroMQ setup, a context and the required sockets are created:
zmq::context_t context(1);

zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5561");

zmq::socket_t sync(context, ZMQ_REP);
sync.bind("tcp://*:5562");

Then we have the first task of the server, detecting when all the expected subscribers are connected:

int currentSubscribers = 0;
while(currentSubscribers < SUB) // 1
{
sayHello(publisher); // 2
if(checkSubscribers(sync)) // 3
++currentSubscribers;
}

1. SUB is the expected number of subscribers.
2. sayHello() send an hello message on the PUB socket, see below.
3. checkSubscribers() returns true if a client shows up, sending a message to the REP socket, see below.
void sayHello(zmq::socket_t& socket)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
zmq::message_t hello(MSG_HELLO);
socket.send(hello);
}

bool checkSubscribers(zmq::socket_t& socket)
{
zmq::message_t ack;
if(socket.recv(&ack, ZMQ_NOBLOCK)) // 1
{
socket.send(ack);
return true;
}
return false;
}

1. Could be interesting remarking that we make here a non blocking call to recv() because we don't want to hang on the socket till it receives a message, we just check if there is any message pending, if not we simply return false.

Now we can send the messages, followed by a terminator, to the subscribers:
for(int i = 0; i < 100000; ++i)
{
zmq::message_t message(MSG_MESSAGE);
publisher.send(message);
}
zmq::message_t message(MSG_END);
publisher.send(message);


Post written while reading the Z-Guide.

No comments:

Post a Comment