ASIO asychronous echo server

The recently seen ASIO TCP/IP echo server is so simple that it does not actually ask for an asynchronous implementation (if it does not look simple enough to you, please have a look at an even simpler implementation). On the other hand, it is simple enough to to be modified for using an asynchronous approach with some hope of not getting lost in details.

Our echo server was basically built on two function, server() and session(). The idea was that any time a client was asking for a connection, a new session was generated in a new thread. So, the key line in that application was:
boost::thread t(session, sock);
You can find it at the bottom of the while loop in the server() function. It could be read as: run in a new thread the session function for the current socket (passed as a shareable smart pointer).

In such a simple context it looks good. Problem is that usually things tend to get more complicated, and managing multithreading explicitly could be a source of additional headaches. A way to avoid that could be relying on ASIO, using its asynchronous calls. Alas, this does not come for free, and we have to pay it with a more complex code structure. In this case this could look like a bit of overkilling, but sure make sense in real cases.

First step: our main function won't call anymore the original server() function but a new shiny asyncServer(), that would be something like that:
void asyncServer()
{
  try
  {
    boost::asio::io_service ios;
    AServer as(ios); // 1
    ios.run(); // 2
  }
  catch(std::exception& e)
  {
    std::cout << "Exception: " << e.what() << std::endl;
  }
}
1. A single server function is not enough anymore, we are about to design a class for modeling the asynchronous server, and here we'll instantiate an object of that class.
2. We'll let ASIO do the dirty job of managing our code, creating threads if and when required. Its run() function will take care of that, returning when nothing more is left to do.

Notice that we are not about to solve an annoying feature of our server: it still runs forever, and to kill it we have to issue an interrupt.

Let's see the AServer declaration:
#pragma once // 1
#include <boost/asio.hpp>
class ASession; // 2

using boost::asio::ip::tcp; // 3

class AServer
{
public:
  AServer(boost::asio::io_service& ios);
  void handleAccept(ASession* session, const boost::system::error_code& error);
private:
  void createSession();

  boost::asio::io_service& ios_;
  tcp::acceptor acp_;
};
1. I'm developing for VC++, so I'm using the infamous, but so handy, pragma once. If your compiler does not support this feature, you should use the clumsier, but more standard, include guards.
2. Forward declaration for the class that we'll use to model the behavior of our asynchronous session.
3. I'm not exactly a fan of using directive, but this nested namespace is way too boring.

If you played around a bit with the server echo example, you shouldn't be surprised by what you see in this class: an ASIO io_service, an acceptor to accept connections from clients, a way to create a new session ... the only perplexing piece of code should be that handleAccept() function. Actually, it looks pretty innocent:
void AServer::handleAccept(ASession* session, const boost::system::error_code& error)
{
  if(!error) // 1
  {
    session->start();
    createSession();
  }
  else // 2
    delete session;
}
1. If it is called in a "good" way, the passed session object is started (be patient for details on that), and a new session is created.
2. Otherwise, the passed session is "bad", so we just delete it.

So, handleAccept() is not a big issue. A bit more thinking is required by its caller:
void AServer::createSession()
{
  ASession* session = new ASession(ios_); // 1

  // using boost::asio::placeholders::error instead of _1 would be clearer, possibly
  acp_.async_accept( // 2
  session->socket(), // 3
  boost::bind(&AServer::handleAccept, this, session, _1)); // 4
  std::cout << "A new session is available to clients" << std::endl;
}
1. A new (asynchronous) session object is created.
2. We call the asynchronous accept method on the acceptor. Asynchronous meaning that returns immediately, not blocking the execution stream till its job is completed.
3. Its first parameter is the socket for the current session.
4. When async_accept() completes its job, meaning it has accepted a connection from a client socket, it would call handleAccept() for this object (this), passing to it the current session object itself and a mysterious other parameter, that would be task of handleAccept() to provide. If we want to make the code a bit more readable, we could use boost::asio::placeholders::error instead of _1, stressing the fact that we expect the caller putting an error code there.

This createSession() is called by two functions in AServer, its constructor:
AServer::AServer(boost::asio::io_service& ios) :
  ios_(ios), acp_(ios, tcp::endpoint(tcp::v4(), ECHO_PORT))
{
  createSession();
}
And the handleAccept() itself, that we have already seen above.

So, what happens when the free function asyncServer() creates an AServer object is that the AServer ctor calls createSession(), that creates a new object on the heap, and then asks the member acceptor to asynchronously accept a client connection in the session socket, and finally, when the accept call is completed, a call to handleAccept() will be performed, causing the current session to start and creating a new session, that would be available for a new connection with a client.

Basically the same behavior for the original echo server, if you don't let the asynchronicity confuse you.

Now the asynchronous session. Here is its class declaration:
#pragma once
#include <boost/asio.hpp>
#include "echo.h"

using boost::asio::ip::tcp;

