Pages

SoftReference

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

Comparing SoftReference and WeakReference. Using a Java soft reference to implement a cache.

Go to the full post

WeakReference and WeakHashMap

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

When in Java strong references are not enough, WeakReference and WeakHashMap could help us to save the day.
Some words on why they are useful and how garbage collector deals with them.

Go to the full post

Simple Logback filtering

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

Logback filtering by XML configuration and using custom Java classes.

Go to the full post

Loggers and appenders

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

How to configure loggers and appenders for Logback by XML and Groovy.

Go to the full post

Reloading Logback configuration (with debug)

This post has been moved to Biting Code, my blog dedicated to JVM languages.

The Logback autoscan configuration feature, and how to get debug information from configuration.

Go to the full post

Logback configuration

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

Logback could be configured using an XML or a Groovy file.

Go to the full post

Changing log level programmatically

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

Sometimes there are good reasons to go through the SLF4J façade and work on the concrete logger framework. An example based on Logback is showed.

Go to the full post

SLF4J and Logback

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

How to set up a Java application to use Logback and SLF4J as logging framework.

Go to the full post

The fastest way of not logging

Please go to Biting Code. There you will find the post that was here, and much more stuff about Java and JVM-based languages.

How much are you ready to pay for not logging? There is some costs that are unavoidable, but using SLF4J we can minimize them.

Go to the full post

Getting system properties

This post has been moved to Biting Code, my blog dedicated to Java and other JVM languages.

A few words words on the class Properties and how to get System Properties.

Go to the full post

Configuring JDK Logging

Please go to Biting Code, there you will find the post that was here, and much more stuff about Java and JVM-based languages.

How to configure JUL (java.util.logging), the standard JDK logger facility.

Go to the full post

SimpleLogger for SLF4J

Please go to Biting Code, there you will find the post originally here, and much more stuff about Java and JVM-based languages.

Some talk about NOPLogger and SimpleLogger for SLF4J

Go to the full post

Setting up SLF4J

Please go to Biting Code, there you will find the post that was here, and much more stuff about Java and JVM-based languages.

SLF4J is not a Slartibartfast's nickname, but the acronym for Simple Logging Facade for Java. As its full name elegantly explains, its raison d'être is close to the Apache Commons Logging one, of which it is a sort of successor. The post is about how to make it work in your environment, and how to run a first very simple application SLF4J-aware.

Go to the full post

Cloning a Collection

I moved the content to Biting Code, my blog dedicated to Java and JVM-based languages.

If you want to clone a collection, and what you have is just a pointer to its Collection interface, you have to (safely) downcast it to its actual type.

Go to the full post

Solving linear equations

This post has been moved to Biting Code, my blog dedicated to Java and other JVM languages.

It shows an example of solution of how to solve a system of linear equation by the lineal package of the Apache Common Math Java library.

Go to the full post

Random generators

I moved the content to Biting Code, my blog dedicated to Java and JVM-based languages.

The java.util.Random class makes available a few simple pseudo-random generators. When more refined generators are required, you could check out the ones provided by Apache Common Math.

Go to the full post

OLSMultipleLinearRegression parabola

I moved the content that was here to Biting Code, my blog dedicated to Java and JVM-based languages.

Estimating a second degree curve through the Apache Commons Math class OLSMultipleLinearRegression.

Go to the full post

The simplest OLSMultipleLinearRegression example ever

I moved this post to Biting Code, my blog dedicated to JVM languages.

A very simple example of Multivariate Linear Regression by Ordinary Least Squares by Apache Commons Math OLSMultipleLinearRegression

Go to the full post

From array of primitives to Collection

I moved this post to Biting Code, my blog dedicated to JVM languages.

Convert an array of objects to a standard Java Collection it is quite easy, thanks to Arrays.asList(), but in case of primitive values we have to be more explicit.

Go to the full post

SummaryStatistics vs. DescriptiveStatistics

I moved this post to Biting Code, my blog dedicated to JVM languages.

Introduction to a couple of Java classes, SummaryStatistics and DescriptiveStatistics, that are part for the Apache Common Math library

Go to the full post

Random expectation

I moved this post to Biting Code, my blog dedicated to JVM languages.

A single random extraction from a pseudo-random distribution should be, well, unexpected. Not much to say more.

More interesting is pondering on a number of extraction from such a distribution. I have done some testing on the expected mean in a few of such distributions available through the Java Random class.

Go to the full post

Using random to generate pi

I moved this post to Biting Code, my blog dedicated to JVM languages.

You can see it just as a clever trick, but it is useful as an example of a different way of thinking. It shows how we can use the properties of a pseudo-random sequence to infer results that sometimes would be difficult to get using classic methods.

Go to the full post

From Collection to array

I moved this post to Biting Code, my blog dedicated to JVM languages.

Concrete classes in the Java Collection hierarchy support cloning, but this is not available to the root Collection interface. So, if we don't know the actual type of a collection, we have to fall back to the Collection.toArray() methods.

Go to the full post

String, StringBuffer, StringBuilder

This post has been moved to Biting Code, my blog dedicated to JVM languages.

What is the difference among the Java classes String, StringBuffer, and StringBuilder. And when use one class or the other.

Go to the full post

Shallow copy

This post has been moved to Biting Code, my blog dedicated to JVM languages.
Difference between shallow copy and deep copy, the Object.clone() method, a couple of useful array management functions, System.arraycopy() and Arrays.copyOf(), deep copy of arrays, and how the fact that standard wrappers type are immutable impacts on this.

Go to the full post

When null does not mean false

This post has been moved to Biting Code, my blog dedicated to JVM languages.
If you have a background in other languages than Java, you could expect that null would be considered equivalent to false. This could lead to some unpleasant surprise.

Go to the full post

Synchronizing with a condition_variable

It is quite common having the requisite in a multithreaded application that a thread should wait for another thread to complete some task, and the common way to accomplish this in C++11 (or Boost, if your compiler does not implemented it yet) by using condition_variable. For instance, we have previously seen an example of a threadsafe implementation of a queue that uses a condition_variable to communicate between threads the status change.

Here we'll see an even simpler example.

A class is designed to be used in multithreaded environment. A member function is going to set an int member variable a number of times, as specified by the caller. Before resetting that value it waits that another thread would use the previously set value. To keep track of the status we use an accessory boolean variable.

Let's see a first implementation that makes no use of condition_variable:
class Conditional : boost::noncopyable
{
private:
  int value_; // 1
  bool produced_; // 2
  boost::mutex m_;

public:
  Conditional() : value_(-1), produced_(false) {}

  void produce(unsigned int count)
  {
    for(int i = count; i >= 0; --i)
    {
      boost::unique_lock<boost::mutex> l(m_); // 3
      while(produced_) // 4
      {
        std::cout << "Producer waits" << std::endl;

        l.unlock(); // 5
        boost::this_thread::sleep(boost::posix_time::millisec(100));
        l.lock();
      }
      std::cout << "Producer sets value to " << i << std::endl; // 6
      value_ = i;
      produced_ = true;
    }
  }

  void consume()
  {
    do {
      boost::unique_lock<boost::mutex> l(m_);
      while(!produced_) // wait for producer
      {
        std::cout << "Consumer waits" << std::endl;

        l.unlock();
        boost::this_thread::sleep(boost::posix_time::millisec(100));
        l.lock();
      }
      std::cout << "Consumer now is in control: " << value_ << std::endl;
      produced_ = false;
    } while(value_); // 7
  }
};
1. Variable shared among threads.
2. Flag to keep track if the current value is ready to be consumed.
3. We enter in the critical section.
4. Waiting for consumer to use a value previously set.
5. Give a chance to the other thread to do its job.
6. When we reach this line, we are ready to set the value and the flag that marks it as ready to be consumed.
7. The consumer is similar to the producer, the main difference is that it cycles till it finds a invalid value in the member variable (that is, zero).

Here is the code for testing this:
Conditional c;

boost::thread t1(&Conditional::consume, &c);
boost::thread t2(&Conditional::produce, &c, 10);

t1.join();
t2.join();
As we can see, this solution works alright. But this is not an elegant solution, main issue is that we have to specify "by hand" the interval we want our threads to sleep when waiting for the other thread to do its job. Using a condition_variable makes our code cleaner and simpler. Firstly we add a private member variable:
boost::condition_variable c_;
Then we rewrite consume and produce in this way:
void produce(unsigned int count)
{
  for(int i = count; i >= 0; --i)
  {
    boost::unique_lock<boost::mutex> l(m_);
    c_.wait(l, [this](){ return !produced_; } ); // 1
    std::cout << "Producer sets value to " << i << std::endl;
    value_ = i;
    produced_ = true;
    c_.notify_one(); // 2
  }
}

void consume() // 3
{
  do {
    boost::unique_lock<boost::mutex> l(m_);
    c_.wait(l, [this](){ return produced_; } );

    std::cout << "Consumer now is in control: " << value_ << std::endl;
    produced_ = false;
    c_.notify_one();
  } while(value_);
}
1. We can wait directly on the mutex (see the other example for details) or, as here, on the unique_lock. In this case we should pass to wait() a predicate that returns a boolean, the condition we are waiting for. Here as predicate we pass a lambda function, with a closure on this, so that we can access the produced_ flag.
2. Another thread that is pending on the condition is notified that the status has changed.
3. As before, consume() is very close to produce().

