Pages

Broker by ZeroMQ built-in device

We have seen in the previous post how is conceptually easy to implement a ZMQ broker that connects request/reply sockets using a pair of router/dealer sockets.

One could notice that the core of the broker consists in setting up a polling on its sockets and then repeatedly call that chunk of code we appropriately moved in a specific function we called receiveAndSend(). And he could say that, being this structure so common, it could be worth move it in an utility function and make it generally available.

The ZeroMQ guys had the same feeling, so we have an API function, zmq_device() that has exactely this purpose.

The C++ code for our broker could be simplified like this:
try
{
zmq::context_t context(1);

zmq::socket_t frontend(context, ZMQ_ROUTER);
frontend.bind("tcp://*:5559");

zmq::socket_t backend(context, ZMQ_DEALER);
backend.bind("tcp://*:5560");

zmq::device(ZMQ_QUEUE, frontend, backend);
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

We simply call zmq::device(), specifying that we want the messages managed in a queue, and that is it.

The Z-Guide gives you more information on the ØMQ devices.

Go to the full post

Broker for request/reply

We have used the ZeroMQ request/reply pattern to create a very simple echo application that connects synchronously a client, that asks for an echo of its message, and a server, that provides the service.

Now we want to extend that application, so that we could have many server and many clients, and even have a way to change their number dynamically.

How could we do that? Adding something in between that take cares of the extra complexity we want. This something is called broker and it does the dirty job of getting a request from a client, sending it to a server, and then doing the same, but the other way round, with the reply.

The changes in the client code are minimal. The just have to ensure the request socket is now connecting to the broker. So, if it is on localhost at port 5559 we'll have this:
socket.connect("tcp://localhost:5559");
More relevant are the changes in the service that provides a reply. Actually it is not anymore a proper server, it is now seen as a broker client that connects to it in the REP role. So its socket would be created an setup like this:
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.connect("tcp://localhost:5560");
Besides, for testing purpose makes sense to add in the code a delay between receiving and sending back the message, maybe using the Boost sleep() function:
// some time expensive job
boost::this_thread::sleep(boost::posix_time::seconds(1));
The fun stuff, as one could expect, is in the broker. It can't use REQ and REP sockets, since we want it to be asynchronous, so it uses ROUTER and DEALER sockets, that interfaces with request and reply, but leaving us the flexibility of a non-blocking interface.

Here is a C++ possible implementation:
try
{
   zmq::context_t context(1);

   zmq::socket_t frontend(context, ZMQ_ROUTER); // 1
   frontend.bind("tcp://*:5559");

   zmq::socket_t backend(context, ZMQ_DEALER); // 2
   backend.bind("tcp://*:5560");

   const int NR_ITEMS = 2;
   zmq_pollitem_t items[NR_ITEMS] =
   {
      { frontend, 0, ZMQ_POLLIN, 0 },
      { backend, 0, ZMQ_POLLIN, 0 }
   };

   while(true)
   {
      zmq::poll(items, NR_ITEMS); // 3

      if(items[0].revents & ZMQ_POLLIN) // frontend sent a message
         receiveAndSend(frontend, backend);
      if(items[1].revents & ZMQ_POLLIN) // backend sent a message
         receiveAndSend(backend, frontend);
   }
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}
1. This ROUTER socket connects to the REQ socket, accepting its request.
2. The DEALER socket is used to connect to the REP socket, to forward to it the client request.
3. The broker hangs waiting for messages from both sides of its connections.

The heart of the broker is the receiveAndSend() function that acts as a mediator between the REQ and REP sockets. Let see how I have implemented it:
namespace
{
   size_t size = sizeof(__int64);

   void receiveAndSend(zmq::socket_t& from, zmq::socket_t& to)
   {
      __int64 more;
      do {
         zmq::message_t message;
         from.recv(&message);
         from.getsockopt(ZMQ_RCVMORE, &more, &size);

         to.send(message, more ? ZMQ_SNDMORE : 0);
      } while(more);
   }
}
Even though our REQ/REP message exchange is meant to involve only non-multipart messages, our broker should actually manage them. The fact is that the ROUTER/DEALER protocol has to keep track in some way of the origin/destination of the traffic, and this is done adding a frame in the message - we don't care what it is actually in it, at least for the moment, but we should manage it correctly, if we want our application to work.

More details on broker router/dealer sockets in the official Z-Guide, where you could also find the original C code on which I have based the C++ code have seen above.

Go to the full post

Publish-subscribe proxy server

In the ØMQ jargon, a device is a component that acts as a mediator between groups. It is used to keep the system architecture simple but still giving it a good degree of flexibility.

Here we are about to see in action a device designed for expanding a pub-sub system, giving a way to a group of clients to access the original publisher in a different way.

The implementation is pretty simple. We have a new component, the proxy, that from the point of view of the original server is just a subscriber as all the other ones. The interesting fact is that it is also a publisher, that is going to provide to its subscribers all the messages it gets from its publisher. So a subscriber could connect to the proxy, and getting exactly the same stream of messages that would receive if connecting to the original publisher.

Notice that the proxy should manage correctly multipart messages, otherwise it would corrupt irremediably the message traffic to its clients.

Here is the C++ code for a simple proxy:
try
{
zmq::context_t context(1);

zmq::socket_t frontend(context, ZMQ_SUB); // 1
frontend.connect("tcp://localhost:5556");

zmq::socket_t backend(context, ZMQ_PUB); // 2
backend.bind("tcp://*:8100");
zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0); // 3

while(true)
{
__int64 more; // 4
do {
size_t size = sizeof(__int64);

zmq::message_t message;
frontend.recv(&message);
frontend.getsockopt(ZMQ_RCVMORE, &more, &size); // 5

std::cout << '.';

backend.send(message, more ? ZMQ_SNDMORE : 0); // 6
} while(more); // 7
}
}
catch(zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. The proxy connects as subscriber to the original publisher.
2. And it makes available a publisher socket so that clients could open a connection to it.
3. Remember that we must specify a filter, even if we don't actually want any filtering, as in this case.
4. This code is for VC++, so __int64 is used to refer to a 64 bit int.
5. Get the current value for the ZMQ_RCVMORE socket option.
6. Forward the message to the proxy clients.
7. When a message marked as having no following part is detected, the internal loop is terminated and a new one is going to start.

In the official Z-Guide you could find the original C example on which I have based this post, with more interesting talk on the matter.

Go to the full post

Multipart messages

ZeroMQ allows us working with simple messages, as we have seen in the previous posts, and with multipart ones. It is worthy to stress the fact that a multipart message is managed atomically by ØMQ, so a client would get all of it or nothing at all.

Let's modify the already seen publisher to let it send a multipart message that spans over three frames. The changes are localized in the for loop. Originally it was designed to send simple messages, now we change it in this way:
for(int i = 0; i < 100; ++i)
{
readyToSend();

for(int j = 0; j < 3; ++j) // 1
{
ss.str("");
ss << i%2 << ':' << i*42 << ':' << i << char('A' + j);
s = ss.str();

zmq::message_t message(s.length());
memcpy(message.data(), s.c_str(), s.length());

std::cout << "Sending " << s << std::endl;
publisher.send(message, (j != 2 ? ZMQ_SNDMORE : 0)); // 2
}
}

1. Internal for loop to send the three-framed multipart messages.
2. The key part of sending a multipart message is here. The flag ZMQ_SNDMORE is sent to specify that at least another frame is expected.

The client requires some more job. Let's modify the subscriber we have already written to read just one (possibly multipart) message and terminate. After creating a SUB socket, connecting it to the PUB, and setting the filter on the message stream, we call recv(). But now, we check the ZMQ_RCVMORE socket option to see if that the current message is in a multipart series, and it is not the last one:
while(true) // 1
{
zmq::message_t message;
subscriber.recv(&message);

__int64 more; // 2
size_t size = sizeof(__int64);
subscriber.getsockopt(ZMQ_RCVMORE, &more, &size); // 3
if(more) // 4
std::cout << "Reading ..." << std::endl;
else
{
std::cout << "Done" << std::endl;
break;
}
}

1. We loop until we get the last part of the current sequence of messages.
2. The option is stored in a 64 bit integer. This code is compiled with VC++, so it uses this __int64 non standard type. On linux you would probably use int64_t. If you want to compile the same code on different platforms, you should rely here on conditional compiling. Quite a nuisance, indeed.
3. Here is how we get a socket option.
4. If the "more" flag is set, it means we are reading a multipart message, and we haven't reach the final frame yet.


Publish-subscribe proxy server

In the ØMQ jargon, a device is a component that acts as a mediator between groups. It is used to keep the system architecture simple but still giving it a good degree of flexibility.

Here we are about to see in action a device designed for expanding a pub-sub system, giving a way to a group of clients to access the original publisher in a different way.

The implementation is pretty simple. We have a new component, the proxy, that from the point of view of the original server is just a subscriber as all the other ones. The interesting fact is that it is also a publisher, that is going to provide to its subscribers the messages it gets from its publisher.

Have a look to the official Z-Guide for more information.

Go to the full post

Improved divide and conquer

The simple ØMQ parallel processing application that we have implemented in the previous posts has a few minor flaws, the most noticeable of them is that the workers hang when the stream of messages terminates.

Now that we have seen how to poll on sockets, we can use this feature to solve this issue.

We want the sink sending a terminator when it detects the job has been completed. To do that we can't use the already existing socket connection between workers and sink, because it is a pull/push, that is one-directional, and here it goes from the workers to the sink. Since we want to send a message in the opposite direction, we create another socket connection, implementing a publisher/subscriber pattern. The sink would publish a kill message to all its subscribers, the workers.

Publisher

We change the code for the sink, creating a publisher socket that is going to send the terminating messages:
zmq::context_t context(1);

// ...

zmq::socket_t terminator(context, ZMQ_PUB);
terminator.bind("tcp://*:5559");

Then, after all the expected messages have been received and processed from the workers, an empty "killing" message is sent to the subscribers:
for(int task_nbr = 0; task_nbr < 100; task_nbr++)
{
   // ...
}

// ...

zmq::message_t kill(0);
terminator.send(kill);
boost::this_thread::sleep(boost::posix_time::seconds(1));

Subscribers

A bit more job is required in the workers.

Firstly, we create a subscription socket that connects to the publisher socket in the sink:
zmq::socket_t controller(context, ZMQ_SUB);
controller.connect("tcp://localhost:5559");
controller.setsockopt(ZMQ_SUBSCRIBE, "", 0);
Notice that we have to specify a filter even though, as in this case, we want it to be empty.

Secondly, since we already have in the worker code another socket that has to be checked for incoming messages (the one we called receiver, a pull socket connected to the push socket defined in the ventilator), we need to create an array of pollitem_t, so that we can actuall poll on them:
zmq::pollitem_t items [] =
{
   { receiver, 0, ZMQ_POLLIN, 0 },
   { controller, 0, ZMQ_POLLIN, 0 }
};
Thirdly, we change the while loop to poll on both sockets:
while(true)
{
   zmq::message_t msgIn;
   zmq::poll(items, 2); // 1

   if(items[0].revents & ZMQ_POLLIN) // 2
   {
      if(receiver.recv(&msgIn))
      {
         // ...
      }
      // ...
   }
   if(items[1].revents & ZMQ_POLLIN) // 3
   {
      std::cout << " Kill!" << std::endl;
      break;
   }
}
1. First change in the loop: we poll on the sockets, hanging forever (the underlying function API zmq_poll() is passed with its third parameter, timeout, set to -1) waiting for a new message.
2. Once poll() returns, signalling in this way that a message is available, we check which socket we should work with. The first one in the items array was the receiver, so in this branch we recv() its message, executing the same code we have precedently used.
3. Then we check for the controller. We know that when we get a message on the controller, that could mean only one thing: we have to shut down. So we don't even check the message, just break the loop.

In the official Z-Guide you will find more details on the solution design and the original C code on which I have based this C++ rewriting.

Go to the full post

Handling multiple sockets

When required, a client could connect to more than one ØMQ socket. That is quite easy so implement, just create two different instances of the zmq::socket_t (I writing code in C++ using the official ZeroMQ version 2.x wrapper) and poll on them for any message delivered. The tricky part is in managing the polling, but an example should make it clear.

The servers that our new client is about to connect to, are the already seen ventilator, implementing a push socket, and the publisher. We need a couple of new sockets, a pull and a subscriber one:
zmq::context_t context(1);

zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.connect("tcp://localhost:5557");

zmq::socket_t subscriber(context, ZMQ_SUB); // 2
subscriber.connect("tcp://localhost:5556");
const char* filter = "1";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));

