Showing posts with label socket. Show all posts
Showing posts with label socket. Show all posts

Boost ASIO echo UDP asynchronous server

A change request for the echo UDP client-server app discussed before. We want keep the client as is, but we need the server be asynchronous.

Instead of using the synchronous calls receive_from() and send_to() on a UDP socket, we are going to use their asynchronous versions async_receive_from() and async_send_to().

The asynchronicity leads naturally to implement a class, having a socket has its private data member, so that we can make our asynchronous call on it.
const int MAX_LEN = 1'024;
const uint16_t ECHO_PORT = 50'015;

class Server
{
private:
    udp::socket socket_;  // 1
    udp::endpoint endpoint_;  // 2
    char data_[MAX_LEN];  // 3

// ...
1. Our ASIO UDP socket.
2. The endpoint we use to keep track of the client currently connected to the server.
3. Data buffer, used to keep track of the message received from the client.

The constructor gets the app ASIO I/O context by reference from the caller and uses it to instantiate its member socket. Then it calls its private method receive() to start its endless job.
Server(ba::io_context& io) : socket_(io, udp::endpoint(udp::v4(), ECHO_PORT))  // 1
{
    receive();
}

void receive()
{
    socket_.async_receive_from(ba::buffer(data_), endpoint_, [this](bs::error_code ec, std::size_t len) {  // 2
        if (!ec && len > 0)  // 3
        {
            send(len);
        }
        else
        {
            receive();  // 4
        }
    });
}
1. The socket requires also an endpoint describing the associated protocol and port. We create it on the fly.
2. Call asynchronously receive_from on the socket. ASIO would put in the data buffer what the client sends and store its network information in the passed endpoint. When the socket receive is completed, ASIO would call the handler passed as third parameter, here a lambda that captures "this" and honors the expected parameters.
3. If the receiving worked fine - no error_code reported - and the message is not empty, we'll call our Server send() method, to echo the message.
4. Otherwise - error or empty message - we don't have to send anything back, so we call the enclosing receive() method, to serve a new client.

When a "good" message is received from a client, our server sends it back to it as is:
void send(std::size_t len)
{
    socket_.async_send_to(ba::buffer(data_, len), endpoint_, std::bind(&Server::receive, this));
}
The socket gets the job of asynchronously send the data, as stored in the Server member variable, with the length, as passed in as parameter, to the endpoint saved as Server member variable. When the data transfer is completed, ASIO would call the handler passed as third argument. Here we don't want to do anything in case or error, not even logging something, so we can simply bind to the handler a call to "this" receive(), ignoring error code and length of the transferred data.

I pushed the complete C++ source file to GitHub. The code is based on the UDP asynchronous echo example in the official Boost ASIO documentation.

Go to the full post

Boost ASIO echo TCP asynchronous server

Let's refactor the echo TCP server to achieve asynchrony. It's going to be a lot of fun. If you feel that it is too much fun, you could maybe have first a look at the similar but a bit simpler asynchronous server discussed in a previous post.

Main

This server works with the same clients as seen for the synchronous server, here we deal just with the server. All the job is delegated to the Server class, whose constructor gets a reference to the application ASIO I/O context.
namespace ba = boost::asio;
// ...

Server server(io);
io.run();
Server

The server ctor initialize its own acceptor on the ASIO I/O context on the endpoint specifying the TCP IP protocol and port chosen, then it calls its private member method accept():
using ba::ip::tcp;
// ...

const uint16_t ECHO_PORT = 50'014;
// ...

class Server
{
private:
 tcp::acceptor acceptor_;

 void accept()
 {
  acceptor_.async_accept([this](bs::error_code ec, tcp::socket socket)  // 1
  {
   if (!ec)
   {
    std::make_shared<Session>(std::move(socket))->read();  // 2
   }
   accept();  // 3
  });
 }
public:
 Server(ba::io_context& io) : acceptor_(io, tcp::endpoint(tcp::v4(), ECHO_PORT))
 {
  accept();
 }
};
1. As handler to async_accept() is a lambda that gets as parameters an error code that let us know if the connection from the client has been accepted correctly, and the socket eventually created to support the connection itself.
2. A beautiful and perplexing line. We create a shared_prt smart pointer to a Session created from a rvalue reference to the socket received as parameter, and call on it its read() method. However this anonymous variable exits its definition block on the next line, so its life is in danger - better see what is going on in read(). Actually, we are in danger, too. If something weird happens in this session object, we don't have any way to do anything about.
3. As soon as a Session object is created, a new call to accept() is issued, an so the server puts itself in wait for a new client connection.

Session

As we have seen just above, we should expect some clever trick from Session, especially in its read() method. Thinking better about it, it is not a big surprise seeing that its superclass is enable_shared_from_this:
class Session : public std::enable_shared_from_this<Session>
{
private:
 tcp::socket socket_;
 char data_[MAX_LEN];

// ...
public:
 Session(tcp::socket socket) : socket_(std::move(socket)) {}  // 1