We could use the same code seen above to test our new version, and we should appreciate as fast is it now, that we removed the tentative sleeps.

Go to the full post

Reader-Writer mutex

Say that in our C++ application we have to protect some data that are read at high frequence from multiple threads and are seldom written. We could use a normal mutex-lock pattern, but that would slow down all the reading threads without any reason. The fact is that we should be able to track all the existing access to the resource and actually create a critical section only when the data change.

We can use the Boost Thread library shared_mutex and shared_lock for this purpose.

Instead of a plain mutex we define a shared_mutex to control the access to the resource; the readers would acquire a shared_lock on it before reading the data. When a writer wants to modify it, it would acquire a lock_guard on the shared_mutex, that would give it full control on the resource.

Let's see the code:
class ReadWrite
{
private:
int value_; // 1
boost::shared_mutex mx_; // 2

public:
ReadWrite() : value_(42) {}

void read()
{
boost::shared_lock<boost::shared_mutex> lx(mx_); // 3
std::cout << boost::this_thread::get_id() << " read: " << value_ << std::endl; // 4
boost::this_thread::sleep(boost::posix_time::millisec(100)); // 5
}

void increase()
{
boost::lock_guard<boost::shared_mutex> lx(mx_); // 6
std::cout << boost::this_thread::get_id() << " increasing " << value_++ << std::endl; // 7
boost::this_thread::sleep(boost::posix_time::millisec(100));
std::cout << boost::this_thread::get_id() << " new value: " << value_ << std::endl;
}

void reading() // 8
{
for(int i = 0; i < 15; ++i)
read();
}
};

1. This is just a silly example, so it would suffice having an integer as undelying resource.
2. And this is the shared_mutex we'll use to access the resource.
3. Before actually reading the data we should succeed acquiring a shared_lock on the defined shared_mutex. As many thread as we like could do it, if are all reading.
4. Some feedback to see what it is actually going on. Notice that this code is buggy, no mutex has been used for the standard output to console - this means that we are almost surely to have mixedup output. I expressely avoided to use such a mutex to keep the code to point, but it should be easy for you to fix the problem.
5. Some sleeping to make the effect of the lock more readable.
6. Here we need to access the data for changing it, so we create a lock_guard on the shared_mutex. This lock would patiently wait its turn if other threads owns the mutex, but when it get it, it would be the only one that could access the data - as we expect from a lock_guard - till it exits.
7. Notice that here the data changes.
8. Just an utility function, to make the testing easier.

And talking about testing:
TEST(ReadWrite, One)
{
ReadWrite rw;

boost::thread t1(&ReadWrite::reading, &rw); // 1
boost::thread t2(&ReadWrite::reading, &rw);

for(int i = 0; i < 4; ++i)
{
boost::this_thread::sleep(boost::posix_time::millisec(300));
rw.increase(); // 2
}

t1.join();
t2.join();
}

1. Two threads run on the reading() function.
2. Sometimes the main thread butts in and changes the undelying data.

Go to the full post

once_flag and call_once()

We have seen that, if we are developing in C++ for a multithreaded environment, it is not a good idea using the double checked locking strategy for ensure that a functionality is called just once in all the process life.

C++11 and the Thread Boost Library offer a robust alternative approach that is based on the couple once_flag and call_once().

The same code that we have seen in the previous post, could be rewritten like this:
std::shared_ptr<SomethingExpensive> sp; // 1
boost::once_flag ofsp; // 2

void setSomething()
{
boost::call_once(ofsp, [](){ // 3
sp.reset(new SomethingExpensive()); // 4
});
}

1. Using the volatile keyword on smart pointers leads to a number of complications. For this reason the original example was written using a raw pointer. We have seen that actually there is no real reason to use the volatile qualifier, so here we are free to go smart.
2. This is the once_flag associated to the resource we want to protect. Here we are using the boost implementation, but if this C++11 feature is supported by your current compiler you can use it just changing the namespace to std.
3. Calling boost::call_once() - again, use std::call_once if available for your compiler - we ensure that the passed function is called only once in all the process life. Here we pass as a function a lambda, so to make the code very compact.
4. That's the code called just once. An object is created on the heap and its address is stored in the smart pointer.

I have added a ctor to the expected expensive class to have some feedback:
SomethingExpensive() { std::cout << "ctor" << std::endl; }
And then I have written a test:
TEST(Expensive, Good)
{
boost::thread t1(&setSomething);
boost::thread t2(&setSomething);

t1.yield();
t2.yield();
}

Go to the full post

Data race on double checked locking

Double checked locking is a well known pattern used to initialize singleton. It works fine in some context, for instance it is absolutely fine using it to implement the Singleton Pattern with Java.

But when the implementation programming language is C++, the double checked locking strategy is often referred to as an anti-pattern - see for instance this wiki page on google code about data races.

The issue arise from the different meaning of the keyword volatile in Java vs. C++. In C++ volatile is just an hint to the compiler, meaning that we don't want any optimization performed on that variable, while in Java it implies constraints that ensures that any threads will read the most recent value of that variable.

So, that code that we see working fine for Java, is not equivalent with this (bad) rewrite in C++:
class SomethingExpensive
{
    // ...
};

volatile SomethingExpensive* se = nullptr; // !!! BAD IDEA !!!
boost::mutex mxse;

void setSomething()
{
    if(se == nullptr) // !!! BAD IDEA !!!
    {
        boost::lock_guard<boost::mutex> l(mxse);
        if(se == nullptr)
            se = new SomethingExpensive();
    }
}
The worst part of this code is that often, or even almost ever, it works. Just sometimes it gives an unexpected behavior that would be very difficult to track down to its real root.

Go to the full post

unique_lock when lock_guard is not enough

In the previous post we have seen that we can use lock_guard in conjunction with the lock() function (both are available in the Boost Thread library and as part of the new C++11 standard) to avoid deadlocks.

In this case the idiom is: call lock() for all the mutexes that you need to lock for that critical region, than create a lock_guard for adopting each of them, so that you have them automatically released till the end of scope - even in case of exceptions.

This should be enough for the normale usage, but sometimes we need more flexibility. That could be the time for using unique_lock (or equivalently, if you are using its boost implementation, its boost::mutex::scoped_lock typedef).

Doesn't make much sense in this case, since unique_lock adds a tiny overhead that is not required, but we can rewrite the original code in this way:
boost::lock<boost::mutex, boost::mutex>(m1_, m2_);
boost::unique_lock<boost::mutex> l1(m1_, boost::adopt_lock);
boost::unique_lock<boost::mutex> l2(m2_, boost::adopt_lock);

More interestingly, instead of adopting a mutex already locked, we could associate a mutex to unique_lock without locking it on construction, and then calling another overload of the lock() function, this one expecting in input unique_locks:
boost::unique_lock<boost::mutex> l1(m1_, boost::defer_lock);
boost::unique_lock<boost::mutex> l2(m2_, boost::defer_lock);
boost::lock(l1, l2);

Keep in mind that if you use the latter setting you must call the lock() function version that works on unique_locks, because otherwise you would lock the underlying mutex but unique_lock wouldn't get this information, so it wouldn't unlock it on destruction. That means, unique_lock would be completely useless.

Go to the full post

How to avoid deadlocks

We have seen how easy is to write code that could lead to a deadlock, and we already seen a protocol that, when carefully observed, remove that risks.

We have see how a deadlock could occur if we need to lock more than one mutex at the same time, because in that case we could have a thread that has already locked a mutex, but can't get its hands on the other (or one of the others) mutex that it needs, since another thread has already locked it for its use. And the other thread is in the same situation. Both guys are waiting for locking a mutex locked from another thread. No easy way out, one would say.

But actually, there is a way out, and it is part of the C++11 standard - and, if your compiler does not implement this new feature, you could get it through the Boost Thread library. The trick is done by the free function std::lock() / boost::lock() that locks all the passed mutex, or wait till this is possible. No partial locking is performed, so in this case the risk of deadlock is completely removed.

We can rewrite both f1() and f2() (as seen in the previous post) as this:
void f(char feedback)
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock<boost::mutex, boost::mutex>(m1_, m2_); // 1

std::cout << feedback;
++v1_;
++v2_;

m1_.unlock(); // 2
m2_.unlock();
}
}

1. As you see, lock() is a template function. It returns when it actually has performed the lock on both mutexes. There is no yield() call after this line because it actually doesn't have any use anymore, but you can try putting it (or maybe even a sleep, to force a context switch) and see what happens.
2. Major nuisance: the lock() function performs a simple lock() on the passed mutexes, no RAII as when lock_guard is used, that means we should unlock "by hand" both of them at the end of the critical area.

Do you feel uneasy for that explicit unlocks? Right, we could still use lock_guard, specifying that it has not to acquire the lock, just adopt it:
boost::lock<boost::mutex, boost::mutex>(m1_, m2_);
boost::lock_guard<boost::mutex> l1(m1_, boost::adopt_lock);
boost::lock_guard<boost::mutex> l2(m2_, boost::adopt_lock);