zmq::pollitem_t items[] =
{ // 3
   { receiver, 0, ZMQ_POLLIN, 0 },
   { subscriber, 0, ZMQ_POLLIN, 0 }
};
while(true) // 4
{
   zmq::message_t message;
   zmq::poll(items, 2); // 5

   if(items[0].revents & ZMQ_POLLIN) // 6
   {
      receiver.recv(&message);
      std::cout << "Processing message from receiver" << std::endl;
      items[0].revents = 0;
   }
   if(items[1].revents & ZMQ_POLLIN) // 7
   {
      subscriber.recv(&message);
      std::cout << "Processing message from subscriber" << std::endl;
      items[1].revents = 0;
   }
}
1. This pull socket connects to the push one from the ventilator.
2. This subscriber socket connects to the publisher one. Remember that in this case we should also set a filter.
3. We put the sockets in an array of pollitem_t, see the documentation for more details, but basically what we care is about the socket we should poll (receiver/subscriber) and what is the event we interested in - ZMQ_POLLIN means a message received.
4. In this crude example we loop indefinitely
5. We poll on 2 elements of the passed item array. Notice that zmq::poll() calls zmq_poll() in this case passing to it as third parameter the default value of -1. That means that in case no message is found for any of the specified sockets, the call hangs indefinitely.
6. If poll signals a message in input for the first socket, we receive on it, and do the expected processing.
7. Same as (6) for the other socket.

This code should be in a try/catch block. Have a look at my previous posts on ØMQ for some other examples.

In the official Z-Guide you will find there the original C code on which I have based this C++ rewriting.

Go to the full post

Sink - ØMQ pull

Third and last part of the divide and conquer ØMQ application. Now we take care of the sink, the process that is the server, in a pull role, to which the workers send a message, using a push socket, when they complete their task as assigned by the ventilator.

