ØMQ 3.1 PUB-SUB Proxy

It is quite straightforward to implement the proxy pattern with ZeroMQ, and the changes between version 2.1 to 3.1 are minimal. So there is not much more to remark here than a couple of points.

The minimal configuration to see a proxy at work requires a PUB server and a SUB client. If the PUB runs without even notice that a proxy is among its clients, the SUB has to know its address, so to connect to it.

The proxy should be multipart-message-aware. Even if our PUB currently supports only single-part messages, it would be a good idea to support the extended messaging protocol. It doesn't add much complexity to the proxy code, and it makes much more robust.

More details on the matter on the ZGuide.

Here is a possible implementation for the proxy:
void* context = zmq_init(1);
void* frontend = zmq_socket(context, ZMQ_SUB);
zmq_connect(frontend, "tcp://localhost:50014"); // 1

void* backend = zmq_socket(context, ZMQ_PUB); // 2
zmq_bind(backend, "tcp://*:8100");
zmq_setsockopt(frontend, ZMQ_SUBSCRIBE, NULL, 0); // 3

bool terminator = false;
while(!terminator)
{
    int more;
    do {
        size_t size = sizeof(int);

        char message[MSG_SIZE];
        int len = zmq_recv(frontend, message, MSG_SIZE, 0);
        if(len == 0)
        {
            std::cout << "The broker detected a terminator!" << std::endl;
            terminator = true; // 5
        }
        zmq_getsockopt(frontend, ZMQ_RCVMORE, &more, &size);

        std::cout << "resending message" << std::endl;

        zmq_send(backend, message, len < MSG_SIZE ? len : MSG_SIZE, more ? ZMQ_SNDMORE: 0); // 6
    } while(more);
}

zmq_close(frontend);
zmq_close(backend);
zmq_term(context);
1. The proxy connects as a subscriber to the publisher we have already seen.
2. A subscriber could now connect directly to the original publisher, or to the proxy, specifying its address.
3. No filtering, as one would expect from a well behaving proxy.
4. Even if the server does not send currently any multipart message, the proxy is ready to manage them correctly.
5. Also the proxy follows the convention of terminating when it receives an empty message. But before terminating it forwards also the terminator.
6. The message length has as a limit the maximum buffer size, the SNDMORE flag is replicated, when required.

As I said, there is no change at all in the publisher, the subscriber changes in only one line, where the address that the socket has to use is specified:
void* socket = zmq_socket(context, ZMQ_SUB);
zmq_connect(socket, "tcp://localhost:8100");
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, NULL, 0);

2 comments:

  1. Great Explanation. Another great article i recommend is:

    this

    ReplyDelete
    Replies
    1. Thanks for your suggestion, even if it is not much in line with the subject of this post. You talk there about the generic virtual proxy design pattern, using Java as implementation language. I moved all the Java stuff to another blog, Biting Code, if you visit it, please let me know what you think about it too.

      Delete