No more need of explicit unlock() at the end of the scope, the lock_guard dtor would take care of it.

Go to the full post

How to deadlock

It is easy to write code that could lead to a deadlock, less easy to see it in action, given that everything related to threads is in same way out of developer control, since it is the operating system - or maybe the hardware - that has the last word on when and where a thread should be executed.

But manipulating a bit the code we could magnify the effect of bad code.

Let see a class that is (badly) designed to be used in multithreading environment:
class Deadlock : boost::noncopyable
{
private:
int v1_; // 1
int v2_;
const int SIZE_;
boost::mutex m1_;
boost::mutex m2_;

public:
Deadlock(int size) : v1_(0), v2_(0), SIZE_(size) {}

int getV1() { return v1_; }
int getV2() { return v2_; }

void f1() // 2
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock_guard<boost::mutex> l1(m1_);
boost::this_thread::yield(); // 3
boost::lock_guard<boost::mutex> l2(m2_);

std::cout << '_'; // 4
++v1_; // 5
++v2_;
}
}

void f2() // 6
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock_guard<boost::mutex> l2(m2_);
boost::this_thread::yield();
boost::lock_guard<boost::mutex> l1(m1_);

std::cout << '.';
++v1_;
++v2_;
}
}
};

1. The class is build around a couple of integer, that are protected in their usage by a mutex each.
2. We have a couple of functions, f1 and f2, that acquires both mutex and then modify the two private member variables a number of times.
3. This yield() call is made to give an hint to the thread manager that we would be happy to let other threads to butt in.
4. Some visual feedback is always welcomed.
5. And here is the real job done by the function: increasing both data members.
6. The other function looks almost the same, but the locks are created in reverse order. Firstly we acquire a lock on m2_ and then on m1_. This could easily leads to a deadlock, since it tends to put the two threads in a situation where the first one owns the lock on m1_ and the second one on m2_. Both of them, in this case, will indefinitely wait for the other one, and none of them would release its one to help the other thread in carrying on its job.

Here is a test that usually would show the problem:
TEST(Deadlock, Bad)
{
const int SIZE = 20;
Deadlock d(SIZE);

boost::thread t1(&Deadlock::f1, &d);
boost::thread t2(&Deadlock::f2, &d);

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 1
ASSERT_EQ(SIZE * 2, d.getV1());
ASSERT_EQ(SIZE * 2, d.getV2());
}

1. It won't make much sense to join() the working threads, since we expect them to be deadlocked. So we just wait for a long time (an entire second!) and then check what they did in the meantime.

If no deadlock happened, the assertions would fail. Maybe on your machine this test could unexpectedly succeed, even more the once. If that is your case you could increase SIZE, or run the test more times (or both), and in the end you should get the expected behavior.

These code is so simple that it is very easy to find a way to make it right. It is enough to follow a common policy in acquiring locks, so I rewrite the second function:
void f2a()
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock_guard<boost::mutex> l1(m1_);
boost::this_thread::yield();
boost::lock_guard<boost::mutex> l2(m2_);

std::cout << '.';
++v1_;
++v2_;
}
}

And I write another test:
TEST(Deadlock, Good)
{
const int SIZE = 20;
Deadlock d(SIZE);

boost::thread t1(&Deadlock::f1, &d);
boost::thread t2(&Deadlock::f2a, &d);

boost::this_thread::sleep(boost::posix_time::seconds(1));
ASSERT_EQ(SIZE * 2, d.getV1());
ASSERT_EQ(SIZE * 2, d.getV2());
}

This one should run fine in any circumstance.

This patch is very easy to be done, but it has the issue that is all right only for simple code. We'll see in a next post a more robust and general solution.

Go to the full post

Why std::stack is not threadsafe

The C++ STL standard containers are not thredsafe, and this is a feature. The point is that threadsafe code is much slower than the code designed to be run on a single thread, so it doesn't make much sense to force a programmer to use a threadsafe container when there are no reasons for that.

Let's see the case of std::stack. This class has been designed expressely to be used in monothreading code, and it can't be easily adapted to multithreading. The main issue is in how its top() and pop() functions are designed. If we call these methods on an empty stack we have an undefined behaviour, so we have to call the other member function empty() to ensure we can actually perform that operations.

Here is a couple of tests written for Google Test that show how top() and pop() are expected to be called:
TEST(StdStack, Top)
{
const int value = 42;
std::stack<int> si;
si.push(value);

if(si.empty() == false)
{
// *** [A] ***

ASSERT_FALSE(si.empty());
EXPECT_EQ(value, si.top());
}
}

TEST(StdStack, Pop)
{
const int value = 42;
std::stack<int> si;
si.push(value);

if(si.empty() == false)
{
// *** [A] ***

ASSERT_FALSE(si.empty());
si.pop();
EXPECT_TRUE(si.empty());
}
}

We expect both tests to succeeded. But what if we have two threads sharing the same stack container? What if in [A] the other thread butt in and pop() an element from the stack? It is easy to redesign the tests to emulate this behaviour, and we can see what is the result, in both case the subsequent assertions are bound to fail.

As we see, the problem is in how the std::stack interface is designed. Splitting the functionality in couple of methods, empty()/top() and empty()/pop(), makes it impossible to adapt it to a multithreading environment. We should provide a different interface where top() and pop() does not require a previous call to empty().

Go to the full post

Automatic join for a thread

When using a thread, we often follow a simple pattern that requires us to create in a function one or more boost::thread object, doing some more job, joining all the created threads, and finally terminate the function execution.

It is such a common structure than one could wonder why not using a wrapper class, similar to lock_guard, that would join the thread in the destructor. Beside simplyfing our code, we have the free benefit of making it more robust, since we should not worry what it is going on in case of an exception occurs after a working thread is created and before its joining.

Being so similar to lock_guard in its use and behaviour, I called such an utility class ThreadGuard (using the CamelCase naming convention to stress the fact that it is not part of a standard library).

Here is it, just a few lines of code but with some interesting points of discussion in it:

class ThreadGuard : boost::noncopyable // 1
{
private:
boost::thread t_;
public:
explicit ThreadGuard(boost::thread& t): t_(std::move(t)) {} // 2

~ThreadGuard()
{
if(t_.joinable())
t_.join(); // 3
}
};

1. No copy of this class is allowed, so we declare privately its copy ctor and assignment operator - to save a bit of typing we make the class derived from the utility boost class expressely designed for this task.
2. We move the boost::thread passed to the class (by reference) to the private member. So, after creating a ThreadGuard, the original boost::thread is left in a not-a-thread status.
3. The dtor join the thread, after ensuring that the current thread is actually joinable.

We could use this class as showed in this test (written for Google Test):
TEST(ThreadGuard, Simple)
{
boost::thread t(sayDelayedHallo, 300); // 1
EXPECT_TRUE(t.joinable());

ThreadGuard tg(t);
EXPECT_FALSE(t.joinable()); // 2

std::cout << "Main thread is ready to leave" << std::endl;
} // 3

1. A boost::thread is created, and then we ensure its construction is successful
2. After creating a ThreadGuard for a boost::thread, the latter becomes a not-a-thread.
3. The join to the thread is called implicitly here.

The thread is built to run this minimal function:
void sayDelayedHallo(unsigned int delay)
{
boost::this_thread::sleep(boost::posix_time::millisec(delay));
std::cout << "Hello" << std::endl;
}

Since there is no use anymore in having here around the "raw" boost::thread, we could create a ThreadGuard with an anonymous temporary object:
ThreadGuard tg(boost::thread(sayDelayedHallo, 300));

Go to the full post

Running a deamon thread

You probably know what a deamon process is: a process running in the background in your (UNIX) environment. A deamon thread is something very similar: a thread running in the background of your process.

We have a way to interact with a "normal" thread from the main thread that spawned it, but we have no way to relate to a deamon thread - it just runs freely until its natural end, or till the process is terminated.

One could wonder what is the point of having such a beast as a deamon thread, but here it comes handy the analogy with the deamon process. In the same way we sometimes want to start running a process in our environment and just forget about it, letting the operating system the burden to take care of it, we could be interested in having a job performed in the background of our application, letting the process taking care of it, if someone has to.

We can get this effect creating a boost::thread (or a std::thread, if your compiler support this C++11 feature) and then detach the generated thread. It is a simple matter, but better seeing it in an example, just to clarify.

This is the code that we want to run in a background thread:
void countdown(unsigned int count)
{
   do {
      std::cout << '[' << count << ']';
      boost::this_thread::sleep(boost::posix_time::seconds(1));
   } while(count--);
}
As you see, it just dumps data on the standard output console, sleeps for a while, and then iterates, for a number of times specified by the caller. We should notice that we could have some problem in the output generation since we could expect the main thread would write to cout too, so a mutex should be used for this shared resource - but we won't.

Here is the code that is going to use the countdown function:
boost::thread t(countdown, 10);

if(t.joinable()) // 1
{
   std::cout << "Detaching worker.";
   t.detach(); // 2
}

if(t.joinable() == false) // 3
{
   std::cout << "The worker thread now is a deamon, running by itself in background.";
}

