Pages

Two boost::conditions on a std::stack

There is a glitch in the design of the code in the previous post: when the reader is faster than the writer some data risk to get lost.

That's the reason for the redesign of the solution we'll see here.

To solve this issue we'll introduce a second boost::condition and redesign accordingly the reading/writing functions.

Here is our new conditions, as private data member in class ReadWrite:

boost::condition cdEmpty_;
boost::condition cdNotEmpty_;

The first one will be used to notify that the stack is empty, the second that at least an item is in the stack.

The substantial change in the reader is that it now notify to the writer when it uses a last item in the stack, making it empty. Here is the function after the rewriting:

void reading()
{
while(true) // 1.
{
boost::mutex::scoped_lock ls(ms_);
if(s_.empty()) // 2.
{
message("wait for an item in the stack");
cdNotEmpty_.wait(ls);
}

int value = s_.top();
s_.pop();
if(s_.empty()) // 3.
{
message("stack is now empty");
cdEmpty_.notify_one();
}
ls.unlock();

if(value == TERMINATOR) // 4.
{
message("terminator found in the stack");
return;
}
else
{
message("read", value);
boost::this_thread::sleep(boost::posix_time::seconds(delayR_));
}
}
}

1. The loop goes on till the function returns in (2).
2. The reader expects something in the stack, nothing is there, so it waits on a condition that the stack status changes.
3. The reader has emptied the stack, we notify this through the condition cdEmpty_.
4. When the terminator is read, a special message, no waiting, and function termination is performed.

The writer changes accordingly. It writes all its data to the stack and then, when it finds that the stack has been emptied, sends the terminator:

void writing()
{
for(int i = 1; i < 6; ++i)
{
{
boost::lock_guard<boost::mutex> l(ms_);
s_.push(i);
}
message("write", i);
cdNotEmpty_.notify_one(); // 1.

boost::this_thread::sleep(boost::posix_time::seconds(delayW_));
}

// wait until stack is empty before sending the terminator
boost::mutex::scoped_lock ls(ms_);
if(s_.empty() == false)
{
message("stack is not empty");
cdEmpty_.wait(ls); // 2.
}

message("sending the terminator");
s_.push(TERMINATOR);
cdNotEmpty_.notify_one();
ls.unlock(); // 3.
}

1. Any time an item is pushed on the stack, we notify that to the other thread.
2. We found the stack was not empty, we wait till we get notified that.
3. This unlock it is not actually necessary, since the ls destructor would perform it, if required, when the local variable goes out of scope. But it won't hurt.

Go to the full post

boost::condition on a std::stack

Let's say we want to create a little tutorial-like application that implements a simple read-write concurrent behaviour on a STL stack.

There will be two threads, one putting data (actually, int values) in the stack, the other one reading from it. The zero value has to be interpreted as terminator for the job.

We are about to implement a solution that would use a class (named ReadWrite) having as private members two threads (each of them for reading and writing), two mutex (one for I/O operations, the other for the stack access), a condition (for keeping track of the empty stack case when reading).

To muddle a bit the water, the reading and writing will be delayed in a configurable way.

We want the public interface of our ReadWrite class to provide a constructor, for setting the read/write delay, a couple of methods to start reading and writing, and the destructor, where the main thread would sit waiting the read/write threads to complete their job.

So, the user code for our class would be something like that:

void t07()
{
ReadWrite rw(2, 1);

rw.startReading();
rw.startWriting();
}

Here is the private data member section for the class:

private:
boost::mutex mio_; // mutex on i/o
boost::mutex ms_; // mutex on the stack
boost::condition cd_;
boost::thread tr_;
boost::thread tw_;

enum {TERMINATOR = 0};

std::stack<int> s_;
int delayR_;
int delayW_;

The TERMINATOR constant is the value we are going to use to signal the reader it could stop waiting input from the writer.

We'll have a couple of utility private functions for writing output to the console:

