Showing posts with label ASIO. Show all posts
Showing posts with label ASIO. Show all posts

Boost ASIO echo UDP asynchronous server

A change request for the echo UDP client-server app discussed before. We want keep the client as is, but we need the server be asynchronous.

Instead of using the synchronous calls receive_from() and send_to() on a UDP socket, we are going to use their asynchronous versions async_receive_from() and async_send_to().

The asynchronicity leads naturally to implement a class, having a socket has its private data member, so that we can make our asynchronous call on it.
const int MAX_LEN = 1'024;
const uint16_t ECHO_PORT = 50'015;

class Server
{
private:
    udp::socket socket_;  // 1
    udp::endpoint endpoint_;  // 2
    char data_[MAX_LEN];  // 3

// ...
1. Our ASIO UDP socket.
2. The endpoint we use to keep track of the client currently connected to the server.
3. Data buffer, used to keep track of the message received from the client.

The constructor gets the app ASIO I/O context by reference from the caller and uses it to instantiate its member socket. Then it calls its private method receive() to start its endless job.
Server(ba::io_context& io) : socket_(io, udp::endpoint(udp::v4(), ECHO_PORT))  // 1
{
    receive();
}

void receive()
{
    socket_.async_receive_from(ba::buffer(data_), endpoint_, [this](bs::error_code ec, std::size_t len) {  // 2
        if (!ec && len > 0)  // 3
        {
            send(len);
        }
        else
        {
            receive();  // 4
        }
    });
}
1. The socket requires also an endpoint describing the associated protocol and port. We create it on the fly.
2. Call asynchronously receive_from on the socket. ASIO would put in the data buffer what the client sends and store its network information in the passed endpoint. When the socket receive is completed, ASIO would call the handler passed as third parameter, here a lambda that captures "this" and honors the expected parameters.
3. If the receiving worked fine - no error_code reported - and the message is not empty, we'll call our Server send() method, to echo the message.
4. Otherwise - error or empty message - we don't have to send anything back, so we call the enclosing receive() method, to serve a new client.

When a "good" message is received from a client, our server sends it back to it as is:
void send(std::size_t len)
{
    socket_.async_send_to(ba::buffer(data_, len), endpoint_, std::bind(&Server::receive, this));
}
The socket gets the job of asynchronously send the data, as stored in the Server member variable, with the length, as passed in as parameter, to the endpoint saved as Server member variable. When the data transfer is completed, ASIO would call the handler passed as third argument. Here we don't want to do anything in case or error, not even logging something, so we can simply bind to the handler a call to "this" receive(), ignoring error code and length of the transferred data.

I pushed the complete C++ source file to GitHub. The code is based on the UDP asynchronous echo example in the official Boost ASIO documentation.

Go to the full post

Boost ASIO echo TCP asynchronous server

Let's refactor the echo TCP server to achieve asynchrony. It's going to be a lot of fun. If you feel that it is too much fun, you could maybe have first a look at the similar but a bit simpler asynchronous server discussed in a previous post.

Main

This server works with the same clients as seen for the synchronous server, here we deal just with the server. All the job is delegated to the Server class, whose constructor gets a reference to the application ASIO I/O context.
namespace ba = boost::asio;
// ...

Server server(io);
io.run();
Server

The server ctor initialize its own acceptor on the ASIO I/O context on the endpoint specifying the TCP IP protocol and port chosen, then it calls its private member method accept():
using ba::ip::tcp;
// ...

const uint16_t ECHO_PORT = 50'014;
// ...

class Server
{
private:
 tcp::acceptor acceptor_;

 void accept()
 {
  acceptor_.async_accept([this](bs::error_code ec, tcp::socket socket)  // 1
  {
   if (!ec)
   {
    std::make_shared<Session>(std::move(socket))->read();  // 2
   }
   accept();  // 3
  });
 }
public:
 Server(ba::io_context& io) : acceptor_(io, tcp::endpoint(tcp::v4(), ECHO_PORT))
 {
  accept();
 }
};
1. As handler to async_accept() is a lambda that gets as parameters an error code that let us know if the connection from the client has been accepted correctly, and the socket eventually created to support the connection itself.
2. A beautiful and perplexing line. We create a shared_prt smart pointer to a Session created from a rvalue reference to the socket received as parameter, and call on it its read() method. However this anonymous variable exits its definition block on the next line, so its life is in danger - better see what is going on in read(). Actually, we are in danger, too. If something weird happens in this session object, we don't have any way to do anything about.
3. As soon as a Session object is created, a new call to accept() is issued, an so the server puts itself in wait for a new client connection.

Session

As we have seen just above, we should expect some clever trick from Session, especially in its read() method. Thinking better about it, it is not a big surprise seeing that its superclass is enable_shared_from_this:
class Session : public std::enable_shared_from_this<Session>
{
private:
 tcp::socket socket_;
 char data_[MAX_LEN];

// ...
public:
 Session(tcp::socket socket) : socket_(std::move(socket)) {}  // 1