 void read()  // 2
 {
  std::shared_ptr<Session> self{ shared_from_this() };  // 3
  socket_.async_read_some(ba::buffer(data_), [self](bs::error_code ec, std::size_t len) {  // 4
   if (!ec)
   {
    self->write(len);
   }
  });
 }
};
1. The ctor gets in the socket that we seen was created by the acceptor and moved in, in its turn, the constructor moves it to its data member.
2. The apparently short lived Session object created by the handler of async_accept() calls this method.
3. A new shared_ptr is created from this! Actually, being such, it is the same shared_prt that we have seen in the calling handler, just its use counter increased. However, our object is still not safe, we need to keep it alive until the complete read-write cycle between client and server is completed.
4. We read asynchronously some bytes from the client. To better see the effect, I have set the size of the data buffer to a silly low value. But the more interesting part here is the handler passed to async_read_some(). Notice that in the capture clause of the lambda we pass self, the shared pointer from this. So our object is safe till the end of the read.

So far so good. Just remember to ensure the object doesn't get invalidated during the writing process:
void write(std::size_t len)
{
 std::shared_ptr<Session> self{ shared_from_this() };
 ba::async_write(socket_, ba::buffer(data_, len), [self](bs::error_code ec, std::size_t) {
  if (!ec)
  {
   self->read();
  }
 });
}
Same as in read(), we ensure "this" stays alive creating a shared pointer from it, and passing it to the async_write() handler.

As required, as the read-write terminates, "this" has no more live references. Bye, bye, session.

I have pushed my C++ source file to GitHub. And here is the link to the original example from Boost ASIO.

Go to the full post

Boost ASIO echo UDP synchronous client-server

Close to the previous post. The main difference that there we have seen a TCP-based data exchange while here we see a UDP echo.

Server

This server is simpler than the previous one. Just one connection is served at a time.
udp::socket socket(io, udp::endpoint(udp::v4(), ECHO_PORT));  // 1

for (;;)  // 2
{
 char data[MAX_LEN];
 udp::endpoint client;
 size_t len = socket.receive_from(ba::buffer(data), client);  // 3

 // ...
 socket.send_to(ba::buffer(data, len), client);  // 4
}
1. Create an ASIO UDP socket on the app io_context, on a UDP created on the fly where the UDP IP protocol and the port to be used are specified.
2. Forever loop to serve, in strict sequential order, all the requests coming from clients.
3. ASIO blocks here, expecting the socket to receive a connection from a client. Make sure that the buffer data is big enough.
4. Since this is an echo server, nothing exciting happens between receiving and sending. Here we send the data, as received, to the endpoint as set by receive_from().

Client
char request[MAX_LEN];
// ...

udp::socket socket{ io, udp::endpoint(udp::v4(), 0) };  // 1
udp::resolver resolver{ io };
udp::endpoint destination = *resolver.resolve(udp::v4(), host, ECHO_PORT_STR).begin();  // 2
socket.send_to(ba::buffer(request, std::strlen(request)), destination);  // 3

char reply[MAX_LEN];
udp::endpoint sender;
size_t len = socket.receive_from(ba::buffer(reply), sender);  // 4
// ...
1. Create a UDP socket on the ASIO I/O context. Notice that the UDP endpoint passed specify the IP protocol but not a valid port.
2. The destination endpoint, that refers to the server, is generated by the resolver created on the line above, that resolves the specified host and port for the given UDP IP protocol. Then the first result is taken (by the begin iterator and then dereferencing). In case of any trouble we have guarantee an exception is thrown by resolve().
3. Send the data through buffer to the socket, that mediates the connection to the server.
4. Once send_to() has ended its job (notice that it is a blocking function), we get the reply from the server calling receive_from(). The socket knows where to go and get the data, and will fill the passed endpoint (sender) with these information.

I pushed the full C++ code - both client and server in the same source file - to GitHub. I based them on blocking_udp_echo_server.cpp and blocking_udp_echo_client.cpp from the official Boost ASIO Tutorial.

Go to the full post

Boost ASIO echo TCP synchronous client-server

I think this echo client-server application is a good introduction to ASIO. The server creates a new TCP socket each time it receives a request from a client, and run it in a new thread, where the read-write activity is accomplished in a synchronous way. The client sends some data to the server, gets it back, and then terminates.
The structure is simple, still a few interesting points are touched.

Client

Given io, the app ASIO io_context, and the server hostname as a string, the client tries this block, and eventually just output to console an exception.
namespace ba = boost::asio;
using ba::ip::tcp;
// ...

tcp::socket socket{ io };  // 1
tcp::resolver resolver{ io };
ba::connect(socket, resolver.resolve(host, ECHO_PORT_STR));  // 2

// ...
ba::write(socket, ba::buffer(request, reqLen));  // 3

char reply[CLIENT_MAX_LEN];  // 4
size_t repLen = ba::read(socket, ba::buffer(reply, reqLen));  // 4
// ...
1. Create an ASIO TCP socket and a resolver on the current io_context.
2. Then resolve() the resolver on the host and port of the echo server (in my case, localhost:50014), and use the resulting endpoints to estabilish a connection on the socket.
3. If the connection holds, write to the socket the data we previously put in the char buffer named request, for a size of reqLen.
4. We reserve a confidently large buffer where to store the server reply. Since we are writing a echo application, we know that the size of the data we are about to get from the client should be the same of the size we have just sent. This simplify our code to the point that we can do a single read for the complete data block.
5. Use the socket for reading from the server. We use the buffer, and the size of the data we sent, for what said on (4).

At this point we could do whatever we want with the data we read in reply with size repLen.

Server loop

Once we create an acceptor on the ASIO io_context, specifying as endpoint the IP protocol we want (here I used version 4) and the port number, we loop forever, creating a new socket through a call to accept() on the acceptor each time a request comes from a client, passing it to the session() function that is going to run in a new thread.
tcp::acceptor acceptor{ io, tcp::endpoint(tcp::v4(), ECHO_PORT) };

for (;;)
{
 std::thread(session, acceptor.accept()).detach();
}
Notice that each thread created in the loop survives the exiting of the block only because it is detached. This is both handy and frightening. In production code, I would probably push them in a collection instead, so that I could explicitly kill anyone that would stop behave properly.

Server session

Since we don't know the size of the data sent by the client, we should be ready to split it and read it in chunks.
for (;;)
{
 char data[SERVER_MAX_LEN];  // 1

 bs::error_code error;
 size_t len = socket.read_some(ba::buffer(data), error);  // 2
 if (error == ba::error::eof)
 {
  return;
 }
 else if (error)
 {
  throw bs::system_error(error);
 }

 ba::write(socket, ba::buffer(data, len)); // 3
}
1. To better see the effect, I have chosen a ridiculously small size for the server data buffer.
2. The data coming from the client is split in chunks from read_some() on the socket created by the acceptor. When the read is completed, read_some() sets the passed boost system error to eof error. When we detect it, we know that we could terminate the session. Any other error says that the something went wrong.
3. If read_some() set no error, we use the current chunk of data to do what the server should do. In this case, we just echo it back to the client.

Full C++ code on GitHub. The original source is the official Boost ASIO tutorial, divided in two parts, client and server.

Go to the full post

Boost ASIO UDP asynchronous server

Having already seen how to establish an ASIO UDP synchronous connection and how create ASIO TCP asynchronous server, we sort of put them together to write an ASIO UDP asynchronous server.

Main

As client we could happily recycle the one written for the UPD synchronous connection - only, be sure to use the same IP protocol for both. So in the main function we just instantiate an ASIO io_context (also known as io_service), pass it by reference to the ctor of a Server object, and then call run() on it.

In a second time, while running a number of clients to play with the app, you would want to run io also in other threads - be sure to do that between the server creation and the io run on the current thread.

Server class

The server would sit on port 50013 and send to the clients always the same message, concatenated with to a counter. To work it needs an ASIO UPD socket and a UDP endpoint that would identify the current client.
// ...
const int HELLO_PORT = 50'013;
const std::string MSG("Async UDP hello from ASIO ");

class Server
{
private:
 udp::socket socket_;
 udp::endpoint endpoint_;
 uint16_t counter_ = 0;
// ...
public:
 Server(ba::io_context& io) : socket_(io, udp::endpoint(udp::v6(), HELLO_PORT))
 {
  start();
 }
};
The server ctor sets the socket data member up using the reference to the ASIO io context received from the instantiator and a UDP endpoint created on the fly, specifying the required IP protocol (here version 6) and the server port.

Then the server start() private method is called:
void start()
{
 std::array<char, 0> buffer;  // 1
 socket_.async_receive_from(ba::buffer(buffer), endpoint_,
  std::bind(&Server::recvHandler, this, std::placeholders::_1));  // 2
 std::cout << "Server ready" << std::endl;
}
1. The client is expected to send an empty message, so the receive buffer could be zero sized.
2. We call async_receive_from() to receive asynchronously from the client a message in buffer. We'll get the client endpoint information in the data member and, on receive completion, it will call another Server's private method, recvHandler(), passing to it the first parameter that ASIO was expected to send, namely a reference to the boost system error_code describing how the async_receive_from() was completed.

If no error was detected in async_receive_from(), the recvHandler() creates a message and sends it to the client:
void recvHandler(const bs::error_code& error)
{
 if (!error)
 {
  std::shared_ptr<std::string> data(new std::string(MSG + std::to_string(++counter_)));  // 1

  socket_.async_send_to(ba::buffer(*data), endpoint_,
   std::bind(&Server::sendHandler, this, data, std::placeholders::_1, std::placeholders::_2));  // 2
  start();
 }
}
1. This piece of code is a bit involuted. We create on the heap a string containing the data to be send to the client, and we wrap it in a shared pointer. In this way we can keep it alive in a multithreading environment until we need it, that is, the end of the sendHandler() method invoked by async_send_to() at the end of its operation.
2. async_send_to() uses the endpoint set by async_receive_from() to know where sending the data. At the end, sendHandler() is called.

From the ASIO point of view, sendHandler() could be an empty method. The only important thing is that the data created in recvHandler() gets here in the shared smart pointer, so that it can ensure it not to be destroyed when still required.
void sendHandler(std::shared_ptr<std::string> data, const bs::error_code& error, std::size_t size)
{
 if (!error)
 {
  std::cout << size << " byte sent from [" << *data << "]" << std::endl;
 }
}
I pushed the full C++ source code on GitHub. It is based on the Daytime.6 example from the official Boost ASIO tutorial.

Go to the full post

Boost ASIO synchronous UDP client/server

If you know how to write an app that uses an ASIO TCP connection, you are close to know also how to do it on UDP.

Large part of the differences are taken care for us in ASIO, and we just have to use the socket as defined in boost::asio::ip::udp instead of its tcp counterpart.

Server

First thing, we create a udp socket, that requires the ASIO I/O context and a udp endpoint, that needs as parameters the IP protocol to be used - version 4 or 6 - and the port - here I picked up 50013.
namespace ba = boost::asio;
namespace bs = boost::system;
using ba::ip::udp;
// ...

const unsigned short HELLO_PORT = 50'013;
// ...

void server(ba::io_context& io)
{
    udp::socket socket{ io, udp::endpoint{udp::v6(), HELLO_PORT} };
 // ...
Then we repeat how many times we like this block - in my tester I did it just once:
std::array<char, 0> recvData;  // 1
udp::endpoint endpoint;  // 2
bs::error_code error;  // 3
socket.receive_from(ba::buffer(recvData), endpoint, 0, error);  // 4
if (error)
 throw bs::system_error(error);  // 5

std::string message{ "UDP hello from ASIO" };

bs::error_code ec;
socket.send_to(boost::asio::buffer(message), endpoint, 0, ec);  // 6
1. In this buffer we store the message sent from the client. It has no use here, so it could be it even zero sized.
2. The endpoint, that will be used to sent the message to the client, is set by the socket receive_from() method, two lines below.
3. This error code is set by receive_from(), in case of problems.
4. The server wait synchronously here for the client. The three parameters are output ones. When the connection starts, the data coming from the client is put in the first parameter (here, an empty message is expected), the second parameter is filled with the client endpoint, the last one stores the possible error in the operation.
5. If receive_from() fails, throw the boost system error code relative exception, that is a standard runtime_error subclass.
6. Send the message to the client, using the endpoint as set by receive_from() and not specifying any flag. Any possible error code returned is disregarded.

Client

The client function tries this code:
udp::resolver resolver{ io };
udp::endpoint destination = *resolver.resolve(udp::v6(), host, HELLO_PORT_STR).begin();  // 1

udp::socket socket{ io };
socket.open(udp::v6());  // 2

std::array<char, 0> sendData;
socket.send_to(ba::buffer(sendData), destination);  // 3

std::array<char, 128> recvData;  // 4
udp::endpoint sender;
size_t len = socket.receive_from(ba::buffer(recvData), sender);

std::cout.write(recvData.data(), len);  // 5
1. Having instantiated a udp resolver on the previous line, we resolve() on it for the same IP protocol of the server - here I used version six - specifying its host and port. Since resolve() returns at least one endpoint or fails, we could safely access the first one dereferencing its begin() iterator.
2. We need an ASIO upd socket. Having created it on the previous line, passing the current ASIO I/O control, we open it for the required UDP version.
3. We start the communication with the server, sending an empty message - as nothing more is expected from it.
4. We'd better have enough room for the message coming from the server, the actual size of it is returned by the call to receive_from().
5. Let's see what we get, outputting it to the console.

Client and server are two free functions in the same C++ file that I pushed to GitHub. Passing no parameter to its main you run it as server, otherwise is a client.

This example is based on Daytime.4 and Daytime.5 from the official Boost ASIO tutorial.

Go to the full post

Boost ASIO TCP/IP asynchronous server

Having seen how simple is creating a synchronous ASIO TCP/IP server, let's see now how to create an asynchronous one.

Main

The code for this example is divided in two classes, Server and Connection, described below. The example main function instantiates an ASIO io_context, uses it to instantiate a Server object, and then run() the I/O context.
namespace ba = boost::asio;
// ...

ba::io_context io;
Server server(io);
io.run();
Connection

The lower details of our code are here. Connection has as private data member an ASIO TCP/IP socket object on which we are going to write data to the client. Since we want to perform the write asynchronously, we use the ASIO async_write() function. This leads us to ensure that the current connection object is still alive when the write would actually be performed. To do that we'll pass to async_write() an instance of the connection object itself. To avoid a nightmarish memory management, we'll wrap it in a shared_ptr. However, to to that, we need to create a shared smart pointer from this, and to do that we have to enable the feature explicitly, deriving our class from the standard enable_shared_from_this:
class Connection : public std::enable_shared_from_this<Connection>
{
private:
 tcp::socket socket_;
 std::string message_{ "Async hello from ASIO " };
 static int counter_;

// ...
The Connection ctor creates its member socket using the passed ASIO I/O context, and sets the message that we'll send to the client. Notice that the message has to be a Connection data member because we have to guarantee its liveliness until the asynchronous write is performed.
Connection(ba::io_context& io) : socket_(io)
{
 message_ += std::to_string(counter_++);
}
However, the ctor is private. The only way we want to let a Connection user to create an instance of this class is by wrapping it in a smart pointer, for the reason we described above, so, we have this static public method:
static std::shared_ptr<Connection> create(ba::io_context& io)
{
 return std::shared_ptr<Connection>(new Connection(io));
}
The writing is performed by this public method:
void start()
{
 ba::async_write(socket_, ba::buffer(message_),
  std::bind(&Connection::write, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
ASIO async_write() requires an AsyncWriteStream, our socket, a ConstBufferSequence, that we create on the fly from our message, and a WriteHandler. This last parameter represent a function in which we can perform any further action after the normal write to socket as been done and before the connection to the client is closed. A free function with two parameters, a constant reference to a Boost error_code and a size_t, is expected, but bind() is here a helpful friend. I use both parameters, but we could easily get rid of them. More importantly, notice the use of shared_from_this(). Even if we don't want do anything in the WriteHandler, it is vital that the connection is kept alive till the end of writing. Keeping the "this" reference active here does the trick.

Server

In the core of our server there is an ASIO TCP/IP acceptor, that is initialized by the ctor, and used by the Server start() function to accept - asynchronously - a connection from a client on a Connection object.
using ba::ip::tcp;
const int HELLO_PORT = 50013;
// ...

class Server
{
private:
 tcp::acceptor acceptor_;
// ...
public:
 Server(ba::io_context& io) : acceptor_(io, tcp::endpoint(tcp::v4(), HELLO_PORT))
 {
  start();
 }
// ...
The ctor calls the Server private method start(), that creates a new connection on the ASIO I/O context received from the main and stored in the acceptor. The socket owned by the connection is used in the async_accept() call on the acceptor, so that the server would wait for a client connection on it.
void start()
{
 ba::io_context& io = acceptor_.get_executor().context();
 std::shared_ptr<Connection> connection = Connection::create(io);
 tcp::socket& socket = connection->socket();
 acceptor_.async_accept(socket, std::bind(&Server::handle, this, connection, std::placeholders::_1));
}
As second parameter, async_accept() expects an ASIO AcceptHandler, a void free function that gets in input a constant reference to a boost system error code, we bind it to call the following Server private method:
void handle(std::shared_ptr<Connection> connection, const bs::error_code& ec)
{
 if (!ec)
 {
  connection->start();
 }
 start();
}
If the handshake with the client worked fine, we use the connection to write - asynchronously - through the socket. Then we call again Server start(), to prepare the server to accept another connection.

This is more or less all. You could see the full C++ code on GitHub.

I have tested this server using the client described in the previous post. I found interesting adding here and there sleeps and printing to console to better observe how the process work. For more fun I'd suggest you to run more clients and let ASIO I/O control to run on a few threads, as shown in the strand example. The code is based on the official ASIO tutorial, Daytime.3 example.

Go to the full post

Boost ASIO synchronous exchange on TCP/IP

Let's build a simple synchronous client-server application based on the TCP/IP protocol using the Boost ASIO ip tcp socket. The server waits a connection on a port, as it comes, it writes a message and then terminate. The client connects to the server, reads its message from the socket, outputs it, and then it terminates too.

Main

I have put both client and server in a single app, if no parameter is passed to the main, the process acts as server, otherwise as a client.
namespace ba = boost::asio;
// ...
const std::string HOSTNAME{ "localhost" };  // 1
// ...

int main(int argc, char* argv[])
{
 ba::io_context io;  // 1

 if (argc > 1)
  client(io, HOSTNAME);
 else
  server(io);
}
1. Used by the client, when connecting to the server. In this simple example both server and client live on the same machine.
2. An io_context (also known as io_service, but that name is now deprecated) is the first requirement for almost anything in ASIO, so I create it as first thing, then is passed by reference to the client or server function, accordingly to the number of parameters passed to the program.

Server

The following code in the server function throws exceptions deriving from std::exception to signal problems. Being this just an example, we just wrap it in a try-catch and output the relative message.
using ba::ip::tcp;
// ...
const int HELLO_PORT = 50013;
// ...

tcp::acceptor acceptor{ io, tcp::endpoint(tcp::v6(), HELLO_PORT) };  // 1

{   // 2
 tcp::socket socket{ io };  // 3
 std::cout << "Server ready" << std::endl;
 acceptor.accept(socket);  // 4

 std::string message{ "Hello from ASIO" };  // 5
 bs::error_code ec; // 6
 ba::write(socket, ba::buffer(message), ec);  // 7
}
1. Instantiate an object of the ASIO TCP/IP acceptor, so that we can listen for connections. We pass to it the ASIO io_context and a TCP endpoint, created specifying the version of the TCP/IP protocol to use (4 or 6) and the port to be used.
2. Here this block is executed just once. Convert it to a for loop for a more common behavior.
3. Each client connection requires a dedicated ASIO TCP/IP socket to be managed. Here it is created and, at the end of the block, exiting the scope, the socket dtor would clean it up.
4. The server sits down, waiting for a client to be served.
5. When the acceptor has accepted a client on the socket, the server wakes up and builds a message.
6. The ASIO write call in the next line requires an error code, to be set in case something goes wrong. We won't even check it here, usually this is not a good idea.
7. The message is converted to an ASIO buffer, so that it could be consumed by the ASIO write() to be written to the socket.

Client

It mirrors the server, with the part of the acceptor taken by a resolver.
tcp::resolver resolver{ io };
tcp::resolver::results_type endpoints = resolver.resolve(host, HELLO_PORT_STR);  // 1

tcp::socket socket{ io };
ba::connect(socket, endpoints);  // 2

for (;;)  // 3
{
 std::array<char, 4> buf;  // 4
 bs::error_code error;
 size_t len = socket.read_some(ba::buffer(buf), error);  // 5

 if (error == ba::error::eof)  // 6
  break; // Connection closed cleanly by peer
 else if (error)
  throw bs::system_error(error);  // 7

 std::cout.write(buf.data(), len);  // 8
 std::cout << '|';  // 9
}
std::cout << std::endl;
1. The resolver is resolved on the required host and port, returning a list of valid endpoints on them.
2. We call the ASIO connect() on the socket created in the line above, specifying the endpoints resolved in (1).
3. Let's loop until the full message is received from the server.
4. I have set the buffer size to a ridiculously low size, just for see it better at work.
5. read_some() data from the socket in the buffer.
6. If we reach end of file, the connection has been correctly completed.
7. Otherwise we interrupt the operation throwing the Boost system error we got.
8. Use the partial data received from the server.
9. This pipe character is put at the end of each chunk of data read only for seeing the effect on the read message.

Full C++ code is on GitHub. It is based on the Daytime.1 and Daytime.2 example from the official Boost ASIO tutorial.

Go to the full post

Simple ASIO TCP client/server example

A server sits on a specified port, and when a client connects, it sends a message and terminates. A client connects to the server, reads from the socket the message, and terminates. Nothing fancy, but it could be a good introduction on how to use ASIO synchronously to create TCP/IP connections.

After five years, the code is getting obsolete. I have reviewed it moving to the (currently - March 2018) latest version of ASIO, please follow the link to the new post. Sorry for the trouble.


You could get the full C++ code for this example on Github. If you run the executable with no parameter, you start the server, otherwise the client.

Server

In this trivial implementation, my server accepts just one connection before terminating, but it is pretty easy to make it run forever. It is just a matter of running this block in an indefinite loop, and not just once as I do here:
{
  boost::asio::ip::tcp::socket socket(aios); // 1
  std::cout << "Server ready" << std::endl;
  acceptor.accept(socket); // 2

  std::string message("Hello from ASIO");
  boost::asio::write(socket, boost::asio::buffer(message)); // 3
}
1. Create a new TCP/IP socket on an already existing ASIO I/O service.
2. Wait for a client connection.
3. Write a message on the socket to the client.

At the end of the block the socket is automatically closed by its dtor.

Before that, I have done some preparatory stuff:
boost::asio::io_service aios; // 1
boost::asio::ip::tcp::acceptor acceptor(aios, // 2
  boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), HELLO_PORT)); // 3
1. The I/O service is the first thing required.
2. The acceptor is used to accept calls from the clients.
3. We need to pass to the acceptor the endpoint, that specifies the protocol used (here is TCP/IP version 4), and the port used (here defined in a constant that I set to 50013).

Client

The client code is symmetrical. First step is about preparing the job:
boost::asio::io_service aios;

boost::asio::ip::tcp::resolver resolver(aios); // 1
boost::asio::ip::tcp::resolver::iterator endpoint = resolver.resolve(
  boost::asio::ip::tcp::resolver::query(host, HELLO_PORT_STR)); // 2
1. Resolver is the counterpart for acceptor. Calling resolve() on it, we get an iterator pointing to the first endpoint associated to a specific address. We can use that iterator to open a connection through the server on a socket, as we'll see below.
2. Query for a specific host and port (here I specified localhost and 50013, notice that both are c-strings).

Now I am ready to open the connection on a socket. If you are using a recent version of Boost Asio (I am working with 1.54), this is done in a one-liner:
boost::asio::connect(socket, endpoint);
If no connection could be opened on any endpoint, a boost system system_error is thrown.

On older asio versions, there was not such a connect() overload, and you have to implement its behavior by hand, in a piece of code like this:
boost::system::error_code error = boost::asio::error::host_not_found;
boost::asio::ip::tcp::resolver::iterator end; // 1
while(error && endpoint != end)
{
  socket.close();
  socket.connect(*endpoint++, error); // 2
}
if(error)
  throw boost::system::system_error(error); // 3
1. The default ctor for a resolver iterator returns its "end" on, we use it to loop on all the endpoints returned by resolver::resolve().
2. Try to connect to the current endpoint, in case of failure loop until we have another endpoint to check.
3. Can't find any endpoint, throw an exception.

Once we have a socket connected to the server, it's just a matter of getting the message it sends to us:
for(;;)
{
  std::array<char, 4> buf; // 1
  boost::system::error_code error;
  size_t len = socket.read_some(boost::asio::buffer(buf), error); // 2

  if(error == boost::asio::error::eof)
    break; // 3
  else if(error)
    throw boost::system::system_error(error); // 4

  std::cout.write(buf.data(), len);
  std::cout << '|'; // 5
}
std::cout << std::endl;
1. Usually I would use a much bigger buffer.
2. Partial read of the message, limited by the buffer dimension.
3. Detected end of file, stop looping.
4. In case of error throws an exception.
5. I show the junctions in the message due to the local buffer, usually it is rebuilt seamlessly.

I written this post as a review of a piece of code I conceived at beginning 2011, that is still documented in a couple of posts, one dedicated to the server part, the other to the client part. You may want to have a look a it.

The original source is the Boost Asio tutorial. Here is the slightly different version of their client and of the their server.

Go to the full post

A second ROUTER-DEALER example

In the previous post we have seen a ZeroMQ ROUTER-DEALER application where the ROUTERs act as servers. We can easily modify it to have the DEALER acting as servers instead.

Now the routers are clients, so they connect to all the DEALER sockets available and then pend on the socket to get its share of messages. They are a stored in a temporary vector, and then they are are send back to the dealer. Before sending messages, the dealers should have been connected by the routers. A simple way to achieve this, is let the dealer threads to sleep for a while before start to send.

The routers and dealers are started in this way:
boost::thread_group threads;
for(int i = 0; i < nRouters; ++i) // 1
    threads.create_thread(std::bind(router2dealer, nDealers));
for(int i = 0; i < nDealers; ++i) // 2
    threads.create_thread(std::bind(dealer4router, nRouters, i));
threads.join_all();
1. There should be at least on router, the upper limit is determined by you system. Each client thread runs an instance of router2dealer().
2. I use the same definition for socket address as in the previous example, so we can have just one or two dealers. Each server thread runs on dealer4router().

This is the function for each client thread:
void router2dealer(int n) // 1
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skRouter(context, ZMQ_ROUTER, id);
    for(int i =0; i < n; ++i)
    {
        skRouter.connect(SKA_CLI[i]); // 3
        dumpId("ROUTER CLI on", SKA_CLI[i]);
    }

    std::vector<zmq::Frames> messages;
    messages.reserve(n); // 4

    for(int i =0; i < n; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2);
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 5
    {
        skRouter.send(frames);
    });
}
1. Each client connects to all the n servers, it receives n messages, one from each server.
2. It is handy to specify the socket id for testing purpose, the thread id looks to me a good choice.
3. Each client binds to all the servers.
4. We already know how may messages we are going to receive, one for each server, we are going to store them in the messages STL vector.
5. A router could act as an asynchronous client, and we see it here. Firstly we have received all the expected messages, now we are sending them back, each to its own original sender.

Each server thread runs on this function:
void dealer4router(int n, int index) // 1
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id);
    skDealer.bind(SKA_SRV[index]);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2
    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index);
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i)
    {
        std::string in = skDealer.recvAsString(); // 3
        dumpId(in.c_str());
    }
}
1. Each dealer knows the number of clients, so to send a message to each of them, and its own id, to bind to the right socket address.
2. For such a simple example, waiting a while before start sending messages, it is enough as a way to ensure that all the expected client are up.
3. The dealer servers too act asynchronously. Firstly sending all the messages to the clients, and now receiving the feedback. Notice that the dealer is sending a mono-frame message, 0MQ prepend the dealer id to the message, transforming it in a two-frame message.

