boost::condition on a std::stack

Let's say we want to create a little tutorial-like application that implements a simple read-write concurrent behaviour on a STL stack.

There will be two threads, one putting data (actually, int values) in the stack, the other one reading from it. The zero value has to be interpreted as terminator for the job.

We are about to implement a solution that would use a class (named ReadWrite) having as private members two threads (each of them for reading and writing), two mutex (one for I/O operations, the other for the stack access), a condition (for keeping track of the empty stack case when reading).

To muddle a bit the water, the reading and writing will be delayed in a configurable way.

We want the public interface of our ReadWrite class to provide a constructor, for setting the read/write delay, a couple of methods to start reading and writing, and the destructor, where the main thread would sit waiting the read/write threads to complete their job.

So, the user code for our class would be something like that:

void t07()
{
ReadWrite rw(2, 1);

rw.startReading();
rw.startWriting();
}

Here is the private data member section for the class:

private:
boost::mutex mio_; // mutex on i/o
boost::mutex ms_; // mutex on the stack
boost::condition cd_;
boost::thread tr_;
boost::thread tw_;

enum {TERMINATOR = 0};

std::stack<int> s_;
int delayR_;
int delayW_;

The TERMINATOR constant is the value we are going to use to signal the reader it could stop waiting input from the writer.

We'll have a couple of utility private functions for writing output to the console:

void message(const char* msg)
{
boost::lock_guard<boost::mutex> lm(mio_);
cout << boost::this_thread::get_id() << ": " << msg << endl;
}

void message(const char* msg, int value)
{
boost::lock_guard<boost::mutex> lm(mio_);
cout << boost::this_thread::get_id() << ": " << msg << ' ' << value << endl;
}

We use lock_guard since we require just the bare functionality from our lock.

Here is the code for the private reading function, that will be run in its specific thread:

void reading()
{
int value = -1;
while(value != TERMINATOR) // 1.
{
boost::mutex::scoped_lock ls(ms_); // 2.
if(s_.empty()) // 3.
{
message("can't read, stack is empty");
cd_.wait(ls); // 4.
}

value = s_.top();
s_.pop();
ls.unlock(); // 5.

if(value == TERMINATOR)
message("terminator found in the stack");
else
{
message("read", value);
boost::this_thread::sleep(boost::posix_time::seconds(delayR_));
}
}
}

1. We loop until the terminator is read in the stack.
2. We usw a scoped_lock on the mutex that rules the access to the stack, since it has to be passed to the wait() method of the condition (see 4), and this means that we need it to support the unlock() functionality. And given that we have to use a scoped_lock, it is worthy to use unlock() also in our code (in 5).
3. If the stack is empty, we can't read, we say that to the user and ...
4. We wait on the condition that something happens on the stack.
5. After we actually read (and pop) a value from the stack, we unlock the mutex, making it available to the other thread.

The other interesting private function in this class is the one that writes to the stack:

void writing()
{
for(int i = 1; i < 6; ++i)
{
{
boost::lock_guard<boost::mutex> l(ms_); // 1.
if(i == 1) // 2.
s_.push(TERMINATOR);
s_.push(i);
}
message("write", i);
cd_.notify_one(); // 3.

boost::this_thread::sleep(boost::posix_time::seconds(delayW_));
}
}

1. just a basic lock is required here, so let's use lock_guard.
2. before putting the first "real" item in the stack, we put the terminator, that will be the last value fetched by the reader (here is a caveat: if the reader is faster than the writer, it would get the terminator before the writer would be able to put all the values in the stack).
3. after a "real" value has be put in the stack, the writer notify the reader that something has changed in the stack.

And here is the public section of ReadWrite:

public:
ReadWrite(int delayR, int delayW) : delayR_(delayR), delayW_(delayW)
{
message("ReadWrite ctor");
}

~ReadWrite()
{
message("main thread wait ...");
tr_.join();
tw_.join();
message("... job completed");
}

void startReading()
{
tr_ = boost::thread(&ReadWrite::reading, this);
}

void startWriting()
{
tw_ = boost::thread(&ReadWrite::writing, this);
}

No comments:

Post a Comment