This is the resulting C++ code:
try
{
zmq::context_t context(1);
zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.bind("tcp://*:5558"); // 2

{ // 3
zmq::message_t flag;
if(receiver.recv(&flag))
std::cout << '.';
else
std::cout << '!';
} // 4

boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time(); // 5

for(int task_nbr = 0; task_nbr < 100; task_nbr++) // 6
{
zmq::message_t tick;
if(receiver.recv(&tick))
std::cout << (task_nbr % 10 ? '.' : ':');
else
std::cout << '!';
}
std::cout << std::endl;

boost::posix_time::ptime end = boost::posix_time::microsec_clock::local_time(); // 7
std::cout << "Total time: " << end - start << std::endl;
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. We are about to use a pull socket.
2. It is set as a server using the TCP protocol.
3. We wait for the first message, signalling the start of batch.
4. Exiting its scope, the message is destroyed, and its cleanup is implicitely performed by zmq_msg_close().
5. Keep track of the current time, to do some statistics check.
6. One hundred "real" messages are expected, let's loop on all of them.
7. Calculate and report duration of batch

If you read the official Z-Guide you will find there the original C code that I have rewritten in C++ with some Boost to have some fun (!)

Go to the full post

Worker - ØMQ pull/push

Second step of our simple divide and conquer ØMQ application. Here we see the code for the worker, the process that is connected as client to the ventilator, being the pull side of a push/pull pattern, and to the sink, here acting as push in a pull/push relation.

Here is the code:
try
{
zmq::context_t context(1);

zmq::socket_t receiver(context, ZMQ_PULL); // 1
receiver.connect("tcp://localhost:5557");

zmq::socket_t sender(context, ZMQ_PUSH); // 2
sender.connect("tcp://localhost:5558");

while(true)
{
zmq::message_t msgIn;
if(receiver.recv(&msgIn))
{
if(msgIn.size() == sizeof(int)) // 3
{
int delay = *(static_cast<int*>(msgIn.data())); // 4

std::cout << '[' << delay << ']';
boost::this_thread::sleep(boost::posix_time::milliseconds(delay)); // 5
}
else // 6
{
std::cout << "[-]";
}
}
else // 7
std::cout << '!';

zmq::message_t msgOut(0); // 8
sender.send(msgOut);
} // 9
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. A pull socket connects this process to the ventilator.
2. A push socket connects this process to the sink.
3. We expect the message being an int, and having its size.
4. If the message is as expected, we can safely see it as an int.
5. We fake the job sleeping for the number of milliseconds we get from the ventilator.
6. Messages of unexpected size are just discarded.
7. In case of problem receiving a message, we log a warning.
8. We send an empty message to the sink to show it that the worker elaboration has been completed.

This example is basically a rewrite in C++ with Boost of the code you find in the official Z-Guide.

Go to the full post

Ventilator - ØMQ push

We are creating a toy divide and conquer ØMQ application to show the push/pull pattern in action.

Here we are showing the code for the ventilator, the process that is delegated to split the original problem in a number of tiny sub-problems that could be passed to instances of the worker module to be solved using a parallel processing schema.

Besides it usage of the ZMQ_PUSH, this code is kind of interesting for its need for a random number - the fake length of each small sub-problem; and for sending data to the client that is not a stream of characters - here we send an integer.

To generate a pseudo-random number here we use the Boost library, creating this small class:
#include <boost/random.hpp>

class VentiRand
{
private:
boost::random::mt19937 generator_; // 1
boost::random::uniform_int_distribution<> random_; // 2
public:
VentiRand(int low, int hi) : random_(low, hi) {}

int getValue() { return random_(generator_); }

};

1. As generator we use the Mersenne twister, in its more commonly used 32 bit version.
2. We use a uniform distribution. Notice there is no type name inside the angular brackets since it is defaulted to int.

And this is the code for the ventilator:
try
{
zmq::context_t context(1); // 1
zmq::socket_t sender(context, ZMQ_PUSH); // 2
sender.bind("tcp://*:5557"); // 3

std::cout << "Press Enter when the workers are ready ";
std::string input;
std::getline(std::cin, input);
std::cout << "Sending tasks to workers" << std::endl;

{
int workload = 0;
zmq::message_t flag(sizeof(int));
memcpy(flag.data(), &workload, sizeof(int)); // 4

sender.send(flag); // 5
} // 6

VentiRand vr(1, 100); // 7
int total = 0; // 8
for(int task_nbr = 0; task_nbr < 100; ++task_nbr)
{
int workload = vr.getValue();
total += workload;

zmq::message_t message(sizeof(int));
memcpy(message.data(), &workload, sizeof(int)); // 9
sender.send(message); // 10
} // 11

std::cout << "Total expected cost: " << total << " msec" << std::endl;

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 12
} // 13
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. A ØMQ context is created, calling the API function zmq_init()
2. The socket used here, created calling zmq_socket(), is of type ZMQ_PUSH. It is similar to the publisher in the sense that allows many clients to connect, but it behaves differently, sending a single message to just one client, and not a copy to all the clients as the publisher does.
3. Calling zmq_bind() we bind the socket to a specific protocol and port.
4. We create a message, setting as data an int value (in this case zero).
5. This zero valued message is sent by zmq_send(), signaling the start of batch.
6. Going out of scope, the message is closed - zmq_msg_close().
7. The random generator is created. We want a number in the range [1..100], representing the time cost in milliseconds for the current task.
8. We keep track of the total expected cost for the tasks, so to measure the efficiency of our job.
9. A message is created, setting as data the random int value just calculated.
10. The message is sent by zmq_send().
11. Exiting its scope, the message is deleted, calling zmq_msg_close() for it.
12. Let this process sleep a while, to give ØMQ time to do its job
13. Context and socket go out of scope, zmq_close() and zmq_term() are called.

This example is basically a rewrite in C++, using Boost to make it more portable, of the code you find in the official Z-Guide.

Go to the full post

Divide and conquer by ØMQ push/pull

The example to show how to use the push/pull pattern in ØMQ is a tad more complex than the ones we have seen for the request/reply and publisher/subscriber ones.

We are about to create a simple implementation of nothing less than the parallel processing model based on a ventilator, many workers, and a sink.

The ventilator splits the (supposedly huge) problem in a number of tiny tasks that can be executed in parallel. Each worker processes any task that receives from the ventilator. The sink collects the results from the workers and produces the final output.

Ventilator and workers are connected by push/pull sockets, where the ventilator pushes and the workers pull. Same for the connection between workers and sink, but in this case the sink plays the pull role, while the workers are pushing.

We have exactly one ventilator and one sink, but we can have a variable number of workers.

Not considering the details specific to the current problem, we'll see that there is not a big difference between a ventilator and the publisher, besides the underlying socket type, here ZMQ_PUSH. More interesting the structure of worker component, that is going to use to sockets, one to connect to the ventilator, here it is playing the ZMQ_PULL role, and one to the sink, as ZMQ_PUSH. Last but not least, the sink, that is connected to each available workers through a ZMQ_PULL socket.

I have jotted down these lines while reading the official Z-Guide.

Go to the full post

ØMQ subscriber

We are talking about the pub-sub pattern, as implemented in ØMQ. We have already seen a simple publisher, now we are about to write a client for it.

We need a few information on the publisher before starting writing the code for the subscriber: where it is and what it is sending around. Checking its code we see that we can connect it using the TCP protocol. I assume you are going to run both application on the same machine, so as IP address we use localhost; and the port number is 5556. The message is sent as a C-string of characters (without the terminator) and it is expected to be in a X:Y:Z format, three integers colon separated.

Then we should deciding what our client want actually doing with the stream of data coming from the server. Let's say that we want to get ten messages, extract the second integer from each of them (when at least two integers actually are in it), sum them and display the result to the user.

Knowing that, we can write the subscriber code, that should end up in something like this:
try
{
zmq::context_t context(1); // 1

std::cout << "Collecting updates from publisher" << std::endl;
zmq::socket_t subscriber(context, ZMQ_SUB); // 2
subscriber.connect("tcp://localhost:5556"); // 3

char* filter = "1";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); // 4

long total = 0;
for(int update_nbr = 0; update_nbr < 10; update_nbr++) // 5
{
zmq::message_t message; // 6
if(subscriber.recv(&message)) // 7
{
std::string data((char*)message.data(), (char*)message.data() + message.size()); // 8
total += getRelevantValue(data); // 9
}
}
std::cout << "Total collected for code " << filter << " is " << total << std::endl;
} // 10
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. As usual, first thing is initializing the context. This results in a call to zmq_init().
2. Then we create a socket - under the curtain a call to zmq_socket() is done. The socket type is ZMQ_SUB, stating this is a subscriber, created to work with a publisher.
3. From the socket we try to connect to the server, API function is zmq_connect(), specifying protocol and address.
4. This is an important point. We must specify a filter, setting it as an option for the socket. In this case looks natural, since we want to get only the messages starting with "1", but even in the case we don't actually want any filter we must set it (to an empty string). Here the API function in the background is zmq_setsockopt().
5. Let's get ten messages coming from the publisher.
6. A new message is created, the API function zmq_msg_init() would be called here.
7. The message is filled, by zmq_recv(), with data coming from the server.
8. The raw character array is converted in a std::string, so that it could be actually used by the code.
9. A utility function is called to implement the actual client logic - its example code is shown below.
10. The socket goes out of scope, zmq_close() is called by its dtor, and the same for the context, that it result in a call to zmq_term().

The code below is not related to ØMQ but it is kind of fun (in a twisted way) since it makes use of a couple of boost functionality. The task of this function is splitting its input std:string, converting the second field to integer, and returning it. By default (for instance, if the second element in the string is not an integer) it returns zero:
int getRelevantValue(const std::string& data)
{
std::vector<std::string> vs;
vs.reserve(3); // 1
boost::split(vs, data, boost::is_any_of(":")); // 2

std::cout << "Message received: "; // 3
std::for_each(vs.begin(), vs.end(), [](std::string s)
{
std::cout << s << ' ';
});
std::cout << std::endl;

if(vs.size() > 1)
try { return boost::lexical_cast<int>(vs[1]); } catch(...) {} // 4
return 0; // 5
}

1. Kind of overkilling. Its main sense is documenting that we are expecting the string to be split in three fields.
2. This friendly Boost Algorithm function puts in the first parameter the result of splitting the second parameter using the list of possible delimiters specified in the third parameter.
3. For debug purposes we dump to standard output the resulting vector elements - I usually enjoy coupling the standard algorithm for_each() with a lambda function in a case like that.
4. If we actually have at least two elements, we get the second, cast it to int and return it. Notice that I used the Boost lexical_cast<> construct to do that.
5. If there are less than two elements, or we didn't succeed in casting the second element to int, we return zero, as designed.

The official Z-Guide has much more to tell you if you are interested in such kind of things.

Go to the full post

ØMQ publisher

After some general talking on the ØMQ publisher-subscriber pattern, let's see the gory details, writing a simple server that acts as a publisher, sending messages to anyone who is concerned.

Here is the C++ code that I have written, see below for a few comments:
try
{
zmq::context_t context(1); // 1
zmq::socket_t publisher(context, ZMQ_PUB); // 2
publisher.bind("tcp://*:5556"); // 3

std::stringstream ss;
std::string s;
for(int i = 0; i < 100; ++i)
{
readyToSend(); // 4

ss.str("");
ss << i%2 << ':' << i*42 << ':' << i; // 5.
s = ss.str();

zmq::message_t message((void *)s.c_str(), s.length(), NULL); // 6
std::cout << "Sending " << s << std::endl;
publisher.send(message); // 7
} // 8
} // 9
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

1. Through a context object we control the aquisition of the ØMQ resource, as the RAII idiom suggests. The class ctor calls the API function zmq_init().
2. Same RAII behaviour for socket_t, where its ctor call zmq_socket(). Here the socket type is ZMQ_PUB, since we want our application acting as a publisher in pub-sub context.
3. The socket bind function calls the ØMQ API function zmq_bind(), specifying here that we want to use the TCP protocol and the port on the host that we want to be grab.
4. A function that is going to determine when the application is ready to fire a new message.
5. Just put some silly data in the message. We assume a format X:Y:Z, three integers colon separated.
6. Let's create a message that uses as data the memory made available in the std::string s. The ctor used here calls the API function zmq_msg_init_data().
7. The message is sent through the socket - actually using zmq_send().
8. The message object goes out of scope, its dtor calls zmq_msg_close()
9. The socket goes out of scope, calling zmq_close(), ditto for the context - zmq_term().

The function called at [4] has the only aim of making testing easier. It just stops the application execution, asking the user a confirmation to continue:
void readyToSend()
{
std::cout << "Enter when ready" << std::endl;
std::string input;
std::getline(std::cin, input);
}

If you have found this post interesting, you are going to love the official Z-Guide.

Go to the full post

Pub-Sub with ØMQ

With ØMQ we can easily implements different messaging patterns. We have already seen how to build a client/server system for the request/reply pattern. Now we tackle the publish/subscribe pattern.

When we use the request/reply pattern, the server hangs on, waiting for a client request. When a message from the client arrives, the server provides a reply, and then it patiently waits for the next request.

In the publish/subscribe pattern there is no such a strong coupling. The server freely publishes its messages, and any client that it is interested in such source should subscribe to receive them. Usually the client is not interested in all the messages the server publish, and to get just what it really wants to get, it applies a filter on the traffic.

In the next couple of posts we are going to see a simple implementation of this pattern, but let me give you a spoiler: do not expect big changes.

The server creates a ZMQ_PUB socket, telling ØMQ that it wants to play the publisher role, and then it will send messages accordingly to the required logic.

The client creates a ZMQ_SUB socket, since it wants to be a subscriber, and - here is a substantial change - specifies which filters should be applied to the traffic coming from the publisher. Then is just a matter of receiving messages.

You can find more information in the official Z-Guide. I am currently reading it, and I find it fun and useful.

Go to the full post

Simple ØMQ TCP client

A server doesn't have much sense without a client. So here I complete the previous post, where we wrote a simple ØMQ TCP echo server, with its companion client.

Since they work in a team, they should respect the same conventions. Messages are expected to be array of characters (sort of C-string but without the terminator), and a empty message could be sent to shut down the server.

Here is the C++ code I have written, below a few notes on it:
try
{
   zmq::context_t context(1); // 1

   std::cout << "Connecting to echo server" << std::endl;
   zmq::socket_t socket(context, ZMQ_REQ); // 2
   socket.connect("tcp://localhost:50013"); // 3

   std::stringstream ss; // 4
   ss << "Hello ";
   for(int request_nbr = 0; request_nbr != 10; ++request_nbr)
   {
      ss << request_nbr;
      std::string message(ss.str());

      {
         zmq::message_t request((void *)message.c_str(), message.length(), NULL); // 5

         std::cout << "Sending " << message << std::endl;
         socket.send(request); // 6
      } // 7

      {
         zmq::message_t reply; // 8
         socket.recv(&reply); // 9

         std::cout << "Received ";
         std::for_each((char*)reply.data(), (char*)reply.data() + reply.size(),
            [](char c){ std::cout << c;}); // 10
         std::cout << std::endl;
      } // 11
   }

   zmq::message_t request(0); // 12
   socket.send(request);
   std::cout << "Sending empty message and terminating" << std::endl;
} // 13
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}
1. As we have done for the server, as first thing we create a zmq::context_t object, so that its constructor calls zmq_init().
2. We create a socket - internally zmq_socket() is called - specifying ZMQ_REQ as type. This is the request side of a request/reply pattern.
3. A call to zmq::socket_t::connect, resolved to zmq_connect(), to establish a connection to the server - notice that we specify the TCP/IP address (here localhost) and the port.
4. For giving a bit of variety, the message is built from a stream.
5. A zmq::message_t is built, using the data in the stringstream. The constructor, in this case, calls zmq_msg_init_data(). Notice that the data is not copied to message_t, just the passed pointer is used - so you should be very careful using this method. By the way, the third parameter, here a NULL, is a pointer to the function that is called by the dtor, to cleanup the data, if required. Here we want to leave the job to stringstream.
6. The message is sent through the socket to the server. The C-API function called is zmq_send().
7. Here the message goes out of scope, its dtor is called, and through it zmq_msg_close().
8. The default ctor is called for zmq::message_t, that resolves in a call to zmq_msg_init().
9. The call to zmq::socket_t::recv() shields a call to zmq_recv().
10. The data message is not a proper C-string, since it is not '/0' terminated. So I can't dump it to the standard output console without doing some job on it. Usually it should be a good idea to create a std::string out of it, like this:
std::string feedback((char*)reply.data(), (char*)reply.data() + reply.size());
But here we won't have any other use in that string after printing it, so why not having a bit of fun using a std::for_each() algorithm coupled with a lambda function instead?
11. As in (7), zmq_msg_close() is called by the message_t dtor.
12. We create and send a zero-sized message, to terminate the server.
13. Here a couple of destructors are called. The socket one, that calls zmq_close(), and the context one, that calls zmq_term().

