Synchronizing with a condition_variable

It is quite common having the requisite in a multithreaded application that a thread should wait for another thread to complete some task, and the common way to accomplish this in C++11 (or Boost, if your compiler does not implemented it yet) by using condition_variable. For instance, we have previously seen an example of a threadsafe implementation of a queue that uses a condition_variable to communicate between threads the status change.

Here we'll see an even simpler example.

A class is designed to be used in multithreaded environment. A member function is going to set an int member variable a number of times, as specified by the caller. Before resetting that value it waits that another thread would use the previously set value. To keep track of the status we use an accessory boolean variable.

Let's see a first implementation that makes no use of condition_variable:
class Conditional : boost::noncopyable
{
private:
  int value_; // 1
  bool produced_; // 2
  boost::mutex m_;

public:
  Conditional() : value_(-1), produced_(false) {}

  void produce(unsigned int count)
  {
    for(int i = count; i >= 0; --i)
    {
      boost::unique_lock<boost::mutex> l(m_); // 3
      while(produced_) // 4
      {
        std::cout << "Producer waits" << std::endl;

        l.unlock(); // 5
        boost::this_thread::sleep(boost::posix_time::millisec(100));
        l.lock();
      }
      std::cout << "Producer sets value to " << i << std::endl; // 6
      value_ = i;
      produced_ = true;
    }
  }

  void consume()
  {
    do {
      boost::unique_lock<boost::mutex> l(m_);
      while(!produced_) // wait for producer
      {
        std::cout << "Consumer waits" << std::endl;

        l.unlock();
        boost::this_thread::sleep(boost::posix_time::millisec(100));
        l.lock();
      }
      std::cout << "Consumer now is in control: " << value_ << std::endl;
      produced_ = false;
    } while(value_); // 7
  }
};
1. Variable shared among threads.
2. Flag to keep track if the current value is ready to be consumed.
3. We enter in the critical section.
4. Waiting for consumer to use a value previously set.
5. Give a chance to the other thread to do its job.
6. When we reach this line, we are ready to set the value and the flag that marks it as ready to be consumed.
7. The consumer is similar to the producer, the main difference is that it cycles till it finds a invalid value in the member variable (that is, zero).

Here is the code for testing this:
Conditional c;

boost::thread t1(&Conditional::consume, &c);
boost::thread t2(&Conditional::produce, &c, 10);

t1.join();
t2.join();
As we can see, this solution works alright. But this is not an elegant solution, main issue is that we have to specify "by hand" the interval we want our threads to sleep when waiting for the other thread to do its job. Using a condition_variable makes our code cleaner and simpler. Firstly we add a private member variable:
boost::condition_variable c_;
Then we rewrite consume and produce in this way:
void produce(unsigned int count)
{
  for(int i = count; i >= 0; --i)
  {
    boost::unique_lock<boost::mutex> l(m_);
    c_.wait(l, [this](){ return !produced_; } ); // 1
    std::cout << "Producer sets value to " << i << std::endl;
    value_ = i;
    produced_ = true;
    c_.notify_one(); // 2
  }
}

void consume() // 3
{
  do {
    boost::unique_lock<boost::mutex> l(m_);
    c_.wait(l, [this](){ return produced_; } );

    std::cout << "Consumer now is in control: " << value_ << std::endl;
    produced_ = false;
    c_.notify_one();
  } while(value_);
}
1. We can wait directly on the mutex (see the other example for details) or, as here, on the unique_lock. In this case we should pass to wait() a predicate that returns a boolean, the condition we are waiting for. Here as predicate we pass a lambda function, with a closure on this, so that we can access the produced_ flag.
2. Another thread that is pending on the condition is notified that the status has changed.
3. As before, consume() is very close to produce().

We could use the same code seen above to test our new version, and we should appreciate as fast is it now, that we removed the tentative sleeps.

No comments:

Post a Comment