std::cout << "Sleeping for a while ... ";
boost::this_thread::sleep(boost::posix_time::seconds(3));
std::cout << " ... terminating main." << std::endl; // 4
1. We should detach only a boost::thread that is alive and kicking or, as they better say, is joinable.
2. That's it. We simply call detach() on the boost::thead object. Actually, no much harm comes if we detach a thread not joinable. It would just result in a silly nonsensical call.
3. The consequent printing it's a bit too optimistic. If a boost::thread is not joinable we can't positively assume that it is still alive and working in the background. It could be anything. We don't have any information on it anymore.
4. Usually we should join all the spawned threads before terminating the main thread, but here we can't. As we said, we have no more control on that working thread. We should just assume that has been written properly and no issue would come from its possible brutal killing from the process manager at the end of the process life.

Go to the full post

Race condition

When different threads or process make use of the same data, we should pay attention not to incur in race conditions.

We have a race condition when two or more concurrent branches of execution depend on the same mutable shared state.

Let's see here a simple example of race condition.

Say that we should manage directly a linked list of elements, that could be defined in such a naively way:
struct Element
{
Element(int v, Element* n) : value(v), next(n) {}

int value;
Element* next;
};

To print all the items in a list of such Element's we could use this raw function:
void dumpList(Element* curr)
{
while(curr != nullptr)
{
std::cout << curr->value << ' ';
curr = curr->next;
}
std::cout << std::endl;
}

Adding an element to the begin of our linked list is a bit more interesting:
void addFront(Element*& head, int value) // 1
{
Element* t = head ? head : nullptr; // 2
Element* e = new Element(value, t); // 3

head = e; // 4
}

1. We pass to our function the pointer to the current head of list, and we pass it by reference, since we want to be able to change it, since we want to create a new head of the list.
2. This line could be merged in the next one, we don't actually need a temporary Element here, but it should make the code a bit clearer. Only if we have a "real" current head we need to set the next of the new element we are about to create.
3. A new Element is created.
4. The Element becomes the new head of the list.

The normal usage of our list is this:
Element* head = nullptr;

addFront(head, 42);
addFront(head, 24);
dumpList(head);

Everything works fine - if we stick to a single thread environment. Using such a code in a multithreading environment is an explicit request of troubles.

But let's start looking at a case where our code still works:

Element* head = nullptr;

boost::thread t1(addFront, std::ref(head), 42); // 1
boost::this_thread::sleep(boost::posix_time::millisec(250)); // 2
boost::thread t2(addFront, std::ref(head), 24); // 3

t1.join();
t2.join();

dumpList(head);

1. This new thread access by reference local data, the pointer to the list head. That means that both thread - main and worker - are accessing the same data. We are risking a race condition.
2. Introducing a relatively long sleeping period, we serialize actually remove the concurrency from this application.
3. Also this new thread is using the same head - the two worker thread are competing on the same state. If we don't acknowledge this situation we could expect big problems.

And it is quite easy to get troubles - it is enough to remove [2]. If there is no sleep, we should expect a mixup in addFront(), leading usually to data loss - only one Element would win the insertion in the list.

To better see what is going on, and in the meantime to make more visible the issue, let's rewrite the addFront() function. We'll just add a sort of emulation of the classic stepping debug function that is designed to work effectively in a multithreading environment:
boost::mutex mio; // 1

void step(int i)
{
{
boost::lock_guard<boost::mutex> l(mio); // 2
std::cout << boost::this_thread::get_id() << "/" << i << std::endl;
}

boost::this_thread::sleep(boost::posix_time::millisec(50)); // 3
}

1. A mutex to protect the output console, being it a shared resource. Actually, we should use it also in the dumpList() function - this is not an issue here, since the dumping is done just at the end of our minimal application, when the working threads have been already joined.
2. Lock the mutex before using the shared resource.
3. Let's make the competing effect more visible adding a sleep.

Let's use the stepping function in this way:
void addFront(Element*& head, int value)
{
step(1);
Element* t = head ? head : nullptr;
step(2);
Element* e = new Element(value, t);
step(3);

head = e;
step(4);
}


Now, running the two working threads should lead to a (catastrophic) result like this one:
006E9D00/1
006E9C88/1
006E9D00/2
006E9C88/2
006E9D00/3
006E9C88/3
006E9D00/4
006E9C88/4
42

It is easy seeing where the problem arise: there is a race to assign the element created by each thread to the head of the list, that is a shared resource. The last thread that writes in head is the winner, since it is going to override the writing previously done by the other thread, that simply gets lost.

The solution to the issue is easy: defining another mutex, and using it to protect the access to the shared resource.

Go to the full post

Producer-consumer on a shared standard container

We want to write a simple multithread producer-consumer application that uses the threadsafe wrapper to std::queue we wrote in the previous post.

The main point of this is seeing how to use a STL container in a multithreading context, and we'll have also a glimpse on how to interrupt a thread (on a interruption point).

We have to use a variation on std::queue because the standard container are not designed for use in a multithreading environment - the logic behind this strategy is that in this way we don't pay the high cost of a multithread-aware data structure if we are not interested in this feature.

Producer

The idea is that the producer generates a number of items, starting from a value passed to its ctor, an puts them in a queue that is accessible to the consumer too:
class Producer
{
private:
  const int root_; // 1
  std::shared_ptr< MyQueue<int> > q_; // 2

public:
  Producer(int root, std::shared_ptr< MyQueue<int> > q) : root_(root), q_(q) {}

  void run(size_t size) // 3
  {
    for(size_t i = 0; i < size; ++i)
    {
      q_->push(root_ + i);
      boost::this_thread::sleep(boost::posix_time::millisec(50)); 
    }
  } 
};
1. Initial value for the sequence generated by the producer.
2. Shared pointer to the queue where we want to put the data.
3. This method would be run in another thread. It accepts in input the number of elements the user wants to generate, and loops, putting each time a new item in the queue and then sleeping for a while.

Consumer

The consumer does not differ much from the producer, here is a simple implementation:
class Consumer
{
private:
  const int ID_; // 1
  std::shared_ptr< MyQueue<int> > q_;

public:
  Consumer(int id, std::shared_ptr< MyQueue<int> > q) : ID_(id), q_(q) {}

  void run(size_t size, bool sleep) // 2
  {
    for(size_t i = 0; i < size; ++i)
    {
      std::cout << "Consumer " << ID_ << ": " << q_->pop() << std::endl; // 3
      if(sleep)
        boost::this_thread::sleep(boost::posix_time::millisec(50));
      else
        boost::this_thread::interruption_point();
    }
  }
};
1. Used to see which consumer actually consumed the item.
2. This function runs in a different thread. The bool parameter is used to decide if this thread should sleep after popping a value from the queue, or just trying to read another item. Notice that in the latter case, a call to interruption_point() is done, so that the main thread could interrupt this worker thread.
3. This line is flawed. There could be multiple consumers, and all of them would try to access the standard output console. This means that we could have a mixed-up printout. The solution to this bug it is quite easy: adding a mutex in this class, and protecting this line with a lock on the mutex in each thread executing it. That should be easy to be fixed, but if you need some help to understand how to do it, you could have a look to other post, like the one where I talk about how to use a functor in multithreading. Incidentally, that same post could give you the idea to rewrite this example using functors. It should be interesting.

Calling producer-consumer

A simple way of running this code is by having a single producer and a single consumer, like this:
std::shared_ptr< MyQueue<int> > spq(new MyQueue<int>()); // 1
size_t size = 10; // 2

Producer p(1000, spq); // 3
boost::thread tp(&Producer::run, &p, size); // 4
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5

Consumer c(1, spq);
boost::thread tc(&Consumer::run, &c, size, true); // 6

tp.join(); // 7
tc.join();
1. We create a queue and put it in a shared pointer, so that we can pass it around in the producer and consumer.
2. Number of elements that we want to create.
3. Here is the producer ...
4. ... and here we start its run() method in another thread.
5. We give plenty of time to the producer to fill with its data the queue ...
6. ... then we create a new thread on the run() method of a consumer.
7. Finally we join the created threads before completing the main thread execution.

It is fun (if you are that kind of guy) to playing around with variation on this model, removing the sleep at [5], adding producers and consumers, changing the number of elements involved and whatsoever.

Go to the full post

Threadsafe std::queue

The standard containers are not threadsafe, and this is a feature, in the sense of we have not to pay for synchronization if we don't need it. On the other hand this means that when we need to use a container in multithreading environment, we have to take care explicitly of all the synchronization details, and it could be not a big fun.

I am about to write a simple multithreading application that needs a queue. One thread will put data in it, the other thread will read it. To implement this well know producer-consumer model, we can't use std::queue, but we should wrap it in a structure that makes it thread safe.

Our application would be very simple, and it would require just two threadsafe operation: push() and pop(). Just one case that requires a bit of thinking: what is going to happen if we call pop() when the queue is empty? Usually we should let the user a chance to decide if it is more appropriate to return in an error state, or hanging (maybe just for a while) waiting for the producer to provide a value. In this case I decided to keep it very simple, and just waiting.

