POLLIN: a client SUB polling on two PUBs

We have already seen a ØMQ client connecting to more than a server, the worker in the Divide and Conquer example connects as PULL to the ventilator that sends messages, and as PUSH to the sink that waits for messages. A bit trickier is the case where a client is getting messages from two different servers. The point is that it has to poll-in, meaning, poll on the servers to get messages in input, on all the connected servers.

I have redesigned the example I originally wrote in C++ for 0MQ 2.1 for the C ZeroMQ 3.1 version, trying to making it a bit clearer. Now there are two PUB-SUB connections, where the servers are processes running this code:
void serverPoll(char* message, bool flag) // 1
{
    char* address = flag ? "tcp://*:60020" : "tcp://*:60021";

    void* context = zmq_init(1);
    void* socket = zmq_socket(context, ZMQ_PUB); // 2
    zmq_bind(socket, address);

    for(int i = 0; i < 20; ++i)
    {
        std::cout << message << ' ';
        zmq_send(socket, message, strlen(message), 0); // 3
        boost::this_thread::sleep(boost::posix_time::seconds(1));
    }

    zmq_close(socket);
    zmq_term(context);
}
1. The message parameter is whatever you want to be sent to the client. One process should call this function with the flag parameter set to true, the other to false.
2. We are going to have to SUB servers.
3. The input message is sent 20 times to the client, at the rate of one per second.

The client should look more interesting:
// ...
const int MSG_SIZE = 64;
char* addresses[] = {"tcp://localhost:60020", "tcp://localhost:60021"};

// ...

void* context = zmq_init(1);
void* sockets[2];
for(int i = 0; i < 2; ++i)
{
    sockets[i] = zmq_socket(context, ZMQ_SUB); // 1
    zmq_connect(sockets[i], addresses[i]);
    zmq_setsockopt(sockets[i], ZMQ_SUBSCRIBE, NULL, 0); // 2
}

zmq_pollitem_t items [] = {
    { sockets[0], 0, ZMQ_POLLIN, 0 }, { sockets[1], 0, ZMQ_POLLIN, 0 } // 3
};

while(zmq_poll(items, 2, 5000) > 0) // 4
{
    char buffer[MSG_SIZE];
    for(int i = 0; i < 2; ++i)
    {
        if(items[i].revents & ZMQ_POLLIN) // 5
        {
            int size = zmq_recv(sockets[i], buffer, MSG_SIZE, 0);
            buffer[size < MSG_SIZE ? size : MSG_SIZE - 1] = '\0';
            std::cout << buffer << ' ';
        }
    }
    std::cout << std::endl;
}

std::cout << "Terminating";
for(int i = 0; i < 2; ++i)
    zmq_close(sockets[i]);
zmq_term(context);
1. Both sockets are SUB connected as clients to the PUB sockets defined in the servers. By the way, have you ever wondered what happens if you specify that a socket is a SUB and then give as address a server one, with a star instead of the machine ip address? I haven't. But a wild copy and past showed me: I got a perplexing unhandled exception in _callthreadstartex(), I am developing on MSVC, and a message on console saying that there was an "Invalid argument (..\..\..\src\tcp_connecter.cpp:63)". At that line there is an assert on errno after a call to the member function set_address() in the zmq::tcp_connecter_t ctor. Once you think for a while on it, it looks clear. I passed an invalid argument to the address setting function in the tcp connecter construct! Right, put localhost there, not a star.
2. Remember that a SUB needs a filter to be set, here we are using a "no filter", anything is read.
3. This is the tricky part. To poll we need to specify an array of zmq_pollitem_t, each of them specifying the socket on which we are polling and how we are polling on it. POLLIN means polling in input.
4. The ZeroMQ 3.1 version of zmq_poll() is very flexible. Here we are saying that we polls on the first to elements in the passed array of zmq_pollitem_t, waiting for 5 secs for anything to come. If nothing happens in that time, it returns 0. We could ask it to hang forvever passing -1, or not waiting at all, passing 0. It returns -1 in case of error, or the number of sockets containing a pending message.
5. If the field revents in the specified zmq_pollitem_t has the bit ZMQ_POLLIN high, a message is waiting to be read, so we can call zmq_recv() without risking to wait indefinitely on it for a message that is not coming.

No comments:

Post a Comment