Pages

Handling multiple sockets

When required, a client could connect to more than one ØMQ socket. That is quite easy so implement, just create two different instances of the zmq::socket_t (I writing code in C++ using the official ZeroMQ version 2.x wrapper) and poll on them for any message delivered. The tricky part is in managing the polling, but an example should make it clear.

The servers that our new client is about to connect to, are the already seen ventilator, implementing a push socket, and the publisher. We need a couple of new sockets, a pull and a subscriber one:
zmq::context_t context(1);

zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.connect("tcp://localhost:5557");

zmq::socket_t subscriber(context, ZMQ_SUB); // 2
subscriber.connect("tcp://localhost:5556");
const char* filter = "1";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));

zmq::pollitem_t items[] =
{ // 3
   { receiver, 0, ZMQ_POLLIN, 0 },
   { subscriber, 0, ZMQ_POLLIN, 0 }
};
while(true) // 4
{
   zmq::message_t message;
   zmq::poll(items, 2); // 5

   if(items[0].revents & ZMQ_POLLIN) // 6
   {
      receiver.recv(&message);
      std::cout << "Processing message from receiver" << std::endl;
      items[0].revents = 0;
   }
   if(items[1].revents & ZMQ_POLLIN) // 7
   {
      subscriber.recv(&message);
      std::cout << "Processing message from subscriber" << std::endl;
      items[1].revents = 0;
   }
}
1. This pull socket connects to the push one from the ventilator.
2. This subscriber socket connects to the publisher one. Remember that in this case we should also set a filter.
3. We put the sockets in an array of pollitem_t, see the documentation for more details, but basically what we care is about the socket we should poll (receiver/subscriber) and what is the event we interested in - ZMQ_POLLIN means a message received.
4. In this crude example we loop indefinitely
5. We poll on 2 elements of the passed item array. Notice that zmq::poll() calls zmq_poll() in this case passing to it as third parameter the default value of -1. That means that in case no message is found for any of the specified sockets, the call hangs indefinitely.
6. If poll signals a message in input for the first socket, we receive on it, and do the expected processing.
7. Same as (6) for the other socket.

This code should be in a try/catch block. Have a look at my previous posts on ØMQ for some other examples.

In the official Z-Guide you will find there the original C code on which I have based this C++ rewriting.

2 comments:

  1. did you even test your code?

    ReplyDelete
    Replies
    1. Sure. Actually, I follow the TDD practice, so I'm used to write tests before the actual code.

      Could you please elaborate why are you asking? If you have found a problem in the code above, please notice that the post has been written ten years before your comment, so you are probably using a quite different setup.

      Delete