class ASession
{
public:
  ASession(boost::asio::io_service& ios) : socket_(ios), connected_(false) {}
  ~ASession() { std::cout << "Session dtor" << std::endl; }

  tcp::socket& socket() { return socket_; }

  void start();
  void handleRead(const boost::system::error_code& error, size_t length);
  void handleWrite(const boost::system::error_code& error);
private:
  tcp::socket socket_;
  boost::array<char, MSG_LEN> data_;
  bool connected_;
};
Having seen the server class, I guess the reader won't be much surprised by this design. In the private section of the class we have a socket, a data buffer for communication, and a flag to keep track of the session status. Among the public methods we have a couple of handleXXX() that we should suspect linked to the asynchronous design of this class.

We remember that ASession::start() is called by AServer::handleAccept() when the asynchronous call to accept on the socket owned by the ASession object itself is accomplished:
void ASession::start()
{
  std::cout << "Starting session" << std::endl;
  connected_ = true; // 1
  socket_.async_read_some(boost::asio::buffer(data_), // 2
  boost::bind(&ASession::handleRead, this, _1, _2)); // 3
}
1. Keep track of the fact that the connection is started.
2. Start reading asynchronously on the socket (connected to a client), using as buffer the private data member.
3. When async_read_some() is done with its job, it is going to call ASession::handleRead() for the current object, passing a couple of parameters. If we want to make the code more clear, we could use the ASIO placehoder boost::asio::placeholders::error instead of _1, and boost::asio::placeholders::bytes_transferred for _2.

In any case the sense of this function is that the read of data coming from the client is done asynchronously, and at the end of it the control is passed to handleRead(). So, let's see this other function:
void ASession::handleRead(const boost::system::error_code& error, size_t bytes)
{
  if(data_[bytes-1] == '\0') // 1
  {
    std::cout << "Client sent a terminator" << std::endl;
    --bytes;
    connected_ = false; // 2
  }

  if(!error && bytes) // 3
  {
    std::cout << "Writing back " << bytes << " characters to the client" << std::endl;
    boost::asio::async_write(socket_, boost::asio::buffer(data_, bytes),
    boost::bind(&ASession::handleWrite, this, _1)); // 4
  }
  else
    delete this; // 5
}
1. This should be a sort of deja vu, being so close to the code of our old echo server. We check if the last character in the buffer is a NUL, being this by our convention the terminator for the connection between client and server.
2. Here we are using a member variable, connect_, to keep track of the termination, since we set it here but we are going to check it in handleWrite().
3. If the asynchronous read succeeded and the buffer is not empty, we send back the data to the client, using async_write().
4. When done, async_write() will call handleWrite() on the current object. If you feel _1 is not descriptive enough, you could use instead boost::asio::placeholders::error.
5. This is interesting. When the session sees it is not useful anymore, it commits suicide, calling delete on itself.

Finally, here is handleWrite(). We have seen it is called by the asynchronous write call in handleRead():
void ASession::handleWrite(const boost::system::error_code& error)
{
  if(!error && connected_) // 1
  {
    std::cout << "Ready for a new read" << std::endl;
    socket_.async_read_some(boost::asio::buffer(data_),
    boost::bind(&ASession::handleRead, this, _1, _2)); // 2
  }
  else
    delete this; // 3
}
1. Only if there was no error in the asynchronous write, and if no terminator was detected in the data sent by the client, we make another asynchronous read call on the socket.
2. As you probably suspected, the placeholders could be replaced by boost::asio::placeholders::error and boost::asio::placeholders::bytes_transferred.
3. Here again, if the session object detect is out of scope, delete itself.

This post is based on the official documentation Boost ASIO Echo examples.

7 comments:

  1. I find this example is very useful and easy to understand.

    If I would like to spawn a thread to handle this ASIO asynchronous echo server, how do I do it? I have a while loop in my main to handle other threads.

    ReplyDelete
  2. Actually, the spirit of this example was to show how to avoid explicit multithreading, letting digest it to ASIO.

    In any case, I not sure where you are having an issue. Technically, you should just spawn another thread in your main that would take care of managing the echo server functionality.

    I don't know anything about your application design, so I'm just guessing, but wouldn't be more clear if you run the echo server in a dedicated process?

    ReplyDelete
  3. The complete Code would be nice at the End.

    ReplyDelete
    Replies
    1. Good suggestion. And I should probably check that after two years anything in this post is in line with the current ASIO version. Thanks.

      Delete
    2. Didn't saw the Date of the post.
      Now i think i know why it don't work ^.^

      Delete
  4. error:
    namespace boost doesn't include any member "bind"
    why?

    ReplyDelete
    Replies
    1. Hello, Anonymous.

      This post is outdated, see previous comment by Peter Pan. I'm sorry I have currently no time to dig in it. I promise that as soon as I can, I'll rewrite it all from scratch.

      Delete