The servers that our new client is about to connect to, are the already seen ventilator, implementing a push socket, and the publisher. We need a couple of new sockets, a pull and a subscriber one:
zmq::context_t context(1); zmq::socket_t receiver(context, ZMQ_PULL); // 1 receiver.connect("tcp://localhost:5557"); zmq::socket_t subscriber(context, ZMQ_SUB); // 2 subscriber.connect("tcp://localhost:5556"); const char* filter = "1"; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); zmq::pollitem_t items[] = { // 3 { receiver, 0, ZMQ_POLLIN, 0 }, { subscriber, 0, ZMQ_POLLIN, 0 } }; while(true) // 4 { zmq::message_t message; zmq::poll(items, 2); // 5 if(items[0].revents & ZMQ_POLLIN) // 6 { receiver.recv(&message); std::cout << "Processing message from receiver" << std::endl; items[0].revents = 0; } if(items[1].revents & ZMQ_POLLIN) // 7 { subscriber.recv(&message); std::cout << "Processing message from subscriber" << std::endl; items[1].revents = 0; } }1. This pull socket connects to the push one from the ventilator.
2. This subscriber socket connects to the publisher one. Remember that in this case we should also set a filter.
3. We put the sockets in an array of pollitem_t, see the documentation for more details, but basically what we care is about the socket we should poll (receiver/subscriber) and what is the event we interested in - ZMQ_POLLIN means a message received.
4. In this crude example we loop indefinitely
5. We poll on 2 elements of the passed item array. Notice that zmq::poll() calls zmq_poll() in this case passing to it as third parameter the default value of -1. That means that in case no message is found for any of the specified sockets, the call hangs indefinitely.
6. If poll signals a message in input for the first socket, we receive on it, and do the expected processing.
7. Same as (6) for the other socket.
This code should be in a try/catch block. Have a look at my previous posts on ØMQ for some other examples.
In the official Z-Guide you will find there the original C code on which I have based this C++ rewriting.
did you even test your code?
ReplyDeleteSure. Actually, I follow the TDD practice, so I'm used to write tests before the actual code.
DeleteCould you please elaborate why are you asking? If you have found a problem in the code above, please notice that the post has been written ten years before your comment, so you are probably using a quite different setup.