I wrote this post while reading the official Z-Guide. A fun and useful reading indeed.

Go to the full post

Simple ØMQ TCP server

Here I am going to rewrite the already seen simple TCP echo server using ØMQ instead of ASIO. I am doing it in C++, using its standard wrapper, but I keep an eye on the underlying C code, to understand better what is going on.

Exception Handling

We are using C++ as implementation language, so exception is the preferred way to managed unexpected errors. The C++ ØMQ functions throws a zmq::error_t exception, derived from std::exception in this case. So, our code would be in such a try/catch block:

try
{
   // ...
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}

Context

First thing we have to do is creating a context. If we have a look at the source code for zmq::context_t we see that this class is basically a way to ensure that zmq_init() is called at startup and zmq_term() at shutdown. So we could say that its task is implementing RAII, where the acquired resource is the ØMQ context. If we develop a ØMQ application in C language, it should the programmer responsibility to ensure zmq_init() / zmq_term() are called as expected. Using C++ we enforce it by putting these mandatory function in the ctor and dtor of the context class:
zmq::context_t context(1);
The parameter we pass to the context is passed directly to zmq_init() and represents the number of threads in the thread pool used by ØMQ to handle I/O operations.

Socket

The zmq::socket_t class wraps the socket functionality offered by ØMQ. Its ctor calls zmq_socket() with the parameter passed by the user and throws a zmq::error_t in case of error. The dtor call its member function close() that is a wrapper for a call zmq_close() on the current socket. We call it in this way:
zmq::socket_t socket(context, ZMQ_REP);
Meaning that we want to create a new socket of ZMQ_REP type for the current context.

