Publish-subscribe proxy server

In the ØMQ jargon, a device is a component that acts as a mediator between groups. It is used to keep the system architecture simple but still giving it a good degree of flexibility.

Here we are about to see in action a device designed for expanding a pub-sub system, giving a way to a group of clients to access the original publisher in a different way.

The implementation is pretty simple. We have a new component, the proxy, that from the point of view of the original server is just a subscriber as all the other ones. The interesting fact is that it is also a publisher, that is going to provide to its subscribers all the messages it gets from its publisher. So a subscriber could connect to the proxy, and getting exactly the same stream of messages that would receive if connecting to the original publisher.

Notice that the proxy should manage correctly multipart messages, otherwise it would corrupt irremediably the message traffic to its clients.

Here is the C++ code for a simple proxy:
try
{
zmq::context_t context(1);

zmq::socket_t frontend(context, ZMQ_SUB); // 1
frontend.connect("tcp://localhost:5556");

zmq::socket_t backend(context, ZMQ_PUB); // 2
backend.bind("tcp://*:8100");
zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0); // 3

while(true)
{
__int64 more; // 4
do {
size_t size = sizeof(__int64);

zmq::message_t message;
frontend.recv(&message);
frontend.getsockopt(ZMQ_RCVMORE, &more, &size); // 5

std::cout << '.';

backend.send(message, more ? ZMQ_SNDMORE : 0); // 6
} while(more); // 7
}
}
catch(zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. The proxy connects as subscriber to the original publisher.
2. And it makes available a publisher socket so that clients could open a connection to it.
3. Remember that we must specify a filter, even if we don't actually want any filtering, as in this case.
4. This code is for VC++, so __int64 is used to refer to a 64 bit int.
5. Get the current value for the ZMQ_RCVMORE socket option.
6. Forward the message to the proxy clients.
7. When a message marked as having no following part is detected, the internal loop is terminated and a new one is going to start.

In the official Z-Guide you could find the original C example on which I have based this post, with more interesting talk on the matter.

No comments:

Post a Comment