void message(const char* msg)
{
boost::lock_guard<boost::mutex> lm(mio_);
cout << boost::this_thread::get_id() << ": " << msg << endl;
}

void message(const char* msg, int value)
{
boost::lock_guard<boost::mutex> lm(mio_);
cout << boost::this_thread::get_id() << ": " << msg << ' ' << value << endl;
}

We use lock_guard since we require just the bare functionality from our lock.

Here is the code for the private reading function, that will be run in its specific thread:

void reading()
{
int value = -1;
while(value != TERMINATOR) // 1.
{
boost::mutex::scoped_lock ls(ms_); // 2.
if(s_.empty()) // 3.
{
message("can't read, stack is empty");
cd_.wait(ls); // 4.
}

value = s_.top();
s_.pop();
ls.unlock(); // 5.

if(value == TERMINATOR)
message("terminator found in the stack");
else
{
message("read", value);
boost::this_thread::sleep(boost::posix_time::seconds(delayR_));
}
}
}

1. We loop until the terminator is read in the stack.
2. We usw a scoped_lock on the mutex that rules the access to the stack, since it has to be passed to the wait() method of the condition (see 4), and this means that we need it to support the unlock() functionality. And given that we have to use a scoped_lock, it is worthy to use unlock() also in our code (in 5).
3. If the stack is empty, we can't read, we say that to the user and ...
4. We wait on the condition that something happens on the stack.
5. After we actually read (and pop) a value from the stack, we unlock the mutex, making it available to the other thread.

The other interesting private function in this class is the one that writes to the stack:

void writing()
{
for(int i = 1; i < 6; ++i)
{
{
boost::lock_guard<boost::mutex> l(ms_); // 1.
if(i == 1) // 2.
s_.push(TERMINATOR);
s_.push(i);
}
message("write", i);
cd_.notify_one(); // 3.

boost::this_thread::sleep(boost::posix_time::seconds(delayW_));
}
}

1. just a basic lock is required here, so let's use lock_guard.
2. before putting the first "real" item in the stack, we put the terminator, that will be the last value fetched by the reader (here is a caveat: if the reader is faster than the writer, it would get the terminator before the writer would be able to put all the values in the stack).
3. after a "real" value has be put in the stack, the writer notify the reader that something has changed in the stack.

And here is the public section of ReadWrite:

public:
ReadWrite(int delayR, int delayW) : delayR_(delayR), delayW_(delayW)
{
message("ReadWrite ctor");
}

~ReadWrite()
{
message("main thread wait ...");
tr_.join();
tw_.join();
message("... job completed");
}

void startReading()
{
tr_ = boost::thread(&ReadWrite::reading, this);
}

void startWriting()
{
tw_ = boost::thread(&ReadWrite::writing, this);
}

Go to the full post

Delayed start of a boost::thread

A nuisance of boost::thread is that as we create the instance, the new thread starts.

On antonym.org there is an interesting post on boost::thread creation that ends suggesting the creation of a wrapper class to solve this issue.

We use the fact that a boost::thread could be created without passing anything to the constructor, and in this case the object would be in a not-a-thread status. We will explicitly run the thread calling a start() method in our wrapper class, that creates a new boost::thread and assigns it to the member one.

Here is an example:

#include <iostream>
#include "boost\thread\thread.hpp"

using std::cout;
using std::endl;

namespace
{
class ThreadSomething
{
private:
boost::thread t_;
int sec_;

void doSomething(int sec)
{
cout << boost::this_thread::get_id() << ": doing something" << endl;
boost::this_thread::sleep(boost::posix_time::seconds(sec));
}

public:
ThreadSomething(int sec) : sec_(sec) {}

void start()
{
t_ = boost::thread(&ThreadSomething::doSomething, this, sec_);
}

void join()
{
t_.join();
}
};
}

void t06()
{
cout << boost::this_thread::get_id() << ": creating a new thread" << endl;
ThreadSomething ts(2);

cout << boost::this_thread::get_id() << ": starting a new thread" << endl;
ts.start();

cout << boost::this_thread::get_id() << ": joining" << endl;
ts.join();
cout << boost::this_thread::get_id() << ": done" << endl;
}