Go to the full post

A first ROUTER-DEALER example

Both ROUTER and DEALER ZeroMQ sockets are designed to work asynchronously. Accordingly to the current design requirements, we can use one or the other as server. Here we see the case where the ROUTER sockets are all servers, in the next post we'll see how to write a simple 0MQ application where the DEALER sockets play in the server role.

Same background of the previous posts, the reference platform is Windows, Visual C++2010, C++, ØMQ 2.2 through the standard ZeroMQ C++ wrapper, plus my zmq::Socket class that you can see on github. There is nothing complicated in the code, so I guess it could be easily refactored to work on any environment supported by 0MQ with a limited effort.

The application is designed to to such a pointless job: a few DEALER clients create one message for each ROUTER available, and then send them all. Then each of them gets a bunch of reply on the socket, and print them to let the user see the result. (Not) surprisingly, each client gets back all and only the messages that it sent to the servers.

You can play around changing the number of dealer and routers. In my example I can have just one or two routers, but you can have how many dealer as you want, and your system allows.

The application in developed to run on a single process, and many threads, to keep testing easy, and this is the code that spawns all the client and server threads:
boost::thread_group threads;
for(int i = 0; i < nDealers; ++i) // 1
    threads.create_thread(std::bind(dealer2router, nRouters, i));
