Showing posts with label boost. Show all posts
Showing posts with label boost. Show all posts

Boost ASIO Strand example

In the previous posts, we used ASIO keeping away from any possible multithreading issue, with the noticeable exception of
Asynchronous wait on timer, part two, where a job was executed concurrently to the ASIO handler in another thread, using of a mutex, a lock, and an atomic int to let it work as expected.

With ASIO we can follow a different approach, based on its strand concept, avoiding explicit synchronization.

The point is that we won't run the competing functions directly, but we will post the calls to a strand object, that would ensure they will be executed in a sequential way. Just be sure you use the same strand object.

We have a class, Printer, with two private methods, print1() and print2(), that uses the same member variable, count_, and printing something both to cout.

We post the two functions a first time in the class constructor, asking our strand object to run them.
namespace ba = boost::asio;
// ...

class Printer
{
// ...

ba::io_context::strand strand_;
int count_;


Printer(ba::io_context& io, int count) : strand_(io), count_(count)
{
 strand_.post(std::bind(&Printer::print1, this));
 strand_.post(std::bind(&Printer::print2, this));
}
The functions would post themselves again on the same strand, until some condition is satisfied.
void print1()
{
 if (count_ > 0)
 {
  print("one");
  --count_;

  strand_.post(std::bind(&Printer::print1, this));
 }
}
And this is more or less the full story for the Printer class. No need of synchronization, we rely on the strand to have them executed sequentially.

We still have to let ASIO run on two threads, and this is done by calling the run() method from io_context from two different threads. This is kind of interesting on its own, because we bump in an subtle problem due on how std::bind() is implemented.

The official Boost ASIO tutorial suggests to use the Boost implementation:
std::thread thread(boost::bind(&ba::io_context::run, &io));
It works fine, end of the story, one would say. But let see what it happens when using the standard bind implementation:
std::thread thread(std::bind(&ba::io_context::run, &io));
// error C2672: 'std::bind': no matching overloaded function found
// error C2783: 'std::_Binder<std::_Unforced,_Fx,_Types...> std::bind(_Fx &&,_Types &&...)': could not deduce template argument for '_Fx'
Damn it. It tries to be smarter than Boost, and in this peculiar case it doesn't work. The problem is that there are two run() functions in io_context, and bind() doesn't know which one to pick up.

A simple solution would be compile our code for a "clean" ASIO version, getting rid of the deprecated parts, as is the case of the run() overload.

If we can't do that, we should provide an extra help to bind, so that it could understand correctly the function type. An explicit cast would do:
auto run = static_cast<ba::io_context::count_type(ba::io_service::*)()>(&ba::io_context::run);
std::thread thread(std::bind(run, &io));
I have taken the address of the member function run from boost::asio::io_context (also known as io_service, but now it is deprecated too) and I explicitly casted it to its actual type.

Can we get the same result in a more readable way? Well, using a lambda could be an idea.
std::thread thread([&io] { io.run(); });
You could get my full C++ code from GitHub. I based it on the Timer.5 example from the official Boost ASIO tutorial.

Go to the full post

Simple ASIO TCP client/server example

A server sits on a specified port, and when a client connects, it sends a message and terminates. A client connects to the server, reads from the socket the message, and terminates. Nothing fancy, but it could be a good introduction on how to use ASIO synchronously to create TCP/IP connections.

After five years, the code is getting obsolete. I have reviewed it moving to the (currently - March 2018) latest version of ASIO, please follow the link to the new post. Sorry for the trouble.


You could get the full C++ code for this example on Github. If you run the executable with no parameter, you start the server, otherwise the client.

Server

In this trivial implementation, my server accepts just one connection before terminating, but it is pretty easy to make it run forever. It is just a matter of running this block in an indefinite loop, and not just once as I do here:
{
  boost::asio::ip::tcp::socket socket(aios); // 1
  std::cout << "Server ready" << std::endl;
  acceptor.accept(socket); // 2

  std::string message("Hello from ASIO");
  boost::asio::write(socket, boost::asio::buffer(message)); // 3
}
1. Create a new TCP/IP socket on an already existing ASIO I/O service.
2. Wait for a client connection.
3. Write a message on the socket to the client.

At the end of the block the socket is automatically closed by its dtor.

Before that, I have done some preparatory stuff:
boost::asio::io_service aios; // 1
boost::asio::ip::tcp::acceptor acceptor(aios, // 2
  boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), HELLO_PORT)); // 3
1. The I/O service is the first thing required.
2. The acceptor is used to accept calls from the clients.
3. We need to pass to the acceptor the endpoint, that specifies the protocol used (here is TCP/IP version 4), and the port used (here defined in a constant that I set to 50013).

Client

The client code is symmetrical. First step is about preparing the job:
boost::asio::io_service aios;

boost::asio::ip::tcp::resolver resolver(aios); // 1
boost::asio::ip::tcp::resolver::iterator endpoint = resolver.resolve(
  boost::asio::ip::tcp::resolver::query(host, HELLO_PORT_STR)); // 2
1. Resolver is the counterpart for acceptor. Calling resolve() on it, we get an iterator pointing to the first endpoint associated to a specific address. We can use that iterator to open a connection through the server on a socket, as we'll see below.
2. Query for a specific host and port (here I specified localhost and 50013, notice that both are c-strings).

Now I am ready to open the connection on a socket. If you are using a recent version of Boost Asio (I am working with 1.54), this is done in a one-liner:
boost::asio::connect(socket, endpoint);
If no connection could be opened on any endpoint, a boost system system_error is thrown.

On older asio versions, there was not such a connect() overload, and you have to implement its behavior by hand, in a piece of code like this:
boost::system::error_code error = boost::asio::error::host_not_found;
boost::asio::ip::tcp::resolver::iterator end; // 1
while(error && endpoint != end)
{
  socket.close();
  socket.connect(*endpoint++, error); // 2
}
if(error)
  throw boost::system::system_error(error); // 3
1. The default ctor for a resolver iterator returns its "end" on, we use it to loop on all the endpoints returned by resolver::resolve().
2. Try to connect to the current endpoint, in case of failure loop until we have another endpoint to check.
3. Can't find any endpoint, throw an exception.

Once we have a socket connected to the server, it's just a matter of getting the message it sends to us:
for(;;)
{
  std::array<char, 4> buf; // 1
  boost::system::error_code error;
  size_t len = socket.read_some(boost::asio::buffer(buf), error); // 2

  if(error == boost::asio::error::eof)
    break; // 3
  else if(error)
    throw boost::system::system_error(error); // 4

  std::cout.write(buf.data(), len);
  std::cout << '|'; // 5
}
std::cout << std::endl;
1. Usually I would use a much bigger buffer.
2. Partial read of the message, limited by the buffer dimension.
3. Detected end of file, stop looping.
4. In case of error throws an exception.
5. I show the junctions in the message due to the local buffer, usually it is rebuilt seamlessly.

I written this post as a review of a piece of code I conceived at beginning 2011, that is still documented in a couple of posts, one dedicated to the server part, the other to the client part. You may want to have a look a it.

The original source is the Boost Asio tutorial. Here is the slightly different version of their client and of the their server.

Go to the full post

Another asynchronous wait on a steady timer

This is a new version of an oldish Boost ASIO example of mine about asynchronously waiting on a timer, keeping advantage of C++11 features. If you are looking for something simpler, there's another post on the same matter but more focused on the bare ASIO functionality. Or you could go straight to the original source, the official tutorial on Boost.

Five years later, I found out that this post requires some adjustments. You could follow the link to its March 2018 version.

The main function of this example is spawning a new thread, that runs a function that does something indefinitely. But before creating the new thread, it would set an asynchronous timer, calling a function on its expiration that would cause the runner to terminate.

It makes sense to encapsulate both function in a single class, like this:
class MyJob
{
private:
    MyJob(const MyJob&) = delete; // 1
    const MyJob& operator=(const MyJob& ) = delete;

    std::mutex mx_; // 2
    bool expired_;
public:
    MyJob() : expired_(false) {}

    void log(const char* message) // 3
    {
        std::unique_lock<std::mutex> lock(mx_);
        std::cout << message << std::endl;
    }

    void timeout() // 4
    {
        expired_ = true;
        log("Timeout!");
    }

