Pages

PUB-SUB with ZeroMQ 3.1: client

We have seen the publisher, now it is time to write the subscriber to complete the minimal structure of a PUB-SUB application implementing that ØMQ messaging pattern.

One point to stress in the SUB is that we have to remember to set a filter on the socket, to specify which subset of the messages sent by the PUB we are interested in. A bit counterintuitively, if we do not specify it, ZeroMQ assumes we don't want to get any of them.

This client reads all the messages sent by the server till it receives an empty message. It extracts a value from each message and it does some elaboration on it, dumping a result to the user:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_SUB); // 1
zmq_connect(socket, "tcp://localhost:50014");

zmq_setsockopt(socket, ZMQ_SUBSCRIBE, NULL, 0); // 2

long total = 0;
int counter = 0;
while(true)
{
    char buffer[MSG_SIZE]; // 3
    int size = zmq_recv(socket, buffer, MSG_SIZE, 0);
    if(size > 0) // 4
    {
        buffer[size < MSG_SIZE ? size : MSG_SIZE - 1] = '\0'; // 5

        total += getValue(buffer); // 6
        counter++;
    }
    else
    {
        std::cout << "Terminating" << std::endl;
        break;
    }
}
std::cout << "Average temperature: " << total / counter << std::endl; // 7
zmq_close(socket);
zmq_term(context);
1. We say to 0MQ that this socket is going to be used by the subscriber in a PUB-SUB pattern.
2. As we said, we need to set a filter on the subscriber. Here we want to get all, this is a "no filter". Here is how to get only the messages starting with the character '1':
char* filter = "1";
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, filter, strlen(filter));
But in this case the client won't receive the empty message, so we should rewrite the code to break in some way the while loop.
3. In ZeroMQ we use a raw byte buffer to exchange messages, so we need to explicitely allocate a chunk of memory. If MSG_SIZE is smaller than the biggest message we get, we have it truncated. So, choose its value carefully.
4. Only if we get a "good" message (no error, no empty message), we do something with it.
5. The raw byte array is converted to a c-string putting a terminator at the end of it.
6. The getValue() function, described below, extracts the value we are interested in from the buffer.
7. Assuming that the information stored in the messages was a temperature, we give the user a feedback on its average value.

The implementation that I show here for the getValue() function is based on the infamous c unsafe string scan. It has just one quality, it is very simple. I guess I'll write another post to show a safer way to do the same, but for the time being let's live with it:
int getValue(char* buffer)
{
    int code, value, index;
    sscanf(buffer, "%d:%d:%d",&code, &value, &index);
    std::cout << code << " [" << index << "]: " << value << std::endl;

    return value;
}

No comments:

Post a Comment