for(int i = 0; i < nRouters; ++i) // 2
    threads.create_thread(std::bind(router4dealer, nDealers, i));
threads.join_all();
1. The number of dealers could range from 1 to ... a big number. The actual limit depends on your working environment. Each thread runs the dealer2router(), that asks for the number of routers, and an id for creating a different message for each client.
2. The number of routers is more definite, since we should specify the actual socket address for each of them. Being lazy, here we can have just one or two routers. But it is easy to increase that number. Each server runs on router4dealer(), function that needs the number of expected messages (one for each dealer) and the router id (so that we can peek up its correct socket address).

As I said, only two server maximum are available:
const char* SKA_CLI[] = {"tcp://localhost:5380", "tcp://localhost:5381" };
const char* SKA_SRV[] = {"tcp://*:5380", "tcp://*:5381" };
Extends these arrays if you want more routers. In the following code, you are going to see a call to a utility function named dumpId(), for which I don't show the source code. It just print to std::cout, locking to protect this shared resource by concurrent accesses.

Each client thread runs this function:
void dealer2router(int n, int index)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id); // 1
    for(int i =0; i < n; ++i)
    {
        skDealer.connect(SKA_CLI[i]); // 2
        dumpId("DEALER CLI on", SKA_CLI[i]);
    }

    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index); // 3
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i) // 4
    {
        std::string in = skDealer.recvAsString();
        dumpId(in.c_str());
    }
}
1. For testing purpose, I set each dealer id to its thread id, so that the output looks easier to be checked.
2. The parameter "n" passed to the function represents the number of routers to which each dealer could connect. Here we are actually connecting to each of them.
3. The parameter "n" represents also the number of messages that each dealer sends, being that one for each available router. The message sent is build in a bit cryptic way, its length is increased at each iteration, starting with one, and it contains a repetition of the same character, based on the loop iteration and the "index" parameter passed to the function to identify the current dealer. The sense of this messy setup is making the output recognizable during the testing.
4. Here we see the dealer's asynchronicity. Firstly we send all the messages, than we receive them all.