Go to the full post

boost::lock_guard vs. boost::mutex::scoped_lock

Among the many different locks available in boost, boost::lock_guard is the simplest one.

What we can do with a lock_guard is just acquiring ownership of a mutex (actually, a Lockable) when constructing it and waiting its exiting of scope to have the mutex released by the destructor.

The scoped_lock, actually a typedef for unique_lock, is a more complex concept. A couple of useful functions available for this class are lock()/unlock() that allow to have a better control on locking.

The basic behavior provided by lock_guard suffices for large part of the cases, so it is better to stick to it as default, and use scoped_lock - or even some other more powerful lock - when we have complex requirements.

As example of lock_guard usage I rewrite a short test from a previous post, where originally a scoped_lock was used (even though it was not necessary):
#include <iostream>
#include "boost/thread/thread.hpp"
#include "boost/thread/locks.hpp"

using std::cout;
using std::endl;

namespace
{
   boost::mutex mio;

   class Count
   {
   private:
      int multi_;
   public:
      Count(int multi) : multi_(multi) { }

      void operator()()
      {
         for(int i = 0; i < 5; ++i)
         {
            boost::lock_guard<boost::mutex> lock(mio);
            std::cout << "Thread " << boost::this_thread::get_id() << " is looping: " << i*multi_ << std::endl;
            boost::this_thread::sleep(boost::posix_time::milliseconds(10));
         }
      }
   };
}

void t05()
{
   std::cout << "Main thread " << boost::this_thread::get_id() << std::endl; 
   boost::thread t1(Count(1));
   {
      boost::lock_guard<boost::mutex> lock(mio);
      std::cout << "Thread 1 started" << std::endl;
   }

   boost::thread t2(Count(-1));
   {
      boost::lock_guard<boost::mutex> lock(mio);
      std::cout << "Thread 2 started" << std::endl;
   }

   boost::thread t3(Count(10));
   {
      boost::lock_guard<boost::mutex> lock(mio);
      std::cout << "Thread 3 started" << std::endl;
   }

   t1.join();
   t2.join();
   t3.join();
   std::cout << "Back to the main thread " << std::endl;
}

Go to the full post

boost::thread::id

Any boost thread could by indentified checking its associated boost::thread::id.

To retrieve the id of the current a specific function in the boost::this_thread, aptly named get_id() is made available.

The id of a boost thread is not yet, or no more, associated with a low level thread, its id assumes a dummy value "not-any-thread".

In this short example we dump the id of the current thread, then the one of another active thread, and finally the id of the same boost thread object, after the underlying system thread has been terminated:

#include <iostream>
#include <boost/thread.hpp>

namespace
{
class Callable
{
// as defined in a previous post
};
}

void t04()
{
std::cout << "Checking thread IDs" << std::endl;
std::cout << "Current thread id: " << boost::this_thread::get_id() << std::endl;

boost::thread t(Callable(5));
std::cout << "Other thread id: " << t.get_id() << std::endl;

t.join();
std::cout << "Other thread id: " << t.get_id() << std::endl;

std::cout << "Done thread id testing" << std::endl;
}

Go to the full post

boost::thread::interrupt()

We can interrupt a secondary thread from the thread that lauched it using the method thread::interrupt().

But the secondary thread has a few ways to come avoid the request of interrupting.

It could declare an object of type boost::this_thread::disable_interruption that marks a region where interrupts are not allowed to work as expected by the caller.

It could catch the exception that would cause its execution to be interrupted, its type is boost::thread_interrupted.

Another fact to keep in mind is that an interruption could be executed only at specific interruption point, that is where a join(), wait(), sleep(), or interruption_point() is called.

Go to the full post

Joining and detaching a boost::thread

Our thread has launched another thread. There are a couple of things that now it could do with it now: wait for its termination or let it go.