Here is a possible implementation for such a queue:
template <typename T>
class MyQueue
{
private:
  std::queue<T> q_;
  boost::mutex m_; // 1
  boost::condition_variable c_;

public:
  void push(const T& data)
  {
    boost::lock_guard<boost::mutex> l(m_); // 1
    q_.push(data);
    c_.notify_one(); // 2
  }

  T pop()
  {
    boost::mutex::scoped_lock l(m_); // 3
    while(q_.size() == 0)
    {
      std::cout << "Waiting for data" << std::endl;
      c_.wait(l); // 4
    }

    T res = q_.front();
    q_.pop();
    return res; // 5
  }
};
1. We need just the bare RAII functionality from this lock, so lock_guard is enough. Once the mutex is acquired, we can change the object status, and that here means adding the passed data to the queue.
2. We use the condition member variable to notify the other thread that a new item has been inserted in the queue.
3. This lock should be passed to the member condition, in case we need to wait for coming data, so a lock_guard is not enough.
4. If the queue is currently empty, we put the current thread in a waiting status using the condition member variable. When the other thread notify that something has changed, this thread would resume its running status.
5. The value at the beginning of the queue is popped and returned to the caller.

[edit]
Initially I put the wait() statement above, at the line marked as (4), in an if block:
if(q_.size() == 0) // THIS IS NOT A GOOD IDEA
{
  std::cout << "Waiting for data" << std::endl;
  c_.wait(l);
}
As Sergey noticed (see comments below) this is not enough. Sometimes, in a very erratic way that I couldn't catch in my testing, wait() returns even if there is no change on the condition. Patching the code is pretty easy, we need to loop on the check until we actually get a confirmation for the change, as suggested by Adrián (again, below in the comments).

The section on the official Boost documentation for Condition variables in the Synchronization page, shows to wait on a condition right in this way. Still, I couldn't find there a clarification for the reasons beneath. For that, I'd suggest you to read the post named Spurious wakeups on the Vladimir Prus blog.

In few words, Vladimir remarks that wait() on a condition could return before completing its actual job, and this is an expected behavior, caused mainly by performance reasons. It is so much cheaper introducing a check in the user code than enforcing the condition wait() to return only as a real wakeup.
[/edit]

As usual, I wrote a few tests (using the Google Test implementation for C++ of xUnit) to be driven in the development and to ensure that the class works as I expected. This is one of them:
TEST(TestMyQueue, OneOutOneIn)
{
  MyQueue<int> mq;
  int in = 1;

  boost::thread t([&](){ EXPECT_EQ(in, mq.pop()); });

  boost::this_thread::sleep(boost::posix_time::seconds(1));
  mq.push(in);
}
The main thread spawns a new thread that runs a lambda function (cool, isn't it?) then sleeps for a second and finally push a value on the queue.

The other thread has access by reference to both the variables defined in the caller scope (that is the meaning of the ampersand in the lambda intro square brackets) and uses them in the test macro, comparing the result of popping on the queue with the value stored in the integer variable.

Since the main thread sleeps one big fat second before pushing the value on the queue, we expect that pop() find the queue being initially empty, and could get the value only after a while.

Go to the full post

Boost::thread as data member

We want to write a C++ stopwatch, a way counting seconds giving the user a way to start and stopping it. We'll do that by a class having a boost::thread as private data member, and letting the user interact with it through a couple of public methods that implement the required functionality.
This is the sort of class declaration that I am thinking about:
class StopWatch : boost::noncopyable

{
private:
int count_; // 1
bool terminate_; // 2
boost::mutex mx_; // 3
boost::thread th_; // 4
void run(); // 5
public:
StopWatch() : count_(0), terminate_(false) {}
void start();
int stop(); // 6
};

1. The current stopwatch value.
2. A flag to manage the user request to the stopwatch to stop. It is going to be shared by two threads, so it requires a mutex.
3. Used to let each thread to work safely on the object status.
4. This boost::thread would care about the counter management.
5. The local thread would run on this function.
6. This function stops the clock, and returns its current value.

Some TDD

Before writing the actual code, I designed a few test cases to help me to going through the development. I used Google Test to do that. If you are interested in the matter, you could have a look to a few posts I wrote, starting from the one about how to set it up.

Here is just the first test I wrote, it should be easy for the reader to figure out the other ones:
TEST(TestStopWatch, NormalBehaviour)

{
StopWatch sw; // 1
sw.start(); // 2
int delay = 3;
boost::this_thread::sleep(boost::posix_time::seconds(delay)); // 3
EXPECT_EQ(delay, sw.stop()); // 4
}

1. We create an instance of the class.
2. The stopwatch starts.
3. We let this thread sleep for a while ...
4. and then check that stopping the watch we get the expected value.

We should care about robustness for our code, so I have written another few tests to check what happens if I stop a watch already stopped, or I try to start it when already started (nothing).

One useful aspect of writing test cases in advance, is that the developer should think of details that could be slipped from the design. For instance, in this case I found out that originally I had forgot to determine how the stopwatch should react if started, stopped, and then restarted again. I pondered a few seconds on the matter, and in the end I have arbitrarily chosen to let the counting going on without resetting.

Public member functions

I have written only a simple default ctor that, beside setting the termination flag to false, set the counter to zero. It could be useful to let the user to initialize the counter with a specific value and, if you want, you can easily improve in this sense the class, adding the relative test cases. Notice that the class extends boost::noncopyable, so that copy ctor and assignment operator are undefined and inaccessible.

The start function is short and cool. Well, we could discuss about its coolness degree, but for sure it is short:
void StopWatch::start()

{
boost::lock_guard<boost::mutex> lock(mx_); // 1
if(th_.joinable()) // 2
return;
terminate_ = false; // 3
th_ = boost::thread(&StopWatch::run, this); // 4
}

1. We are about to change the object status, we want to do it in a safe way, so we go through the mutex.
2. If the thread is joinable, it it already been started. We don't want to do anything more.
3. The status flag is already set by the ctor, but we need to reset it in case of restarting.
4. We are calling here the boost::thread move copy ctor, that rely on the new C++11 (AKA C++0x) RValue reference concept. Maybe this line would have been more readable if written like:
boost::thread temp(&Counter::run, this);

th_ = temp.move();

In this way we explicitely show that firstly we create a temporary boost::thread object that runs on the run() method of the current object, then we assign it to the member boost::thread object.

If we run step by step the code, we could appreciate how the member boost::thread object is intially created in a not-a-thread status, its thread_info are set to nullptr, and after [4] its value is swapped with the one in the temporary boost::thread object.

Given the start() function its stop() companion comes quite natural:
int StopWatch::stop()

{
{ // 1
boost::lock_guard<boost::mutex> lock(mx_);
if(!th_.joinable()) // 2
return -1;
else // 3
terminate_ = true;
}
th_.join(); // 4
return count_;
}

1. We want to limit the locking on the mutex on this part of the function code, so we open a specific scope.
2. If the member boost::thread is not joinable, then doesn't make any sense stopping it, we return a negative value to signal that something unexpected happened.
3. Otherwise we mark that the user asked he wants the clock to stop.
4. We let the running thread to be completed, then return the current value for the counter.

Private member function

Here is the code actually executed by the worker thread, on user request:
void StopWatch::run()

{
bool terminate; // 1
do {
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2
std::cout << ++count_ << ' ';
{ // 3
boost::lock_guard<boost::mutex> lock(mx_);
terminate = terminate_;
}
} while(!terminate);
}

1. To minimize the span of the critical region, we use a local variable to check the object status.
2. This toy stopwatch counts seconds. The thread sleeps for one second and then increase the counter.
3. This is the critical section where we read the current value for the object status, and we copy it to the local variable we are about to use to check when we have to stop counting.

There was already a similar post, where a class is designed to allow a delayed startup of a thread, but I think that this current example could be a bit more interesting.

Go to the full post

Running a few boost::thread's

Conceptually speaking, running a boost::thread is not a big deal. We create a thread object passing to it the piece of code we want to run for it, and that's it. The boost::thread object takes care of the entire life span of the actual thread.

But let's give some more details on how we could define the code that we want to run, having a look to a simple example that uses an old plain free function, member functions, static and not, and a functor.

Getting feedback

The code I am about to write is quite simple, but still it make sense to have an utility function that dumps to standard output a timestamp and a message. And since we are in a multithreaded world, we should pay attention to carefully shield by mutex the access to shared resources:
boost::mutex mio; // 1
void print(int flag, const char* msg)
{
  boost::lock_guard<boost::mutex> lock(mio); // 2
  std::cout << boost::posix_time::microsec_clock::universal_time() << ' ' // 3
    << flag << ": " << msg << " " << boost::this_thread::get_id() << std::endl;
}
1. Mutex used for avoid clashes when writing to the standard output console.
2. We achieve a simple RAII for acquiring and releasing a mutex through lock_guard.
3. This Boost Date_Time function gives us the current time in a portable way.

Free function

Here is a plain free function that we want to run in a separate thread. Notice that we call yield(), function available from each thread, even if not associated to a boost::thread object, at the end of any loop cycle, so that we notify to the system that the current thread could temporary stop running, if required to:
void runner(int times)
{
  for(int i=0; i < times; ++i)
  {
    print(i, "runner");
    boost::this_thread::yield();
  }
}
We can create a new thread running that free function in this way:
const int TIMES = 10;
boost::thread t1(runner, TIMES);
The first argument we pass to the ctor is the function address we want to run, then we pass all the arguments required by the function itself.

Static member function

There is no "this" pointer for a static class member, that is just included in the class namespace. So we don't expect a big difference between using free functions and static member functions in a Boost Thread context.

Considering this class:
class ARunner
{
public:
  static void runner(int times);
};

void ARunner::runner(int times)
{
  for(int i=0; i < times; ++i)
  {
    print(i, "static runner");
    boost::this_thread::yield();
  }
}
We can run a thread on its static member function like this:
boost::thread t2(ARunner::runner, TIMES);
We simply have to specify the fully qualified function name, so that the compiler could find it.

Functor

A functor is nothing more than a plain class for which is defined the operator(), so that an instance of that class could be used where a functional parameter is expected. Here is an example of such a beast:
class FunRunner : public boost::noncopyable // 1
{
private:
  unsigned const int TIMES_;
public:
  FunRunner(unsigned int times) : TIMES_(times) {}
  void operator()()
  {
    for(unsigned int i=0; i < TIMES_; ++i)
    {
      print(i, "functor runner");
      boost::this_thread::yield();
    }
  }
};
The ctor set an internal constant, and the operator() uses it in its looping.

Here is the code that uses our functor to create and run a new thread:
FunRunner fr(TIMES);
boost::thread t3(std::ref(fr));
Notice that we passed the functor by reference to the boost::thread constructor, using the std::ref() wrapper. If we won't do that, we would need a useless copy of the functor just for the boost::thread object internal usage. But we designed the functor to be non-copyable, so the code wouldn't compile at all.

Member function

Let's add a normal member function to the class we just created:
class FunRunner : public boost::noncopyable
{
// ...
public:
  void runner(const char* msg)
  {
    for(unsigned int i=0; i < TIMES_; ++i)
    {
      print(i, msg);
      boost::this_thread::yield();
    }
  }
// ...
};
Creating a boost::thread that runs this runner() function is a tad more complicated than the other ways:
boost::thread t4(&FunRunner::runner, &fr, "generic member function runner");
The constructor needs to know the function name, then it needs the "this" pointer to the class object, and finally all the parameters required from the function we want to run in the new thread.

Yielding and joining in main()

As we have seen, yield() is a free function that could be called from any thread, so we can call it from the master thread, in our main() function:
for(int i=0; i < TIMES; ++i)
{
  print(i, "main");
  boost::this_thread::yield();
}
In this sense, there is no difference among the different threads, we ask the system to run them, or we give it an hint that we could stop running for a while, but it is the operating system that decides what it is actually and where - if our application is running on a multiprocessor / multicore hardware.

The difference is that the thread that creates other threads should wait till the end of the job of its spawn before returning, and this is done by calling join() on the boost::thread objects:
t1.join();
t2.join();
t3.join();
t4.join();
This list on joins looks a bit funny, we could have made it more professional using a boost::thread_group, but we'll see it in a next post.

Go to the full post

Joining a POSIX thread

Once you create a new thread on a specified function, your original main thread is free to do whatever it likes. It could even terminate the program execution - and that would be no fun for the other thread, since it would like to have its time to do its job, too.

So, usually it is a good idea to wait until the spawned threads complete their life before terminating. To do that, in term of POSIX API, we use the pthread_join() function.Referring to the previous post, where we created the new thread like this:
pthread_t thread;

pthread_create(&thread, /* ... */);
We can now wait that the other thread completes its run calling:
pthread_join(thread, NULL);
The second parameter, when not NULL, would refer to the memory location where the value returned by the function on which that thread runs should be stored.

This function too returns zero if the specified thread joins happily, otherwise a non-zero value showing something bad happened.

Go to the full post

Creating a POSIX thread

If you are working in plain C, or even if you are working in C++ and your compiler does not support yet the friendly std::thread class (and for some obscure reason you could not use boost::thread) you should rely on your platform dependent thread API.

If you are developing for some UNIX/Linux system, there are good chances you have at hand a POSIX compliant thread API.

Here we are going to see how to create a new thread using this API.

We should pass to the new thread a pointer to the function we want to run. That function should accept in input a pointer to void and return another pointer to void. Say that we expect the caller passing as parameter a C-string, and we want jut to output it to standard output. We could write something like this:
void* task(void* arg)
{
    printf("%s\n", (const char*)arg);
    return NULL;
}
This code is painfully sensitive, we should trust the caller is actually passing what we are expecting, since we have no way to let the compiler ensure that the parameter is really a C-string.

Before calling the pthread API function to create a new thread, we need at least a variable to store the id for the newly created thread, and possibly another variable to store the integer returned from the function call, that is an error code set to zero in case success.

The resulting code should look something like this:
pthread_t thread; // 1
int res = pthread_create(&thread, NULL, task, (void*)"hello"); // 2
if(res != 0) // 3
    printf("Can't create a thread: %d\n", res);
1. The posix thread creator function expects an object of type pthread_t for storing the id of the created thread.
2. Here is how we call pthread_create(). First parameter is a pointer to a pthread_t object - this is an output parameter; second one is a pointer to the attributes we want to set for the new thread, for the moment let's assume we'll go with the default ones, and just pass a NULL to let the posix thread library know that we don't want to set anything fancy; third one is a pointer to the function the thread should run; and lastly we have the parameter we want to pass to that function.
3. A failure is signaled by returning a non-zero value. We can't actually doing much here, just giving a warning to the user.

Go to the full post

High watermark for durable pub/sub

In the ZeroMQ pub/sub pattern, if we set an identity for a subscriber, the publisher keeps the data in queue till it is actually delivered to the subscriber. That means we could safely disconnect and then reconnect that subscriber without losing data, but it means also that the publisher could easily eat a huge share of memory in the attempt of keeping the required data waiting in its internal queue.

The high watermark is a technique to have some advantage of a durable connection without be forced to pay a too high price in term of memory usage.

It is quite easy to modify a subscriber for durability. Basically we could take the synchronized subscriber we have already written, and specify its identity before connecting it to the publisher:
zmq::context_t context(1);

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.setsockopt(ZMQ_IDENTITY, "Hello", 5);
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
subscriber.connect("tcp://localhost:5565");
// ...

If we don't change the synchronized publisher code, we are exposed to the risk of an unlimited growth in memory request. To avoid that, we could determine how many messages we want to store. Say that we want to set it to 2:

zmq::socket_t publisher(context, ZMQ_PUB);
__int64 hwm = 2; // 1
publisher.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm)); // 2
publisher.bind("tcp://*:5565");

