Pages

Publish-subscribe proxy server

In the ØMQ jargon, a device is a component that acts as a mediator between groups. It is used to keep the system architecture simple but still giving it a good degree of flexibility.

Here we are about to see in action a device designed for expanding a pub-sub system, giving a way to a group of clients to access the original publisher in a different way.

The implementation is pretty simple. We have a new component, the proxy, that from the point of view of the original server is just a subscriber as all the other ones. The interesting fact is that it is also a publisher, that is going to provide to its subscribers all the messages it gets from its publisher. So a subscriber could connect to the proxy, and getting exactly the same stream of messages that would receive if connecting to the original publisher.

Notice that the proxy should manage correctly multipart messages, otherwise it would corrupt irremediably the message traffic to its clients.

Here is the C++ code for a simple proxy:
try
{
zmq::context_t context(1);

zmq::socket_t frontend(context, ZMQ_SUB); // 1
frontend.connect("tcp://localhost:5556");

zmq::socket_t backend(context, ZMQ_PUB); // 2
backend.bind("tcp://*:8100");
zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0); // 3

while(true)
{
__int64 more; // 4
do {
size_t size = sizeof(__int64);

zmq::message_t message;
frontend.recv(&message);
frontend.getsockopt(ZMQ_RCVMORE, &more, &size); // 5

std::cout << '.';

backend.send(message, more ? ZMQ_SNDMORE : 0); // 6
} while(more); // 7
}
}
catch(zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. The proxy connects as subscriber to the original publisher.
2. And it makes available a publisher socket so that clients could open a connection to it.
3. Remember that we must specify a filter, even if we don't actually want any filtering, as in this case.
4. This code is for VC++, so __int64 is used to refer to a 64 bit int.
5. Get the current value for the ZMQ_RCVMORE socket option.
6. Forward the message to the proxy clients.
7. When a message marked as having no following part is detected, the internal loop is terminated and a new one is going to start.

In the official Z-Guide you could find the original C example on which I have based this post, with more interesting talk on the matter.

Go to the full post

Multipart messages

ZeroMQ allows us working with simple messages, as we have seen in the previous posts, and with multipart ones. It is worthy to stress the fact that a multipart message is managed atomically by ØMQ, so a client would get all of it or nothing at all.

Let's modify the already seen publisher to let it send a multipart message that spans over three frames. The changes are localized in the for loop. Originally it was designed to send simple messages, now we change it in this way:
for(int i = 0; i < 100; ++i)
{
readyToSend();

for(int j = 0; j < 3; ++j) // 1
{
ss.str("");
ss << i%2 << ':' << i*42 << ':' << i << char('A' + j);
s = ss.str();

zmq::message_t message(s.length());
memcpy(message.data(), s.c_str(), s.length());

std::cout << "Sending " << s << std::endl;
publisher.send(message, (j != 2 ? ZMQ_SNDMORE : 0)); // 2
}
}

1. Internal for loop to send the three-framed multipart messages.
2. The key part of sending a multipart message is here. The flag ZMQ_SNDMORE is sent to specify that at least another frame is expected.

The client requires some more job. Let's modify the subscriber we have already written to read just one (possibly multipart) message and terminate. After creating a SUB socket, connecting it to the PUB, and setting the filter on the message stream, we call recv(). But now, we check the ZMQ_RCVMORE socket option to see if that the current message is in a multipart series, and it is not the last one:
while(true) // 1
{
zmq::message_t message;
subscriber.recv(&message);

__int64 more; // 2
size_t size = sizeof(__int64);
subscriber.getsockopt(ZMQ_RCVMORE, &more, &size); // 3
if(more) // 4
std::cout << "Reading ..." << std::endl;
else
{
std::cout << "Done" << std::endl;
break;
}
}

1. We loop until we get the last part of the current sequence of messages.
2. The option is stored in a 64 bit integer. This code is compiled with VC++, so it uses this __int64 non standard type. On linux you would probably use int64_t. If you want to compile the same code on different platforms, you should rely here on conditional compiling. Quite a nuisance, indeed.
3. Here is how we get a socket option.
4. If the "more" flag is set, it means we are reading a multipart message, and we haven't reach the final frame yet.


Publish-subscribe proxy server

In the ØMQ jargon, a device is a component that acts as a mediator between groups. It is used to keep the system architecture simple but still giving it a good degree of flexibility.

Here we are about to see in action a device designed for expanding a pub-sub system, giving a way to a group of clients to access the original publisher in a different way.

The implementation is pretty simple. We have a new component, the proxy, that from the point of view of the original server is just a subscriber as all the other ones. The interesting fact is that it is also a publisher, that is going to provide to its subscribers the messages it gets from its publisher.

Have a look to the official Z-Guide for more information.

Go to the full post

Improved divide and conquer

The simple ØMQ parallel processing application that we have implemented in the previous posts has a few minor flaws, the most noticeable of them is that the workers hang when the stream of messages terminates.

Now that we have seen how to poll on sockets, we can use this feature to solve this issue.

We want the sink sending a terminator when it detects the job has been completed. To do that we can't use the already existing socket connection between workers and sink, because it is a pull/push, that is one-directional, and here it goes from the workers to the sink. Since we want to send a message in the opposite direction, we create another socket connection, implementing a publisher/subscriber pattern. The sink would publish a kill message to all its subscribers, the workers.

Publisher

We change the code for the sink, creating a publisher socket that is going to send the terminating messages:
zmq::context_t context(1);

// ...

zmq::socket_t terminator(context, ZMQ_PUB);
terminator.bind("tcp://*:5559");

Then, after all the expected messages have been received and processed from the workers, an empty "killing" message is sent to the subscribers:
for(int task_nbr = 0; task_nbr < 100; task_nbr++)
{
   // ...
}

// ...

zmq::message_t kill(0);
terminator.send(kill);
boost::this_thread::sleep(boost::posix_time::seconds(1));

Subscribers

A bit more job is required in the workers.

Firstly, we create a subscription socket that connects to the publisher socket in the sink:
zmq::socket_t controller(context, ZMQ_SUB);
controller.connect("tcp://localhost:5559");
controller.setsockopt(ZMQ_SUBSCRIBE, "", 0);
Notice that we have to specify a filter even though, as in this case, we want it to be empty.

Secondly, since we already have in the worker code another socket that has to be checked for incoming messages (the one we called receiver, a pull socket connected to the push socket defined in the ventilator), we need to create an array of pollitem_t, so that we can actuall poll on them:
zmq::pollitem_t items [] =
{
   { receiver, 0, ZMQ_POLLIN, 0 },
   { controller, 0, ZMQ_POLLIN, 0 }
};
Thirdly, we change the while loop to poll on both sockets:
while(true)
{
   zmq::message_t msgIn;
   zmq::poll(items, 2); // 1

   if(items[0].revents & ZMQ_POLLIN) // 2
   {
      if(receiver.recv(&msgIn))
      {
         // ...
      }
      // ...
   }
   if(items[1].revents & ZMQ_POLLIN) // 3
   {
      std::cout << " Kill!" << std::endl;
      break;
   }
}
1. First change in the loop: we poll on the sockets, hanging forever (the underlying function API zmq_poll() is passed with its third parameter, timeout, set to -1) waiting for a new message.
2. Once poll() returns, signalling in this way that a message is available, we check which socket we should work with. The first one in the items array was the receiver, so in this branch we recv() its message, executing the same code we have precedently used.
3. Then we check for the controller. We know that when we get a message on the controller, that could mean only one thing: we have to shut down. So we don't even check the message, just break the loop.

In the official Z-Guide you will find more details on the solution design and the original C code on which I have based this C++ rewriting.

Go to the full post

Handling multiple sockets

When required, a client could connect to more than one ØMQ socket. That is quite easy so implement, just create two different instances of the zmq::socket_t (I writing code in C++ using the official ZeroMQ version 2.x wrapper) and poll on them for any message delivered. The tricky part is in managing the polling, but an example should make it clear.

The servers that our new client is about to connect to, are the already seen ventilator, implementing a push socket, and the publisher. We need a couple of new sockets, a pull and a subscriber one:
zmq::context_t context(1);

zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.connect("tcp://localhost:5557");

zmq::socket_t subscriber(context, ZMQ_SUB); // 2
subscriber.connect("tcp://localhost:5556");
const char* filter = "1";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));

