Condition variable on a queue

It's a typical situation in a multithreaded scenario. We have a couple of tasks, and they should synchronize on a shared data structure. To keep things simple, one produces data, the other one consumes them. Here I write a solution to this problem using C++11 synchronization primitives, and a STL queue as data structure.

The important point in this post is how to use a standard condition variable to make a STL container safe for multithreading. I have already written a couple of post on the same theme. In the first one I write a threadsafe queue by extending the STL one, in the second one I use the same approach I am using here, creating a class that rule the access to a pure STL queue, ensuring a synchronized behavior.

The main difference is that two years are passed in the meantime, and now I am using GCC 4.8 that supports C++11 to the point that I could avoid any reference to Boost libraries. In any case it would pretty easy to adapt the code to a C++03 compiler and let Boost doing the job.

Still, remember that multithreading support is not enable by default on GCC. If you don't want to get a runtime exception (with a message like this: "Enable multithreading to use std::thread: Operation not permitted") you'd better say to the linker that you want the pthread library to be linked (by -l option).

How the thing works

I wrote this little class, that acts as a wrapper around a queue:
class Conditional
    std::queue<int> queue_; // 1
    std::condition_variable cond_; // 2
    std::mutex mutex_;

    void producer(); // 3
    void consumer(); // 4
1. The container that is shared between the two working task I am about to use.
2. As we'll see soon, the synchronization on the queue is performed using a condition variable and a mutex.
3. The producer task runs this method, that generates data and put them in the queue.
4. This is for the consumer task, it uses data in the queue.

A client code for this class would work in this way:
Conditional c;

std::thread tc(&Conditional::consumer, &c); // 1
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 2
std::thread tp(&Conditional::producer, &c);

1. Create and start the consumer task.
2. Before creating the producer, let sleep for a tad, so that we are sure the consumer starts first (and won't find anything to consume).

The sleep between the two thread creation makes sense (obviously?) only for an informal tester as this one. You wouldn't write such a thing in production code (again, obviously?).


The job that the producer task performs is pretty boring, just pushing ten elements in the queue. I follow the convention that a zero should be interpreted as an "end of transmission" message:
void producer()
    for(int i = 10; i >= 0; --i)
        std::unique_lock<std::mutex> lock(mutex_); // 1
        std::cout << "producing " << i << std::endl;
        cond_.notify_one(); // 2
1. Before accessing the shared resources, the task acquires a lock on the designed mutex. The cool thing about unique_lock is that we don't have to explicitly release the mutex, if we don't have any reason to do it. We can rely on its destructor to do this job (it follows the RAII paradigm). Notice that in this case the mutex protects both the queue and the standard output console. Usually it is cleaner to use a mutex for each resource.
2. Once the queue has changed, we notify (one single task pending on) the condition about it.


The consumer job is a bit more interesting:
void consumer()
    while(true) // 1
        std::unique_lock<std::mutex> lock(mutex_); // 2
        while(queue_.empty()) // 3
            std::cout << "waiting queue" << std::endl;
            cond_.wait(lock); // 4
        int message = queue_.front(); // 5

        if(!message) // 6
            std::cout << "terminating" << std::endl;

        std::cout << "consuming " << message << std::endl;
1. Loop "forever", waiting for elements to be pushed in the queue.
2. Try to acquire access to the queue. Notice (again) that the mutex is actually used to protect both the queue and the output console. This is not usually a good idea.
3. Before reading, we ensure there is actually anything in it.
4. The queue is empty. Let's wait on the condition. The lock sets the mutex free, until it gets a notification that something has changed. Notice that I put the wait in a while-loop, because we should beware of spurious wakeup. In a few words, it is possible (albeit not common) to get a false positive on a condition variable, putting a wait in a if-loop could lead to rare and unexpected failures.
5. We not the queue is not empty, get an element and remove it.
6. Following the convention that a zero element means "that's all folks"

Full C++ source code for this example is on github. I have added a couple of bonus commented variations there, in the consumer code:
  • After the wait on the condition variable, I checked if the wakeup is real or spurious. I have never get a spurious one. But this is not a guarantee of anything, you should still beware of spurious wakeup. If you don't care about logging and checking, you can rewrite this block in a single line:
    cond_.wait(lock, [this] { return !queue_.empty(); });
    The while-loop now is hidden inside the condition variable wait() call.
  • After consuming a message, just once in a while, I added a short sleep, to see what changed in the interaction between threads on the queue. You could have a relative fun playing with it too.
Notice that there is no guarantee on how the scheduler select a thread to run, you could get any time a different output from this program.

No comments:

Post a Comment