The dealer is sending out a multipart message composed by two frames: its address, that I explicitly set to the thread id in (1), and the payload. The address will be used by the router to identify which dealer has to get a reply, and will be consumed by 0MQ. That's way in (4) we should receive just the payload.

Each server runs this code on its own thread:
void router4dealer(int nMsg, int id)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER);
    skRouter.bind(SKA_SRV[id]); // 1
    dumpId("ROUTER SRV on", SKA_SRV[id]);

    std::vector<zmq::Frames> messages;
    messages.reserve(nMsg); // 2

    for(int i =0; i < nMsg; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2); // 3
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 4
    {
        skRouter.send(frames);
    });

    dumpId("ROUTER done");
}
1. The id passed to the function is used to select the right socket address for the current router. You should carefully check it to avoid out of range values, that should drive this puny application to a miserable crash.
2. We already know how many messages this router is going to receive, so we reserve enough room for them in the vector where we are going to temporary store them.
3. Only one message is expected from each dealer in the application, and each of them is composed by two frames, the dealer address, and the message payload.
4. The routers are asynchronous too, and we see that at work here. Firstly we have received all the messages from the dealers, now we are sending them back. I am using an STL for_each algorithm that iterates on each element of the vector where the messages have been stored. For each message, a C++11 lambda function is called that makes use of the socket, accessed by reference, and accepts in input the current element of the vector, again by reference. In the lambda body we just send the message back through the socket.

Go to the full post

Basic DEALER-REP example

I stripped out every possible detail and wrote a minimal ZeroMQ that uses a DEALER-REPLY socket combination. It should be useful to play around with it to see how this pattern works on its own, before combining it in a more complex structure.

The code is based on ØMQ 2.2 for C++, using my zmq::socket_t extension that you can find in an include file on github). The target platform is Windows + Visual C++ 2010. Some STL, Boost, and C++11 features have been used.

The dealer thread creates a few multipart messages, sends them through the socket, and wait for their echo. Each reply thread would get its part of that messages, and send them back to the socket.

There are a few interesting point to notice.

The dealer sends multipart messages, composed by three frames, representing the sender address, a separator, and the actual payload. The reply socket gets just the payload, it has no access to the address (an nor to the separator).

The dealer performs a fair load balancing among the REP sockets connected to it, so we expect each client to get the same share of messages. To let the example run as expected, we should ensure that all the clients are connected to the server before the messages begin to be sent on the socket, otherwise only the already connected ones would get their share.

The clients and server are created by this code:
boost::thread_group threads;
for(int i = 0; i < nRep; ++i) // 1
    threads.create_thread(std::bind(rep2dealer, nMsg)); // 2
threads.create_thread(std::bind(dealer, nRep, nMsg)); // 3
threads.join_all();
1. nRep is the number of REP clients that we want the application to have. You should ensure that it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REP client runs its own copy of the rep2dealer() function, see below. It expects in input the number of messages that each client should receive. Same consideration as seen in (1) applies.
3. The server runs on the dealer() function. It needs to know how many clients to expect, and how many messages to send to each of them.

The socket addresses are:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
I am not showing here the code for the utility function dumpId(), it just print to std::cout, locking to protect this shared resource by concurrent accesses.

Here is the function executed by each client:
void rep2dealer(int nMsg)
{
    dumpId("REP startup");
    zmq::context_t context(1);
    
    zmq::Socket skRep(context, ZMQ_REP); // 1
    skRep.connect(SK_ADDR_CLI);
    dumpId("REP CLI on", SK_ADDR_CLI);

    for(int i =0; i < nMsg; ++i) // 2
    {
        std::string msg = skRep.recvAsString();
        dumpId("REP for", msg.c_str());
        skRep.send(msg);
    }
}
1. Reply socket, connected to the DEALER one that will see in the server.
2. On a reply socket we synchronously receive and send messages. Here we loop to do that for a (single-part) message for the number of expected messages for each client. If the balance done by the dealer wouldn't be fair, or for any reason a client would receive less messages than expected, it would hang trying to get more stuff.

And this is the server:
void dealer(int nRep, int nMsg)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    zmq::Socket skDealer(context, ZMQ_DEALER); // 1
    skDealer.bind(SK_ADDR_SRV);
    dumpId("DEALER SRV on", SK_ADDR_SRV);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2

    zmq::Frames frames; // 3
    frames.reserve(3);
    frames.push_back(boost::lexical_cast<std::string>(boost::this_thread::get_id()));
    frames.push_back("");
    frames.push_back("");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        frames[2] += 'k'; // 4
        skDealer.send(frames);
    }
    dumpId("all sent");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        zmq::Frames input = skDealer.blockingRecv(3); // 5
        dumpId(input[2].c_str()); // 6
    }

    dumpId("DEALER done");
}
1. This is the DEALER socket accepting the connection from the REP client sockets.
2. Before sending messages, we wait till all the clients are connected, so that each of them would get its own share of messages.
3. Let's prepare the base for the message to be sent. It is a three part message, first frame is the address of the sender, here it is not interesting, but still I set it to the thread id. Second element is empty, acting as a separator. Third element is the payload. It is initialized empty and modified in the sending loop, to give some sensible testing feedback.
4. The first generated message has a single 'k' as a payload, any other time another 'k' is added.
5. Getting the message back from the clients.
6. Dump to standard output the payload.

