We have three different kind of messages in our application: normal, control, terminator. We don't care much of real data exchange, and setting the text in a ZeroMQ message is a bore - so we bypass it in an hackish way just setting the length of the message, and not the real data. To make it a bit more readable to the casual reader, here is a few constants that we are about to use:
const int MSG_MESSAGE = 0;
const int MSG_HELLO = 1;
const int MSG_END = 2;
First part of the server code is the usual ZeroMQ setup, a context and the required sockets are created:
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5561");
zmq::socket_t sync(context, ZMQ_REP);
sync.bind("tcp://*:5562");
Then we have the first task of the server, detecting when all the expected subscribers are connected:
int currentSubscribers = 0;
while(currentSubscribers < SUB) // 1
{
sayHello(publisher); // 2
if(checkSubscribers(sync)) // 3
++currentSubscribers;
}
1. SUB is the expected number of subscribers.
2. sayHello() send an hello message on the PUB socket, see below.
3. checkSubscribers() returns true if a client shows up, sending a message to the REP socket, see below.
void sayHello(zmq::socket_t& socket)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
zmq::message_t hello(MSG_HELLO);
socket.send(hello);
}
bool checkSubscribers(zmq::socket_t& socket)
{
zmq::message_t ack;
if(socket.recv(&ack, ZMQ_NOBLOCK)) // 1
{
socket.send(ack);
return true;
}
return false;
}
1. Could be interesting remarking that we make here a non blocking call to recv() because we don't want to hang on the socket till it receives a message, we just check if there is any message pending, if not we simply return false.
Now we can send the messages, followed by a terminator, to the subscribers:
for(int i = 0; i < 100000; ++i)
{
zmq::message_t message(MSG_MESSAGE);
publisher.send(message);
}
zmq::message_t message(MSG_END);
publisher.send(message);
Post written while reading the Z-Guide.
No comments:
Post a Comment