PUB-SUB coordination by REQ-REP: the subscriber

In the previous post we have seen a publisher that is coordinated to its subscribers, now it is time to show how the subscriber works.

Let's see the main part of the subscriber function:
void* context = zmq_init(1);

void* skSub = zmq_socket(context, ZMQ_SUB);
zmq_connect(skSub, "tcp://localhost:5561");
zmq_setsockopt(skSub, ZMQ_SUBSCRIBE, NULL, 0); // 1

zmq_recv(skSub, NULL, 0, 0); // 2
acknowledge(context); // 3

//...
while(true)
{
    int value;
    int len = zmq_recv(skSub, &value, sizeof(int), 0); // 4
    if(len != sizeof(int)) // 5
    {
        // ...
        break;
    }
    if(value == PING_FLAG) // 6
    {
        // ...
        continue;
    }
    // 7
}

// ...
1. No filter on the subscription.
2. The subscriber waits till it sees the publisher.
3. See below, the subscriber lets the publisher know about it.
4. Receive the value, expected to be an int.
5. If it is not an int (typically, we got an error, or the empty message used as terminator) we terminate the loop.
6. Dummy value detected, the publisher is still looping waiting for more subscribers. The value is discarded, and we get to the next iteration.
7. The value is real, let's process it.

A request socket is used to communicate back to the publisher, so that it would know about a new subscriber connected to it:
void acknowledge(void* context)
{
    void* skSync = zmq_socket(context, ZMQ_REQ);
    zmq_connect(skSync, "tcp://localhost:5562");
    zmq_send(skSync, NULL, 0, 0);
    zmq_recv(skSync, NULL, 0, 0);
    zmq_close(skSync);
}
It is just a matter of sending and receiving an empty message. The socket is used just here, so we create and close it in the function body.

No comments:

Post a Comment