Specifying ZMQ_REP as type we are saying to ØMQ that we want our socket to be used in a request-reply pattern (in the reply role, as one should expect). The reason why we should specify this, is that ØMQ provides an higher lever of abstraction than ASIO, and we are required to provide to ØMQ some information on what we want actually doing, so that ØMQ could know how it should to behave.

Binding a socket

Once a socket is created, it had to be bound to be able to accept connections. The C function for doing that, zmq_bind(), is wrapped in the zmq::socket_t::bind() member function:
socket.bind("tcp://*:50013");
The passed string is the endpoint for the socket. Its first part, here "tcp", specifies the transport protocol that we want to use. The second part, after the separator "://" contains information as required by the actual protocol used. In this case we say that we want our socket to work for any available interface (that is the meaning of the star "*") and then, after the colon, we specify the port that we want to use (50013).

Waiting for request on a socket

If we have a look at the source code for class zmq::message_t, we see that it derives privately from zmq_msg_t and it is a way to enforce by code the requirement of calling zmq_msg_init() on a zmq_msg_t before using it, and then calling zmq_msg_close() on it at the end of its usage.

Once the zmq::message_t is initialized, we can pass it to zmq::socket_t::recv(), like this:
zmq::message_t request;
socket.recv(&request);
As usual, recv() is a wrapper to zmq_recv(), adding error check and exception generation, when something goes wrong.

The call to recv() blocks the current execution flow, waiting a client for filling the message with data. If everything goes fine (that means, no exception) the next step is about checking what we actually have in the received message.

Extracting the received data

In this specific piece of code, we decided that an empty message has to be interpreted by the server as a request of terminating its execution. To check if this is the case, we check the length of the message, calling its member method size():
request.size()
It shouldn't be a big surprise to find out that it just wraps a call to zmq_msg_size().

The message moved around by ØMQ is actually just a bunch of bytes, accessed by the member function data(), that wraps a call to zmq_msg_data(). In this application we can safely assume that the data is actually a string of characters, so we can convert it to a std::string using the ctor that requires as parameters the start and end iterator:
std::string data((char*)request.data(), (char*)request.data() + request.size());

Sending a reply

Building a zmq::message_t should be quite straightforward, it just a matter of setting the size of the data, and copying the actual message, byte by byte, in its reserved area. To actually send it we call:
socket.send(reply);
That actually means calling zmq_send() and ensuring that there is no error.

Here is the resulting code:
try
{
   zmq::context_t context(1);
   zmq::socket_t socket(context, ZMQ_REP);
   socket.bind("tcp://*:50013");
   while(true)
   {
      zmq::message_t request;
      socket.recv(&request);

      if(request.size() == 0)
      {
         std::cout << "Empty message received - shutting down the server" << std::endl;
         break;
      }

      std::string data((char*)request.data(), (char*)request.data() + request.size());
      std::cout << "Received: " << data << std::endl;

      zmq::message_t reply(data.length());
      memcpy(reply.data(), data.c_str(), data.length());
      socket.send(reply);
   }
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}

If you are looking for more information on this stuff, you would probably enjoy reading the official Z-Guide.

Go to the full post

Hello ØMQ

Sockets are not standardized in C++ world. So, when we want to use them, better to rely on some library that shield the low level platform dependent details. ASIO is a great choice, check yourself how to write a simple ASIO TCP client to have an idea of how it works.

Still, there are higher level alternatives, that makes our job even easier. One of them is provided by ØMQ (Zero Message Queue) a C library that is freely available for a number of operating system and programming languages.

They are working to release soon version 3.0, that is currently marked as unstable, I have downloaded it and played a bit around with it, but I assume is safer to use the stable release 2.1 instead.

I am writing this post while reading the Z-Guide. I suggest you to do the same. Actually, I would tell you to read it even if you are not much interested in the matter, because it is written in a very good and fun way.

Anyway. Here I just report about downloading, the stuff, setting up a VC++ project, and running a first 'hello' zmq application.

As I said, I am doing it on Windows, but on the page where you can get the software you can check how to build the library on *X systems, it looks very linear.

Not much to say either about creating a VC++ project, you should just remember to set the include and lib directories, specify a dependency to libzmq.lib, and make the libzmq.dll available to the executable.

What I am about to do with it, it is creating a powerful application that tells to the user which version of zmq is currently in use. OK, that is not much, but it is enough to test if everything is working as expected.

First thing, I should remember to include the zmq header file. Here I could have used zmq.h, that refers to the C code, but I use instead the C++ version, that wraps up the original include file to provide some friendly object-oriented support (here totally useless) to the code writer:
#include <iostream>
#include <zmq.hpp>

int main()
{
   int major, minor, patch;
   // zmq_version(&major, &minor, &patch); // pure C version
   zmq::version(&major, &minor, &patch); // C++ wrapper version
   std::cout << "Current 0MQ version is " << major << minor << patch << std::endl;
}
If this work, you should assume you have properly installed zmq in your environment.

Go to the full post

RAII for OCCI

As we have seen in the previous post, Oracle OCCI is just cool, if you think what you have to write to get the same result using directly Oracle OCI. Still there is room to improvement, especially if you think of what C++0x makes available.

To execute a query we have to create an Environment object, that we use to create a Connection, that we use to create a Statement, that we (finally) could run to get a ResultSet back.

After that we should clean it up. So we should close the ResultSet from the Statement, then terminate the Statement from the Connection, terminate the Connection from the Environment, and (happy ending) terminate the Environment.

In our simple example there is no reason to complain. The code is reasonably straightforward, and looks easy to understand. Still, trouble is lurking from these lines. What happens if an exception breaks the natural execution flow? And what if future changes make the code less immediate? Maybe some part of the code could try to do silly things, not respecting the implicit contract that we should maintain to write OCCI code.

To make our code safer, we should make more explicit our requirements, using language constructs.

Let's think to the Environment object. It shouldn't be copied - maybe it could be moved, but we should do it carefully - it should be the first object in the OCCI chain to be created, and the last one to be deleted. We could verbosely comment our code to make it clear to the reader, but I have a sort of suspect that this is not a strategy that pays off.

Better using a smart pointer, and since I am writing this code explicitly for VC++ 2010, we can take advantage of its implementation for std::unique_ptr, that fits exactly to the requirements of our problem.