Go to the full post

A very simple REQ-ROUTER example

This is the simplest ZeroMQ REQ-ROUTER example I could conceive. I have written it for ØMQ 2.2 on Windows for MSVC, using the C++ built-in wrapper, with an extension to manage easier multipart messages (you can find it as an include file on github). I have made use of STL, Boost, and some C++11 support, but nothing impressive.

The application runs on just one process, on which a few client threads are created, each of them using its own REQ socket, all of them connected to the same ROUTER socket running on another thread. The protocol used is tcp, so we can easily port the example to a multiprocess setting.

Each client sends a single message to the server. The server gets all the messages, it knows in advance how many clients are going to connect to it, and then it sends back them all through the sockets.

The code will make all clearer. Here is an excerpt of the main function:
boost::thread_group threads;
for(int i = 0; i < nReq; ++i) // 1
    threads.create_thread(req4router); // 2
threads.create_thread(std::bind(router, nReq)); // 3
threads.join_all();
1. nReq is the number of REQ clients that we want the application to have. You should get it from the user and ensure it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REQ client runs its own copy of the req4router() function, see below.
3. The router runs on the router() function. It needs to know how many clients to expect.

As said, I used the tcp protocol, and this is how I defined the socket addresses:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
Both client and server make use of an utility function named dumpId() that I won't show in this post, please have a look on the previous posts to have an idea how it should work, but I guess you can figure it out by yourself. It just print to std::cout the strings are passed to it. It takes care of locking on a mutex, since the console is a shared resource we need to protect it from concurrent accesses. Besides it also print the thread id, for debugging purpose.

Here is the function executed by each client:
void req4router()
{
    dumpId("REQ startup");
    zmq::context_t context(1);
    
    zmq::Socket skReq(context, ZMQ_REQ); // 1
    skReq.connect(SK_ADDR_CLI); // 2
    dumpId("REQ CLI on", SK_ADDR_CLI);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    skReq.send(id); // 4
    std::string msg = skReq.recvAsString();
    dumpId(msg.c_str());
}
1. zmq::Socket is my zmq::socket_t subclass.
2. Let's connect this REQ socket as a client to the ROUTER socket, that plays as a server.
3. To check if everything works as expected, we should send a unique message. The thread id looks a good candidate for this job. To convert a Boost thread id to a string we need to explicitly cast it through lexical_cast.
4. Send and receiving on a socket go through specialized methods in zmq::Socket that take care of the dirty job of converting from standard string to ZeroMQ buffers.

And this is the server:
void router(int nReq)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 1
    skRouter.bind(SK_ADDR_SRV);
    dumpId("ROUTER SRV on", SK_ADDR_SRV);

    std::vector<zmq::Frames> msgs; // 2
    for(int i = 0; i < nReq; ++i)
    {
        msgs.push_back(skRouter.blockingRecv(3)); // 3
    }
    dumpId("all received");

    std::for_each(msgs.begin(), msgs.end(), [&skRouter](zmq::Frames& frames)
    {
        skRouter.send(frames); // 4
    });

    dumpId("ROUTER done");
}
1. This is the ROUTER socket accepting the connection from the REQ client sockets.
2. Frames is actually just a typedef for a vector of strings. In msgs we store all the messages coming from the clients.
3. zmq::Socket::blockingRecv() gets the specified number of frames (three, in this case) from the socket, and returns them as a zmq::Frames object. That object is pushed in the buffer vector. The first frame contains the address of the sender, the second one is just a separator, and the third is the actual payload.
4. Just echo back to each client the message it has sent. The lambda function passed to for_each() needs to know what skRouter is, that is way it is specified in the capture clause, as parameter.

Go to the full post

Monitoring broker state with PUB-SUB sockets

This is the first step in describing a fairly complex ZeroMQ sample application based on the example named Interbroker Routing in the ZGuide. We want to create a few interconnected brokers, so that we could exchange jobs among them. In order to do that, each broker should be able to signal to its peers when it has workers available for external jobs. We are going to do that using PUB-SUB socket connections.

Each broker as a PUB socket that would publish to all the other brokers its status, and a SUB socket that would get such information from the others.

My prototype implementation differs from the original ZGuide code for a few ways.

- It is written for Windows+MSVC, so I couldn't use the inproc protocol, that is not available on that platform. I have used tcp instead.
- I ported it from the czmq binding to the standard 0MQ 2.x light-weight C++ one, improved by a zmq::Socket class that provides support for multipart messages, feature that is missing in the original zmq::socket_t.
- The implementation language is C++, so in the code you will see some STL and Boost stuff, and even some C++11 goodies, as lambda function.
- The brokers run all in the same process. It wouldn't make much sense in a real life project, but it makes testing faster. In any case, it should be quite easy to refactor the code to let each broker running in its own process. A minor glitch caused by this approach is that I had to take care of concurrency when printing to standard output. That is the reason why all the uses of std::cout in the code are protected by mutex/lock.

As said above, in this first step each broker doesn't do much, just using the PUB/SUB sockets to share its state with its peers.

These are the addresses used for the broker state sockets, both for client and server mode:
const char* SKA_BROKER_STATE_CLI[] =
    {"tcp://localhost:5180", "tcp://localhost:5181", "tcp://localhost:5182"};
const char* SKA_BROKER_STATE_SRV[] =
    {"tcp://*:5180", "tcp://*:5181", "tcp://*:5182"};
Each broker prototype runs on a different thread the below described function, passing as broker an element of the second array above, and as peers the other two elements from the first array. So, for instance, the first broker is going to have "tcp://*:5180" as server address, and "tcp://localhost:5181", "tcp://localhost:5182" as peer:
void stateFlow(const char* broker, const std::vector<std::string>& peers)
{
    zmq::context_t context(1); // 1

    zmq::Socket stateBackend(context, ZMQ_PUB); // 2
    stateBackend.bind(broker);

    zmq::Socket stateFrontend(context, ZMQ_SUB); // 3
    stateFrontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    std::for_each(peers.begin(), peers.end(), [broker, &stateFrontend](const std::string& peer)
    {
        stateFrontend.connect(peer.c_str());
    });

    std::string tick("."); // 4
    zmq_pollitem_t items [] = { { stateFrontend, 0, ZMQ_POLLIN, 0 } }; // 5
    for(int i = 0; i < 10; ++i) // 6
    {
        if(zmq_poll(items, 1, 250 * 1000) < 0) // 7
            break;

        if(items[0].revents & ZMQ_POLLIN) // 8
        {
            zmq::Frames frames = stateFrontend.blockingRecv(2);
            dumpId(frames[0].c_str(), frames[1].c_str()); // 9
            items[0].revents = 0; // cleanup
        }
        else // 10
        {
            dumpId("sending on", broker);
            zmq::Frames frames;
            frames.reserve(2);
            frames.push_back(broker);
            frames.push_back(tick);
            stateBackend.send(frames);

            tick += '.';
            boost::this_thread::sleep(boost::posix_time::millisec(333)); // 11
        }
    }
}
1. Each broker has its own ZeroMQ context.
2. The stateBackend socket is a PUB that makes available the state of this broker to all the subscribing ones.
3. The stateFrontend socket is a SUB that gets all the messages from its subscribed publisher (as you see in the next line, no filter is set)
4. Currently we are not really interested on the information the broker is sending, so for the time being a simple increasing sequence of dots will do.
5. We poll on the frontend, waiting for messages coming from the peers.
6. I don't want to loop forever, just a few iterations are enough to check the system.
7. Poll for a quarter of second on the SUB socket, in case of error the loop is interrupted. This is the only, and very rough, error handling in this piece of code, but hey, it is just a prototype.
8. There is actually something to be read. I am expecting a two-part message, so I use a checked zmq::Socket::blockingRecv() that throws an exception (not caught here, meaning risk of brutal termination) if something unexpected happens.
9. Give some feedback to the user, dumpId() is just a wrapper for safely printing to standard output, adding as a plus the thread ID.
10. Only if no message has been received from peers, this broker sends its status.
11. As for (10), this line makes sense only because there is no real logic in here, the idea is that I don't want always the same broker to send messages an the other just receiving, so I put to sleep who sends, ensuring in this way it won't send again the next time.