1. I am developing for VC++ 2010, as you can see from the 64 bit int type used here.
2. The high watermark has to be set before binding the socket to its endpoint.

Post based on the Z-Guide C durable publisher-subscriber example.

Go to the full post

Pub/Sub with envelope

A typical use for ZeroMQ multipart messages is to implement the message envelope concept.

A message envelope is a place where we store additional information related to the message but not explicitly part of it, as could be an address.

In the publisher/subscriber pattern context, an envelope, seen as the first frame of the multipart message, has the interesting property of being the only area filtered by the subscriber.

Let see a publisher that sends indefinitely a couple of multipart messages per second:
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5563");

std::string aEnv("A"); // 1
std::string aCon("This is A message");
std::string bEnv("B");
std::string bCon("This is B message");

while(true)
{
std::cout << '.'; // 2

zmq::message_t ma1((void*)aEnv.c_str(), aEnv.length(), NULL); // 3
zmq::message_t ma2((void*)aCon.c_str(), aCon.length(), NULL);
publisher.send(ma1, ZMQ_SNDMORE); // 4
publisher.send(ma2); // 5

zmq::message_t mb1((void*)bEnv.c_str(), bEnv.length(), NULL);
zmq::message_t mb2((void*)bCon.c_str(), bCon.length(), NULL);
publisher.send(mb1, ZMQ_SNDMORE);
publisher.send(mb2);

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 6
}

1. We are going to send this couple of messages, envelope and content as defined here, all over again.
2. Some feedback to show that the server is actually alive.
3. We use the ZeroMQ zero-copy idiom, the data sent is actually stored in the C++ std::string.
4. The envelope is marked as frame of a multipart message.
5. The content is marked as last frame in a message.
6. Take a sleep, using boost to be more portable.

And here is the C++ code for the subscribers:
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); // 1

while(true)
{
zmq::message_t envelope;
zmq::message_t content;
subscriber.recv(&envelope); // 2
subscriber.recv(&content); // 3

std::cout << "Envelope: ";
dumpMessage(envelope); // 4
std::cout << " content: ";
dumpMessage(content);
}