zmq::pollitem_t items[] =
{ // 3
   { receiver, 0, ZMQ_POLLIN, 0 },
   { subscriber, 0, ZMQ_POLLIN, 0 }
};
while(true) // 4
{
   zmq::message_t message;
   zmq::poll(items, 2); // 5

   if(items[0].revents & ZMQ_POLLIN) // 6
   {
      receiver.recv(&message);
      std::cout << "Processing message from receiver" << std::endl;
      items[0].revents = 0;
   }
   if(items[1].revents & ZMQ_POLLIN) // 7
   {
      subscriber.recv(&message);
      std::cout << "Processing message from subscriber" << std::endl;
      items[1].revents = 0;
   }
}
1. This pull socket connects to the push one from the ventilator.
2. This subscriber socket connects to the publisher one. Remember that in this case we should also set a filter.
3. We put the sockets in an array of pollitem_t, see the documentation for more details, but basically what we care is about the socket we should poll (receiver/subscriber) and what is the event we interested in - ZMQ_POLLIN means a message received.
4. In this crude example we loop indefinitely
5. We poll on 2 elements of the passed item array. Notice that zmq::poll() calls zmq_poll() in this case passing to it as third parameter the default value of -1. That means that in case no message is found for any of the specified sockets, the call hangs indefinitely.
6. If poll signals a message in input for the first socket, we receive on it, and do the expected processing.
7. Same as (6) for the other socket.