So, the initial situation is that we have a piece of code that looks like this:
try
{
   // setup
   oc::Environment* env = oc::Environment::createEnvironment(); // 1
   // ...

   // execution
   // ...
   while(res->next())
      std::cout << res->getString(1) << ' ' << res->getString(2) << ' ' << res->getInt(3) << std::endl;

   // cleanup
   // ...
   oc::Environment::terminateEnvironment(env); // 2
}
catch(const oc::SQLException& e)
{
   // ...
And we want apply a couple of changes on it:
1. Instead of a raw pointer we want to deal with an object on the stack, so that we could leave to the compiler (through the stack unwinding mechanism) the nuisance of taking care of its correct destruction, even in case of exceptions.
2. As direct consequence of (1), no explicit cleanup should be performed for our Environment object.

This is the code we want to get instead:
try
{
   OcciEnvironment sEnv = OcciWrapper::createEnvironment();
   // ...

   // ...
   while(sRes->next())
      std::cout << sRes->getString(1) << ' ' << sRes->getString(2) << ' ' << sRes->getInt(3) << std::endl;

   // ...
}
catch(const oc::SQLException& e)
{
   // ...
At the end of the day, when all the OCCI raw pointers are wrapped in smart pointers, there would be no more need for a cleanup section, and the code would be more robust.

But, as usual, there is no free lunch, we just moved the complexity of the issue elsewhere. And if large part of it would stay hidden in the standard library implementation, some of it will sneak in this piece of code that we are about to put in an include file available to our source:
#pragma once

#include <functional>
#include <occi.h>
namespace oc = oracle::occi;

typedef std::unique_ptr<oc::Environment, std::function<void(oc::Environment*)> > OcciEnvironment; // 1

namespace OcciWrapper
{
   OcciEnvironment createEnvironment() // 2
   {
      oc::Environment* env = oc::Environment::createEnvironment(); // 3
      return OcciEnvironment(env, std::bind(oc::Environment::terminateEnvironment, env)); // 4
   }
}
1. This is not so terrible, after all. Instead of using a raw pointer to an oracle::occi::Environment object, we plan to use a smart pointer, but since its definition is a bit complex, I think it is better to typedef it to a more friendly name.
The std::unique_ptr is a scoped limited smart pointer that can't be copied (but could be moved) and that lets its user a way to define what should be called by its dtor. But when you want to use this feature we have to specify in the class template the type of the deleter. So we say to the compiler that we want to be able to pass a function that returns void and requires in input a raw pointer to Environment.
2. Actually, we should write a createEnvironment() overload for each oracle::occi::createEnvironment() we plan to use in our code. Let's assume that currently we could do with just this one.
3. Firstly we create a raw pointer, and then we use it to create its smart big brother.
4. Time to be smart. The raw pointer is wrapped in an unique_ptr, and the Environment cleanup function is passed as the deleter associated to this object. Notice that we had to use std::bind to let it know what it should use as parameter.

Go to the full post

Oracle OCCI for Visual Studio 2010

I wrote a sort of hello world application using OCCI for Oracle 11 and VC++ 2010. A simple example that runs a select statement on the standard Oracle HR schema and print on the console the resulting rows. If you have a previous experience developing with Oracle OCI you would certainly appreciate the difference.

I'd say the development team has made a good job, but still both the setup and the usage require a bit of attention.

Firstly, I warmly suggest you to use Oracle Database Instant Client, since it makes much easier the job. To be used with VC++ 9 and 10, you need also to download a patch.

Secondly, you should ensure your application has a way to access the tnsnames.ora file where is defined the SID for the database you want to access; you can do that ensuring that ORACLE_HOME or TNS_ADMIN is defined as environment variable, and it is associated with the right path.

If ORACLE_HOME is used, it should be something like c:\ ... \oracle\product\11.2.0\server, where under server you should have network\admin, and in admin you should find tnsnames.ora - TNS_ADMIN should be the path containing tnsnames.ora.

Third step is about setting the Property Pages for the VC++ project that is going to actually use OCCI.

In the Configuration Properties:

1) C/C++
a) General - Additional Include Directories
Add the include directory for OCI. You should find it in you Oracle 11 server installation, something like ... \server\oci\include.
b) Code Generation - Runtime Library
Ensure ist value is set to /MDd or /MD, meaning Multithreaded (Debug) DLL.

2) Linker
a) Additional Library Directories
It should point to the directory where you put the VC++ 10 patch for the instant client.
b) Input - Additional Dependencies
Here you add the library, oraocci11.lib or oraocci11d.lib, specific for VC++ 10 and Oracle 11.

Fourth step is about making available instant client DLLs (generic and specific ones), adding their paths to the PATH environment variable, or copying them in the directory where the executable resides.

Once we have done all this stuff, it is relatively easy writing the code we talked about at the beginning of the post:
#include <iostream>
#include <occi.h>

namespace oc = oracle::occi;

int main()
{
   try
   {
      // setup
      oc::Environment* env = oc::Environment::createEnvironment(); // 1
      oc::Connection* conn = env->createConnection("hr", "password", "MYDB"); // 2
      oc::Statement* stmt = conn->createStatement("select * from countries where country_name like 'A%'"); // 3

      // execution
      oc::ResultSet* res = stmt->executeQuery(); // 4
      while(res->next()) // 5
         std::cout << res->getString(1) << ' ' << res->getString(2) << ' ' << res->getInt(3) << std::endl;

      // cleanup
      stmt->closeResultSet(res); // 6
      conn->terminateStatement(stmt);
      env->terminateConnection(conn);
      oc::Environment::terminateEnvironment(env);
   }
   catch(const oc::SQLException& e) // 7
   {
      std::cout << "Exception: " << e.what() << std::endl;
   }

   system("pause");
}
1. All the connections are created in an environment, that could have different setups. Here, we create a plain environment, that is enough for the current task.
2. Given an environment, we can create a connection. The third parameter is the SID identifying the database in the tnsnames.ora file, the first two parameters represent user and password.
3. Once we have a connection, we can create a statement in it. Here it is a select query.
4. When the statement is a query, we can execute it, getting back as a result a resultset.
5. Now we can loop on the resultset to access all the found rows.
6. Inverting the construction order, we cleanup the object we created: resultset, statement, connection and, finally, the environment.
7. Catching the exceptions that our OCCI calls could throw.

Not bad, if you compare this code with what you should write to get the same result using OCI. But it is a bit painful to see all these pointers, and thinking what happens in case of an exception, say, just after [3.] - the answer is: no cleanup function is called. A bit of RAII would help to make it more robust.

Go to the full post

A double for loop

I think it could be useful, at least for myself, to keep track of the piece of code I have written today in Perl for future reference. Nothing too advanced, but with a few interesting point touched in its few lines.

It is a double for loop scanning all the argument passed to the script and storing them in an array, ready to be used in the rest of the script itself.

Let's have a look at the code:
ARG_LOOP: # 1
foreach(@ARGV) # 2
{
for(my $i = 0; $i < 3; $i++) # 3
{
if(/(.*_$TAGS[$i])$IN_EXT$/) # 4
{
if(defined $files[$i]) # 5
{
print "You can't pass more than one $TAGS[$i] file to the script\n";
print $help;
exit 1;
}

$files[$i] = $1; # 6
next ARG_LOOP; # 7
}
}

print "Unexpected argument detected\n"; # 8
print $help;
exit 2;
}

1. A label is declared so that a "next" instruction could be used in the inner loop to skip to the next iteration for the outer loop.
2. Looping on all the element in the ARGV array, containing the arguments passed to the script. The current item is stored in the default $_ variable.
3. Internal loop: we are expecting three input parameter.
4. Each input parameter should respect a specific pattern: first part is seen as a block containing free text, then an underscore followed by a previously defined tag. The final part of the parameter is a previously defined INput EXTension. Notice the $ before the final slash: nothing is expected after the extension.
5. There should be exactly one input parameter for each defined tag. So we check if the current file has been already defined in a previous iteration. If this is the case, we print a message to the user (in the $help variable is showed how the script should be called) and return to the system.
6. A match has been found, and the file has not already be defined for the current tag. The (first and unique) block defined in the pattern is assigned to the current file element.
7. We stop the internal loop and give back the control to the next iteration of the external loop.
8. We shouldn't normally get here, since all expected input parameter should match all the stored tags. If we find a parameter not matching any tag, we print some helpful comment and return to the system.

Go to the full post

Checking user input

We are writing a Perl script, and we have to choose a execution path accordingly to the user decision. The user should answer y or n, both lower and uppercase, to our question. Any other answer has to be rejected.

Here is a possible solution to this common requirement.

The code that requires the user decision is something like that:
print "First step completed\n";
if(stop_execution()) # 1
{
print "Correct data and retry.\n";
exit 1; # 2
}
else
{
print "Ready for the next step.\n";
}

print "This is the second step.\n";

# ...

exit 0;

1. We are about to describe our decision-making function stop_execution() just below. Here it suffices noticing that it would return a "true" value in case we actually want to interrupt the execution stream.
2. As usually expected, a script returns a non-zero value in case of unhappy ending.

Our user interaction consists in asking if he is ready for the next step, iterating until his answer is a valid one, and returning to the user a true/false value:
sub stop_execution
{
my $prompt = "Do you want to continue? [y/n]: "; # 1

print $prompt;
while(<STDIN>) # 2
{
if(/^n$/i) # 3
{
return 1;
}
elsif(/^y$/i) # 4
{
return 0;
}
print $prompt; # 5
}
}

1. This is the string that will be used as prompt to the user to get an input.
2. We read from the standard input stream, and put the result in the default $_ variable.
3. If $_ is matching to the specified pattern, we return a non-zero value (that means, true). The pattern is just a letter, n, and we specify that the string should match to it from the beginning to its end. Besides, the 'i' character after the closing slash tells that the match is case-insensitive. So both "n" and "N" are accepted.
4. Same as above, but the pattern is matching against "y" (and "Y") and we return 0 (meaning false).
5. If the user input does not match with our expectations, we repeat the question and iterate.