1. As a filter is expected a C-string that could be passed to the client as an argument from the command line.
2. The client is pending on its SUB socket, waiting for a message that matches with its filter. Remember that multipart messages are managed atomically, and the filtering works accordingly. So if no match is found in the envelope, the complete multipart message is discarded, as one would expect.
3. We know that the message is made by two frames, so we simply get the second one. Usually we should implement a more robust check on the RCVMORE socket option to ensure that there actually is next frame, before receiving it. See the post on multipart messages for details.
4. Since a message data is not a real C-string, missing the terminating NUL character, we need a utility function to print it, something like this:
void dumpMessage(zmq::message_t& msg)
{
std::for_each((char*)msg.data(), (char*)msg.data() + msg.size(),
[](char c){ std::cout << c;});
std::cout << std::endl;
}

In the Z-Guide you'll find more on this issue and the original C source code on which I have based this example.

Go to the full post

Synchronized subscriber

We are about to write a C++ client for a synchronized ZeroMQ PUB/SUB application. More details in the cited post, and I guess it is a good idea to read before this one. Moreover, you should probably have a look at the post on the server, before going on reading here.

First thing, the client sets up its ZeroMQ context, and create a SUB socket connected to the server PUB, with an empty filter:
zmq::context_t context(1);

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5561");
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);

Then it ensures the server is available, checking if it is emitting an hello message:
{
std::cout << "Waiting for server" << std::endl;
zmq::message_t hello;
subscriber.recv(&hello);
std::cout << "Received hello message from server" << std::endl;
}

The server is up, so we connect to its reply socket to let it know another client is ready:
{
zmq::socket_t sync(context, ZMQ_REQ); // 1
sync.connect("tcp://localhost:5562");

zmq::message_t message(MSG_MESSAGE); // 2
sync.send(message);
sync.recv(&message); // 3
}

1. The request socket is used just here, so we create it in a local scope, letting it disappear at the end of it.
2. Send a synchronization request.
3. wait for synchronization reply.

The client main loop:
std::cout << "ready to receive messages" << std::endl;
int nMsg = 0; // 1
for(bool done = false; !done;) // 2
{
zmq::message_t message;
subscriber.recv(&message); // 3

switch(message.size()) // 4
{
case MSG_MESSAGE: // 5
++nMsg;
break;
case MSG_HELLO: // 6
std::cout << "Server is still waiting for other SUB" << std::endl;
break;
case MSG_END: // 7
default:
done = true;
break;
}
}

std::cout << nMsg << " messages received" << std::endl;

1. Counter for the messages actually received, we expect no loss in the communication.
2. We loop until we'll detect the terminator message. As explained talking about the server, we are using the hackish convention of sending the information just in the message size, and not in its actual body - not a very clean approach, I admit it, but it makes the code more concise.
3. Hang waiting for the next message from the server.
4. Check what the server has sent, as said in (2), we just have to check the message size to get it.
5. We have got a new proper message. The counter is increased to keep track of it.
6. When not all the expected clients are connected to the server, the already connected clients get hello messages that have no use here.
7. The message is a terminator (or something unexpected caught by the default label)

If you wonder why I wrote this post, the answer is buried in the Z-Guide.

Go to the full post

Synchronized publisher

We are about to write a C++ server for a synchronized ØMQ PUB/SUB application. More details in the cited previous post, here we are more focused on the code itself.

We have three different kind of messages in our application: normal, control, terminator. We don't care much of real data exchange, and setting the text in a ZeroMQ message is a bore - so we bypass it in an hackish way just setting the length of the message, and not the real data. To make it a bit more readable to the casual reader, here is a few constants that we are about to use:
const int MSG_MESSAGE = 0;
const int MSG_HELLO = 1;
const int MSG_END = 2;

First part of the server code is the usual ZeroMQ setup, a context and the required sockets are created:
zmq::context_t context(1);

zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5561");

zmq::socket_t sync(context, ZMQ_REP);
sync.bind("tcp://*:5562");

Then we have the first task of the server, detecting when all the expected subscribers are connected:

int currentSubscribers = 0;
while(currentSubscribers < SUB) // 1
{
sayHello(publisher); // 2
if(checkSubscribers(sync)) // 3
++currentSubscribers;
}

1. SUB is the expected number of subscribers.
2. sayHello() send an hello message on the PUB socket, see below.
3. checkSubscribers() returns true if a client shows up, sending a message to the REP socket, see below.
void sayHello(zmq::socket_t& socket)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
zmq::message_t hello(MSG_HELLO);
socket.send(hello);
}

bool checkSubscribers(zmq::socket_t& socket)
{
zmq::message_t ack;
if(socket.recv(&ack, ZMQ_NOBLOCK)) // 1
{
socket.send(ack);
return true;
}
return false;
}

1. Could be interesting remarking that we make here a non blocking call to recv() because we don't want to hang on the socket till it receives a message, we just check if there is any message pending, if not we simply return false.

Now we can send the messages, followed by a terminator, to the subscribers:
for(int i = 0; i < 100000; ++i)
{
zmq::message_t message(MSG_MESSAGE);
publisher.send(message);
}
zmq::message_t message(MSG_END);
publisher.send(message);


Post written while reading the Z-Guide.

Go to the full post

Node Coordination

Pair sockets have been designed thinking about threads coordination, and don't work so well in a multi-process architecture, since they miss the automatic reconnecting capability that is so useful in that case.

Coordinating nodes is better achieved using a pair of request/reply sockets. In the Z-Guide you can see a way to do that introducing a one second sleep to ensure no message gets lost. A better solution, requiring a bit more coding, is left as exercise. And this is what we are about to do here.

We want to design an application based on a pub/sub server that is going to send a number of messages to a bunch of clients and then terminate. The interesting part is that it starts sending them only when it knows there is a predetermined number of clients listening.

The server sends hello messages using its publisher socket to let know everyone out there that it is alive and kicking. Each client checks for this hello message, when it gets it, it sends a message to the server on the REQ/REP connection. The server increase the number of listeners and goes on looping till it reaches the expected audience size.

Then it sends all the real messages, using the PUB socket, and finally a terminator, to let know the clients the job is completed.

The client uses a SUB socket, set with an empty filter, to check on the hello message from the server. Then it sends on its REQ socket a message to the server, and waits to receive an answer. At that point it is ready to receive the "real" messages, so it starts looping on the SUB socket. Notice that it could get three different type of messages: a real message, an hello message (the server sends them till all the clients show up), a terminator message.

In the next posts we'll write the code, but now we are taking a short timeout.

Go to the full post

Coordinating threads with pair sockets

The coordination among different threads is achieved in ZeroMQ using messages. Typically two threads get connected using pair sockets and sending/receiving messages to each other to exchange information. As example we are about to write a multithread application designed in this way: - The main thread creates a secondary thread, does some preparation job, then waits for the other thread to signal back, an finally completing its functionality. - The secondary thread creates another thread, that represents the real first step in the job, then when it is done, it lets it know to step 2. Let's see a way to implement the first step:
void step1(zmq::context_t& context) // 1
{
    print("step 1");
    boost::this_thread::sleep(boost::posix_time::milliseconds(50)); // 2

    zmq::socket_t xmitter(context, ZMQ_PAIR); // 3
    xmitter.connect("inproc://step2");

    boost::this_thread::sleep(boost::posix_time::milliseconds(50)); // 4
    print("tell to step2 we're ready");

    zmq::message_t message(0);
    xmitter.send(message); // 5

    print("step1 done");
}
1. Remember that the ZeroMQ context is the only object that could be shared among different threads in the same process. Actually we must use the same context, if we want different threads to connect. 2. Simulation of a real job. 3. Pair socket used to signal when this thread is done. Notice it is connected to step2 using the inproc protocol. 4. Some other things to do. 5. Send a message to signal we are done. The second step is a bit more complicated, since we have to manage two connections, one to step one, the other to the main thread:
void step2(zmq::context_t& context)
{
    print("step2");

    {
        print("step2A");

        zmq::socket_t receiver(context, ZMQ_PAIR); // 1
        receiver.bind("inproc://step2");

        print("creating thread for step1");
        boost::thread t1(std::bind(step1, std::ref(context))); // 2

        print("doing something");
        boost::this_thread::sleep(boost::posix_time::milliseconds(150)); // 3

        zmq::message_t message;
        receiver.recv(&message); // 4

        print("signal received form step1");
        t1.join();
    }

    {
        print("step2B");

        zmq::socket_t xmitter(context, ZMQ_PAIR); // 5
        xmitter.connect("inproc://main");

        print("doing something else");
        boost::this_thread::sleep(boost::posix_time::milliseconds(150));

        print("signal back to main");
        zmq::message_t message(0);
        xmitter.send(message); // 6
    }

    print("step2 done");
}
1. This pair socket estabilishes a connection to step 1. 2. The step 1 is executed in another thread. 3. Some useful job. 4. Pending on the socket, waiting the OK message to continue from step 1. 5. A second pair socket, this one is used to let a message going from this thread to main. 6. The job assigned to step 2 has been completed. And this the main code:
print("entering main function");
zmq::context_t context(1); // 1

print("bind inproc socket before starting step2");
zmq::socket_t receiver(context, ZMQ_PAIR); // 2
receiver.bind("inproc://main");

print("creating thread for step 2");
boost::thread t2(std::bind(step2, std::ref(context))); // 3

print("doing some preparation job");
boost::this_thread::sleep(boost::posix_time::milliseconds(200));

