Multithreading with ASIO

ASIO could be used to make multithreading easier to be implemented and maintained. Think for instance to an application that should run a number of task in sequential or concurrent way, accordingly to a parameter that is passed from the argument line representing the number of thread to be used.

Implementing that with ASIO is pretty easy.

First of all, I wrote a couple of silly functions that represent the tasks my application should perform:
namespace
{
  boost::mutex mio; // 1

  void logMsg(char* reason, int i)
  {
    boost::lock_guard<boost::mutex> lock(mio); // 2.
    std::cout << reason << " for " << i << " at " << std::time(0) << std::endl;
  }

  void jobOne(int secs) // 3.
  {
    logMsg("Start jobOne", secs);
    boost::this_thread::sleep(boost::posix_time::millisec(1000 * secs));
    logMsg("End jobOne", secs);
  }

  void jobTwo(int millisecs)
  {
    logMsg("Start jobTwo", millisecs);
    boost::this_thread::sleep(boost::posix_time::millisec(millisecs));
    logMsg("End jobTwo", millisecs);
  }
}
1. A mutex is required because there could be more threads competing on a shared resource, in this case the output console where we'll print some log messages.
2. Here we use lock_guard to rule synchronize the access, so that the lock on the mutex is acquired entering the function and released on exit.
3. JobOne is actually just about sleeping for a few seconds.
4. JobTwo is not much different from jobOne.

And this could be an ASIO implementation for the given requirement:
void run(int tNumber) // 1
{
  boost::asio::io_service svc; // 2
  boost::asio::io_service::work work(svc); // 3
  boost::thread_group threads;

  for(int i = 0; i < tNumber; ++i) // 4
    threads.create_thread(std::bind(&boost::asio::io_service::run, &svc));

  svc.post(std::bind(jobOne, 2)); // 5
  svc.post(std::bind(jobOne, 1));
  svc.post(std::bind(jobTwo, 500));

  svc.stop(); // 6
  threads.join_all(); // 7
}
1. This function has to be called passing the number of threads we want to use.
2. As usual, io_service is the key element in ASIO code.
3. Try to comment this line and see what happens. In my case, the behavior becomes unpredictable, and that should be what you get too, since if you don't use a work object the run() function should terminate if there is nothing to do, and it could happen, given the way the code is written, that a thread could die before seeing anything at all to do. [See the comments for more on this point. As Stefan points out, if we don't use an io_service::work object, stopping the io_service object as here it is done in (6) is not a smart thing to do.]
4. We create as many thread as specified in the passed parameter. Any of these threads execute the run function on the IO service object.
5. We use post() to put in the execution queue for the IO service object any task we want to run.
6. Then we signal to the IO service object that there is nothing more to do.
7. And we wait for the threads execution to complete.

In this simple case, a work object is not actually mandatory. We could get rid of it inverting how the code is written. Moving (4), the loop for creating new threads for running the IO service, after (5), the block that creates the jobs to be executed, we will get that no threads would risk to see no job available, and so terminating before expected. But it is usually not a good idea relying on this behavior for a more general situation. [See a more elegant and robust solution and read the comment by Stefan here below for details]

2 comments:

  1. Hi. I know this post is quite old, but anyway...
    I think your code example is not quite correct.
    Calling stop() on the io_service object while it
    has some jobs to finish (the post()'s you did), will unconditionally finish any running run()'s.
    The boost.asio docs say that
    here: http://www.boost.org/doc/libs/1_46_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.stopping_the_io_service_from_running_out_of_work

    "To effect a shutdown, the application will then need to call the io_service object's stop() member function. This will cause the io_service run() call to return as soon as possible, abandoning unfinished operations and without permitting ready handlers to be dispatched"

    I fixed that by putting the work object in an auto_ptr<> like here (as suggested in the docs):
    https://gist.github.com/1393807

    ReplyDelete
  2. Hello Stefan, thank you for comment. You are right, but only if you don't use a io_service::work object. Actually, using it here for such a simple code is a kind of overkilling - but, you know, this is just an example.
    I had already suggested in the last lines of the post a way to get rid of it, organizing the code differently - but surely your solution is more stable and maintainable.

    ReplyDelete