Threadsafe std::queue

The standard containers are not threadsafe, and this is a feature, in the sense of we have not to pay for synchronization if we don't need it. On the other hand this means that when we need to use a container in multithreading environment, we have to take care explicitly of all the synchronization details, and it could be not a big fun.

I am about to write a simple multithreading application that needs a queue. One thread will put data in it, the other thread will read it. To implement this well know producer-consumer model, we can't use std::queue, but we should wrap it in a structure that makes it thread safe.

Our application would be very simple, and it would require just two threadsafe operation: push() and pop(). Just one case that requires a bit of thinking: what is going to happen if we call pop() when the queue is empty? Usually we should let the user a chance to decide if it is more appropriate to return in an error state, or hanging (maybe just for a while) waiting for the producer to provide a value. In this case I decided to keep it very simple, and just waiting.

Here is a possible implementation for such a queue:
template <typename T>
class MyQueue
{
private:
  std::queue<T> q_;
  boost::mutex m_; // 1
  boost::condition_variable c_;

public:
  void push(const T& data)
  {
    boost::lock_guard<boost::mutex> l(m_); // 1
    q_.push(data);
    c_.notify_one(); // 2
  }

  T pop()
  {
    boost::mutex::scoped_lock l(m_); // 3
    while(q_.size() == 0)
    {
      std::cout << "Waiting for data" << std::endl;
      c_.wait(l); // 4
    }

    T res = q_.front();
    q_.pop();
    return res; // 5
  }
};
1. We need just the bare RAII functionality from this lock, so lock_guard is enough. Once the mutex is acquired, we can change the object status, and that here means adding the passed data to the queue.
2. We use the condition member variable to notify the other thread that a new item has been inserted in the queue.
3. This lock should be passed to the member condition, in case we need to wait for coming data, so a lock_guard is not enough.
4. If the queue is currently empty, we put the current thread in a waiting status using the condition member variable. When the other thread notify that something has changed, this thread would resume its running status.
5. The value at the beginning of the queue is popped and returned to the caller.

[edit]
Initially I put the wait() statement above, at the line marked as (4), in an if block:
if(q_.size() == 0) // THIS IS NOT A GOOD IDEA
{
  std::cout << "Waiting for data" << std::endl;
  c_.wait(l);
}
As Sergey noticed (see comments below) this is not enough. Sometimes, in a very erratic way that I couldn't catch in my testing, wait() returns even if there is no change on the condition. Patching the code is pretty easy, we need to loop on the check until we actually get a confirmation for the change, as suggested by Adrián (again, below in the comments).

The section on the official Boost documentation for Condition variables in the Synchronization page, shows to wait on a condition right in this way. Still, I couldn't find there a clarification for the reasons beneath. For that, I'd suggest you to read the post named Spurious wakeups on the Vladimir Prus blog.

In few words, Vladimir remarks that wait() on a condition could return before completing its actual job, and this is an expected behavior, caused mainly by performance reasons. It is so much cheaper introducing a check in the user code than enforcing the condition wait() to return only as a real wakeup.
[/edit]

As usual, I wrote a few tests (using the Google Test implementation for C++ of xUnit) to be driven in the development and to ensure that the class works as I expected. This is one of them:
TEST(TestMyQueue, OneOutOneIn)
{
  MyQueue<int> mq;
  int in = 1;

  boost::thread t([&](){ EXPECT_EQ(in, mq.pop()); });

  boost::this_thread::sleep(boost::posix_time::seconds(1));
  mq.push(in);
}
The main thread spawns a new thread that runs a lambda function (cool, isn't it?) then sleeps for a second and finally push a value on the queue.

The other thread has access by reference to both the variables defined in the caller scope (that is the meaning of the ampersand in the lambda intro square brackets) and uses them in the test macro, comparing the result of popping on the queue with the value stored in the integer variable.

Since the main thread sleeps one big fat second before pushing the value on the queue, we expect that pop() find the queue being initially empty, and could get the value only after a while.

10 comments:

  1. When tested in VC++2010, it passes Gtest but sometimes fails with "Debug Assertion Failed" pointing to line 338 of . Which is -
    "Expression: deque iterator not dereferencable" on "if (_Mycont == 0..."

    ReplyDelete
  2. Hi Sergey. Thank you for passing by. I have done some other testing on this piece of code, but I couldn't replicate the behavior you describe.

    The assertion failed you report means that front() has been called on an empty queue (a few more details here).

    It looks like, for some weird reason, c_.wait() returns before the push() on queue has done its job.

    Please let me know if you can write a test case that clarify the issue.

    ReplyDelete
    Replies
    1. I am in the process of reading "C++ Concurrency in Action" by Antony Williams (the maintainer of the Boost Thread library). And recently have migrated from VC++ to gcc 4.7.2 on Ubuntu (thus, leaving Gtest on Windows for the time being). He implements the threadsafe queue with "two variants on pop(): try_pop(), which tries to pop the value from the queue but always returns immediately (with an indication of failure) even if there wasn’t a value to retrieve, and wait_and_pop(), which will wait until there’s a value to retrieve." (p.72). When i will get over it i will, probably, add some more information on the subject. Thanks for reply on the comment, by the way.

      Delete
    2. See the comment below by Adrián. Thank you for raising the issue.

      Delete
    3. By the way, you should mention that the queue must be set up before test. Like -

      mq.push(in);
      EXPECT_EQ(in, mq.pop());

      Otherwise we call pop() in lambda on empty queue. This is what caused an assertion failure on empty container. And i guess, "OneInOneOut" would sound more appropriate.

      Delete
    4. Well, no. Actually, the point here is that the queue could be empty, and the consumer won't fail, would just wait, maybe forever. As Adrián pointed out (and read the cited Vladimir Prus post for more details), an occasional failure in the original code could been explained by a possible spurious wake-up. I have never had it in my test cases, but it could happen, the code has to be written looping on the wait() call on the condition (see the cited Boost documentation for details) to ensure that wake up is real.

      Being now fully protected, MyQueue::pop() won't pop the queue until we are sure there is something in it.

      Delete
    5. Testing "OneOutOneIn" (i.e. the other way around), you break the queue's main property FIFO (First In First Out).

      Delete
    6. I think I get what you mean. Your OneInOneOut test case is designed to ensure that MyQueue behaves like an ordinary queue. That would be useful if it was created from scratch, but it is written by composition around the well tested STL queue, so it looks a bit of an overkill to me.

      If we need a queue that has to satisfy just the basic requisites, it won't make any sense to create a new class, we would go for the STL one. Here I need a queue that should be a bit smarter. If receives a pop request and nothing is in, instead of falling to an undefined behavior, it should wait until a push would provide something to return.

      My OneOutOneIn test case checks this scenario, and it has that weird name because it firstly does a (single) request for output, and then provides a (single) input to the queue. It is MyQueue responsibility to reverse the actual execution of the code on its internal STL queue, delaying the pop till it makes sense to run it.

      If OneOutOneIn fails, this means that MyQueue doesn't work as expected.

      Delete
  3. You need to call c.wait(l) within a loop, because wait() is subject to spurious wake-ups ( standard behaviour)

    ReplyDelete
    Replies
    1. I correct the code following your suggestion, thank you Adrián!

      Delete