print("wait for step 2 to be completed");
zmq::message_t message;
receiver.recv(&message); // 4

t2.join();
print("job done");
1. This context object is the only one created for this process, and it is shared among all the threads. 2. We use a pair socket so that we can get a signal from step 2 when it completes. 3. A thread that runs on the function step2() is created. 4. This thread hangs here waiting for a message on the socket. I have left out till this moment the code for the logging function print(). It is defined in the anonymous local namespace - that means, it is local to the current file scope - and it uses a mutex, defined in the same context:
boost::mutex mio; // 1

void print(const char* message)
{
    boost::lock_guard<boost::mutex> lock(mio); // 2

    std::cout << boost::this_thread::get_id() << ": " << message << std::endl;
}
1. Since all the threads are going to compete on a shared resource, in this case the standard output console, we need a mutex. 2. Lock on the mutex, to avoid mixing up when writing the message. You can find the C code on which this C++ example is based in the Z-Guide.

Go to the full post

Multithreading with ZeroMQ

If you are used to classical multithreading, you are going to be surprised from the approach taken by ZeroMQ. Mutexes and locks are not normally used, and the communication among different threads is usually performed through ØMQ sockets.

A ZeroMQ multithreading application is designed keeping in mind a few rules:

- Each process has its own ZeroMQ context, that is the only object that should be shared among threads. Nothing else, ZeroMQ socket included, should be shared.
- The threads in a process are connected by inproc sockets.
- A thread could have its own ZeroMQ context, but in this case it can't be connected to other threads in the same process using an inproc socket.

In this post, I use the C++ interface to ØMQ 2.1, if you are using version 3.1, you could be interested in another post, where I have also implemented a graceful way to shutdown the worker threads.

A well designed ZeroMQ application could be easily modified to switch from a multiprocess to a multithread support. For instance, let's have a look at the ZeroMQ broker example we have just seen.

The client doesn't change at all. It is going to have a REQ socket like this:
zmq::socket_t socket(context, ZMQ_REQ);
socket.connect("tcp://localhost:5559");
And send/receive through it a message and its reply as generated by the server.

The changes are all in the server. Originally it was built as a bunch of processes connected to the broker, now we rewrite it as a single multithreaded process. The broker is not running anymore in its own process, but it is part of the server itself.

This is a possible C++ implementation for the server main routine:
boost::thread_group threads; // 1
try
{
   zmq::context_t context(1);
   zmq::socket_t clients(context, ZMQ_ROUTER);
   clients.bind("tcp://*:5559");

   zmq::socket_t workers(context, ZMQ_DEALER);
   zmq_bind(workers, "inproc://workers"); // 2

   for(int i = 0; i < threadNbr; ++i) // 3
      threads.create_thread(std::bind(&doWork, std::ref(context))); // 4

   zmq::device(ZMQ_QUEUE, clients, workers); // 5
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}
threads.join_all(); // 6
1. To make the application more portable, we can use the Boost Thread library. Here we create a group of threads, that would contain all the service threads.
2. This is the major change in the code. The DEALER socket does not expects anymore a connection from other processes, but from other inproc sockets.
3. In the variable threadNbr we have precedently put the number of concurrent service we want to run. This value could be passed as input argument, or read from a configuration file.
4. We create a new thread, specifying the code it has to run, and a parameter that should be passed by reference to the actual function - doWork(). The parameter is the ZeroMQ context that, as we said, is the only object that we expect to be shared among different threads.
5. As before, we use ZeroMQ queue device to do the dirty job.
6. Currently this code is never executed, since the device is expected to run forever. But a more refined implementation should in any case take care of cleaning up.

The doWork() function is very close to the code that was executed by any single process in the previous version. The main differences are that here we don't create a new context, but use the one passed as parameter, enjoying the fact that it is thread-safe; and that the reply socket connects inproc to the broker:
void doWork(zmq::context_t& context)
{
   try
   {
      zmq::socket_t socket(context, ZMQ_REP); // 1
      socket.connect("inproc://workers"); // 2
      while(true)
      {
         zmq::message_t request;
         socket.recv(&request);

         std::string data((char*)request.data(), (char*)request.data() + request.size());
         std::cout << "Received: " << data << std::endl;

         boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3

         zmq::message_t reply(data.length());
         memcpy(reply.data(), data.c_str(), data.length());
         socket.send(reply);
      }
   }
   catch(const zmq::error_t& ze)
   {
      std::cout << "Exception: " << ze.what() << std::endl;
   }
}
1. The socket is created using the process context.
2. Given (1), we can connect the socket inproc.
3. Using Boost sleep we write code easier to port on different platforms.

You can find the C code on which this C++ example is based in the Z-Guide.

Go to the full post

Broker by ZeroMQ built-in device

We have seen in the previous post how is conceptually easy to implement a ZMQ broker that connects request/reply sockets using a pair of router/dealer sockets.

One could notice that the core of the broker consists in setting up a polling on its sockets and then repeatedly call that chunk of code we appropriately moved in a specific function we called receiveAndSend(). And he could say that, being this structure so common, it could be worth move it in an utility function and make it generally available.

The ZeroMQ guys had the same feeling, so we have an API function, zmq_device() that has exactely this purpose.

The C++ code for our broker could be simplified like this:
try
{
zmq::context_t context(1);

zmq::socket_t frontend(context, ZMQ_ROUTER);
frontend.bind("tcp://*:5559");

zmq::socket_t backend(context, ZMQ_DEALER);
backend.bind("tcp://*:5560");

zmq::device(ZMQ_QUEUE, frontend, backend);
}
catch(const zmq::error_t& ze)
{
std::cout << "Exception: " << ze.what() << std::endl;
}

We simply call zmq::device(), specifying that we want the messages managed in a queue, and that is it.

The Z-Guide gives you more information on the ØMQ devices.

Go to the full post

Broker for request/reply

We have used the ZeroMQ request/reply pattern to create a very simple echo application that connects synchronously a client, that asks for an echo of its message, and a server, that provides the service.

Now we want to extend that application, so that we could have many server and many clients, and even have a way to change their number dynamically.

How could we do that? Adding something in between that take cares of the extra complexity we want. This something is called broker and it does the dirty job of getting a request from a client, sending it to a server, and then doing the same, but the other way round, with the reply.

The changes in the client code are minimal. The just have to ensure the request socket is now connecting to the broker. So, if it is on localhost at port 5559 we'll have this:
socket.connect("tcp://localhost:5559");
More relevant are the changes in the service that provides a reply. Actually it is not anymore a proper server, it is now seen as a broker client that connects to it in the REP role. So its socket would be created an setup like this:
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.connect("tcp://localhost:5560");
Besides, for testing purpose makes sense to add in the code a delay between receiving and sending back the message, maybe using the Boost sleep() function:
// some time expensive job
boost::this_thread::sleep(boost::posix_time::seconds(1));
The fun stuff, as one could expect, is in the broker. It can't use REQ and REP sockets, since we want it to be asynchronous, so it uses ROUTER and DEALER sockets, that interfaces with request and reply, but leaving us the flexibility of a non-blocking interface.

Here is a C++ possible implementation:
try
{
   zmq::context_t context(1);

   zmq::socket_t frontend(context, ZMQ_ROUTER); // 1
   frontend.bind("tcp://*:5559");

   zmq::socket_t backend(context, ZMQ_DEALER); // 2
   backend.bind("tcp://*:5560");

   const int NR_ITEMS = 2;
   zmq_pollitem_t items[NR_ITEMS] =
   {
      { frontend, 0, ZMQ_POLLIN, 0 },
      { backend, 0, ZMQ_POLLIN, 0 }
   };

   while(true)
   {
      zmq::poll(items, NR_ITEMS); // 3

      if(items[0].revents & ZMQ_POLLIN) // frontend sent a message
         receiveAndSend(frontend, backend);
      if(items[1].revents & ZMQ_POLLIN) // backend sent a message
         receiveAndSend(backend, frontend);
   }
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}
1. This ROUTER socket connects to the REQ socket, accepting its request.
2. The DEALER socket is used to connect to the REP socket, to forward to it the client request.
3. The broker hangs waiting for messages from both sides of its connections.

The heart of the broker is the receiveAndSend() function that acts as a mediator between the REQ and REP sockets. Let see how I have implemented it:
namespace
{
   size_t size = sizeof(__int64);

   void receiveAndSend(zmq::socket_t& from, zmq::socket_t& to)
   {
      __int64 more;
      do {
         zmq::message_t message;
         from.recv(&message);
         from.getsockopt(ZMQ_RCVMORE, &more, &size);

         to.send(message, more ? ZMQ_SNDMORE : 0);
      } while(more);
   }
}
Even though our REQ/REP message exchange is meant to involve only non-multipart messages, our broker should actually manage them. The fact is that the ROUTER/DEALER protocol has to keep track in some way of the origin/destination of the traffic, and this is done adding a frame in the message - we don't care what it is actually in it, at least for the moment, but we should manage it correctly, if we want our application to work.

More details on broker router/dealer sockets in the official Z-Guide, where you could also find the original C code on which I have based the C++ code have seen above.

Go to the full post