The servers that our new client is about to connect to, are the already seen ventilator, implementing a push socket, and the publisher. We need a couple of new sockets, a pull and a subscriber one:
zmq::context_t context(1);
zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.connect("tcp://localhost:5557");
zmq::socket_t subscriber(context, ZMQ_SUB); // 2
subscriber.connect("tcp://localhost:5556");
const char* filter = "1";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
zmq::pollitem_t items[] =
{ // 3
{ receiver, 0, ZMQ_POLLIN, 0 },
{ subscriber, 0, ZMQ_POLLIN, 0 }
};
while(true) // 4
{
zmq::message_t message;
zmq::poll(items, 2); // 5
if(items[0].revents & ZMQ_POLLIN) // 6
{
receiver.recv(&message);
std::cout << "Processing message from receiver" << std::endl;
items[0].revents = 0;
}
if(items[1].revents & ZMQ_POLLIN) // 7
{
subscriber.recv(&message);
std::cout << "Processing message from subscriber" << std::endl;
items[1].revents = 0;
}
}
1. This pull socket connects to the push one from the ventilator.2. This subscriber socket connects to the publisher one. Remember that in this case we should also set a filter.
3. We put the sockets in an array of pollitem_t, see the documentation for more details, but basically what we care is about the socket we should poll (receiver/subscriber) and what is the event we interested in - ZMQ_POLLIN means a message received.
4. In this crude example we loop indefinitely
5. We poll on 2 elements of the passed item array. Notice that zmq::poll() calls zmq_poll() in this case passing to it as third parameter the default value of -1. That means that in case no message is found for any of the specified sockets, the call hangs indefinitely.
6. If poll signals a message in input for the first socket, we receive on it, and do the expected processing.
7. Same as (6) for the other socket.
This code should be in a try/catch block. Have a look at my previous posts on ØMQ for some other examples.
In the official Z-Guide you will find there the original C code on which I have based this C++ rewriting.
did you even test your code?
ReplyDeleteSure. Actually, I follow the TDD practice, so I'm used to write tests before the actual code.
DeleteCould you please elaborate why are you asking? If you have found a problem in the code above, please notice that the post has been written ten years before your comment, so you are probably using a quite different setup.