This code should be in a try/catch block. Have a look at my previous posts on ØMQ for some other examples.

In the official Z-Guide you will find there the original C code on which I have based this C++ rewriting.

Go to the full post

Sink - ØMQ pull

Third and last part of the divide and conquer ØMQ application. Now we take care of the sink, the process that is the server, in a pull role, to which the workers send a message, using a push socket, when they complete their task as assigned by the ventilator.

This is the resulting C++ code:
try
{
zmq::context_t context(1);
zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.bind("tcp://*:5558"); // 2

{ // 3
zmq::message_t flag;
if(receiver.recv(&flag))
std::cout << '.';
else
std::cout << '!';
} // 4

boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time(); // 5

for(int task_nbr = 0; task_nbr < 100; task_nbr++) // 6
{
zmq::message_t tick;
if(receiver.recv(&tick))
std::cout << (task_nbr % 10 ? '.' : ':');
else
std::cout << '!';
}
std::cout << std::endl;

boost::posix_time::ptime end = boost::posix_time::microsec_clock::local_time(); // 7
std::cout << "Total time: " << end - start << std::endl;
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. We are about to use a pull socket.
2. It is set as a server using the TCP protocol.
3. We wait for the first message, signalling the start of batch.
4. Exiting its scope, the message is destroyed, and its cleanup is implicitely performed by zmq_msg_close().
5. Keep track of the current time, to do some statistics check.
6. One hundred "real" messages are expected, let's loop on all of them.
7. Calculate and report duration of batch

If you read the official Z-Guide you will find there the original C code that I have rewritten in C++ with some Boost to have some fun (!)

Go to the full post

Worker - ØMQ pull/push

Second step of our simple divide and conquer ØMQ application. Here we see the code for the worker, the process that is connected as client to the ventilator, being the pull side of a push/pull pattern, and to the sink, here acting as push in a pull/push relation.

Here is the code:
try
{
zmq::context_t context(1);

zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.connect("tcp://localhost:5557");

zmq::socket_t sender(context, ZMQ_PUSH); // 2
sender.connect("tcp://localhost:5558");

while(true)
{
zmq::message_t msgIn;
if(receiver.recv(&msgIn))
{
if(msgIn.size() == sizeof(int)) // 3
{
int delay = *(static_cast<int*>(msgIn.data())); // 4

std::cout << '[' << delay << ']';
boost::this_thread::sleep(boost::posix_time::milliseconds(delay)); // 5
}
else // 6
{
std::cout << "[-]";
}
}
else // 7
std::cout << '!';

zmq::message_t msgOut(0); // 8
sender.send(msgOut);
} // 9
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. A pull socket connects this process to the ventilator.
2. A push socket connects this process to the sink.
3. We expect the message being an int, and having its size.
4. If the message is as expected, we can safely see it as an int.
5. We fake the job sleeping for the number of milliseconds we get from the ventilator.
6. Messages of unexpected size are just discarded.
7. In case of problem receiving a message, we log a warning.
8. We send an empty message to the sink to show it that the worker elaboration has been completed.

This example is basically a rewrite in C++ with Boost of the code you find in the official Z-Guide.

Go to the full post

Ventilator - ØMQ push

We are creating a toy divide and conquer ØMQ application to show the push/pull pattern in action.

Here we are showing the code for the ventilator, the process that is delegated to split the original problem in a number of tiny sub-problems that could be passed to instances of the worker module to be solved using a parallel processing schema.

Besides it usage of the ZMQ_PUSH, this code is kind of interesting for its need for a random number - the fake length of each small sub-problem; and for sending data to the client that is not a stream of characters - here we send an integer.

To generate a pseudo-random number here we use the Boost library, creating this small class:
#include <boost/random.hpp>

class VentiRand
{
private:
boost::random::mt19937 generator_; // 1
boost::random::uniform_int_distribution<> random_; // 2
public:
VentiRand(int low, int hi) : random_(low, hi) {}

int getValue() { return random_(generator_); }

};

1. As generator we use the Mersenne twister, in its more commonly used 32 bit version.
2. We use a uniform distribution. Notice there is no type name inside the angular brackets since it is defaulted to int.

And this is the code for the ventilator:
try
{
zmq::context_t context(1); // 1
zmq::socket_t sender(context, ZMQ_PUSH); // 2
sender.bind("tcp://*:5557"); // 3

std::cout << "Press Enter when the workers are ready ";
std::string input;
std::getline(std::cin, input);
std::cout << "Sending tasks to workers" << std::endl;

{
int workload = 0;
zmq::message_t flag(sizeof(int));
memcpy(flag.data(), &workload, sizeof(int)); // 4

sender.send(flag); // 5
} // 6

VentiRand vr(1, 100); // 7
int total = 0; // 8
for(int task_nbr = 0; task_nbr < 100; ++task_nbr)
{
int workload = vr.getValue();
total += workload;

zmq::message_t message(sizeof(int));
memcpy(message.data(), &workload, sizeof(int)); // 9
sender.send(message); // 10
} // 11

std::cout << "Total expected cost: " << total << " msec" << std::endl;

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 12
} // 13
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. A ØMQ context is created, calling the API function zmq_init()
2. The socket used here, created calling zmq_socket(), is of type ZMQ_PUSH. It is similar to the publisher in the sense that allows many clients to connect, but it behaves differently, sending a single message to just one client, and not a copy to all the clients as the publisher does.
3. Calling zmq_bind() we bind the socket to a specific protocol and port.
4. We create a message, setting as data an int value (in this case zero).
5. This zero valued message is sent by zmq_send(), signaling the start of batch.
6. Going out of scope, the message is closed - zmq_msg_close().
7. The random generator is created. We want a number in the range [1..100], representing the time cost in milliseconds for the current task.
8. We keep track of the total expected cost for the tasks, so to measure the efficiency of our job.
9. A message is created, setting as data the random int value just calculated.
10. The message is sent by zmq_send().
11. Exiting its scope, the message is deleted, calling zmq_msg_close() for it.
12. Let this process sleep a while, to give ØMQ time to do its job
13. Context and socket go out of scope, zmq_close() and zmq_term() are called.

This example is basically a rewrite in C++, using Boost to make it more portable, of the code you find in the official Z-Guide.

Go to the full post

Divide and conquer by ØMQ push/pull

The example to show how to use the push/pull pattern in ØMQ is a tad more complex than the ones we have seen for the request/reply and publisher/subscriber ones.

We are about to create a simple implementation of nothing less than the parallel processing model based on a ventilator, many workers, and a sink.

The ventilator splits the (supposedly huge) problem in a number of tiny tasks that can be executed in parallel. Each worker processes any task that receives from the ventilator. The sink collects the results from the workers and produces the final output.

Ventilator and workers are connected by push/pull sockets, where the ventilator pushes and the workers pull. Same for the connection between workers and sink, but in this case the sink plays the pull role, while the workers are pushing.

We have exactly one ventilator and one sink, but we can have a variable number of workers.

Not considering the details specific to the current problem, we'll see that there is not a big difference between a ventilator and the publisher, besides the underlying socket type, here ZMQ_PUSH. More interesting the structure of worker component, that is going to use to sockets, one to connect to the ventilator, here it is playing the ZMQ_PULL role, and one to the sink, as ZMQ_PUSH. Last but not least, the sink, that is connected to each available workers through a ZMQ_PULL socket.

I have jotted down these lines while reading the official Z-Guide.

Go to the full post

ØMQ subscriber

We are talking about the pub-sub pattern, as implemented in ØMQ. We have already seen a simple publisher, now we are about to write a client for it.

We need a few information on the publisher before starting writing the code for the subscriber: where it is and what it is sending around. Checking its code we see that we can connect it using the TCP protocol. I assume you are going to run both application on the same machine, so as IP address we use localhost; and the port number is 5556. The message is sent as a C-string of characters (without the terminator) and it is expected to be in a X:Y:Z format, three integers colon separated.

Then we should deciding what our client want actually doing with the stream of data coming from the server. Let's say that we want to get ten messages, extract the second integer from each of them (when at least two integers actually are in it), sum them and display the result to the user.

Knowing that, we can write the subscriber code, that should end up in something like this:
try
{
zmq::context_t context(1); // 1

std::cout << "Collecting updates from publisher" << std::endl;
zmq::socket_t subscriber(context, ZMQ_SUB); // 2
subscriber.connect("tcp://localhost:5556"); // 3

char* filter = "1";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); // 4

long total = 0;
for(int update_nbr = 0; update_nbr < 10; update_nbr++) // 5
{
zmq::message_t message; // 6
if(subscriber.recv(&message)) // 7
{
std::string data((char*)message.data(), (char*)message.data() + message.size()); // 8
total += getRelevantValue(data); // 9
}
}
std::cout << "Total collected for code " << filter << " is " << total << std::endl;
} // 10
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. As usual, first thing is initializing the context. This results in a call to zmq_init().
2. Then we create a socket - under the curtain a call to zmq_socket() is done. The socket type is ZMQ_SUB, stating this is a subscriber, created to work with a publisher.
3. From the socket we try to connect to the server, API function is zmq_connect(), specifying protocol and address.
4. This is an important point. We must specify a filter, setting it as an option for the socket. In this case looks natural, since we want to get only the messages starting with "1", but even in the case we don't actually want any filter we must set it (to an empty string). Here the API function in the background is zmq_setsockopt().
5. Let's get ten messages coming from the publisher.
6. A new message is created, the API function zmq_msg_init() would be called here.
7. The message is filled, by zmq_recv(), with data coming from the server.
8. The raw character array is converted in a std::string, so that it could be actually used by the code.
9. A utility function is called to implement the actual client logic - its example code is shown below.
10. The socket goes out of scope, zmq_close() is called by its dtor, and the same for the context, that it result in a call to zmq_term().

The code below is not related to ØMQ but it is kind of fun (in a twisted way) since it makes use of a couple of boost functionality. The task of this function is splitting its input std:string, converting the second field to integer, and returning it. By default (for instance, if the second element in the string is not an integer) it returns zero:
int getRelevantValue(const std::string& data)
{
std::vector<std::string> vs;
vs.reserve(3); // 1
boost::split(vs, data, boost::is_any_of(":")); // 2

std::cout << "Message received: "; // 3
std::for_each(vs.begin(), vs.end(), [](std::string s)
{
std::cout << s << ' ';
});
std::cout << std::endl;

if(vs.size() > 1)
try { return boost::lexical_cast<int>(vs[1]); } catch(...) {} // 4
return 0; // 5
}

1. Kind of overkilling. Its main sense is documenting that we are expecting the string to be split in three fields.
2. This friendly Boost Algorithm function puts in the first parameter the result of splitting the second parameter using the list of possible delimiters specified in the third parameter.
3. For debug purposes we dump to standard output the resulting vector elements - I usually enjoy coupling the standard algorithm for_each() with a lambda function in a case like that.
4. If we actually have at least two elements, we get the second, cast it to int and return it. Notice that I used the Boost lexical_cast<> construct to do that.
5. If there are less than two elements, or we didn't succeed in casting the second element to int, we return zero, as designed.

The official Z-Guide has much more to tell you if you are interested in such kind of things.

Go to the full post

ØMQ publisher

After some general talking on the ØMQ publisher-subscriber pattern, let's see the gory details, writing a simple server that acts as a publisher, sending messages to anyone who is concerned.

Here is the C++ code that I have written, see below for a few comments:
try
{
zmq::context_t context(1); // 1
zmq::socket_t publisher(context, ZMQ_PUB); // 2
publisher.bind("tcp://*:5556"); // 3

std::stringstream ss;
std::string s;
for(int i = 0; i < 100; ++i)
{
readyToSend(); // 4

ss.str("");
ss << i%2 << ':' << i*42 << ':' << i; // 5.
s = ss.str();

zmq::message_t message((void *)s.c_str(), s.length(), NULL); // 6
std::cout << "Sending " << s << std::endl;
publisher.send(message); // 7
} // 8
} // 9
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. Through a context object we control the aquisition of the ØMQ resource, as the RAII idiom suggests. The class ctor calls the API function zmq_init().
2. Same RAII behaviour for socket_t, where its ctor call zmq_socket(). Here the socket type is ZMQ_PUB, since we want our application acting as a publisher in pub-sub context.
3. The socket bind function calls the ØMQ API function zmq_bind(), specifying here that we want to use the TCP protocol and the port on the host that we want to be grab.
4. A function that is going to determine when the application is ready to fire a new message.
5. Just put some silly data in the message. We assume a format X:Y:Z, three integers colon separated.
6. Let's create a message that uses as data the memory made available in the std::string s. The ctor used here calls the API function zmq_msg_init_data().
7. The message is sent through the socket - actually using zmq_send().
8. The message object goes out of scope, its dtor calls zmq_msg_close()
9. The socket goes out of scope, calling zmq_close(), ditto for the context - zmq_term().

The function called at [4] has the only aim of making testing easier. It just stops the application execution, asking the user a confirmation to continue:
void readyToSend()
{
std::cout << "Enter when ready" << std::endl;
std::string input;
std::getline(std::cin, input);
}

If you have found this post interesting, you are going to love the official Z-Guide.

Go to the full post

Pub-Sub with ØMQ

With ØMQ we can easily implements different messaging patterns. We have already seen how to build a client/server system for the request/reply pattern. Now we tackle the publish/subscribe pattern.

When we use the request/reply pattern, the server hangs on, waiting for a client request. When a message from the client arrives, the server provides a reply, and then it patiently waits for the next request.

In the publish/subscribe pattern there is no such a strong coupling. The server freely publishes its messages, and any client that it is interested in such source should subscribe to receive them. Usually the client is not interested in all the messages the server publish, and to get just what it really wants to get, it applies a filter on the traffic.

In the next couple of posts we are going to see a simple implementation of this pattern, but let me give you a spoiler: do not expect big changes.

The server creates a ZMQ_PUB socket, telling ØMQ that it wants to play the publisher role, and then it will send messages accordingly to the required logic.

The client creates a ZMQ_SUB socket, since it wants to be a subscriber, and - here is a substantial change - specifies which filters should be applied to the traffic coming from the publisher. Then is just a matter of receiving messages.

You can find more information in the official Z-Guide. I am currently reading it, and I find it fun and useful.

Go to the full post

Simple ØMQ TCP client

A server doesn't have much sense without a client. So here I complete the previous post, where we wrote a simple ØMQ TCP echo server, with its companion client.

Since they work in a team, they should respect the same conventions. Messages are expected to be array of characters (sort of C-string but without the terminator), and a empty message could be sent to shut down the server.

Here is the C++ code I have written, below a few notes on it:
try
{
   zmq::context_t context(1); // 1

   std::cout << "Connecting to echo server" << std::endl;
   zmq::socket_t socket(context, ZMQ_REQ); // 2
   socket.connect("tcp://localhost:50013"); // 3

   std::stringstream ss; // 4
   ss << "Hello ";
   for(int request_nbr = 0; request_nbr != 10; ++request_nbr)
   {
      ss << request_nbr;
      std::string message(ss.str());

      {
         zmq::message_t request((void *)message.c_str(), message.length(), NULL); // 5

         std::cout << "Sending " << message << std::endl;
         socket.send(request); // 6
      } // 7

      {
         zmq::message_t reply; // 8
         socket.recv(&reply); // 9

         std::cout << "Received ";
         std::for_each((char*)reply.data(), (char*)reply.data() + reply.size(),
            [](char c){ std::cout << c;}); // 10
         std::cout << std::endl;
      } // 11
   }

   zmq::message_t request(0); // 12
   socket.send(request);
   std::cout << "Sending empty message and terminating" << std::endl;
} // 13
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}
1. As we have done for the server, as first thing we create a zmq::context_t object, so that its constructor calls zmq_init().
2. We create a socket - internally zmq_socket() is called - specifying ZMQ_REQ as type. This is the request side of a request/reply pattern.
3. A call to zmq::socket_t::connect, resolved to zmq_connect(), to establish a connection to the server - notice that we specify the TCP/IP address (here localhost) and the port.
4. For giving a bit of variety, the message is built from a stream.
5. A zmq::message_t is built, using the data in the stringstream. The constructor, in this case, calls zmq_msg_init_data(). Notice that the data is not copied to message_t, just the passed pointer is used - so you should be very careful using this method. By the way, the third parameter, here a NULL, is a pointer to the function that is called by the dtor, to cleanup the data, if required. Here we want to leave the job to stringstream.
6. The message is sent through the socket to the server. The C-API function called is zmq_send().
7. Here the message goes out of scope, its dtor is called, and through it zmq_msg_close().
8. The default ctor is called for zmq::message_t, that resolves in a call to zmq_msg_init().
9. The call to zmq::socket_t::recv() shields a call to zmq_recv().
10. The data message is not a proper C-string, since it is not '/0' terminated. So I can't dump it to the standard output console without doing some job on it. Usually it should be a good idea to create a std::string out of it, like this:
std::string feedback((char*)reply.data(), (char*)reply.data() + reply.size());
But here we won't have any other use in that string after printing it, so why not having a bit of fun using a std::for_each() algorithm coupled with a lambda function instead?
11. As in (7), zmq_msg_close() is called by the message_t dtor.
12. We create and send a zero-sized message, to terminate the server.
13. Here a couple of destructors are called. The socket one, that calls zmq_close(), and the context one, that calls zmq_term().

I wrote this post while reading the official Z-Guide. A fun and useful reading indeed.

Go to the full post