Go to the full post

Program options

We all know that a C/C++ application could get input values from the environment through the argc/argv variables passed to its main() function. And we all know how boring is checking and making available for use these values.

The good news is that we could use a Boost library, Program Options, and let it do large part of the job for us.

Let's say that we have an application that should get from command line an integer representing a size:
myApp -s 42
We want also to provide a way for the user to know how to call it:
myApp -h
And we want to be allowed to enter the parameters in a more verbose way, --size and --help.
If the user calls the application without specifying any parameter we would complain, and show him the help.

Naturally, we should check that nothing weird happens, like duplicated option tags or unexpected one are passed. And we should check the option consistency, we want to ensure that the user passes an integer (and not a floating point, or a string) as size.

Using Boost Program Options all of this comes at a reasonable price. We just have to include the required header, and maybe provide a shortcut for the namespace:
#include <boost/program_options.hpp>
namespace po = boost::program_options;
Then we provide a description for the options we accept:
po::options_description od("Available options"); // 1.
od.add_options()
("help,h", "help message") // 2.
("size,s", po::value<int>(), "max size"); // 3.
1. The options_description object is created with a description - try to think of something that could be result useful for the application user.
2. Options could be added in a number of way. Here we see how to specify two tags, --help and -h, and an associated description. No argument value is expected in this case.
3. Size option tag should be followed by an integer representing the size value. The second parameter is passing this information to the options_description object.

If the option "size" would default to 42, we can enforce modifying in this way its definition:
("size,s", po::value<int>()->default_value(42), "max size");
Our code will access the values through a map where key is the option name (here "help" or "size") and the value a boost::any object. And here it comes the tricky part. We have to call three Boost Program Options functions to load into the map the values: parse_command_line() that matches the argv values with the expected options as stored in the option_description; and then store() and notify() to complete the job.

If the user has entered something conflicting with the rules set in options_description, parse_command_line() throws an error, that we should be ready to catch.

The resulting code should look more or less like this:
po::variables_map vm;
try
{
   po::store(po::parse_command_line(argc, argv, od), vm);
}
catch(const std::exception& ex)
{
   std::cout << "Error checking program options: " << ex.what() << std::endl;
   std::cout << od << std::endl;
   return 1;
}
po::notify(vm);
The user input now is in the variables_map, and we just have to implement our logic.

So, we could ensure at least an option has been passed to the application checkin the map size:
if(!vm.size())
{
   std::cout << "Please specify an option" << std::endl;
   std::cout << od << std::endl;
   return 2;
}
We can check if an option has been selected by the caller verifying that the count for an option is not zero. In case of the "help" option, we usually just dump the options description and terminate:
if(vm.count("help"))
{
   std::cout << od << std::endl;
   return 0;
}
To extract the associated value from an option, we should specify the actual type stored in the boost::any object (beware of wrong types - they would lead to an exception):
if(vm.count("size"))
{
   std::cout << "The passed size is " << vm["size"].as<int>() << std::endl;
}
The code in this post is based on the getting started example that you can find in the official Boost Program Options tutorial.

Go to the full post

Using recursive_directory_iterator

Let's modify the code we have just written to list a directory tree getting rid of the recursion. To do the trick we use a relatively new Boost Filesystem feature, the recursive_directory_iterator, an iterator that runs on all the files in the current directory traversing recursively all its subdirectories.

Warning! Boost 1.56 introduced a behavior change that makes the code showed here subtly wrong. Please, check the code provided by LukeM for a working patch. Thank you Luke!

In this way we plan to simplify our code, letting to the iterator the task of translating the (potentially very complex) tree structure in a flat one, and we keep for ourselves the easier task of iterating on each element.

To be honest, in such a simple example as this one, you could have the impression that we are increasing the code complexity instead. But in a real case the balance should be different.

Creating a recursive_directory_iterator

We are using a different iterator, so we change accordingly the class that creates it:
boost::filesystem::recursive_directory_iterator createRIterator(boost::filesystem::path path)
{
   try
   {
      return boost::filesystem::recursive_directory_iterator(path);
   }
   catch(boost::filesystem::filesystem_error& fex)
   {
      std::cout << fex.what() << std::endl;
      return boost::filesystem::recursive_directory_iterator();
   }
}
Dumping a file name

The dump functionality we implemented in the previous post was asking for refactoring. It was almost screaming for doing less and in a more controlled way. Let's take this chance to make it happy:
void dump(boost::filesystem::path path, int level)
{
   try
   {
      std::cout << (boost::filesystem::is_directory(path) ? 'D' : ' ') << ' ';
      std::cout << (boost::filesystem::is_symlink(path) ? 'L' : ' ') << ' ';
      for(int i = 0; i < level; ++i)
         std::cout << ' ';
      std::cout << path.filename() << std::endl;
   }
   catch(boost::filesystem::filesystem_error& fex)
   {
      std::cout << fex.what() << std::endl;
   }
}
Listing a tree using recursive_directory_iterator

Redesigning the listing function requires a bit of work, let's see the resulting code and then let's talk about the changes:
void plainListTree(boost::filesystem::path path) // 1
   dump(path, 0);
   boost::filesystem::recursive_directory_iterator it = createRIterator(path);
   boost::filesystem::recursive_directory_iterator end;
   while(it != end) // 2
   {
      dump(*it, it.level()); // 3
      if(boost::filesystem::is_directory(*it) && boost::filesystem::is_symlink(*it)) // 4
         it.no_push();
      try
      {
         ++it; // 5
      }
      catch(std::exception& ex)
      {
         std::cout << ex.what() << std::endl;
         it.no_push(); // 6
         try { ++it; } catch(...) { std::cout << "!!" << std::endl; return; } // 7
      }
   }
}
1. We don't need the user to start a recursion, and we don't need to keep track of the current level of recursion, since it is kept internally by the iterator.
2. Using a while loop does not look very natural in this context, but incrementing a recursive_directory_iterator could result in an exception, and we want it to be managed inside the loop, since we don't want the looping to be interrupted while an error happens before we terminate scanning all the elements.
3. Interestingly and usefully, recursive_directory_iterator stores in its status the current recursion level.
4. Actually, this functionality was in checkAndDump(), now we divorced it from dump() and let it live on its own. Another change is regarding what we actually do when we detect that we have at hand a directory that is also a symbolic link: we call no_push() on the iterator, that basically says not to go into the directory, but skip to the next element. That is exactely the behaviour we want in this context.
5. As said above, moving a recursive_directory_iterator could result in an exception (for instance, if the next element is a directory and we have no read access on it).
6. We couldn't access the next item in the collection, so we assume it refers to a directory that we can't access, and we ask the iterator class not to navigate in that directory but skip to the next element.
7. OK, this line should be written in a more expanded way, and we should try to recover in a more graceful way in case of troubles. But let just assume our previous assumption ("bad" directory) was right so that the catch clause is almost a paranoid check.

Warning! Boost 1.56 introduced a behavior change that makes the code above subtly wrong. Please, check the code provided by LukeM for a working patch. Thank you Luke!

Go to the full post

Listing a directory tree

We have seen how to use the Boost Filesystem library to do a simple operation like getting the size of a file, a good way to get acquainted to the library basic functionality, and dumping the content of a directory, getting introduced to the powerful directory_iterator.

But why listing just a directory content, when we could also show the content of all its subdirectories?

That is what we are going here, and we are about to do it using explicit recursion. We'll recursively create a new directory_iterator any time we will bump into a directory while scanning the current lot of files.

Then will see how to use recursive_directory_iterator to implement the same behavior, but leaving the burden of managing recursion to the Boost Filesystem library.

There are a couple of points that we have to be aware of. Firstly we could have an error trying to access a subdirectory - typically we could try to access a directory without having reading rights on it - and secondly we could find out that a subdirectory is actually a symbolic link to another "real" directory.

It is easy to see why the first fact could be an issue, but we can solve it shielding the code that creates the directory iterator in a try catch block. The second point could lead to a problem if the link drives us up to a directory already visited, creating in this way a vicious circle. The easiest way to avoid this risk is simply refuse to recurse in directory that are also symbolic link.

Dumping a file name