    void operator()() // 5
    {
        for(int i = 0; !expired_; ++i)
        {
            std::ostringstream os;
            os << '[' << i << ']';

            log(os.str().c_str());
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }
};
1. I don't want an object of this class to be copyable, we'll see later why. So I remove from this class interface copy constructor and assignment operator, using the C++11 equals-delete marker.
2. There are two threads insisting on a shared resource (the standard output console), a mutex is needed to rule its access.
3. The shared resource is used in this function only. A lock on the member mutex takes care of protecting it.
4. When the timer expires, it is going to call this method.
5. This function contains the job that is going to run in another thread. Nothing fancy, actually. Just a forever loop with some logging and sleeping. The timeout is going to change the loop control variable, so that we can have a way out.

This is the user code for the class described above:
boost::asio::io_service io; // 1
boost::asio::steady_timer timer(io, std::chrono::seconds(3)); // 2

MyJob job; // 3
timer.async_wait([&job](const boost::system::error_code&) {
  job.timeout();
}); // 4

std::thread thread(std::ref(job)); // 5
io.run();
thread.join();
1. An ASIO I/O service is created.
2. I create a steady timer (that is just like an old deadline timer, but uses the C++11 chrono functionality) on the I/O service object.
3. An object that describes the job I want to run in another thread is instantiated.
4. When the timer expires, the passed lambda function is executed. It is an asynchronous call, so it returns immediately the control, that passed to the next instruction (5). The lambda would call the timeout() method on the job object, that has been captured by reference. Having defined the MyJob class as non-copyable, forgetting the ampersand, passing the job by value, results in a compiler error. Here I don't care about the error code parameter, that is set by ASIO to say if the timer has expired correctly or with an error. I just stop the job running. In a real-life usage a check would be expected.
5. Before running the I/O service, I create a thread on our job - again passed by reference, as the std::ref() shows. Again, trying to pass it by value would result in compiler errors.

Full C++ code for this example is on Github.

Go to the full post

Hello again, Boost Asio

Some time has passed since my last post on ASIO. I guess it is worthy to restart everything from scratch, with a cute tiny hello world example that shows how ASIO can work with C++11 features.

What I am going to do is just setting a one second synchronous timer on ASIO, and wait for its expiration. But using the new chrono C++11 library.

[This example is still working alright, however, five years later, I have reviewed the code, using Boost ASIO 1.66. Please, refer to updated post instead.]

Prerequisite

I am using GCC 4.8.1 on a Linux box. But I am confident that any C++11 compliant compiler will do, on any supported platform.

You would need to have Boost too, you should get it by some repository (of your Linux distribution), or you could get it directly form Boost.

If you are on Linux, and you have installed Boost through apt-get, you should have its header files in /usr/include/boost, and the libraries in /usr/lib. This would save you some time in setting up the makefile for your application.

ASIO has a strong low-level dependency to the boost_system library, you should remember to add it to your linking (-l, if you know what I mean) options, otherwise you would get some nasty errors at linker time, like this one:
/usr/include/boost/system/error_code.hpp:214:
  undefined reference to `boost::system::generic_category()'
Wait a minute ...

This example is based on the Timer.1 page of the official Boost Asio tutorial. What I changed is that I don't use the deadline_timer, that specifies periods defined as non-standard posix_time, but the basic_waitable_timer, or better its typedef steady_timer, that relies on the C++11 chrono library. By the way, this class could use also boost::chrono, if std::chrono is not available in your environment.
std::cout << "Starting ... " << std::flush; // 1

boost::asio::io_service aios; // 2
boost::asio::steady_timer timer(aios, std::chrono::seconds(1)); // 3
timer.wait(); // 4

std::cout << "done!" << std::endl; // 5
1. I do something, included some flushed output to the standard console, so that I can see that the program is working.
2. I create an Asio I/O Service
3. I start a steady timer, meaning a timer that uses the standard C++11 chrono steady clock, on the just created I/O service, passing as requested period one single second.
4. Synchronously wait until the timer expires. If the timer had already expired when the execution stream entered the wait() method, it would return immediately.
5. Back at work, I send some more feedback to the user.

I put on github the full code for this example, slightly edited. Here I used for sake of clarity the full namespace names any time it is required. On the github version, I make use of the handy C++ namespace alias capability to shorten them down like this:
namespace ba = boost::asio;
namespace sc = std::chrono;
And another thing. For this example, including the chrono standard header is not a necessity, since it is already implicitly included by the boost asio steady timer include. But I guess it makes the code clearer.

Go to the full post

Alien language

Stripping out its colorful description, the Code Jam Qualification Round 2009 Problem A, better known as Alien language, is merely a matter of applying a pattern matching function. Given a vocabulary and a pattern list, we should say how many words match to each pattern.

The major issue is that I choose to implement the solution in C++, where the regular expressions had a complicated history, and only now, with C++11, we have a standardized regex library.

But this leads to another complication. The regex implementation by GCC is still incomplete, even on the most recent version (4.8.1, when I am writing this). So I fell back to the alway reliable Boost.

My idea is creating a class, that stores the vocabulary and makes available to the user a method to check a pattern on it:
#include <vector>
#include <string>

class LangCheck
{
private:
    std::vector<std::string> vocabulary_;

public:
    LangCheck(const std::vector<std::string>& vocabulary) : vocabulary_(vocabulary) {}

    int matches(std::string pattern);
};
Yeah, right. But why passing the pattern to the matches() function by value and not by reference?

The reason is that the pattern is produced in a slightly "wrong" way, so we have to adjust it before we can use it.

Here is the test case, as derived from the input provided by the original problem:
#include <gtest/gtest.h>

TEST(AlienLang, Test)
{
    LangCheck lc({ "abc", "bca", "dac", "dbc", "cba" }); // 1

    EXPECT_EQ(2, lc.matches("(ab)(bc)(ca)")); // 2
    EXPECT_EQ(1, lc.matches("abc"));
    EXPECT_EQ(3, lc.matches("(abc)(abc)(abc)"));
    EXPECT_EQ(0, lc.matches("(zyx)bc"));
}
1. I create a temporary vector of string object on the fly, using the new C++11 initializer list, that would be automagically moved to the LangCheck data member.
2. I call the matches() methods for all the passed cases. As you see, instead of squared brackets, round ones are used. We need to modify the input to adjust it to the standard regex format.

If you plan to use boost::regex too, remember that this library is not a "pure header" one, you need to link the opportune archive, or a shareable object, to your executable. Initially I forgot to do it, and I got a number of compiling errors like:
undefined reference to boost::re_detail::perl_matcher
undefined reference to boost::basic_regex
If you have installed boost on your linux box via apt-get, you should find them in the /usr/lib folder. Once I linked boost_regex, I could happily succeed compiling my tester.

Here is how I implemented the matches() function:
#include <algorithm>
#include <boost/regex.hpp>
// ...

int LangCheck::matches(std::string pattern) // 1
{
    std::replace(pattern.begin(), pattern.end(), '(', '['); // 2
    std::replace(pattern.begin(), pattern.end(), ')', ']');

    boost::regex re(pattern); // 3

    int matching = 0;
    for(std::string word : vocabulary_)
        if(boost::regex_match(word, re)) // 4
            ++matching;

    return matching;
}
1. For what I said above, the input string is passed by value.
2. Using the standard algorithm replace() function, that works on any STL container supporting forward iterator, I adjust the brackets in the pattern.
3. Create a boost regex object from the modified pattern.
4. Let's count how many words in the current vocabulary matches the pattern. Any success would increment the counter, that in the end would be returned.

Go to the full post

Number of disc intersections - O(N square) solution

The beta 2010 test from Codility training requires knowing, at least at intuitive level, about binomial coefficients. Otherwise, considering the short time available for pondering on such problems, and the pressure you could feel if this is part of an interview process, you are doomed. Or you should fall back to a sub-optimal solution.

The problem looks quite innocent. We have an array of integers, each element representing a one-dimensional "disc" having as center the index of the element itself, and as radius the element's value. So, if A[3] is 1, this means that we have a "disc" centered on three with radius of one. You can visualize it as a segment [2, 4]. Our job is writing a function that counts the intersections among these "discs".

Setup

As usual, I started working on the problem thinking for a C++ implementation, and writing a few test cases (I use gtest) to help me to design the code. Here is three of the tests I have written, with a few constants and a type definition that would help keeping the code readable:
const int MAX_INTERSECTING_PAIRS = 10000000; // 1
const unsigned int MAX_ELEMENTS = 100000; // 2
const int MAX_VALUE = 2147483647; // 3

typedef std::vector<int>::size_type SIZE_T;

int beta(const std::vector<int>& input); // 4

TEST(TestBeta, CaseSimple) // 5
{
  std::vector<int> input = {1, 5, 2, 1, 4, 0};
  ASSERT_EQ(11, beta(input));
}

TEST(TestBeta, CaseOverflow) // 6
{
  std::vector<int> input = { 0, MAX_VALUE, 0 };
  ASSERT_EQ(2, beta(input));
}

TEST(TestBeta, CaseTooMany)
{
  std::vector<int> input;
  input.resize(MAX_ELEMENTS + 1);

  ASSERT_EQ(-1, beta(input));
}
1. If the solution is getting huge, we are allowed to (actually, we have to) abort the job and return an error code (minus one).
2. Ditto if there are too many elements in the array.
3. We can assume that the values in input are 32 bit signed integer, that means, this is the max value we could get.
4. Declaration for the function we have to implement.
5. Test case provided by Codility. We have six segments, [-1, 1], [-4, 6], [0, 4], [2, 4], [0, 8], [5, 5], that should give eleven intersections. Note that the last element has length zero, so it collapses to a single point, on x = 5.
6. Codility problems are often written to lead to surprises on the extreme cases. Better to think of a few test cases on that neighborhood. Here I ensure that a huge "disk" is checked correctly. I expect just two intersections here.
7. Let's check what happens when the input is too big.

An inefficient solution

The first solution that jumped to my mind is naive but effective, at least for a small number of elements. Just loop on all the disks, compare the right limit of the current one against the left one of all the other ones. The good thing about this algorithm is that is pretty easy to write. You should have it written and tested in a whiff. On the flip side, it has a Big Oh N Square time complexity. Meaning, it would take forever to run, in case of a large vector in input.
int beta(const std::vector<int>& input)
{
  if(input.size() < 2) // 1
    return 0;
  if(input.size() > MAX_ELEMENTS) // 2
    return -1;

  int result = 0;
  for(int64_t i = 0; i < static_cast<int64_t>(input.size() - 1); ++i) // 3
    for(int64_t j = i+1; j < static_cast<int64_t>(input.size()); ++j) // 4
      if(i + input[i] >= j - input[j]) // 5
      {
        if(result == MAX_INTERSECTING_PAIRS) // 6
          return -1;
        ++result;
      }

  return result;
}
1. If there are less than two "discs", we can't have any intersection.
2. No more than MAX_ELEMENTS are allowed in input.
3. Let's loop on all the discs (but the last one, all the job would be already done at that point).
4. As said above, I compare each disc against all the other. More precisely, I compare it against all the next ones, since the intersections with the previous ones have been already checked by the previous iterations.
5. Check if the "i" disc is overlapping the "j" one. If you wondered why I have used int64_t integers to represent the vector's indices, here is the answer. If not, in case of huge values stored as vector values, we could get here an overflow. I could have used a more natural type for the indices, and write this line something like "if((int64_t)input[i] + input[j] + i - j >= 0)". Use whichever version looks more readable to you.
6. We have an intersection, before keeping track of it, we should check if we haven't already reached the limit.

Testing time duration

To see how this implementation is bound to fail, we could write a test case that checks how long it takes to run it. Boost has a time library that comes handy for this task:
#include <boost/date_time/posix_time/posix_time.hpp>

// ...

TEST(TestBeta, Case20K)
{
  std::vector<int> input;
  input.resize(20000); // 1

  boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time();

  ASSERT_EQ(0, beta(input));

  boost::posix_time::time_duration diff = boost::posix_time::microsec_clock::local_time() - start;
  ASSERT_LT(diff.total_milliseconds(), 5); // 2
}
1. In this test I don't care much on the vector content, a twenty thousand zeroes company is good enough to test the time complexity of my function.
2. I want that checking this vector would take less than five milliseconds. Tune this value to your system and expectations.

Improve the result

We have got a solution, and this is sweet, but it is a weak one (it scores 65/100). Now we have a couple of alternative. Working on this code to make it more performant, or looking for a better algorithm that would solve the structural issues of the current one. More about it in the following posts.

Go to the full post

TopCoder Username

Another common source of interview questions is TopCoder, a site that hosts a huge variety of developer competitions. They have a standard format for tests, presented in their How to dissect a TopCoder problem statement page.

As example, they provides a few questions they have used in the past. First of them is the UserName problem. In a handful of words, we have to write a function that ensures the chosen username is available or, if not, propose a viable alternative.

I simplified a bit the required interface, tailoring it for C++. Now, my requirements are that we should write a function respecting this prototype:
std::string check(const std::string& input, const std::vector<std::string>& existing);
Where input is the username submitted by the user and existing is the list of already taken usernames. The function has to return a valid username, preferably the one chosen by the user. If the requested username is already taken, we will return it with a concatenated number - the smallest, the better.

As usual, before start coding, I have written a few testcases. Here is just one of them:
TEST(Username, Case0)
{
  std::string input("MyUsername");
  std::vector<std::string> existing = { "Taken", "Someone", "Hello", "Nickname", "User" }; // 1 

  ASSERT_EQ(input, check(input, existing)); // 2
}
1. I have written this code on a recent GCC compiler, so I took advantage of C++11 initialization for STL vectors.
2. The passed username is not among the list of already existing ones, so I expect to get it back as result of function call.

And here is my solution:
std::string check(const std::string& input, const std::vector<std::string>& existing)
{
  std::set<std::string> orderedNames(existing.begin(), existing.end()); // 1
  if(orderedNames.find(input) == orderedNames.end()) // 2
    return input;

  for(int i = 1; ; ++i) // 3
  {
    std::string candidate = input + boost::lexical_cast<std::string>(i); // 4
    if(orderedNames.find(candidate) == orderedNames.end())
      return candidate;
  }

  // we should never reach here
  throw new std::exception();
}
1. Seeing the requirements, the expected code complexity is not an issue, still I reckoned it was wise to be prepared for a huge vector of existing usernames. In that case, it is worthy to create a set so that we have them ordered, and looking for an element has a cheap O(log n) cost.
2. If the input is not among the existing items, return it.
3. We check until we find a good alternative candidate - so, I used an open ended for loop.
4. Username candidates are build concatenating the user input with a integer, starting from 1. Among the many possible ways of converting an int to a string, I've chosen the boost lexical cast.

Go to the full post

Post on ASIO strand

IMHO, the ASIO strand example on the official Boost tutorial is a bit too complex. Instead of focusing on the matter, it involves also some ASIO deadline_timer knowledge, that makes sense in the tutorial logic, but I'd say make think blurred.

So I have written a minimal example that I hope would result more intuitive as a first introduction to this concept.

This post is from April 2012, and it is now obsolete, please follow the link to the updated version I have written on March 2018.

We have a class designed to be used in a multithread context, it has as data member a resource that is meant to be shared, and we have a couple of functions that modify that shared value, and could be called from different threads.

Usually what we do is relying on mutexes and locks to synchronize the access to the shared resource. ASIO provides us the strand class, as a way to serialize the execution of the works posted to it, making unnecessary explicit synchronization. But be aware that this is true only for the functions going to the same strand.

We want to write a piece of code like this:
namespace ba = boost::asio;

// ...

ba::io_service aios;
Printer p(aios, 10); // 1
boost::thread t(std::bind(&ba::io_service::run, &aios)); // 2
aios.run(); // 3
t.join(); // 4
1. See below for the class Printer definition. In a few words, it is going to post the execution of a couple of its functions on ASIO, both of them acting on the same shared resource.
2. We run a working thread on the ASIO I/O service.
3. Also the main thread is running on ASIO.
4. Wait for the worker completion, than end the execution.

So we have two threads running on ASIO. Let's see now the Printer class private section:
class Printer
{
private:
    ba::strand strand_; // 1
    int count_; // 2

    void print(const char* msg) // 3
    {
        std::cout << boost::this_thread::get_id() << ' ' << msg << ' ' << count_ << std::endl;
    }
    
    void print1() // 4
    {
        if(count_ > 0)
        {
            print("print one");
            --count_;
            strand_.post(std::bind(&Printer::print1, this));
        }
    }

// ...
};
1. We are going to operate on a Boost Asio strand object.
2. This is our shared resource, a simple integer.
3. A utility function that dumps to standard output console the current thread ID, a user message, and the shared resource.
4. Client function for (3), if the shared resource count_ is not zero, it calls (3), than decreases count_ and post through the strand a new execution of this function. There is another private function, print2(), that is exactly like print1(), it just logs a different message.

Since we are in a multithread context, these function should look suspicious. No mutex/lock? No protection to the access of count_? And, being cout an implicitly shared resource, we are risking to get a garbled output too.

Well, these are no issues, since we are using a strand.

But let's see the Printer ctor:
Printer(ba::io_service& aios, int count) : strand_(aios), count_(count) // 1
{
    strand_.post(std::bind(&Printer::print1, this)); // 2
    strand_.post(std::bind(&Printer::print2, this));
}
1. Pay attention to how the private ASIO strand object is constructed from the I/O service.
2. We prepare the first runs, posting on the strand the execution of the private functions.

What happens is that all the works posted on the strand are sequentially executed. Meaning that a new work starts only after the previous one has completed. There is no overlapping, no concurrency, so no need of locking. Since we have two threads available, ASIO will choose which one to use for each work execution. We have no guarantee on which thread is executed what.

We don't have the troubles associated with multithreading, but we don't have some of its advantages either. Namely, when running on a multicore/multiprocessor machine, a strand won't use all the available processing power for its job.

The full C++ source code for this example is on github.

Go to the full post

Fibonacci with ASIO

The Fibonacci function is commonly used for writing testing code, because it is conceptually easy without being fully uninteresting. In this blog I have already used it a couple of times, once when showing how to implement a C++11 lambda function, then as a mean to show where standard C++11 multithreading could come in handy.

Now I use Fibonacci to show a more sensible example of an ASIO multithread application.

In the previous post, we have seen how to explicitly run and stop the ASIO I/O service, here we'll rely on the fact that ASIO would automatically end the I/O service when it has no more work to do. The trick is to post all the tasks to the service before starting the working threads. When they will go out of jobs, ASIO will end.

The worker function is still the same simple call to run() on the I/O service, with some logging added for testing purpose:
void worker(ba::io_service& aios)
{
    dump("start worker");
    aios.run();
    dump("end worker");
}
Where I have defined ba to be a synonym for boost::asio.

Our main thread is about to post to ASIO as job a few call to this function:
void calculate(unsigned int input)
{
    dump("input", input);
    int result = fibonacci(input);
    dump("output", result);
}
Finally, it calls this recursive function, trivial implementation of Fibonacci:
unsigned int fibonacci(unsigned int n)
{
    if(n < 2)
        return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
And here is the interesting part, where the main thread prepares the ASIO I/O service, spawns a few worker threads, and waits till the job is done:
ba::io_service aios; // 1

dump("starting up");
aios.post(std::bind(calculate, 35)); // 2
aios.post(std::bind(calculate, 30));
aios.post(std::bind(calculate, 20));
aios.post(std::bind(calculate, 10));

boost::thread_group threads;
for(int i = 0; i < 2; ++i) // 3
    threads.create_thread(std::bind(worker, std::ref(aios)));

dump("ready to join");
threads.join_all(); // 4
dump("job done");
1. No fake work is associated to the ASIO I/O service created here.
2. A few jobs are posted on the service.
3. A couple of threads are created on the worker function seen above. So, each thread calls run() on the service, signaling that it is available to process a pending job. ASIO will take care of assigning a new task to each thread, and when a thread finishes its work on a job, it assigns to it the next available one. And so on till there is nothing more to do.
4. The main thread waits here till all the worker threads are done, meaning, ASIO has assigned all the pending tasks to them, and they have completed each run.

The full source C++ code for this example is on github.

Go to the full post

Run and stop an ASIO I/O service

This example doesn't do anything useful, but should clarify how you could use Boost ASIO to control the execution of a multithread application.

Our main thread will create a bunch of worker threads, do some job, and finally terminate gracefully.

ASIO would be used to synchronize the job. The I/O service is created by the main thread and passed to the worker by reference, remember that it is a non-copyable object, it won't make sense to pass an ASIO I/O service by value, and if you try to do that, you are going to have a compiler error.

The worker threads are running a dummy function that just dumps a couple of messages, one before, and one after calling run() on the I/O service.

The main thread has provided a dummy I/O service work object, so that the run() called by the workers result in let them hang on it. At this point the main thread will be the only active one, and it would do whatever is its job before calling the stop() function on the ASIO I/O service. That would let terminate the execution of run() on each worker.

Last duty of the main thread is join all its spawned threads, and then it could happily terminate.

The code run by the workers won't be anything more than this:
void worker(boost::asio::io_service& aios, int id) // 1
{
    dump(id, "first step"); // 2
    aios.run(); // 3
    dump(id, "last step");
}
1. The ASIO I/O service object is passed by reference, we are actually working on the same object of the caller.
2. We are in a multithread environment, and we are accessing a shared resource, the standard output console. We have to rule its access by a mutex, if we want to avoid unpleasant mixups, and this is what the dump() function does.
3. Let's run the service.

Here is the code executed by the main thread:
boost::asio::io_service aios; // 1
boost::asio::io_service::work work(aios); // 2

boost::thread_group threads; // 3
for(int i = 0; i < 6; ++i )
    threads.create_thread(std::bind(worker, std::ref(aios), i)); // 4

dump("Main thread has spawned a bunch of threads");
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5
aios.stop(); // 6
dump("Main thread asked ASIO io service to stop");
threads.join_all(); // 7
dump("Worker threads joined");
1. The ASIO I/O service object is created. 2. A dummy ASIO work is put on the service. 3. We want to manage a group of threads, the Boost class thread_group has been designed exactly for that. 4. The worker function has to be adapted to become suitable for the create_thread() request, that's way I used std::bind(). Besides, I need to enforce that the ASIO I/O service object is passed by reference, and that is the reason for using std::ref(). 5. Using this framework just to let the main thread to take a one second nap is a kind of overkilling, but I guess you see the point. 6. Main thread is ready to terminate, so it issues a request to ASIO to stop its work. 7. And after ensuring all the threads have joined, we can terminated the application. The full C++ source code for this example is freely available on github.

Go to the full post

LRU Queue Device - worker

Before taking care of the LRU queue broker itself, and after having seen what its client does, it is time to look to the other side of the device, the worker threads.

Our device is designed to run a user defined number of workers. Each worker is associated to a thread, and each of them runs this function:
const char* SK_ADDR_BACKEND = "inproc://backend"; // 1

// ...

void worker(zmq::context_t& context) // 2
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    zmq::socket_t skWorker(context, ZMQ_REQ); // 4
    zmq_setsockopt(skWorker, ZMQ_IDENTITY, id.c_str(), id.length());
    skWorker.connect(SK_ADDR_BACKEND);

    std::string receiver;
    int payload = 0;
    while(true)
    {
        zmq::message_t zmReceiver(receiver.length()); // 5
        memcpy(zmReceiver.data(), receiver.c_str(), receiver.length());
        skWorker.send(zmReceiver, ZMQ_SNDMORE); // 5a

        zmq::message_t zmDummy;
        skWorker.send(zmDummy, ZMQ_SNDMORE); // 5b

        zmq::message_t zmOutput(sizeof(int));
        memcpy(zmOutput.data(), &payload, sizeof(int));
        skWorker.send(zmOutput); // 5c

        zmq::message_t zmClientId;
        skWorker.recv(&zmClientId); // 6
        dump(id, zmClientId);

        if(!zmClientId.size()) // 7
        {
            dump(id, "terminating");
            return;
        }
        const char* base = static_cast<const char*>(zmClientId.data());
        receiver = std::string(base, base + zmClientId.size()); // 8

        skWorker.recv(&zmDummy); // 9

        zmq::message_t zmPayload;
        skWorker.recv(&zmPayload); // 10
        if(zmPayload.size() != sizeof(int)) // 11
        {
            dump(id, "bad payload detected");
            return;
        }

        payload = *(int*)zmPayload.data(); // 12
        dump(id, payload);

        boost::this_thread::sleep(boost::posix_time::millisec(payload));
    }
}
1. We are developing a multithread ZeroMQ application, where the sockets are connected on the inproc protocol. The name I choose for the backend connection between the device and the workers is "backend".
2. As we should remember, the 0MQ context is the only object that could be shared among different threads, and actually it should be shared among them when, as in this case, we want to have a data exchange.
3. Usually we don't care to specify the socket identity, and we let ØMQ to do it. Here it is done as a way to make easier to test the application. That's why I used the Boost thread id.
4. To this point, the worker acts just like the client. Both of them are REQUEST sockets, connected inproc to a ROUTER socket in the device. The difference is on what is going to happen next. The worker is going to send a dummy message to let the device know it is up and running, than it waits for a reply, doing some job on it, sends an answer back and waits for a new job, till it gets a terminating message.
5. It takes eight lines to send a request to the device. And the first time, as said in (4), it is just a dummy. The trouble is that we are sending a multipart message, and there is no easy way to do it, out of taking care of the gory details.
As first part (5a) we are sending a character string representing the address of the client that asked for this job. The first time we have no associated client, we are just signalling to the device that we are ready, so we are actually sending an empty frame.
Second part (5b) is a zero sized frame, seen from ZeroMQ as a separator.
Third and last part (5c) is the payload, representing the result of the job performed by the worker. For the first loop this value is meaningless.
6. Now the worker hangs, waiting to receive a reply on the socket. As the name of the variable suggests, what we expect to get in it is the client id, that we'll use to send back an answer in (5a).
7. If no client id is specified, we interpret the message as a termination request, so we stop looping.
8. Otherwise, we convert the raw byte array in a proper C++ string, and we store it in the local variable that will be used in (5a).
9. Next frame is a dummy, we just ignore it.
10. Finally we receive the payload, the read stuff we should work with.
11. This sample application is designed so that only int are expected here, anything else is considered a catastrophic failure (real production shouldn't behave like this, as I guess you well know).
12. The integer in the last frame of the message is converted to an int, and used by the worker to, well, determine how long it should sleep. The value itself, without any change, is going to be used in (5c) to be sent back to the client.

The full C++ source code for this example is on github.

Go to the full post

LRU Queue Device - client

If you have read the previous post, it should be about clear how the LRU queue broker that we are going to create should work. Here we are focusing on the clients used by the device.

When we feel ready, we call the start() public function that creates a thread for each client, associating it to this function:
const char* SK_ADDR_FRONTEND = "inproc://frontend"; // 1

// ...

void client(zmq::context_t& context, int value) // 2
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    zmq::socket_t skClient(context, ZMQ_REQ); // 4
    zmq_setsockopt(skClient, ZMQ_IDENTITY, id.c_str(), id.length());
    skClient.connect(SK_ADDR_FRONTEND);

    dump(id, "client is up"); // 5

    zmq::message_t zmPayload(sizeof(int)); // 6
    memcpy(zmPayload.data(), &value, sizeof(int));
    skClient.send(zmPayload);

    dump(id, "client request sent");

    zmq::message_t zmReply;
    skClient.recv(&zmReply); // 7

    if(zmReply.size() == sizeof(int)) // 8
    {
        int reply = *((int*)zmReply.data());
        dump(id, reply);
    }
    else // unexpected
        dump(id, zmReply);
}
1. This is a multithread ZeroMQ application, the sockets are connected on the inproc protocol. The name I choose for the frontend connection device-clients is "frontend".
2. The synchronization among threads in a 0MQ application happens sharing a 0MQ context object. The second parameter passed to the function is just a value different for each client, so that we can keep track of what is going on in our testing.
3. Here we could have let ØMQ to choose the socket identity, but having a determined socket id for each client is useful from a testing point of view. I guessed the most natural choice was picking up the Boost thread id. Since there is no direct way to see that id as a string, we have to explicitly cast it using the Boost lexical_cast operator.
4. Each client has its own REQ socket, for which we set the identity as described in (2), and that we connect to the frontend router socket defined in our device.
5. We are in a multithread environment, so it is not safe to use a shared resource without protecting it through a mutex. That's why I wrote a few dump() functions to give some basic feedback to the user.
6. What this client does is simply sending the value it gets as input parameter to the device that create it by the socket that connect them. Pretty meaningless, I agree, but it should do as example.
7. This is a REQ socket, we have sent our request, now we pend indefinitely for a reply.
8. I have removed almost all the error checking from the application, to keep it as readable as possible. But I couldn't help to add just a few minimal checks like this one. We are expecting an integer as a reply (actually, it should be just an echo of the integer that we sent), so I ensure that the received message size would be as expected. If so, I extract the int from the message, and print it to the console as a such. Otherwise I print the reply for debug purpose as it would be a character string.

The C++ source code for this example is on github.

Go to the full post

LRU Queue Device - general view

Implementing the Least Recently Used pattern for ZeroMQ was a longish but not complicated job. Here things are getting tougher, since we want to create a device to rule an exchange of messages from a bunch of clients to a bunch of workers, again using the LRU pattern.

This example is based un the Request-Replay Message Broker described in the ZGuide. I have reworked it a bit, both to adapt it to the Windows environment (no ipc protocol supported there) and both to make it clearer, at least from my point of view. I used 0MQ 2.2.0, its standard light-weight C++ wrapper, and Visual C++ 2010 as developing environment.

As they put it in the ZGuide, "Reading and writing multipart messages using the native ØMQ API is like eating a bowl of hot noodle soup, with fried chicken and extra vegetables, using a toothpick", and since that is what we are actually going to do, don't be suprised if sometime it could look to you overwhelmingly complex and messy. Be patient, try to work your way though the example one step after the other, and in the end you will find out that it is not so bad, after all.

The idea is that we want to have a class, QueueDevice, that is going to be used in this way:
void lruQueue(int nWorkers, int nClients)
{
    QueueDevice device;

    device.start(nClients, nWorkers); // 1
    device.poll(); // 2
}
1. We specify the number of clients and workers insisting on our device when we start it.
2. Then we poll on the device till some condition is reached.

The QueueDevice class is going to have a bunch of private data members:
class QueueDevice
{
    // ...
private:
    zmq::context_t context_; // 1
    zmq::socket_t backend_; // 2
    zmq::socket_t frontend_;
    std::queue<std::string> qWorkers_; // 3
    boost::thread_group thWorkers_; // 4
    boost::thread_group thClients_;

    // ...
};
1. First of all, a ZeroMQ context.
2. A couple of sockets, both ROUTER as we'll see in the ctor, one for managing the backend, AKA the connection to the workers, the other for the frontend, meaning the clients.
3. I am going to use a queue to temporary store in the device which worker is available to the clients. We want to implement an LRU pattern, so a queue is just the right data structure.
4. We are going to spawn some threads for the workers and other for the clients. Actually we could have used just one thread_group, but I guess it is more readable having two of them.

There are going to be a few public methods in the QueueDevice class:
  • A constructor to initialize the sockets.
  • A destructor to join all the threads.
  • A start() method to actually start the worker and client threads.
  • A poll() method to, well, poll on the backend and frontend sockets.
The QueueDevice start() method is going to need to access a worker() and a client() fuction representing the jobs executed on the frontend and backend sides of the device.

Finally, we'll have a bunch of dumping functions to be used by about all the many threads that are going to constitute our application to print some feedback for the user on the standard console. As usual, being std::cout a shared resource, we'll need to use a mutex to avoid unpleasent mixing up.

That should be all. In the next posts I'll go through the details. The full C++ source code for this example is available on github.

Go to the full post

LRU Pattern - putting all together

Following the design explained in the ZGuide, I have written a porting to C++ of a simple application that implements the Least Recently Used messaging pattern, using the light-weight ZeroMQ C++ default wrapper, for Windows-MSVC 2010, linking to ØMQ 2.2.0, currently the latest ZeroMQ stable version.

You can find the complete C++ code for this example on github, and in the previous two posts some comments on the client side and on the worker.

There is still a tiny bit of code I should talk about, the dumpMessage() functions:
boost::mutex mio; // 1

void dumpMessage(const std::string& id, int value) // 2
{
    boost::lock_guard<boost::mutex> lock(mio); // 3
    std::cout << id << ' ' << value << std::endl;
}

void dumpMessage(const std::string& id, const char* msg)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::cout << id << ' ' << msg << std::endl;
}
1. In the rest of the code, all the synchronization among threads is done by 0MQ message exchanges, so we don't need mutexes and locks. But here we have to deal with many threads competing on the same shared resource, namely the standard output console. Ruling its access with a mutex is the most natural solution, I reckon.
2. I provide two overloads for the printing function, one for printing the socket/thread id plus its int payload, and one for id plus a message from the code to the user.
3. If there is already a thread running on the next line, we patiently wait here for our turn. The lock would be released by the dtor.

All the printing in the application is done through this couple of functions, with a notable exception in the client main function:
void lru(int nWorkers)
{
    // ...
    boost::thread_group threads;
    // ...
    threads.join_all();
    std::cout << "Number of processed messages: " << processed << std::endl;
}
But at that point we have already executed a "join all" on all the worker threads spawned by the client. We are back in the condition where only one thread is running for this process. So we don't have to worry about competing for that shared resource.

The design of this example should be clear. We enter the lru() function, create as many worker threads as required by the caller (actually, in real code it would be worthy to perform a check on that number), have a data exchange between the client and the workers, retrieve a result from all this job, and terminate the run.

From the ZeroMQ point of view, the interesting part is in the REQ-ROUTER pattern. Each worker has a request socket, so it would communicate to the client socket (a router) when it is ready for another run. When the router has no more job to be done, it would simply send a terminator to each worker asking for new feed, till no one of them is around anymore.

Go to the full post

LRU Pattern - worker

Second step of the LRU pattern example implementation. After the client, now is time to write the worker. The ZeroMQ version I used is the brand new 2.2.0, the chosen implementation language is C++, in the MSVC 2010 flavor, for the Windows operating system. The complete C++ source code is on github.

This is a multithread application, the client creates a thread for each worker, and each of them executes this function:
const char* SOCK_ADDR = "inproc://workers";

// ...

void worker(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 1
    zmq::socket_t worker(context, ZMQ_REQ); // 2
    zmq_setsockopt(worker, ZMQ_IDENTITY, id.c_str(), id.length());
    worker.connect(SOCK_ADDR);

    int processed = 0; // 3
    while(true)
    {
        zmq::message_t msg(&processed, sizeof(int), NULL);
        worker.send(msg); // 4
        zmq::message_t payload;
        if(worker.recv(&payload) == false) // 5
        {
            dumpMessage(id, "error receiving message");
            return;
        }
        if(payload.size() != sizeof(int)) // 6
        {
            dumpMessage(id, "terminating");
            return;
        }

        int value = *(int*)payload.data(); // 7
        dumpMessage(id, value);

        boost::this_thread::sleep(boost::posix_time::millisec(value)); // 8
        ++processed;
    }
}
1. As id for the REQ socket I used the thread id. To convert a Boost thread id to a string we need to go through a lexical cast.
2. The REQ socket is created on the context passed by the main thread: all the threads in a 0MQ multithread application that want to be connected should share the same context.
3. Keeping track of the messages processed in this thread, so that we can pass back this information to the client.
4. Sending a message through the socket. Notice that this socket has an identity set, so what we are sending here is actually a multipart message consisting of three frames: the identity, empty data, and the number of currently processed messages.
5. Receiving the reply to our request. The identity is automatically stripped, we have to take care only of the effective payload.
6. The error handling is very poor in this example. We expect an int as message, anything with a different size is considered equivalent to a null sized message, that I conventionally consider as a terminator.
7. Extract the int contained in the message ...
8. ... and use it to do some job, in this case just sleeping for a while.

All the synchronization among threads is done through messages sent on ZeroMQ sockets. The exception are the dumpMessage() functions, that have access to a resource (std::cout) shared among all threads. We'll how to deal with this in the next post.

Go to the full post

LRU Pattern - client

Let's implement the Least Recently Used Routing pattern for ZeroMQ 2.2.0 (just released) in C++ on Windows by MSCV. On the ZGuide you would find the rationale behind this code, and the C implementation that I used as guideline in this post. Beside porting it to C++, I have adapted it to Windows (there is no IPC support for this platform) and I have done some minor changes that, I guess, makes this example even more interesting.

It is a multithreaded application, where the client has a router socket that connects to a bunch of request sockets, one for each worker thread. The REQ sends a message containing its own id to the ROUTER when it is ready to process a new message; the ROUTER use the REQ id to reply to that specific REQ socket that is known as available. Here I talk about the client part of the application, but you can already have a look at the entire source code on github.

The main function gets as parameter the number of workers that we want to run, creates a router socket and the worker threads, each of them with its own REQ socket, on the same context, lets the router send a few messages to the workers, and then terminates them, getting as a side effect the number of messages processed by each thread:
void lru(int nWorkers)
{
    zmq::context_t context(1);
    MyRouter router(context); // 1

    boost::thread_group threads; // 2
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(worker, std::ref(context))); // 3

    for(int i = 0; i < nWorkers * 10; ++i)
        router.sendMessage(); // 4

    int processed = 0;
    for(int i = 0; i < nWorkers; ++i)
        processed += router.terminateWorker(); // 5

    threads.join_all(); // 6
    std::cout << "Number of processed messages: " << processed << std::endl;
}
1. See below for the MyRouter class, for the moment think of it as a wrapper to a ØMQ ROUTER socket.
2. The Boost thread_group makes easy to manage a group of threads, like the one we want to have here.
3. Each thread is created on the function worker(), we'll talk about it in a next post, passing to it a reference to the 0MQ context.
4. Send an average of ten messages to each 0MQ REQ socket.
5. Send a terminator to each worker, terminateWorker() returns the number of processed message for the current worker.
6. Join all the threads, give a feedback to the user and terminate.

The MyRouter class uses a const and a class:
const char* SOCK_ADDR = "inproc://workers"; // 1

class RandomTimeGenerator // 2
{
private:
    boost::random::mt19937 generator_;
    boost::random::uniform_int_distribution<> random_;
public:
    RandomTimeGenerator(int low, int high) : random_(low, high) {}
    int getValue() { return random_(generator_); }
};

class MyRouter
{
private:
    zmq::socket_t client_;
    RandomTimeGenerator rtg_;

public:
    MyRouter(zmq::context_t& context) : client_(context, ZMQ_ROUTER), rtg_(1, 1000) // 3
    {
        client_.bind(SOCK_ADDR);
    }

    void sendMessage() // 4
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);

        zmq::message_t zmDummy1;
        client_.recv(&zmDummy1);
        zmq::message_t zmDummy2;
        client_.recv(&zmDummy2);

        client_.send(zmAddress, ZMQ_SNDMORE);

        zmq::message_t zmEmpty; // 5
        client_.send(zmEmpty, ZMQ_SNDMORE);

        int value = rtg_.getValue();
        zmq::message_t zmPayload(sizeof(int));
        memcpy(zmPayload.data(), &value, sizeof(int));
        client_.send(zmPayload);
    }

    int terminateWorker() // 6
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);
        zmq::message_t zmDummy;
        client_.recv(&zmDummy);

        zmq::message_t zmPayload;
        client_.recv(&zmPayload);
        std::string id((char*)zmAddress.data(), (char*)zmAddress.data() + zmAddress.size());
        int acknowledged = *(int*)zmPayload.data();
        dumpMessage(id, acknowledged); // 7

        client_.send(zmAddress, ZMQ_SNDMORE); // 8

        zmq::message_t zmEmpty;
        client_.send(zmEmpty, ZMQ_SNDMORE);
        client_.send(zmEmpty);

        return acknowledged;
    }
};
1. The address used by the sockets for the inproc connections.
2. Class to generate a random int that will be used to simulate a randomly long task to be executed by the worker. As generator is used a Boost Marsenne twister, and as distribution an uniform integer one.
3. The ctor expects in input the ZeroMQ context that should be used to create the 0MQ ROUTER socket. Besides, the random generator is initialized, so that it would generate a series of number in the interval [1, 1000]. The client socket is bound to the above specified inproc socket address.
4. The core of this class, firstly, we wait for a worker ZeroMQ REQ socket to state that it is available, we care only about the first frame it has sent to us, that contains the address of the REQ socket. The other two parts are discarded, but the first is sent back as the first frame of our reply, so that ZeroMQ could associate it to the original sender.
5. Second frame of the reply is empty, and in the third one we place an int as returned by the random generator. Notice the ZMQ_SNDMORE flag for the first two frames, to let ZeroMQ understand as the three sends have to be seen as three parts of an single message.
6. The last message received from each 0MQ REQ is managed differently from all the previous ones. The first frame is sent back as in (4), but we use also the third frame, the actual payload in this multipart message, from which we extract an integer, that represent the number of messages that have been received by that worker.
7. This function just print on the standard console the passed parameters. But we are in a multithread context, and std::cout is a shared resource, so we should be careful.
8. We send a last message to the current worker. Actually an empty message that has to be interpreted as a terminator.

Go to the full post

uBLAS matrix_column proxy

When I wrote a function that calculates the matrix L1 norm, the resulting code would have been more readable if I had based it on the vector L1 norm functionality previously defined. But to do that, we need a class that acts like a proxy for the underlying matrix, making available one of its rows. uBLAS implements matrix_column just for cases like this.

Let's refactor my::norm_1() to use matrix_column:
// ...
#include <boost/numeric/ublas/matrix.hpp>
#include <boost/numeric/ublas/matrix_proxy.hpp> // 1
namespace ublas = boost::numeric::ublas;
namespace my
{
    double norm_1(ublas::matrix<double>& m)
    {
        double norm = 0.0;
        for(unsigned int j = 0; j < m.size2(); ++j)
        {
            ublas::matrix_column<ublas::matrix<double> > mc(m, j); // 2
            double current = my::norm_1(mc); // 3
            if(current > norm)
                norm = current;
        }
        return norm;
    }
}
1. Here are the matrix proxy definitions, remember to include this header file.
2. Declaring a matrix_column based on a matrix of doubles, passing to it the actual matrix and the index of the column we are interested in.
3. The matrix_column could be treated as a normal uBLAS vector, here we are passing it to our norm_1() functionality.

Go to the full post

Matrix and Vector L1 Norm

The L1 norm is defined for both vectors and matrices, we can easily write a C++ function to calculate it, but when possible it is better to use a more stable and generic implementation, as the one provided by the Boost Numeric uBLAS library.

L1 Norm for vectors

The L1 vector norm is defined as the sum of all absolute values of the vector elements. This is a simple L1 norm for double's vectors implementation:
namespace my
{
    double norm_1(const ublas::vector<double>& v)
    {
        auto adder = [](double partial, double value) { return partial + abs(value); }; // 1
        return std::accumulate(v.begin(), v.end(), 0.0, adder); // 2
    }
}
1. The natural implementation of this function is a one-liner, I have extract the lambda from the next line only for readability purpose. A simple adder is implemented, where the current partial result is passed in, increased with the absolute value of the current value, and returned.
2. The STL accumulate is used to sum all the element using our (1) custom adder. Note that the initial value should be explicitly set to double, since accumulate() deduces its return type from it.

L1 Norm for matrices

The L1 matrix norm is defined as the biggest L1 vector norm for each of its column. Here is a simple implementation:
namespace my
{
    double norm_1(const ublas::matrix<double>& m)
    {
        double norm = 0.0;
        for(unsigned int j = 0; j < m.size2(); ++j)
        {
            double current = 0.0; // 1
            for(unsigned int i = 0; i < m.size1(); ++i)
                current += abs(m(i,j));
            if(current > norm) // 2
                norm = current;
        }
        return norm;
    }
}
1. Calculate the L1 norm for a column in "current".
2. If the current candidate is bigger that the previously calculated candidate L1 norm, it is stored as new current L1 norm.

Examples

This piece of sample code shows how to calculate the L1 norm on a few vectors and matrices, using both my simple functions and the uBLAS versions:
// ...
namespace ublas = boost::numeric::ublas;

ublas::vector<double> v1(6);
ublas::vector<double> v2(6);
for(int i = 0; i < 6; ++i)
{
    v1[i] = i * (i%2 ? 1 : -1);
    v2[5 - i] = i;
}
ublas::vector<double> v3 = v1 * -1;

ublas::matrix<double> m1(3,3), m2(3,3);
for(int i = 0; i < 3; ++i)
{
    for(int j = 0; j < 3; ++j)
    {
        m1(i, j) = (i + j) * ((j+i)%2 ? 1 : -1);
        m2(2 - i, 2 - j) = i + j;
    }
}
ublas::matrix<double> m3 = m1 * -1;

std::cout << v1 << v2 << v3 << std::endl;
std::cout << m1 << std::endl << m2 << std::endl << m3 << std::endl;
std::cout << "Norm 1: " << ublas::norm_1(v1) << ' ' << my::norm_1(v1) << std::endl;
std::cout << "        " << ublas::norm_1(v2) << ' ' << my::norm_1(v2) << std::endl;
std::cout << "        " << ublas::norm_1(v3) << ' ' << my::norm_1(v3) << std::endl;
std::cout << "Norm 1: " << ublas::norm_1(m1) << ' ' << my::norm_1(m1) << std::endl;
std::cout << "        " << ublas::norm_1(m2) << ' ' << my::norm_1(m2) << std::endl;
std::cout << "        " << ublas::norm_1(m3) << ' ' << my::norm_1(m3) << std::endl;

Go to the full post

Complex numbers matrix transformations

The Boost numeric uBLAS library provides a handful of function to operate transformations on matrix (or vector) of complex numbers.

Element transformation

Three of them are mere convenience functions that apply a transformation for single complex numbers to each element of the container:
ublas::vector<std::complex<double> > v1(6);
for(int i = 0; i < 6; ++i) { v1[i] = std::complex<double> (i, i * -1.0); }
std::cout << v1 << std::endl;

ublas::matrix<std::complex<double> > m1(6,3);
for(unsigned int i = 0; i < m1.size1(); ++i)
    for(unsigned int j = 0; j < m1.size2(); ++j)
        m1(i, j) = std::complex<double>(i, j * -1.0);
std::cout << m1 << std::endl;

std::cout << " conjugate vector: " << ublas::conj(v1) << std::endl; // 1
std::cout << " conjugate matrix: " << ublas::conj(m1) << std::endl;
std::cout << " real vector: " << ublas::real(v1) << std::endl; // 2
std::cout << " real matrix: " << ublas::real(m1) << std::endl;
std::cout << " imaginary vector: " << ublas::imag(v1) << std::endl; // 3
std::cout << " imaginary matrix: " << ublas::imag(m1) << std::endl;
1. The conjugate of a complex number is another complex with the same real part and the imaginary part with the changed sign. The uBLAS conj() function returns a new container where each element is the conjugate of the same element in the original container.
2. A complex number has two components, real and imaginary. The uBLAS real() function returns a container where each element is the real part of the correspondent complex element in the original container.
3. Counterpart of (2), here the container in out contains the imaginary parts of the original complex numbers.

Matrix transposition

The other two functions could still be used on both vectors or matrices, even though they make actual sense only on the latter.
std::cout << " transpose vector: " << ublas::trans(v1) << std::endl; // 1
std::cout << " transpose matrix: " << ublas::trans(m1) << std::endl;
std::cout << " hermitian vector: " << ublas::herm(v1) << std::endl; // 2
std::cout << " hermitian matrix: " << ublas::herm(m1) << std::endl;
1. A transpose of a matrix is another matrix where the rows of the first are the columns of the second. A vector is a matrix with just one row, so there is no change.
2. The Hermitian matrix is another matrix, transpose of the conjugate of the original one. When applied to a vector, the Hermitian generates the conjugated vector.

Go to the full post

Products with uBLAS

Using the vectors and matrices defined in the Boost uBLAS library, we can perform a number of basic linear algebraic operations. Here is a brief description of the products functionality available.

I am performing these tests using Boost 1.49 for MSVC 2010. Here is what I did to use uBLAS vector and matrix:
#include <boost/numeric/ublas/vector.hpp> // 1
#include <boost/numeric/ublas/matrix.hpp> // 2
#include <boost/numeric/ublas/io.hpp> // 3

namespace ublas = boost::numeric::ublas; // 4
1. For the uBLAS vector definition.
2. uBLAS matrix.
3. I/O uBLAS support, it makes output to console easier.
4. Synonym definition to the boost numeric ublas namespace.

Error handling

When running in debug mode, uBLAS performs a strict data check. Usually we need to remove all this checks from the production code for efficiency reasons, making the uBLAS code faster, but moving in this way to our code the responsibility of checking all the weirdness that could happen.

If the symbol NDEBUG is defined as preprocessor constant, the uBLAS code generated by the compiler is lean and mean. Otherwise is safer, but bigger and slower.

So, if we plan to keep in production code the safer uBLAS version, we should try/catch for std::exception our uBLAS code, to react adequately in case of anomalies. Otherwise, for NDEBUG generated code, no check is performed, no exception is thrown by uBLAS. Simply if we give garbage in, we get garbage out. It is our responsibility to ensure that this won't happen.

A couple of uBLAS vectors

To begin playing a bit around with uBLAS products, we need a few of vectors:
ublas::vector<double> v1(6); // 1
ublas::vector<double> v2(6);

for(int i = 0; i < 6; ++i)
{
    v1[i] = i;
    v2[i] = i + 1;
}

ublas::vector<double> vs(3);
for(int i = 0; i < 3; ++i) { vs[i] = i + 1; }

std::cout << v1 << v2 << vs << std::endl; // 2
1. The uBLAS vector behave slightly differently from STL vector. We can specify in the constructor its size, and the number of element passed is allocated without being initialized.
2. Dump to standard output console the generated uBLAS vectors:
[6](0,1,2,3,4,5)[6](1,2,3,4,5,6)[3](0,0,0)
Inner product

We can calculate the inner product between two vectors through the uBLAS function inner_prod(). It takes in input two vectors with the same size, and returns a scalar value:
std::cout << ublas::inner_prod(v1, v2) << std::endl; // 1
std::cout << std::inner_product(v1.begin(), v1.end(), v2.begin(), 0.0) << std::endl; // 2

try // 3
{
    double badValue = ublas::inner_prod(v1, vs); // 4
    std::cout << "Bad inner product: " << badValue << std::endl; // 5
}
catch(const std::exception& ex)
{
    std::cout << "Can't get inner product: " << ex.what() << std::endl; // 6
}
1. uBLAS inner product, in this case the expected result is 70.
2. We could use STL instead.
3. Remember that makes sense try/catching uBLAS functions only if NDEBUG has not been defined
4. The two vectors involved in an inner product should have the same size. Here this is not true, trouble should be expected.
5. This line is going to be executed only if NDEBUG has been defined, and in that case the output will be a meaningless value.
6. If we are not running in NDEBUG mode, the uBLAS check on the vectors size would fail, and an exception is thrown, and here it would be caught.

Outer product

The outer products of two vectors is a matrix where the x,y element is the product of the x element of the first vector by the y element of the second one, as shown by this very basic implementation:
namespace my
{
    ublas::matrix<double> outer_prod(const ublas::vector<double>& v1, const ublas::vector<double>& v2)
    {
        ublas::matrix<double> m(v1.size(), v2.size());
        for(unsigned int i = 0; i < v1.size(); ++i)
            for(unsigned int j = 0; j < v2.size(); ++j)
                m(i, j) = v1[i] * v2[j];
        return m;
    }
}
There is no constrain on the vector sizes. If they are the same, the result is a square matrix, otherwise we'll get a rectangular one:
std::cout << "uBLAS outer product /1: " << ublas::outer_prod(v1, v2) << std::endl;
std::cout << "uBLAS outer product /2: " << ublas::outer_prod(v2, vs) << std::endl;

ublas::vector<double> v0(0);
std::cout << "uBLAS outer product /3: " << ublas::outer_prod(v1, v0) << std::endl; // 1
1. It is not illegal multiply for a zero-sized vector. In this specific case we get a 6x0 matrix.

Left-right matrix-vector product

The product between matrix and vector is not commutative. The vector could be on the right only if its size matches the matrix size1(), and on the left if it matches size2():
ublas::matrix<double> m1(6,3); // 1

for(unsigned int i = 0; i < m1.size1(); ++i)
    for(unsigned int j = 0; j < m1.size2(); ++j)
        m1(i, j) = (i + j);

std::cout << ublas::prod(v1, m1) << std::endl; // 2
std::cout << ublas::prod(m1, vs) << std::endl; // 3

try
{
    std::cout << "Right " << ublas::prod(vs, m1) << std::endl; // 4
}
catch(const std::exception& ex)
{
    std::cout << "Can't get outer product: " << ex.what() << std::endl;
}
1. A 6x3 rectangular matrix.
2. Vector v1 size is 6, matching with matrix size1.
3. All right, vector vs size is 3.
4. Size mismatch! If NDEBUG is not defined, uBLAS here throws an exception. Otherwise, we get a meaningless result.

Matrix-matrix product

We can think to the vector-matrix product as a special case of a matrix to matrix product. So, here too we should ensure that first matrix size2 matches to second matrix size1. Otherwise we are bound to get an exception (no NDEBUG) or nonsensical output (if NDEBUG is defined).
ublas::matrix<double> m2(3,6); // 1

for(unsigned int i = 0; i < m2.size1(); ++i)
    for(unsigned int j = 0; j < m2.size2(); ++j)
        m2(i, j) = (10 - i - j);

std::cout << "Output matrix: " << ublas::prod(m1, m2) << std::endl; // 2

ublas::matrix<double> mq(6,6); // 3

for(unsigned int i = 0; i < mq.size1(); ++i)
    for(unsigned int j = 0; j < mq.size2(); ++j)
        mq(i, j) = i + j;

try
{
    std::cout << "Bad: " << ublas::prod(m1, mq) << std::endl; // 4
}
catch(const std::exception& ex)
{
    std::cout << "Can't get matrix-matrix product: " << ex.what() << std::endl;
}
1. A 3x6 rectangular matrix.
2. The first matrix is a 6x3 one, so they could be multiplied.
3. A 6x6 square matrix.
4. Here the no NDEBUG version throws an exception, if NDEBUG was defined, we should expect silly values in the resulting matrix.

Element product

The element product takes as operand two vectors, or matrices, having the same sizes, and produces as result a third vector (or matrix) with the same dimensions, and where each element contains a value that is the product of the original elements in the same position.
std::cout << ublas::element_prod(v1, v2) << std::endl; // 1

try
{
    std::cout << ublas::element_prod(v1, vs) << std::endl; // 2
}
catch(const std::exception& ex)
{
    std::cout << "Bad element product: " << ex.what() << std::endl;
}
1. The vectors v1 and v2 have the same size, we can perform an element_prod() on them.
2. Different sizes in the passed vectors result in an exception (if NDEBUG not defined) or in an inconsistent result (NDEBUG defined).
Same for matrices:
ublas::matrix<double> m3(6,3);

for(unsigned int i = 0; i < m3.size1(); ++i)
    for(unsigned int j = 0; j < m3.size2(); ++j)
        m3(i, j) = (7 - i - j);

std::cout << ublas::element_prod(m1, m3) << std::endl; // 1

try
{
    std::cout << ublas::element_prod(m1, m2) << std::endl; // 2
}
catch(const std::exception& ex)
{
    std::cout << "Bad element product: " << ex.what() << std::endl;
}
1. This should works fine.
2. A mismatch in the matrices' sizes leads to an exception (NDEBUG not defined) or garbage in the output matrix (NDEBUG defined).

Element division

Element division is about the same of element product:
std::cout << ublas::element_div(v1, v2) << std::endl; // 1
std::cout << ublas::element_div(v2, v1) << std::endl; // 2

try
{
    std::cout << ublas::element_div(v1, vs) << std::endl; // 3
}
catch(const std::exception& ex)
{
    std::cout << "Bad element product: " << ex.what() << std::endl;
}

std::cout << ublas::element_div(m1, m3) << std::endl; // 4

try
{
    std::cout << ublas::element_div(m1, m2) << std::endl; // 5
}
catch(const std::exception& ex)
{
    std::cout << "Bad element product: " << ex.what() << std::endl;
}
1. This works fine.
2. This works fine, too. But given that the first element in the v1 vector is zero, the first element in the resulting vector is not a valid double, represented as 1.#INF in Windows.
3. Vectors with different sizes, their element division leads to an exception (NDEBUG not defined) or to nonsensical result (NDEBUG defined).
4. Matrices with the same sizes, it works alright with the same caveat in (2), given that we have a zero divisor.
5. Like (3), mismatching sizes for matrices lead to troubles.

Go to the full post