 void read()  // 2
 {
  std::shared_ptr<Session> self{ shared_from_this() };  // 3
  socket_.async_read_some(ba::buffer(data_), [self](bs::error_code ec, std::size_t len) {  // 4
   if (!ec)
   {
    self->write(len);
   }
  });
 }
};
1. The ctor gets in the socket that we seen was created by the acceptor and moved in, in its turn, the constructor moves it to its data member.
2. The apparently short lived Session object created by the handler of async_accept() calls this method.
3. A new shared_ptr is created from this! Actually, being such, it is the same shared_prt that we have seen in the calling handler, just its use counter increased. However, our object is still not safe, we need to keep it alive until the complete read-write cycle between client and server is completed.
4. We read asynchronously some bytes from the client. To better see the effect, I have set the size of the data buffer to a silly low value. But the more interesting part here is the handler passed to async_read_some(). Notice that in the capture clause of the lambda we pass self, the shared pointer from this. So our object is safe till the end of the read.

So far so good. Just remember to ensure the object doesn't get invalidated during the writing process:
void write(std::size_t len)
{
 std::shared_ptr<Session> self{ shared_from_this() };
 ba::async_write(socket_, ba::buffer(data_, len), [self](bs::error_code ec, std::size_t) {
  if (!ec)
  {
   self->read();
  }
 });
}
Same as in read(), we ensure "this" stays alive creating a shared pointer from it, and passing it to the async_write() handler.

As required, as the read-write terminates, "this" has no more live references. Bye, bye, session.

I have pushed my C++ source file to GitHub. And here is the link to the original example from Boost ASIO.

Go to the full post

Boost ASIO echo UDP synchronous client-server

Close to the previous post. The main difference that there we have seen a TCP-based data exchange while here we see a UDP echo.

Server

This server is simpler than the previous one. Just one connection is served at a time.
udp::socket socket(io, udp::endpoint(udp::v4(), ECHO_PORT));  // 1

for (;;)  // 2
{
 char data[MAX_LEN];
 udp::endpoint client;
 size_t len = socket.receive_from(ba::buffer(data), client);  // 3

 // ...
 socket.send_to(ba::buffer(data, len), client);  // 4
}
1. Create an ASIO UDP socket on the app io_context, on a UDP created on the fly where the UDP IP protocol and the port to be used are specified.
2. Forever loop to serve, in strict sequential order, all the requests coming from clients.
3. ASIO blocks here, expecting the socket to receive a connection from a client. Make sure that the buffer data is big enough.
4. Since this is an echo server, nothing exciting happens between receiving and sending. Here we send the data, as received, to the endpoint as set by receive_from().

Client
char request[MAX_LEN];
// ...

udp::socket socket{ io, udp::endpoint(udp::v4(), 0) };  // 1
udp::resolver resolver{ io };
udp::endpoint destination = *resolver.resolve(udp::v4(), host, ECHO_PORT_STR).begin();  // 2
socket.send_to(ba::buffer(request, std::strlen(request)), destination);  // 3

char reply[MAX_LEN];
udp::endpoint sender;
size_t len = socket.receive_from(ba::buffer(reply), sender);  // 4
// ...
1. Create a UDP socket on the ASIO I/O context. Notice that the UDP endpoint passed specify the IP protocol but not a valid port.
2. The destination endpoint, that refers to the server, is generated by the resolver created on the line above, that resolves the specified host and port for the given UDP IP protocol. Then the first result is taken (by the begin iterator and then dereferencing). In case of any trouble we have guarantee an exception is thrown by resolve().
3. Send the data through buffer to the socket, that mediates the connection to the server.
4. Once send_to() has ended its job (notice that it is a blocking function), we get the reply from the server calling receive_from(). The socket knows where to go and get the data, and will fill the passed endpoint (sender) with these information.

I pushed the full C++ code - both client and server in the same source file - to GitHub. I based them on blocking_udp_echo_server.cpp and blocking_udp_echo_client.cpp from the official Boost ASIO Tutorial.

Go to the full post

Boost ASIO echo TCP synchronous client-server

I think this echo client-server application is a good introduction to ASIO. The server creates a new TCP socket each time it receives a request from a client, and run it in a new thread, where the read-write activity is accomplished in a synchronous way. The client sends some data to the server, gets it back, and then terminates.
The structure is simple, still a few interesting points are touched.

Client

Given io, the app ASIO io_context, and the server hostname as a string, the client tries this block, and eventually just output to console an exception.
namespace ba = boost::asio;
using ba::ip::tcp;
// ...

tcp::socket socket{ io };  // 1
tcp::resolver resolver{ io };
ba::connect(socket, resolver.resolve(host, ECHO_PORT_STR));  // 2

// ...
ba::write(socket, ba::buffer(request, reqLen));  // 3

char reply[CLIENT_MAX_LEN];  // 4
size_t repLen = ba::read(socket, ba::buffer(reply, reqLen));  // 4
// ...
1. Create an ASIO TCP socket and a resolver on the current io_context.
2. Then resolve() the resolver on the host and port of the echo server (in my case, localhost:50014), and use the resulting endpoints to estabilish a connection on the socket.
3. If the connection holds, write to the socket the data we previously put in the char buffer named request, for a size of reqLen.
4. We reserve a confidently large buffer where to store the server reply. Since we are writing a echo application, we know that the size of the data we are about to get from the client should be the same of the size we have just sent. This simplify our code to the point that we can do a single read for the complete data block.
5. Use the socket for reading from the server. We use the buffer, and the size of the data we sent, for what said on (4).

At this point we could do whatever we want with the data we read in reply with size repLen.

Server loop

Once we create an acceptor on the ASIO io_context, specifying as endpoint the IP protocol we want (here I used version 4) and the port number, we loop forever, creating a new socket through a call to accept() on the acceptor each time a request comes from a client, passing it to the session() function that is going to run in a new thread.
tcp::acceptor acceptor{ io, tcp::endpoint(tcp::v4(), ECHO_PORT) };

for (;;)
{
 std::thread(session, acceptor.accept()).detach();
}
Notice that each thread created in the loop survives the exiting of the block only because it is detached. This is both handy and frightening. In production code, I would probably push them in a collection instead, so that I could explicitly kill anyone that would stop behave properly.

Server session

Since we don't know the size of the data sent by the client, we should be ready to split it and read it in chunks.
for (;;)
{
 char data[SERVER_MAX_LEN];  // 1

 bs::error_code error;
 size_t len = socket.read_some(ba::buffer(data), error);  // 2
 if (error == ba::error::eof)
 {
  return;
 }
 else if (error)
 {
  throw bs::system_error(error);
 }

 ba::write(socket, ba::buffer(data, len)); // 3
}
1. To better see the effect, I have chosen a ridiculously small size for the server data buffer.
2. The data coming from the client is split in chunks from read_some() on the socket created by the acceptor. When the read is completed, read_some() sets the passed boost system error to eof error. When we detect it, we know that we could terminate the session. Any other error says that the something went wrong.
3. If read_some() set no error, we use the current chunk of data to do what the server should do. In this case, we just echo it back to the client.

Full C++ code on GitHub. The original source is the official Boost ASIO tutorial, divided in two parts, client and server.

Go to the full post

Boost ASIO UDP asynchronous server

Having already seen how to establish an ASIO UDP synchronous connection and how create ASIO TCP asynchronous server, we sort of put them together to write an ASIO UDP asynchronous server.

Main

As client we could happily recycle the one written for the UPD synchronous connection - only, be sure to use the same IP protocol for both. So in the main function we just instantiate an ASIO io_context (also known as io_service), pass it by reference to the ctor of a Server object, and then call run() on it.

In a second time, while running a number of clients to play with the app, you would want to run io also in other threads - be sure to do that between the server creation and the io run on the current thread.

Server class

The server would sit on port 50013 and send to the clients always the same message, concatenated with to a counter. To work it needs an ASIO UPD socket and a UDP endpoint that would identify the current client.
// ...
const int HELLO_PORT = 50'013;
const std::string MSG("Async UDP hello from ASIO ");

class Server
{
private:
 udp::socket socket_;
 udp::endpoint endpoint_;
 uint16_t counter_ = 0;
// ...
public:
 Server(ba::io_context& io) : socket_(io, udp::endpoint(udp::v6(), HELLO_PORT))
 {
  start();
 }
};
The server ctor sets the socket data member up using the reference to the ASIO io context received from the instantiator and a UDP endpoint created on the fly, specifying the required IP protocol (here version 6) and the server port.

Then the server start() private method is called:
void start()
{
 std::array<char, 0> buffer;  // 1
 socket_.async_receive_from(ba::buffer(buffer), endpoint_,
  std::bind(&Server::recvHandler, this, std::placeholders::_1));  // 2
 std::cout << "Server ready" << std::endl;
}
1. The client is expected to send an empty message, so the receive buffer could be zero sized.
2. We call async_receive_from() to receive asynchronously from the client a message in buffer. We'll get the client endpoint information in the data member and, on receive completion, it will call another Server's private method, recvHandler(), passing to it the first parameter that ASIO was expected to send, namely a reference to the boost system error_code describing how the async_receive_from() was completed.

If no error was detected in async_receive_from(), the recvHandler() creates a message and sends it to the client:
void recvHandler(const bs::error_code& error)
{
 if (!error)
 {
  std::shared_ptr<std::string> data(new std::string(MSG + std::to_string(++counter_)));  // 1

  socket_.async_send_to(ba::buffer(*data), endpoint_,
   std::bind(&Server::sendHandler, this, data, std::placeholders::_1, std::placeholders::_2));  // 2
  start();
 }
}
1. This piece of code is a bit involuted. We create on the heap a string containing the data to be send to the client, and we wrap it in a shared pointer. In this way we can keep it alive in a multithreading environment until we need it, that is, the end of the sendHandler() method invoked by async_send_to() at the end of its operation.
2. async_send_to() uses the endpoint set by async_receive_from() to know where sending the data. At the end, sendHandler() is called.

From the ASIO point of view, sendHandler() could be an empty method. The only important thing is that the data created in recvHandler() gets here in the shared smart pointer, so that it can ensure it not to be destroyed when still required.
void sendHandler(std::shared_ptr<std::string> data, const bs::error_code& error, std::size_t size)
{
 if (!error)
 {
  std::cout << size << " byte sent from [" << *data << "]" << std::endl;
 }
}
I pushed the full C++ source code on GitHub. It is based on the Daytime.6 example from the official Boost ASIO tutorial.

Go to the full post

Boost ASIO synchronous UDP client/server

If you know how to write an app that uses an ASIO TCP connection, you are close to know also how to do it on UDP.

Large part of the differences are taken care for us in ASIO, and we just have to use the socket as defined in boost::asio::ip::udp instead of its tcp counterpart.

Server

First thing, we create a udp socket, that requires the ASIO I/O context and a udp endpoint, that needs as parameters the IP protocol to be used - version 4 or 6 - and the port - here I picked up 50013.
namespace ba = boost::asio;
namespace bs = boost::system;
using ba::ip::udp;
// ...

const unsigned short HELLO_PORT = 50'013;
// ...

void server(ba::io_context& io)
{
    udp::socket socket{ io, udp::endpoint{udp::v6(), HELLO_PORT} };
 // ...
Then we repeat how many times we like this block - in my tester I did it just once:
std::array<char, 0> recvData;  // 1
udp::endpoint endpoint;  // 2
bs::error_code error;  // 3
socket.receive_from(ba::buffer(recvData), endpoint, 0, error);  // 4
if (error)
 throw bs::system_error(error);  // 5

std::string message{ "UDP hello from ASIO" };

bs::error_code ec;
socket.send_to(boost::asio::buffer(message), endpoint, 0, ec);  // 6
1. In this buffer we store the message sent from the client. It has no use here, so it could be it even zero sized.
2. The endpoint, that will be used to sent the message to the client, is set by the socket receive_from() method, two lines below.
3. This error code is set by receive_from(), in case of problems.
4. The server wait synchronously here for the client. The three parameters are output ones. When the connection starts, the data coming from the client is put in the first parameter (here, an empty message is expected), the second parameter is filled with the client endpoint, the last one stores the possible error in the operation.
5. If receive_from() fails, throw the boost system error code relative exception, that is a standard runtime_error subclass.
6. Send the message to the client, using the endpoint as set by receive_from() and not specifying any flag. Any possible error code returned is disregarded.

Client

The client function tries this code:
udp::resolver resolver{ io };
udp::endpoint destination = *resolver.resolve(udp::v6(), host, HELLO_PORT_STR).begin();  // 1

udp::socket socket{ io };
socket.open(udp::v6());  // 2

std::array<char, 0> sendData;
socket.send_to(ba::buffer(sendData), destination);  // 3

std::array<char, 128> recvData;  // 4
udp::endpoint sender;
size_t len = socket.receive_from(ba::buffer(recvData), sender);

std::cout.write(recvData.data(), len);  // 5
1. Having instantiated a udp resolver on the previous line, we resolve() on it for the same IP protocol of the server - here I used version six - specifying its host and port. Since resolve() returns at least one endpoint or fails, we could safely access the first one dereferencing its begin() iterator.
2. We need an ASIO upd socket. Having created it on the previous line, passing the current ASIO I/O control, we open it for the required UDP version.
3. We start the communication with the server, sending an empty message - as nothing more is expected from it.
4. We'd better have enough room for the message coming from the server, the actual size of it is returned by the call to receive_from().
5. Let's see what we get, outputting it to the console.

Client and server are two free functions in the same C++ file that I pushed to GitHub. Passing no parameter to its main you run it as server, otherwise is a client.

This example is based on Daytime.4 and Daytime.5 from the official Boost ASIO tutorial.

Go to the full post

Boost ASIO TCP/IP asynchronous server

Having seen how simple is creating a synchronous ASIO TCP/IP server, let's see now how to create an asynchronous one.

Main

The code for this example is divided in two classes, Server and Connection, described below. The example main function instantiates an ASIO io_context, uses it to instantiate a Server object, and then run() the I/O context.
namespace ba = boost::asio;
// ...

ba::io_context io;
Server server(io);
io.run();
Connection

The lower details of our code are here. Connection has as private data member an ASIO TCP/IP socket object on which we are going to write data to the client. Since we want to perform the write asynchronously, we use the ASIO async_write() function. This leads us to ensure that the current connection object is still alive when the write would actually be performed. To do that we'll pass to async_write() an instance of the connection object itself. To avoid a nightmarish memory management, we'll wrap it in a shared_ptr. However, to to that, we need to create a shared smart pointer from this, and to do that we have to enable the feature explicitly, deriving our class from the standard enable_shared_from_this:
class Connection : public std::enable_shared_from_this<Connection>
{
private:
 tcp::socket socket_;
 std::string message_{ "Async hello from ASIO " };
 static int counter_;

// ...
The Connection ctor creates its member socket using the passed ASIO I/O context, and sets the message that we'll send to the client. Notice that the message has to be a Connection data member because we have to guarantee its liveliness until the asynchronous write is performed.
Connection(ba::io_context& io) : socket_(io)
{
 message_ += std::to_string(counter_++);
}
However, the ctor is private. The only way we want to let a Connection user to create an instance of this class is by wrapping it in a smart pointer, for the reason we described above, so, we have this static public method:
static std::shared_ptr<Connection> create(ba::io_context& io)
{
 return std::shared_ptr<Connection>(new Connection(io));
}
The writing is performed by this public method:
void start()
{
 ba::async_write(socket_, ba::buffer(message_),
  std::bind(&Connection::write, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
ASIO async_write() requires an AsyncWriteStream, our socket, a ConstBufferSequence, that we create on the fly from our message, and a WriteHandler. This last parameter represent a function in which we can perform any further action after the normal write to socket as been done and before the connection to the client is closed. A free function with two parameters, a constant reference to a Boost error_code and a size_t, is expected, but bind() is here a helpful friend. I use both parameters, but we could easily get rid of them. More importantly, notice the use of shared_from_this(). Even if we don't want do anything in the WriteHandler, it is vital that the connection is kept alive till the end of writing. Keeping the "this" reference active here does the trick.

Server

In the core of our server there is an ASIO TCP/IP acceptor, that is initialized by the ctor, and used by the Server start() function to accept - asynchronously - a connection from a client on a Connection object.
using ba::ip::tcp;
const int HELLO_PORT = 50013;
// ...

class Server
{
private:
 tcp::acceptor acceptor_;
// ...
public:
 Server(ba::io_context& io) : acceptor_(io, tcp::endpoint(tcp::v4(), HELLO_PORT))
 {
  start();
 }
// ...
The ctor calls the Server private method start(), that creates a new connection on the ASIO I/O context received from the main and stored in the acceptor. The socket owned by the connection is used in the async_accept() call on the acceptor, so that the server would wait for a client connection on it.
void start()
{
 ba::io_context& io = acceptor_.get_executor().context();
 std::shared_ptr<Connection> connection = Connection::create(io);
 tcp::socket& socket = connection->socket();
 acceptor_.async_accept(socket, std::bind(&Server::handle, this, connection, std::placeholders::_1));
}
As second parameter, async_accept() expects an ASIO AcceptHandler, a void free function that gets in input a constant reference to a boost system error code, we bind it to call the following Server private method:
void handle(std::shared_ptr<Connection> connection, const bs::error_code& ec)
{
 if (!ec)
 {
  connection->start();
 }
 start();
}
If the handshake with the client worked fine, we use the connection to write - asynchronously - through the socket. Then we call again Server start(), to prepare the server to accept another connection.

This is more or less all. You could see the full C++ code on GitHub.

I have tested this server using the client described in the previous post. I found interesting adding here and there sleeps and printing to console to better observe how the process work. For more fun I'd suggest you to run more clients and let ASIO I/O control to run on a few threads, as shown in the strand example. The code is based on the official ASIO tutorial, Daytime.3 example.

Go to the full post

Boost ASIO synchronous exchange on TCP/IP

Let's build a simple synchronous client-server application based on the TCP/IP protocol using the Boost ASIO ip tcp socket. The server waits a connection on a port, as it comes, it writes a message and then terminate. The client connects to the server, reads its message from the socket, outputs it, and then it terminates too.

Main

I have put both client and server in a single app, if no parameter is passed to the main, the process acts as server, otherwise as a client.
namespace ba = boost::asio;
// ...
const std::string HOSTNAME{ "localhost" };  // 1
// ...

int main(int argc, char* argv[])
{
 ba::io_context io;  // 1

 if (argc > 1)
  client(io, HOSTNAME);
 else
  server(io);
}
1. Used by the client, when connecting to the server. In this simple example both server and client live on the same machine.
2. An io_context (also known as io_service, but that name is now deprecated) is the first requirement for almost anything in ASIO, so I create it as first thing, then is passed by reference to the client or server function, accordingly to the number of parameters passed to the program.

Server

The following code in the server function throws exceptions deriving from std::exception to signal problems. Being this just an example, we just wrap it in a try-catch and output the relative message.
using ba::ip::tcp;
// ...
const int HELLO_PORT = 50013;
// ...

tcp::acceptor acceptor{ io, tcp::endpoint(tcp::v6(), HELLO_PORT) };  // 1

{   // 2
 tcp::socket socket{ io };  // 3
 std::cout << "Server ready" << std::endl;
 acceptor.accept(socket);  // 4

 std::string message{ "Hello from ASIO" };  // 5
 bs::error_code ec; // 6
 ba::write(socket, ba::buffer(message), ec);  // 7
}
1. Instantiate an object of the ASIO TCP/IP acceptor, so that we can listen for connections. We pass to it the ASIO io_context and a TCP endpoint, created specifying the version of the TCP/IP protocol to use (4 or 6) and the port to be used.
2. Here this block is executed just once. Convert it to a for loop for a more common behavior.
3. Each client connection requires a dedicated ASIO TCP/IP socket to be managed. Here it is created and, at the end of the block, exiting the scope, the socket dtor would clean it up.
4. The server sits down, waiting for a client to be served.
5. When the acceptor has accepted a client on the socket, the server wakes up and builds a message.
6. The ASIO write call in the next line requires an error code, to be set in case something goes wrong. We won't even check it here, usually this is not a good idea.
7. The message is converted to an ASIO buffer, so that it could be consumed by the ASIO write() to be written to the socket.

Client

It mirrors the server, with the part of the acceptor taken by a resolver.
tcp::resolver resolver{ io };
tcp::resolver::results_type endpoints = resolver.resolve(host, HELLO_PORT_STR);  // 1

tcp::socket socket{ io };
ba::connect(socket, endpoints);  // 2

for (;;)  // 3
{
 std::array<char, 4> buf;  // 4
 bs::error_code error;
 size_t len = socket.read_some(ba::buffer(buf), error);  // 5

 if (error == ba::error::eof)  // 6
  break; // Connection closed cleanly by peer
 else if (error)
  throw bs::system_error(error);  // 7

 std::cout.write(buf.data(), len);  // 8
 std::cout << '|';  // 9
}
std::cout << std::endl;
1. The resolver is resolved on the required host and port, returning a list of valid endpoints on them.
2. We call the ASIO connect() on the socket created in the line above, specifying the endpoints resolved in (1).
3. Let's loop until the full message is received from the server.
4. I have set the buffer size to a ridiculously low size, just for see it better at work.
5. read_some() data from the socket in the buffer.
6. If we reach end of file, the connection has been correctly completed.
7. Otherwise we interrupt the operation throwing the Boost system error we got.
8. Use the partial data received from the server.
9. This pipe character is put at the end of each chunk of data read only for seeing the effect on the read message.

Full C++ code is on GitHub. It is based on the Daytime.1 and Daytime.2 example from the official Boost ASIO tutorial.

Go to the full post

Boost ASIO Strand example

In the previous posts, we used ASIO keeping away from any possible multithreading issue, with the noticeable exception of
Asynchronous wait on timer, part two, where a job was executed concurrently to the ASIO handler in another thread, using of a mutex, a lock, and an atomic int to let it work as expected.

With ASIO we can follow a different approach, based on its strand concept, avoiding explicit synchronization.

The point is that we won't run the competing functions directly, but we will post the calls to a strand object, that would ensure they will be executed in a sequential way. Just be sure you use the same strand object.

We have a class, Printer, with two private methods, print1() and print2(), that uses the same member variable, count_, and printing something both to cout.

We post the two functions a first time in the class constructor, asking our strand object to run them.
namespace ba = boost::asio;
// ...

class Printer
{
// ...

ba::io_context::strand strand_;
int count_;


Printer(ba::io_context& io, int count) : strand_(io), count_(count)
{
 strand_.post(std::bind(&Printer::print1, this));
 strand_.post(std::bind(&Printer::print2, this));
}
The functions would post themselves again on the same strand, until some condition is satisfied.
void print1()
{
 if (count_ > 0)
 {
  print("one");
  --count_;

  strand_.post(std::bind(&Printer::print1, this));
 }
}
And this is more or less the full story for the Printer class. No need of synchronization, we rely on the strand to have them executed sequentially.

We still have to let ASIO run on two threads, and this is done by calling the run() method from io_context from two different threads. This is kind of interesting on its own, because we bump in an subtle problem due on how std::bind() is implemented.

The official Boost ASIO tutorial suggests to use the Boost implementation:
std::thread thread(boost::bind(&ba::io_context::run, &io));
It works fine, end of the story, one would say. But let see what it happens when using the standard bind implementation:
std::thread thread(std::bind(&ba::io_context::run, &io));
// error C2672: 'std::bind': no matching overloaded function found
// error C2783: 'std::_Binder<std::_Unforced,_Fx,_Types...> std::bind(_Fx &&,_Types &&...)': could not deduce template argument for '_Fx'
Damn it. It tries to be smarter than Boost, and in this peculiar case it doesn't work. The problem is that there are two run() functions in io_context, and bind() doesn't know which one to pick up.

A simple solution would be compile our code for a "clean" ASIO version, getting rid of the deprecated parts, as is the case of the run() overload.

If we can't do that, we should provide an extra help to bind, so that it could understand correctly the function type. An explicit cast would do:
auto run = static_cast<ba::io_context::count_type(ba::io_service::*)()>(&ba::io_context::run);
std::thread thread(std::bind(run, &io));
I have taken the address of the member function run from boost::asio::io_context (also known as io_service, but now it is deprecated too) and I explicitly casted it to its actual type.

Can we get the same result in a more readable way? Well, using a lambda could be an idea.
std::thread thread([&io] { io.run(); });
You could get my full C++ code from GitHub. I based it on the Timer.5 example from the official Boost ASIO tutorial.

Go to the full post

Boost ASIO using a member function as handler



In the fourth step of the official Boost ASIO tutorial, a class method in used as handler instead of the free function. See the previous post for my version that uses free function - or a lambda function. Here I present my refactored code that uses less Boost and more C++ standard stuff.

All the relevant code is now in the class Printer, so the function in the main thread gets much simpler:
Printer printer(io);
io.run();
Where io is a boost::asio::io_context object - previously known as io_service.

Here is the Printer class:
class Printer
{
private:
 ba::system_timer timer_;
 int count_;
public:
 Printer(ba::io_context& io) : timer_(io, sc::milliseconds(500)), count_(0)  // 1
 {
  timer_.async_wait(std::bind(&Printer::print, this));
 }

 ~Printer()  // 2
 {
  std::cout << "final count is " << count_ << std::endl;
 }

 void print()  // 3
 {
  if (count_ < 5)
  {
   std::cout << count_++ << ' ';
   timer_.expires_at(timer_.expiry() + sc::milliseconds(500));
   timer_.async_wait(std::bind(&Printer::print, this));
  }
 }
};
1. The constructor initializes the member variables and then asks ASIO to set an asychronous wait on the timer, passing as handler the member function print() of this object.
2. The dtor will print the final counter value.
3. Same old print() function, but now is a method of Printer, so it could freely access its data members.

This approach looks cleaner of the free function one. Still, we need to keep in our mind the fact that timer_, count_ (and cout) are seen by two different threads, and this could lead to concurrency problems, that could be solved using mutexes and locks, as I have shown in a previous spoilery post, or using the ASIO concept of strand, as we'll see in the next post.

Go to the full post

Boost ASIO passing parameters to handler

I have sort of spoiled the argument of this post in the previous one, where the focus was on telling ASIO to asynchronously run a function when a timer expires, but I couldn't keep from presenting also a scenario where multiple threads synchronize on a flag and access concurrently a shared resource. Let's do a step beyond, and analyze a simpler case, where the function we want ASIO to run at the timer expiration just has to access a variable defined in the main thread.


Plain function

The point raised in the official Boost ASIO tutorial is having a simple function passed to ASIO so that after it is called the first time, on timer expirations, it resets the timer on itself, keeping track on the times it has been invoked in a counter defined in the main thread. Here is my version of it, with some minor variation.
namespace ba = boost::asio;
namespace sc = std::chrono;

// ...

void print(ba::system_timer* pTimer, int* pCount) {  // 1
 if (*pCount < 5)  // 2
 {
  std::cout << (*pCount)++ << ' ';
  pTimer->expires_at(pTimer->expiry() + sc::milliseconds(500));  // 3
  pTimer->async_wait(std::bind(print, pTimer, pCount));  // 4
 }
}
1. I use system_timer, based on the standard chrono::system_clock, instead of the boost::posix_time based deadline_timer.
2. The check on the counter is used to avoid an infinite series of calls.
3. Reset the timer expiration to half a second in the future. To get the current expiration time I use expiry() instead of the deprecated overaload with no parameters of expires_at().
4. Reset an asychronous wait on the timer, passing as parameter the function itself.

Notice how the standard bind() function is used to bind the print() function to the handler expected by async_wait() on the timer. This makes possible to elide the reference to boost::system::error_code, that we decided not to use here, and add instead the two parameters we actually need.

In the main thread we start the asychronous wait on the ASIO managed timer in this way:
// ...

int count = 0;
ba::system_timer timer(io, sc::milliseconds(500));  // 1

timer.async_wait(std::bind(print, &timer, &count));  // 2
io.run();  // 3
1. io is a boost::asio::io_context object - previously known as io_service.
2. Notice that both count and timer are shared between the main thread and the one owned by ASIO in which is going to be executed print().
3. However, nothing happens in the main thread until ASIO ends its run().

The code is so simple that we can guarantee it works fine. However, when between (2) and (3), another thread is spawned, with something involving io and timer (and cout, by the way) we should be ready to redesign the code for ensure thread safety.

Same, with lambda

Usually, I would feel more at ease putting the code above in a class, as I did in the previous post. Still, one could argue that in a simple case like this one, that could be kind of an overkill. Well, in this case I would probably go for a lambda implementation, that at least would keep the code close, making less probable forgetting something in case of future refactoring.

Since I could capture count and timer instead of passing them to the lambda, there is no need of custom binding here. However, the original function needs to use its name in its body, and this is something that C++ lambdas are not allowed to do. There are a couple of workaround available, I have chosen to save it as a local std::function variable, and then pass it to async_wait() like this:
// ...
std::function<void(const bs::error_code&)> print = [&](const bs::error_code&) {
 if (count < 5)
 {
  std::cout << count++ << ' ';
  timer.expires_at(timer.expiry() + sc::milliseconds(500));
  timer.async_wait(print);
 }
};

timer.async_wait(print);

I have pushed the two new files, free function and lambda example, on GitHub.

Go to the full post

Boost ASIO asynchronous wait on timer

Having seen how ASIO takes care of resources, now we are ready for something a bit more spicy, setting up an asynchronous wait.

Plain example

Firstly, I have faithfully followed the Timer.2 Boost ASIO tutorial, only using standard C++ construct instead of the Boost counterpart.

We want ASIO to run in another thread the following simple function, that just does some output, and we want it to be done after a certain delay.
namespace bs = boost::system;

// ...

void hello(const bs::error_code& ec)  // 1
{
 std::cout << "delayed hello [" << ec.value() << "] " << std::flush;
}
1. If ASIO completes the wait on the timer correctly, it passes an error code with value zero.

namespace ba = boost::asio;
namespace sc = std::chrono;

// ...

void timer2(ba::io_context& io)  // 1
{
 std::cout << "2) Starting ... " << std::flush;

 ba::system_timer timer{ io, sc::seconds(1) };  // 2
 timer.async_wait(hello);
 std::cout << "hello " << std::flush;

 io.run();  // 3
 std::cout << "done" << std::endl;
}
1. The old io_service is now deprecated, get used to see io_context in its place.
2. Create a system timer on ASIO, setting its expire time to one second. Then start a non-blocking wait on it, passing the address of above defined hello() function as parameter.
3. After doing some job on the current thread, here just a print on the output console, we want to wait for the termiantion of the other thread, controlled by ASIO. So we call its run() method, blocking until we get back the control.

More action

Let's add some meat to the above example. Say that we want to run a loop in the main thread until a timeout expires.
class MyJob
{
private:
 MyJob(const MyJob&) = delete;  // 1
 const MyJob& operator=(const MyJob&) = delete;

 std::mutex mx_;  // 2
 std::atomic<bool> expired_;  // 3
public:
 MyJob() : expired_(false) {}

 void log(const char* message)
 {
  std::unique_lock<std::mutex> lock(mx_);  // 4
  std::cout << message << std::flush;
 }

 void timeout()  // 5
 {
  expired_ = true;
  log("Timeout!\n");
 }

 void operator()()  // 6
 {
  for (int i = 0; !expired_; ++i)
  {
   std::ostringstream os;
   os << '[' << i << ']';

   log(os.str().c_str());
   std::this_thread::sleep_for(sc::milliseconds(300));
  }
 }
};
1. The objects of this class are inherently non-copyable. So I remove copy ctor and assignment operator from its interface.
2. The output console is shared between two threads. This mutex is going to rule its access.
3. The threads are going to synchronize on a boolean. At startup it is set to false. When the timer expires set it to true. The main loop runs until it sees an expiration. Being just one thread setting it, while the other only reading it, an atomic boolean would be enough to ensure communication between threads.
4. Lock the mutex to acquire exclusive access to the shared resource.
5. This method is going to be called by the timer on expiration.
6. The loop I want to run until timer expiration.

Let's see how I used this class.
void timer2a(ba::io_context& io)
{
 std::cout << "2a) Starting ... " << std::flush;

 ba::system_timer timer(io, sc::seconds(1));

 MyJob job;
 timer.async_wait([&job](const bs::error_code&) { job.timeout(); });  // 1
 std::thread thread(std::ref(job));  // 2

 io.run();
 thread.join();
}
1. The async_wait() method of the timer is fed with a lambda that capture by reference the instance of the class MyJob that I created in the previous line. In the lambda body we call the job timeout() method. The result is that the expiration flag is set when the timeout expires. Here I ignore the ASIO error code, but it would be easy to take notice of it, when required.
2. Create a new thread to run the job loop.

One thing more. I want to run the two examples, one after the other, using the same ASIO io_context object. But the first one run() it, putting it in the stopped status. No problem, I just have to remember to reset it. That is what I do in the main:
timer2(io);
assert(io.stopped());

io.reset();
timer2a(io);

I have pushed both C++ source files on GitHub, the simpler, and the more interesting one.

Go to the full post

Boost ASIO Basic Skills

In the latest years there have been a few changes in the ASIO library, and I finally decided to review the post I have produced on it. I have downloaded the (currently) latest Boost libraries, version 1.66 (please have a look the Boost Revision History for details) and I am about to use it on a WIndows box with Visual Studio 2017 as IDE with the current (March 2018) Visual C++ programming language implementation.

In this and the next few posts, I plan to follow the Boost ASIO tutorial, basic skills section.

Notice that a few well established ASIO elements are now marked as deprecated. Define BOOST_ASIO_NO_DEPRECATED among the compiling option to get rid of them. I kept a more conservative approach, however, simply avoiding deprecations whenever I saw them.

First victim is a big one, io_service. Luckily it looks like the solution is just using io_context instead.

This led to the main change in the code for my version of the Timer.1 tutorial.

Its point is using ASIO to set a synchronous timer to block the current thread execution for a while. This is not very interesting, but shows the common pattern we are about to use to let ASIO know about a service we want it to manage on our behalf.
namespace ba = boost::asio;
namespace sc = std::chrono;

// ...

void timer1(ba::io_context& io)
{
 std::cout << "1) Starting ... " << std::flush;

 ba::system_timer timer{ io, sc::seconds(1) };  // 1
 timer.wait();  // 2

 std::cout << "done!" << std::endl;
}
1. I am creating an ASIO service, system timer, setting its delay to one second.
2. I consume the service synchronously, blocking the current thread execution.

I have used here system_timer, equivalent to the deadline_timer used in the official example, differing from it that is based on the standard C++ chrono library, instead of the boost equivalent one. If you need a steady clock, use steady_timer.

I have pushed the reviewed code for this example on GitHub. I have added a main in an another file, that would run all the examples in this section.

Go to the full post

Simple ASIO TCP client/server example

A server sits on a specified port, and when a client connects, it sends a message and terminates. A client connects to the server, reads from the socket the message, and terminates. Nothing fancy, but it could be a good introduction on how to use ASIO synchronously to create TCP/IP connections.

After five years, the code is getting obsolete. I have reviewed it moving to the (currently - March 2018) latest version of ASIO, please follow the link to the new post. Sorry for the trouble.


You could get the full C++ code for this example on Github. If you run the executable with no parameter, you start the server, otherwise the client.

Server

In this trivial implementation, my server accepts just one connection before terminating, but it is pretty easy to make it run forever. It is just a matter of running this block in an indefinite loop, and not just once as I do here:
{
  boost::asio::ip::tcp::socket socket(aios); // 1
  std::cout << "Server ready" << std::endl;
  acceptor.accept(socket); // 2

  std::string message("Hello from ASIO");
  boost::asio::write(socket, boost::asio::buffer(message)); // 3
}
1. Create a new TCP/IP socket on an already existing ASIO I/O service.
2. Wait for a client connection.
3. Write a message on the socket to the client.

At the end of the block the socket is automatically closed by its dtor.

Before that, I have done some preparatory stuff:
boost::asio::io_service aios; // 1
boost::asio::ip::tcp::acceptor acceptor(aios, // 2
  boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), HELLO_PORT)); // 3
1. The I/O service is the first thing required.
2. The acceptor is used to accept calls from the clients.
3. We need to pass to the acceptor the endpoint, that specifies the protocol used (here is TCP/IP version 4), and the port used (here defined in a constant that I set to 50013).

Client

The client code is symmetrical. First step is about preparing the job:
boost::asio::io_service aios;

boost::asio::ip::tcp::resolver resolver(aios); // 1
boost::asio::ip::tcp::resolver::iterator endpoint = resolver.resolve(
  boost::asio::ip::tcp::resolver::query(host, HELLO_PORT_STR)); // 2
1. Resolver is the counterpart for acceptor. Calling resolve() on it, we get an iterator pointing to the first endpoint associated to a specific address. We can use that iterator to open a connection through the server on a socket, as we'll see below.
2. Query for a specific host and port (here I specified localhost and 50013, notice that both are c-strings).

Now I am ready to open the connection on a socket. If you are using a recent version of Boost Asio (I am working with 1.54), this is done in a one-liner:
boost::asio::connect(socket, endpoint);
If no connection could be opened on any endpoint, a boost system system_error is thrown.

On older asio versions, there was not such a connect() overload, and you have to implement its behavior by hand, in a piece of code like this:
boost::system::error_code error = boost::asio::error::host_not_found;
boost::asio::ip::tcp::resolver::iterator end; // 1
while(error && endpoint != end)
{
  socket.close();
  socket.connect(*endpoint++, error); // 2
}
if(error)
  throw boost::system::system_error(error); // 3
1. The default ctor for a resolver iterator returns its "end" on, we use it to loop on all the endpoints returned by resolver::resolve().
2. Try to connect to the current endpoint, in case of failure loop until we have another endpoint to check.
3. Can't find any endpoint, throw an exception.

Once we have a socket connected to the server, it's just a matter of getting the message it sends to us:
for(;;)
{
  std::array<char, 4> buf; // 1
  boost::system::error_code error;
  size_t len = socket.read_some(boost::asio::buffer(buf), error); // 2

  if(error == boost::asio::error::eof)
    break; // 3
  else if(error)
    throw boost::system::system_error(error); // 4

  std::cout.write(buf.data(), len);
  std::cout << '|'; // 5
}
std::cout << std::endl;
1. Usually I would use a much bigger buffer.
2. Partial read of the message, limited by the buffer dimension.
3. Detected end of file, stop looping.
4. In case of error throws an exception.
5. I show the junctions in the message due to the local buffer, usually it is rebuilt seamlessly.

I written this post as a review of a piece of code I conceived at beginning 2011, that is still documented in a couple of posts, one dedicated to the server part, the other to the client part. You may want to have a look a it.

The original source is the Boost Asio tutorial. Here is the slightly different version of their client and of the their server.

Go to the full post

Another asynchronous wait on a steady timer

This is a new version of an oldish Boost ASIO example of mine about asynchronously waiting on a timer, keeping advantage of C++11 features. If you are looking for something simpler, there's another post on the same matter but more focused on the bare ASIO functionality. Or you could go straight to the original source, the official tutorial on Boost.

Five years later, I found out that this post requires some adjustments. You could follow the link to its March 2018 version.

The main function of this example is spawning a new thread, that runs a function that does something indefinitely. But before creating the new thread, it would set an asynchronous timer, calling a function on its expiration that would cause the runner to terminate.

It makes sense to encapsulate both function in a single class, like this:
class MyJob
{
private:
    MyJob(const MyJob&) = delete; // 1
    const MyJob& operator=(const MyJob& ) = delete;

    std::mutex mx_; // 2
    bool expired_;
public:
    MyJob() : expired_(false) {}

    void log(const char* message) // 3
    {
        std::unique_lock<std::mutex> lock(mx_);
        std::cout << message << std::endl;
    }

    void timeout() // 4
    {
        expired_ = true;
        log("Timeout!");
    }

    void operator()() // 5
    {
        for(int i = 0; !expired_; ++i)
        {
            std::ostringstream os;
            os << '[' << i << ']';

            log(os.str().c_str());
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }
};
1. I don't want an object of this class to be copyable, we'll see later why. So I remove from this class interface copy constructor and assignment operator, using the C++11 equals-delete marker.
2. There are two threads insisting on a shared resource (the standard output console), a mutex is needed to rule its access.
3. The shared resource is used in this function only. A lock on the member mutex takes care of protecting it.
4. When the timer expires, it is going to call this method.
5. This function contains the job that is going to run in another thread. Nothing fancy, actually. Just a forever loop with some logging and sleeping. The timeout is going to change the loop control variable, so that we can have a way out.

This is the user code for the class described above:
boost::asio::io_service io; // 1
boost::asio::steady_timer timer(io, std::chrono::seconds(3)); // 2

MyJob job; // 3
timer.async_wait([&job](const boost::system::error_code&) {
  job.timeout();
}); // 4

std::thread thread(std::ref(job)); // 5
io.run();
thread.join();
1. An ASIO I/O service is created.
2. I create a steady timer (that is just like an old deadline timer, but uses the C++11 chrono functionality) on the I/O service object.
3. An object that describes the job I want to run in another thread is instantiated.
4. When the timer expires, the passed lambda function is executed. It is an asynchronous call, so it returns immediately the control, that passed to the next instruction (5). The lambda would call the timeout() method on the job object, that has been captured by reference. Having defined the MyJob class as non-copyable, forgetting the ampersand, passing the job by value, results in a compiler error. Here I don't care about the error code parameter, that is set by ASIO to say if the timer has expired correctly or with an error. I just stop the job running. In a real-life usage a check would be expected.
5. Before running the I/O service, I create a thread on our job - again passed by reference, as the std::ref() shows. Again, trying to pass it by value would result in compiler errors.

Full C++ code for this example is on Github.

Go to the full post

Waiting asynchronously

As a first approach to ASIO make sense to write a minimal program that performs a synchronous wait on an ASIO timer, as I did in a previous post. But more interesting is setting an asynchronous wait, as we are going to see here.

This post was written in 2013, and it is getting obsolete. Please refer to its March 2018 version instead.

What I want to do here is executing a couple of actions. One has to be delivered after a certain amount of time, so I set a timer on ASIO, and ask to its I/O service to run a function when it expires. The other can be executed right away, so I'll do it in the main code, after setting the timer on ASIO, but before asking the ASIO I/O service to run.

The function that I want ASIO to execute when the timer expires is:
void hello(const boost::system::error_code& ec) // 1
{
    std::cout << "delayed hello [" << ec.value() << ']'  << std::endl;
}
1. ASIO is going to pass to to function the timer exit status. Usually we should check it, and let our code to behave differently if the timer expired correctly or with an error. A zero error code means (as for the old C tradition) success. Here I simply output its value to the user.

And that's the code snippet where ASIO is set up and run:
boost::asio::io_service aios; // 1

boost::asio::steady_timer timer(aios, std::chrono::seconds(1)); // 2
timer.async_wait(hello); // 3
std::cout << "hello" << std::endl; // 4
aios.run(); // 5
1. Create the ASIO I/O Service
2. Create a one second timer on the service
3. Run the timer asynchronously, specifying that we want to run the hello() function when it expires.
4. Do something else.
5. Pass the control to ASIO. It would keep it until it has something to do. Here we have instruct it only to take care of the timer. As soon it is done with it, it would return the control back.

If we don't get first the hello message as by (4) and then the delayed hello message as by (3), we should start to worry.

Full C++ source code, with minor variations, is on github.

Go to the full post

Hello again, Boost Asio

Some time has passed since my last post on ASIO. I guess it is worthy to restart everything from scratch, with a cute tiny hello world example that shows how ASIO can work with C++11 features.

What I am going to do is just setting a one second synchronous timer on ASIO, and wait for its expiration. But using the new chrono C++11 library.

[This example is still working alright, however, five years later, I have reviewed the code, using Boost ASIO 1.66. Please, refer to updated post instead.]

Prerequisite

I am using GCC 4.8.1 on a Linux box. But I am confident that any C++11 compliant compiler will do, on any supported platform.

You would need to have Boost too, you should get it by some repository (of your Linux distribution), or you could get it directly form Boost.

If you are on Linux, and you have installed Boost through apt-get, you should have its header files in /usr/include/boost, and the libraries in /usr/lib. This would save you some time in setting up the makefile for your application.

ASIO has a strong low-level dependency to the boost_system library, you should remember to add it to your linking (-l, if you know what I mean) options, otherwise you would get some nasty errors at linker time, like this one:
/usr/include/boost/system/error_code.hpp:214:
  undefined reference to `boost::system::generic_category()'
Wait a minute ...

This example is based on the Timer.1 page of the official Boost Asio tutorial. What I changed is that I don't use the deadline_timer, that specifies periods defined as non-standard posix_time, but the basic_waitable_timer, or better its typedef steady_timer, that relies on the C++11 chrono library. By the way, this class could use also boost::chrono, if std::chrono is not available in your environment.
std::cout << "Starting ... " << std::flush; // 1

boost::asio::io_service aios; // 2
boost::asio::steady_timer timer(aios, std::chrono::seconds(1)); // 3
timer.wait(); // 4

std::cout << "done!" << std::endl; // 5
1. I do something, included some flushed output to the standard console, so that I can see that the program is working.
2. I create an Asio I/O Service
3. I start a steady timer, meaning a timer that uses the standard C++11 chrono steady clock, on the just created I/O service, passing as requested period one single second.
4. Synchronously wait until the timer expires. If the timer had already expired when the execution stream entered the wait() method, it would return immediately.
5. Back at work, I send some more feedback to the user.

I put on github the full code for this example, slightly edited. Here I used for sake of clarity the full namespace names any time it is required. On the github version, I make use of the handy C++ namespace alias capability to shorten them down like this:
namespace ba = boost::asio;
namespace sc = std::chrono;
And another thing. For this example, including the chrono standard header is not a necessity, since it is already implicitly included by the boost asio steady timer include. But I guess it makes the code clearer.

Go to the full post

Post on ASIO strand

IMHO, the ASIO strand example on the official Boost tutorial is a bit too complex. Instead of focusing on the matter, it involves also some ASIO deadline_timer knowledge, that makes sense in the tutorial logic, but I'd say make think blurred.

So I have written a minimal example that I hope would result more intuitive as a first introduction to this concept.

This post is from April 2012, and it is now obsolete, please follow the link to the updated version I have written on March 2018.

We have a class designed to be used in a multithread context, it has as data member a resource that is meant to be shared, and we have a couple of functions that modify that shared value, and could be called from different threads.

Usually what we do is relying on mutexes and locks to synchronize the access to the shared resource. ASIO provides us the strand class, as a way to serialize the execution of the works posted to it, making unnecessary explicit synchronization. But be aware that this is true only for the functions going to the same strand.

We want to write a piece of code like this:
namespace ba = boost::asio;

// ...

ba::io_service aios;
Printer p(aios, 10); // 1
boost::thread t(std::bind(&ba::io_service::run, &aios)); // 2
aios.run(); // 3
t.join(); // 4
1. See below for the class Printer definition. In a few words, it is going to post the execution of a couple of its functions on ASIO, both of them acting on the same shared resource.
2. We run a working thread on the ASIO I/O service.
3. Also the main thread is running on ASIO.
4. Wait for the worker completion, than end the execution.

So we have two threads running on ASIO. Let's see now the Printer class private section:
class Printer
{
private:
    ba::strand strand_; // 1
    int count_; // 2

    void print(const char* msg) // 3
    {
        std::cout << boost::this_thread::get_id() << ' ' << msg << ' ' << count_ << std::endl;
    }
    
    void print1() // 4
    {
        if(count_ > 0)
        {
            print("print one");
            --count_;
            strand_.post(std::bind(&Printer::print1, this));
        }
    }

// ...
};
1. We are going to operate on a Boost Asio strand object.
2. This is our shared resource, a simple integer.
3. A utility function that dumps to standard output console the current thread ID, a user message, and the shared resource.
4. Client function for (3), if the shared resource count_ is not zero, it calls (3), than decreases count_ and post through the strand a new execution of this function. There is another private function, print2(), that is exactly like print1(), it just logs a different message.

Since we are in a multithread context, these function should look suspicious. No mutex/lock? No protection to the access of count_? And, being cout an implicitly shared resource, we are risking to get a garbled output too.

Well, these are no issues, since we are using a strand.

But let's see the Printer ctor:
Printer(ba::io_service& aios, int count) : strand_(aios), count_(count) // 1
{
    strand_.post(std::bind(&Printer::print1, this)); // 2
    strand_.post(std::bind(&Printer::print2, this));
}
1. Pay attention to how the private ASIO strand object is constructed from the I/O service.
2. We prepare the first runs, posting on the strand the execution of the private functions.

What happens is that all the works posted on the strand are sequentially executed. Meaning that a new work starts only after the previous one has completed. There is no overlapping, no concurrency, so no need of locking. Since we have two threads available, ASIO will choose which one to use for each work execution. We have no guarantee on which thread is executed what.

We don't have the troubles associated with multithreading, but we don't have some of its advantages either. Namely, when running on a multicore/multiprocessor machine, a strand won't use all the available processing power for its job.

The full C++ source code for this example is on github.

Go to the full post

Fibonacci with ASIO

The Fibonacci function is commonly used for writing testing code, because it is conceptually easy without being fully uninteresting. In this blog I have already used it a couple of times, once when showing how to implement a C++11 lambda function, then as a mean to show where standard C++11 multithreading could come in handy.

Now I use Fibonacci to show a more sensible example of an ASIO multithread application.

In the previous post, we have seen how to explicitly run and stop the ASIO I/O service, here we'll rely on the fact that ASIO would automatically end the I/O service when it has no more work to do. The trick is to post all the tasks to the service before starting the working threads. When they will go out of jobs, ASIO will end.

The worker function is still the same simple call to run() on the I/O service, with some logging added for testing purpose:
void worker(ba::io_service& aios)
{
    dump("start worker");
    aios.run();
    dump("end worker");
}
Where I have defined ba to be a synonym for boost::asio.

Our main thread is about to post to ASIO as job a few call to this function:
void calculate(unsigned int input)
{
    dump("input", input);
    int result = fibonacci(input);
    dump("output", result);
}
Finally, it calls this recursive function, trivial implementation of Fibonacci:
unsigned int fibonacci(unsigned int n)
{
    if(n < 2)
        return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
And here is the interesting part, where the main thread prepares the ASIO I/O service, spawns a few worker threads, and waits till the job is done:
ba::io_service aios; // 1

dump("starting up");
aios.post(std::bind(calculate, 35)); // 2
aios.post(std::bind(calculate, 30));
aios.post(std::bind(calculate, 20));
aios.post(std::bind(calculate, 10));

boost::thread_group threads;
for(int i = 0; i < 2; ++i) // 3
    threads.create_thread(std::bind(worker, std::ref(aios)));

dump("ready to join");
threads.join_all(); // 4
dump("job done");
1. No fake work is associated to the ASIO I/O service created here.
2. A few jobs are posted on the service.
3. A couple of threads are created on the worker function seen above. So, each thread calls run() on the service, signaling that it is available to process a pending job. ASIO will take care of assigning a new task to each thread, and when a thread finishes its work on a job, it assigns to it the next available one. And so on till there is nothing more to do.
4. The main thread waits here till all the worker threads are done, meaning, ASIO has assigned all the pending tasks to them, and they have completed each run.

The full source C++ code for this example is on github.

Go to the full post

Run and stop an ASIO I/O service

This example doesn't do anything useful, but should clarify how you could use Boost ASIO to control the execution of a multithread application.

Our main thread will create a bunch of worker threads, do some job, and finally terminate gracefully.

ASIO would be used to synchronize the job. The I/O service is created by the main thread and passed to the worker by reference, remember that it is a non-copyable object, it won't make sense to pass an ASIO I/O service by value, and if you try to do that, you are going to have a compiler error.

The worker threads are running a dummy function that just dumps a couple of messages, one before, and one after calling run() on the I/O service.

The main thread has provided a dummy I/O service work object, so that the run() called by the workers result in let them hang on it. At this point the main thread will be the only active one, and it would do whatever is its job before calling the stop() function on the ASIO I/O service. That would let terminate the execution of run() on each worker.

Last duty of the main thread is join all its spawned threads, and then it could happily terminate.

The code run by the workers won't be anything more than this:
void worker(boost::asio::io_service& aios, int id) // 1
{
    dump(id, "first step"); // 2
    aios.run(); // 3
    dump(id, "last step");
}
1. The ASIO I/O service object is passed by reference, we are actually working on the same object of the caller.
2. We are in a multithread environment, and we are accessing a shared resource, the standard output console. We have to rule its access by a mutex, if we want to avoid unpleasant mixups, and this is what the dump() function does.
3. Let's run the service.

Here is the code executed by the main thread:
boost::asio::io_service aios; // 1
boost::asio::io_service::work work(aios); // 2

boost::thread_group threads; // 3
for(int i = 0; i < 6; ++i )
    threads.create_thread(std::bind(worker, std::ref(aios), i)); // 4

dump("Main thread has spawned a bunch of threads");
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5
aios.stop(); // 6
dump("Main thread asked ASIO io service to stop");
threads.join_all(); // 7
dump("Worker threads joined");
1. The ASIO I/O service object is created. 2. A dummy ASIO work is put on the service. 3. We want to manage a group of threads, the Boost class thread_group has been designed exactly for that. 4. The worker function has to be adapted to become suitable for the create_thread() request, that's way I used std::bind(). Besides, I need to enforce that the ASIO I/O service object is passed by reference, and that is the reason for using std::ref(). 5. Using this framework just to let the main thread to take a one second nap is a kind of overkilling, but I guess you see the point. 6. Main thread is ready to terminate, so it issues a request to ASIO to stop its work. 7. And after ensuring all the threads have joined, we can terminated the application. The full C++ source code for this example is freely available on github.

Go to the full post