The full C++ source code for this example is on github. The Socket class is there too.

Go to the full post

Async 0MQ app - worker

If you have already read the sort of informal design for this sample asynchronous application, then the post on the client code, and even the one on the server, you are ready to see how I have implemented the worker code.

In brief: this is a C++ ZeroMQ application, based on version 2.2, developed on Windows for MSVC2010, using the 0MQ C++ binding improved by defining a zmq::Socket class that extends the standard zmq::socket_t to simplify multipart messages management.

We have seen in the previous post that the server creates a (variable) number of worker threads and then expects these workers to be ready to work on the messages coming from the client. Here is that function:
void worker(zmq::context_t& context) // 1
{
    zmq::Socket skWorker(context, ZMQ_DEALER); // 2
    skWorker.connect(SK_BCK_ADDR);

    while(true)
    {
        zmq::Frames frames = skWorker.blockingRecv(2, false);
        if(frames.size() == 1) // 3
            break;

        int replies = rand_.getValue(); // 4
        for(int i =0; i < replies; ++i)
        {
            boost::this_thread::sleep(boost::posix_time::millisec(100 * rand_.getValue()));
            dumpId("worker reply");
            skWorker.send(frames);
        }
    }
}
1. Server and workers share the same context.
2. Each worker has its own DEALER socket that is connected (next line) to the DEALER socket on the server. SK_BCK_ADDR is a C-string specifying the inproc address used by the socket.
3. The "real" messages managed by the workers should be composed by two frames, the address of the client and the actual message payload. A mono-framed message is interpreted by the worker as a request from the server to shutdown.
4. To check the asynchronicity of the application, the worker has this strange behavior: it sends back to the client a random number of copies of the original message, each of them is sent with a random delay.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

Go to the full post

Async 0mq application - server

This sample ZeroMQ async application is described in a previous post, here I am commenting the server part of it, that is connected to the already seen clients by DEALER/ROUTER sockets, and to a few workers that we'll seen next, by DEALER/DEALER sockets.
The AsynchronousCS constructor creates a thread running on this function, part of the private section of the same class:
void server(int nWorkers) // 1
{
    zmq::context_t context(1);
    zmq::Socket skFrontend(context, ZMQ_ROUTER); // 2
    skFrontend.bind(SK_SRV_ADDR);

    zmq::Socket skBackend(context, ZMQ_DEALER); // 3
    skBackend.bind(SK_BCK_ADDR);

    boost::thread_group threads;
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(&AsynchronousCS::worker, this, std::ref(context))); // 4

    zmq_pollitem_t items [] =
    {
        { skFrontend, 0, ZMQ_POLLIN, 0 },
        { skBackend,  0, ZMQ_POLLIN, 0 }
    };

    while(true)
    {
        if(zmq_poll(items, 2, 3000 * 1000) < 1) // 5
            break;

        if(items[0].revents & ZMQ_POLLIN) // 6
        {
            zmq::Frames frames = skFrontend.blockingRecv(2);
            skBackend.send(frames);
            items[0].revents = 0; // cleanup
        }
        if(items[1].revents & ZMQ_POLLIN) // 7
        {
            zmq::Frames frames = skBackend.blockingRecv(2);
            skFrontend.send(frames);
            items[1].revents = 0; // cleanup
        }
    }

    for(int i = 0; i < nWorkers; ++i) // 8
        skBackend.send("");

    threads.join_all(); // 9
}
1. The number of workers used by the server is set by the user.
2. zmq::Socket is an extension of the zmq::socket_t as found in the default 0MQ C++ binding. This server socket is used to keep a connection the client (DEALER) sockets.
3. This second socket is used to keep a DEALER/DEALER connection with each worker.
4. A new thread is created for each worker (its code is discussed in the next post), notice that a reference to the ZeroMQ context is passed to each worker, so that all sockets on the server are sharing the same 0MQ context.
5. As a simple way of terminating the server execution, I set the limit of three seconds on the polling. If nothing is received on either socket in such a time frame, the indefinite loop is interrupted.
6. Input received on the frontend, the multipart message is read and passed to the backend.
7. Same as (6), but the other way round.
8. When the clients are not sending anymore messages, it is time to send a terminator to each worker. As a convention, a single empty message would be interpreted by the worker as a terminator.
9. Give time to the workers to terminate, and then the server is down.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

Go to the full post

Async 0mq application - DEALER/ROUTER client

Follow the link for a view on the asynchronous ZeroMQ application I am describing in these posts. Here I won't care of the big picture but I am focusing on the function that is executed by each thread running as a client.

Each client thread is created passing as argument the address of this function, defined as private in the class AsynchronousCS:
void client()
{
    zmq::context_t context(1); // 1
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skClient(context, ZMQ_DEALER, id); // 3
    skClient.connect(SK_CLI_ADDR);

    zmq_pollitem_t items[] = { { skClient, 0, ZMQ_POLLIN, 0 } };

    std::string message("message ");
    for(int i =0; i < 6; ++i) // 4
    {
        for(int heartbeat = 0; heartbeat < 100; ++heartbeat) // 5
        {
            zmq_poll(items, 1, 10 * 1000); // polls each 10 millis
            if(items[0].revents & ZMQ_POLLIN)
            {
                std::string msg = skClient.recvAsString(); // 6
                dumpId("client receives", msg); // 7

                items[0].revents = 0;
            }
        }
        dumpId("client sends", message);
        skClient.send(message);
        message += 'x';
    }

    while(zmq_poll(items, 1, 1000 * 1000) > 0) // 8
    {
        if(items[0].revents & ZMQ_POLLIN)
        {
            std::string msg = skClient.recvAsString();
            dumpId("coda", msg);
            items[0].revents = 0; // cleanup
        }
    }
}
1. Even if the client threads run in the same process as the server, each of them has its own ZeroMQ context, this makes it easy to refactor the application to more common solution where each client runs in its own process.
2. I am going to use the thead id as socket id.
3. zmq::Socket is my extension of the zmq::socket_t as found in the default 0MQ C++ binding. This constructor calls zmq_setsockopt() to set its socket ZMQ_IDENTITY to the passed id. This DEALER client socket is going to connect to a ROUTER server socket.
4. The client is going to send a few messages to the server.
5. We loop 100 times, polling on the socket with a limit of 10 millis. Expecting only a few messages in input, we could assume that this loop will take not much less than a second to be executed.
6. This zmq::Socket function takes care of receiving, on a zmq::message_t and converting it to a std::string.
7. Accessing the shared resource std::cout to dump a message requires a mutex/lock protection, all of that is taken care in this function.
8. As described in the previous post, as a way to keep the code simple, I have implemented no way for the server to tell to the client that the stream of messages is terminated, on the contrary, is the client itself that wait a "reasonable" amount of time (1 sec) after the last input message, before deciding that no more messages are going to come. This is an high unreliable solution, but for such a simple example should suffice.

Full source C++ code for this sample application is on github, where you can find the source for zmq::Socket too.

Go to the full post

An asynchronous client/server 0mq application - introduction

This example is based on the Asynchronous Client-Server ZGuide chapter. I have done a few slight changes, primarly to adapt it to Windows - MSVC2010, and to port the code, originally written for czmq, to use the C++ standard binding available for ZeroMQ version 2.x; actually, I developed and tested the code on the most recent currently available ØMQ version, 2.2.0.

It is a single process application, but this is just for simplify its development and testing. It could be easily rewritten splitting it in a server component and a client one.

Client

We could decide how many clients to run. Each client sends half a dozen messages, one (about) every second. We want it to act asynchronously, so we continuously poll for incoming messages between a send and the next one. Once we complete sending our messages, we keep waiting for a while for answers before terminating the execution.

This behavior is uncommon, but it has the advantage of letting the client to terminate cleanly without requiring a complex server.

To allow an asynchronous behavior, the socket connection between client and server will be a DEALER-ROUTER one.

Server