If we don't have anything more interesting to do in the current thread, we can let it waiting for some times for the other thread, using timed_join().

Or could wait indefinitely for it, by calling join().

A third alternative is detaching the thread from the boost::thread object. In this way we are saying that we don't want to have anything to do anymore with that thread. It would do its job till the end, and we were not concerned with its life and death.

Here is a short example showing what said:

#include <iostream>
#include <boost/thread.hpp>

namespace
{
class Callable
{
private:
int value_;
public:
Callable(int value) : value_(value) {}

void operator()()
{
std::cout << "cout down ";
while(value_ > 0)
{
std::cout << value_ << ' ';
boost::this_thread::sleep(boost::posix_time::seconds(1));
--value_;
}
std::cout << "done" << std::endl;
}
};
}

void t02()
{
std::cout << "Launching a thread" << std::endl;

boost::thread t1(Callable(6));
t1.timed_join(boost::posix_time::seconds(2));
std::cout << std::endl << "Expired waiting for timed_join()" << std::endl;

t1.join();
std::cout << "Secondary thread joined" << std::endl;

Callable c2(3);
boost::thread t2(c2);
t2.detach();

std::cout << "Secondary thread detached" << std::endl;
boost::this_thread::sleep(boost::posix_time::seconds(5));
std::cout << "Done thread testing" << std::endl;
}

Go to the full post

Launching a boost::thread

To create a new boost::thread object, an object of a callable type that could be invoked passing no parameter should be passed to its constructor.

The object passed to the boost::thread constructor is copied to the thread local memory and invoked in its specific execution thread.

If it is not possible to copy the callable object, we could use std::ref (or boost::ref) to use a reference in the new thread. In this case, the caller must ensure the callable object being alive until the newly created thread terminate.

As example we use a class, Callable, to show the different behaviour creating a thread passing an object in the default way (copy) and by std::ref.

The copy constructor of Callable set its internal control variable to zero, whatever is the original one. So we could have easily a direct feedback on what is happening internally.


#include <iostream>
#include <boost/thread.hpp>

namespace
{
class Callable
{
private:
int value_;
public:
Callable(int value) : value_(value) {}
Callable(const Callable& c) { value_ = 0; }

void operator()()
{
std::cout << "count down ";
while(value_ > 0)
{
std::cout << value_ << ' ';
boost::this_thread::sleep(boost::posix_time::seconds(1));
--value_;
}
std::cout << "done" << std::endl;
}
};
}

void t01()
{
std::cout << "Launching threads" << std::endl;

Callable c1(12);
boost::thread t1(c1);
t1.join();

Callable c2(12);
boost::thread t2(std::ref(c2));
t2.join();

std::cout << "Done thread testing" << std::endl;
}

Go to the full post

HFDP - Model View Controller (MVC)

MVC is probably the most known compound pattern, since it describes naturally almost any modern application.

Let's think for instance to an mp3 player. The user sees on the View what is the current status of the player. Using the interface, the user asks the application to do something new, as playing another song, and the request is passed to the Controller. The Controller acts consequently, asking the model to change its status. The View observes the Model, and reacts changing accordingly when notified by the Model.

Model: keeps all the data, the state and the application logic.
View: gives the user a presentation of the model.
Controller: receives the user inputs and asks for changes in the Model and View accordingly.

The user has only access to the View. His requests are passed to the Controller.

The Controller translates the request it gets from the View in calls to the Model.

If required, the Controller could also ask the View to change.

The Model notifies the View when its state changes. And the View could ask the Model for more data to integrate the information to be showed to the user.

The Observer Pattern connects the Model to the View. The Model is observed by the View.
The Strategy Pattern connects the View to the Controller. The Controller implements the behaviour of the View.
The Composite Pattern is used to structure the View, to manage all its components.

More on MVC, and on compound patterns, in chapter twelve of Head First Design Patterns, by Freeman, Freeman, Sierra, Bates.

Go to the full post