Let's write a function that dumps the name of a file accordingly to its level in the current hierarchy, and returns a flag specifying if that is a "good" directory:
bool checkAndDump(boost::filesystem::path path, int level) // 1.
{
bool isDir = boost::filesystem::is_directory(path);
bool isLnk = boost::filesystem::is_symlink(path); // 2.

std::cout << (isDir ? 'D' : ' ') << ' ';
std::cout << (isLnk ? 'L' : ' ') << ' ';
for(int i = 0; i < level; ++i)
std::cout << ' ';
std::cout << path.filename() << std::endl;

return isDir && !isLnk; // 3.
}

1. The caller should pass the name and the current level of the file in the hierarchy.
2. We check if the current file is a "real" one, or it is just a symbolic link.
3. We return true only for a "real" directory.

Actually, in this checkAndDump() function there is a lot of room for improvement. First of all, it should be refactored in two functions. As even its names shows, it is trying to do two things at the same time, and this is not usually a good idea. But, more importantly, that there is no error checking. The code should be enclosed in a try/catch block, to avoid unpleasant surprises. But let it go, for the time being.

Creating a directory_iterator

To keep the code more readable, we move the creation of the directory iterator in a dedicated function:
boost::filesystem::directory_iterator createIterator(boost::filesystem::path path)
{
try
{
return boost::filesystem::directory_iterator(path); // 1.
}
catch(boost::filesystem::filesystem_error& fex)
{
std::cout << fex.what() << std::endl;
return boost::filesystem::directory_iterator(); // 2.
}
}

1. We try to create the iterator.
2. I case of any trouble we return an invalid iterator.

Listing a tree

As said, we could use directory_iterator to scan our directory tree, explicitly recursing in each (good) directory we find:
void recursiveListTree(boost::filesystem::path path, int level)
{
if(!checkAndDump(path, level)) // 1.
return;

boost::filesystem::directory_iterator it = createIterator(path); // 2.
boost::filesystem::directory_iterator end;
std::for_each(it, end, [level](boost::filesystem::path p) // 3.
{
recursiveListTree(p, level + 1); // 4.
});
}

1. If the current path does not refer to a "good" directory, our job is done.
2. We create a directory iterator. Remember that in case of error we get an invalid iterator.
3. We loop on the current path content. If we couldn't access the directory, the first iterator was set to invalid, so the loop is not executed at all. Otherwise we run the lambda function passed as third parameter (giving to it access by value to the level) that would get as input the item pointed by the iterator, that means, a path.
4. Here we have the recursive step. We increase the depth level an call again the function.

Starting the recursion

The user would call a function that takes care of checking for possible errors before calling our recursive function:
void listTree(boost::filesystem::path path)
{
try
{
if(!boost::filesystem::exists(path))
{
std::cout << path << " does not exist" << std::endl;
return;
}

std::cout << "Listing directory tree" << std::endl;
recursiveListTree(path, 0); // 1.
}
catch(const boost::filesystem::filesystem_error& ex)
{
std::cout << ex.what() << std::endl;
}
}

1. Here we start the recursion, passing 0 as initial level.

Go to the full post

Listing files in a directory

We have used the Boost Filesystem library to create a function that dump on standard console the size of the file whose name is passed to the function itself.

We have seen why it is a good idea using Boost Filesystem for such a task: the resulting code could be compiled and run successfully on Windows and Unix, being the differences in file system convention managed internally by the boost::filesystem::path class and by the provided functions; unexpected failures are seen as exceptions of type boost::filesystem::filesystem_error so that they could be gracefully shielded using a try-catch construct.

Now we are using this library for a function slightly more complicated. We expect as input a directory name, and we print to standard console the files contained in that directory, if any.

I have tested the code I'm showing here on Windows / Visual C++ 2010 and on Linux RedHat / gcc. No difference at all in the source files, just the project files have to be built differently accordingly to the requirements of the specific platform.

Actually, since I had forgot to add the library names in the makefile, I got a list of "undefined reference to" to a number of function, like boost::system::generic_category().

The solution was, naturally, adding the missing library files to the makefile:
MY_LIBRARIES := boost_system boost_filesystem
If you have some doubt on this point, there is a previous post where I have shown an example of a makefile for Boost.

A generic file name dump function

It is not strictly relevant here, but it is a bit of extra fun. Since I plan to put the filenames in a couple of different C++ standard containers, I wrote also a generic function for dumping all the elements of a container based on type boost::filesystem::path using the standard algorithm for_each and a lambda function:
template <typename Container>
void dumpFileNames(const Container& c)
{
std::for_each(c.begin(), c.end(), [](boost::filesystem::path p) // 1.
{
std::cout << (boost::filesystem::is_directory(p) ? 'D' : ' '); // 2.
std::cout << " " << p.filename() << std::endl;
});
}

1. We go through all the elements in the container. The current element, that should be of type boost::filesystem::path, is passed to the lambda function defined as third parameter of the for_each() call.
2. We could print more information for each file, but here we just put a 'D' if the current filename refers to a directory. The function is_directory() gives us the answer using the appropriate method for the current operating system.
3. We use the boost::filesystem::path filename() method to extract the filename relative to the local directory.

What if I try to misuse dumpFileNames() like this?
std::vector<int> vi;
dumpFileNames(vi);

Well, I'll get a number of compile errors that should help me understanding what is my mistake. Visual Studio immediately tells me that it "cannot convert parameter 1 from 'const int' to 'boost::filesystem2::path'". A message that looks reasonable.

Listing files in a directory /1

If a boost::filesystem::path refers to a directory, we can extract from it a boost::filesystem::directory_iterator, and use it to navigate among all the files that it contains. This consideration leads to our first version of a listing function:
void listDirAsIs(boost::filesystem::path filename)
{
typedef std::vector<boost::filesystem::path> Files; // 1.
Files files;

boost::filesystem::directory_iterator beg(filename); // 2.
boost::filesystem::directory_iterator end; // 3.
std::copy(beg, end, std::back_inserter(files)); // 4.

dumpFileNames(files); // 5.
}

1. When there is no special requirement for the container that should be used, the standard vector could be a good choice.
2. Constructing a directory_iterator from the filename gives us an iterator pointing to the first contained file, if any.
3. The default ctor for directory_iterator generates an invalid iterator, that could be used as "end".
4. We push back all the elements in the interval from beg to end in our vector, using the well-known copy-inserter idiom.
5. And finally we dump the vector.

Listing files in a directory /2

On Windows this function is just what we need, but on UNIX we get a result that is a bit unsatisfactory. The fact is the Windows keeps the filenames in a directory alphabetically ordered, while UNIX does not impose any special ordering rule. If we want that our code behave the same when running on different Operating Systems we should change our code. An idea would be to explicitly check if we are on Windows or UNIX, probably using macros, and calling the specific code. Alternatively we could call std::sort() on our resulting vector of filenames before dumping it. But I choose a third approach, using as container std::set, that keeps in order the items as they are inserted:
void listDirOrd(boost::filesystem::path filename)
{
typedef std::set<boost::filesystem::path> Files;
Files files;

boost::filesystem::directory_iterator beg(filename);
boost::filesystem::directory_iterator end;
std::copy(beg, end, std::inserter(files, files.begin())); // 1.

dumpFileNames(files);
}

1. Since here we are using std::set that does not implement a back (or front) inserter, we call use std::inserter.

Adding some checks

Basically our job is done. We add some checking to avoid unexpected behaviour in case of errors and we get this wrapper:
void listDir(boost::filesystem::path filename)
{
try
{
if(!boost::filesystem::exists(filename)) // 1.
{
std::cout << filename << " does not exist" << std::endl;
return;
}

if(!boost::filesystem::is_directory(filename)) // 2.
{
std::cout << filename << " is not a directory" << std::endl;
return;
}

std::cout << filename << " is a directory containing:" << std::endl;
listDirAsIs(filename); // 3.
std::cout << "---" << std::endl;
listDirOrd(filename);
}
catch(const boost::filesystem::filesystem_error& ex) // 4.
{
std::cout << ex.what() << std::endl;
}
}

1. Non-existing objects are detected here.
2. Non-directory files do not requires more than a warning.
3. Just to see the result, we leave also the first implementation of our listing functionality.
4. Any file system error is trapped here.

The code in this post is based on an example that you can find in the official boost filesystem tutorial. For more background information, you could have a look at the Boost filesystem version 3 home page.

Go to the full post