The server provides the ROUTER socket to which the client DEALER sockets connect. Its job is passing each message it gets from the clients to an available worker of its own, and then sending back to the right client the feedback returned by the worker.

Since we want the worker generate a variable number of replies to a single message coming from the client, we use a DEALER-DEALER socket connection.

As a simple way of terminating the server execution, I decided just to check the polling waiting period. If nothing is coming on the server socket for more than a few seconds, we can assume that it is time to shut down the server.

We have seen that the client already has its own rule to shutdown, so from the server point of view, we have to care only of the workers. The convention I use here is to send from the server an empty message to each worker, that it is going to interpret it as a terminator.

Worker

Each worker has its own DEALER socket connected to the server DEALER socket. Its job is getting a message and echoing it, not once, but a random number of times in [0..2], introducing also a random sleep period, just to make things a bit more fancy.

The messages managed by the worker should be composed by two frames, first one is the address of the sending client, second one is the actual payload. If we receive just one frame, it should be empty, and it should be considered a terminator. And what if that single-framed message is not empty? For simplicity sake, I just terminating the worker as it would be empty.

Class AsynchronousCS

Not much coding in this post, I save the fun stuff for the next ones. Here I just say that I have implemented the thing in a class, named AsynchronousCS, designed in this way:
class AsynchronousCS
{
private:
    boost::thread_group threads_; // 1
    MyRandom rand_; // 2

    void client(); // 3
    void server(int nWorkers); // 4
    void worker(zmq::context_t& context); // 5
public:
    AsynchronousCS(int nClients, int nWorkers) : rand_(0,2) // 6
    {
        for(int i = 0; i < nClients; ++i)
            threads_.create_thread(std::bind(&AsynchronousCS::client, this));
        threads_.create_thread(std::bind(&AsynchronousCS::server, this, nWorkers));
    }

    ~AsynchronousCS() { threads_.join_all(); } // 7
};
1. The main thread has to spawn a few threads, one for the server, a variable number of them for the clients. This thread group keeps track of all of them to make easy joining them.
2. Object of an utility class to generate the random numbers used by the worker.
3. Each client thread runs on this function.
4. The server thread runs on this function, the parameter is the number of workers that the server will spawn.
5. Each worker thread runs on this function. Notice that the workers share the same context of the server, while the client and server could be easily rewritten to work in different processes.
6. Constructor, requires the number of clients and workers for the application, instantiate the random generator object, specifying that we want it to generates values in the interval [0..2], and then creates the client and server threads.
7. Destructor, joins all the threads created in the ctor.

Using this class is just a matter of instantiating an object:
void dealer2router(int nClients, int nWorkers)
{
    AsynchronousCS(nClients, nWorkers);
}
The dealer2router() function hangs on the class dtor till the join completes.

In the next posts I'll show how I have implemented the client, server, and worker functions. Full source C++ code for this sample application is on github. To easily manage multipart messages, I use an extension of the zmq::socket_t class, as defined in the ZeroMQ standard C++ binding. I named it zmq::Socket and you can find it too on github.

Go to the full post

Queue Broker v.2 rewritten

A few days ago, I wrote a version of the Least Recently Used Queue broker based on a subclass of the standard C++ ZeroMQ socket_t class. I have just redesigned it to change the way multipart messages are managed, you can read some more about the new zmq::Socket in its specific post. Here I say something about the changes in the broker itself.

The full broker source code is on github, in any case, the changes are in the way multipart messages are managed.

For instance, we can have a look at MyDevice::receivingOnBackend():
void receivingOnBackend()
{
//    zmq::Frames input = backend_.blockingRecv(3, false);
    zmq::Frames input = backend_.blockingRecv(5, false); // workerID (, "", clientID, "", payload) // 1

    // ...
    
//    if(input.size() == 3)
    if(input.size() == 5) // 2
    {
//        zmq::Frames output(input.begin() + 1, input.end());
        zmq::Frames output(input.begin() + 2, input.end()); // 3
        frontend_.send(output);
    }
}
1. On the backend we receive a workerID and, normally, the clientID and the message payload. That means one or three "real" frames. In my original implementation, we didn't get the separators, now we do. So we get a total of one or five frames.
2. As said above, a full message now has size 5, and not anymore 3.
3. We send to the frontend the clientID and the payload, to do that we discard the first two frames, the workerID and the first separator.

Go to the full post

Hello Windows Sockets 2 - client

The Winsock API is a collection of quite low level functions. As we have seen in the previous post, writing an echo server, substantially a simple procedure (wait for data, send them back, shutdown when the connection closes), it is not complicated but longish, we have access to the tiniest detail in the process. The most tedious part of the job is that we have to specify any single step by hand, even the ones that could be easily automatized. A slender C++ wrapper would be enough to make the job much more simpler, but till now I haven't found around anything like that. Any suggestion is welcomed.

Let's now complete our echo application, thinking about the client side.

Initializing Winsock

This is the same stuff that we have already seen for the server side. We need to enclose out code in a pair of function call to WSAStartup() and WSACleanup(), the Ws2_32.lib should be linked to our project, and we have to include a couple of header files, winsock2.h and ws2tcpip.h, in our C/C++ source file.

Open/close socket

Almost the same as for the server:
void client(const char* host, const char* port) // 1
{
    ADDRINFO hints;
    wsa::set(hints, 0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP); // 2

    ADDRINFO* ai = wsa::get(host, port, &hints);
    if(ai)
    {
        wsa::list(ai); // 3

        SOCKET sk = wsa::createClientSocket(ai);
        freeaddrinfo(ai);

        if(sk != INVALID_SOCKET)
        {
            coreClient(sk); // 4
            closesocket(sk);
        }
    }
}
1. The user should provide us both machine address an port, so that we can connect to the server socket.
2. Tiny variation, we specify AF_UNSPEC as address family, so that both TCP/IP v4 and v6 are accepted, when available. This does not make much sense here, since we already know that the server is using a TCP/IP v4, but in a real case scenario could be a useful trick.
3. We asked to winsock for all the address info matching our requirements. If its current version supports both TCP/IP v4 and v6, we should get both of them. This tiny utility function, see it below, checks all the retrieved address info and dump to the console some of their settings.
4. The rest of the code is just the same of what we have already seen for the server, before calling the specific client code, we create the socket, and then we'll close it. Notice also that we had to free by hand the memory associated to the address info.

As promised, here is the address info listing utility function:
namespace wsa
{
    void list(ADDRINFO* ai)
    {
        while(ai)
        {
            std::cout << ai->ai_family << ' ' << ai->ai_socktype << ' ' << ai->ai_protocol << std::endl;
            ai = ai->ai_next; // 1
        }
    }
}
1. ADDRINFO is actually a linked list of records. Its ai_next field points to the next element, if any. So here we are looping on all the items available.

Send and receive

Here is the client specific code:
void coreClient(SOCKET sk)
{
    char* message = "Hello Winsock 2!";

    int size = send(sk, message, strlen(message), 0); // 1
    if(size == SOCKET_ERROR)
    {
        std::cout << "send failed: " << WSAGetLastError() << std::endl;
        return;
    }

    std::cout << size << " bytes sent" << std::endl;

    if(shutdown(sk, SD_SEND) == SOCKET_ERROR) // 2
    {
        std::cout << "shutdown failed: " << WSAGetLastError() << std::endl;
        return;
    }

    do { // 3
        char buffer[BUFLEN];
        size = recv(sk, buffer, BUFLEN, 0); // 4
        if(size > 0)
        {
            std::cout << "Buffer received: '";
            std::for_each(buffer, buffer + size, [](char c){ std::cout << c; });
            std::cout << '\'' << std::endl;
        }
        else if(!size)
            std::cout << "Connection closed." << std::endl;
        else
            std::cout << "recv failed: " << WSAGetLastError() << std::endl;
    } while(size > 0);
}
1. The message is sent through the socket, winsock returns the number of bytes actually sent, or an error code.
2. This simple application sends just one message, so here we can already close the send component of the socket, releasing some resources doing that, still keeping alive its receive component.
3. Loop until we receive data from the server
4. The received data is put in the buffer, its size is returned by the call. A zero size message is interpreted as a shutdown signal coming from the server. A negative value has to be intepreted as an error code.

If we run the client alone, it won't find any server socket to connect, and would return an error to the user.

The full client/server source code is on github.

Go to the full post