That is precisely the what a boost::condition could be used for.
Basically, we should do something like that: the writer should check the buffer, if it is full it should wait for the reader thread to free some space for the new data; besides, the writer should notify the reader when it puts a data in the buffer. On the other side, the reader checks the buffer, if it is empty, waits for the writer to make available new data; when it reads an element of the buffer it pops it, and then notify the other thread that more room is available.
Let's see this example:
#include <iostream>
#include "boost/thread/thread.hpp"
#include "boost/thread/mutex.hpp"
#include "boost/thread/condition.hpp"
#include "boost/circular_buffer.hpp"
using namespace std;
namespace
{
class Buffer : private boost::noncopyable
{
private:
enum { BUF_SIZE = 3 };
boost::condition cond_;
boost::mutex mcb_; // mutex on the buffer
boost::mutex mio_; // mutex for console access
boost::circular_buffer<int> cb_;
public:
Buffer(int size = BUF_SIZE) : cb_(size) {}
void put(int i)
{
{
boost::mutex::scoped_lock lock(mio_);
cout << "sending: " << i << endl;
}
// acquire exclusive access on the buffer
boost::mutex::scoped_lock lock(mcb_);
if(cb_.full())
{
{
boost::mutex::scoped_lock lock(mio_);
cout << "Buffer is full. Waiting..." << endl;
}
// the buffer is full: wait on the lock for a notification
while (cb_.full())
cond_.wait(lock);
}
cb_.push_back(i);
// notify to the other thread that a new item is available
cond_.notify_one();
}
int get()
{
// acquire exclusive access on the buffer
boost::mutex::scoped_lock lock(mcb_);
if(cb_.empty())
{
{
boost::mutex::scoped_lock lock(mio_);
cout << "Buffer is empty. Waiting..." << endl;
}
// the buffer is empty: wait on the lock for a notification
while (cb_.empty())
cond_.wait(lock);
}
int i = cb_.front();
cb_.pop_front();
// notify to the other thread that the buffer is not full anymore
cond_.notify_one();
{
boost::mutex::scoped_lock lock(mio_);
cout << i << " received" << endl;
}
return i;
}
};
const int ITERS = 20;
void writer(Buffer& buf)
{
for (int n = 0; n < ITERS; ++n)
buf.put(n);
}
void reader(Buffer& buf)
{
for (int x = 0; x < ITERS; ++x)
buf.get();
}
}
void dd04()
{
Buffer buf;
// notice the use of boost::ref
boost::thread t1(&reader, boost::ref(buf));
boost::this_thread::sleep(boost::posix_time::milliseconds(5));
boost::thread t2(&writer, boost::ref(buf));
t1.join();
t2.join();
}
The main thread generates two working threads, one on the reader free function and one on the writing free function. Both of them work on the buffer, that is allocated in the main thread and it is passed by reference to the read/write threads. It is necessary to use boost::ref(), otherwise instead of a reference to the Buffer object a copy of it is passed to the free functions.
But the point of this example is in the mechanism of condition wait on a lock, to leave time to the other threads to do something that it is required to the actual thread to go on with its job, and the notify on a condition, to let know to the other threads that something has changed in that context.
No comments:
Post a Comment