Producer-consumer on a shared standard container

We want to write a simple multithread producer-consumer application that uses the threadsafe wrapper to std::queue we wrote in the previous post.

The main point of this is seeing how to use a STL container in a multithreading context, and we'll have also a glimpse on how to interrupt a thread (on a interruption point).

We have to use a variation on std::queue because the standard container are not designed for use in a multithreading environment - the logic behind this strategy is that in this way we don't pay the high cost of a multithread-aware data structure if we are not interested in this feature.

Producer

The idea is that the producer generates a number of items, starting from a value passed to its ctor, an puts them in a queue that is accessible to the consumer too:
class Producer
{
private:
  const int root_; // 1
  std::shared_ptr< MyQueue<int> > q_; // 2

public:
  Producer(int root, std::shared_ptr< MyQueue<int> > q) : root_(root), q_(q) {}

  void run(size_t size) // 3
  {
    for(size_t i = 0; i < size; ++i)
    {
      q_->push(root_ + i);
      boost::this_thread::sleep(boost::posix_time::millisec(50)); 
    }
  } 
};
1. Initial value for the sequence generated by the producer.
2. Shared pointer to the queue where we want to put the data.
3. This method would be run in another thread. It accepts in input the number of elements the user wants to generate, and loops, putting each time a new item in the queue and then sleeping for a while.

Consumer

The consumer does not differ much from the producer, here is a simple implementation:
class Consumer
{
private:
  const int ID_; // 1
  std::shared_ptr< MyQueue<int> > q_;

public:
  Consumer(int id, std::shared_ptr< MyQueue<int> > q) : ID_(id), q_(q) {}

  void run(size_t size, bool sleep) // 2
  {
    for(size_t i = 0; i < size; ++i)
    {
      std::cout << "Consumer " << ID_ << ": " << q_->pop() << std::endl; // 3
      if(sleep)
        boost::this_thread::sleep(boost::posix_time::millisec(50));
      else
        boost::this_thread::interruption_point();
    }
  }
};
1. Used to see which consumer actually consumed the item.
2. This function runs in a different thread. The bool parameter is used to decide if this thread should sleep after popping a value from the queue, or just trying to read another item. Notice that in the latter case, a call to interruption_point() is done, so that the main thread could interrupt this worker thread.
3. This line is flawed. There could be multiple consumers, and all of them would try to access the standard output console. This means that we could have a mixed-up printout. The solution to this bug it is quite easy: adding a mutex in this class, and protecting this line with a lock on the mutex in each thread executing it. That should be easy to be fixed, but if you need some help to understand how to do it, you could have a look to other post, like the one where I talk about how to use a functor in multithreading. Incidentally, that same post could give you the idea to rewrite this example using functors. It should be interesting.

Calling producer-consumer

A simple way of running this code is by having a single producer and a single consumer, like this:
std::shared_ptr< MyQueue<int> > spq(new MyQueue<int>()); // 1
size_t size = 10; // 2

Producer p(1000, spq); // 3
boost::thread tp(&Producer::run, &p, size); // 4
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5

Consumer c(1, spq);
boost::thread tc(&Consumer::run, &c, size, true); // 6

tp.join(); // 7
tc.join();
1. We create a queue and put it in a shared pointer, so that we can pass it around in the producer and consumer.
2. Number of elements that we want to create.
3. Here is the producer ...
4. ... and here we start its run() method in another thread.
5. We give plenty of time to the producer to fill with its data the queue ...
6. ... then we create a new thread on the run() method of a consumer.
7. Finally we join the created threads before completing the main thread execution.

It is fun (if you are that kind of guy) to playing around with variation on this model, removing the sleep at [5], adding producers and consumers, changing the number of elements involved and whatsoever.

2 comments:

  1. Very good and helpful article. Can be coupled with "Implementing a Thread-Safe Queue using Condition Variables (Updated)" (http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html)

    ReplyDelete
    Replies
    1. Happy you found it useful. And thank you for sharing that link.

      Delete