Pages

Boost::thread as data member

We want to write a C++ stopwatch, a way counting seconds giving the user a way to start and stopping it. We'll do that by a class having a boost::thread as private data member, and letting the user interact with it through a couple of public methods that implement the required functionality.
This is the sort of class declaration that I am thinking about:
class StopWatch : boost::noncopyable

{
private:
int count_; // 1
bool terminate_; // 2
boost::mutex mx_; // 3
boost::thread th_; // 4
void run(); // 5
public:
StopWatch() : count_(0), terminate_(false) {}
void start();
int stop(); // 6
};

1. The current stopwatch value.
2. A flag to manage the user request to the stopwatch to stop. It is going to be shared by two threads, so it requires a mutex.
3. Used to let each thread to work safely on the object status.
4. This boost::thread would care about the counter management.
5. The local thread would run on this function.
6. This function stops the clock, and returns its current value.

Some TDD

Before writing the actual code, I designed a few test cases to help me to going through the development. I used Google Test to do that. If you are interested in the matter, you could have a look to a few posts I wrote, starting from the one about how to set it up.

Here is just the first test I wrote, it should be easy for the reader to figure out the other ones:
TEST(TestStopWatch, NormalBehaviour)

{
StopWatch sw; // 1
sw.start(); // 2
int delay = 3;
boost::this_thread::sleep(boost::posix_time::seconds(delay)); // 3
EXPECT_EQ(delay, sw.stop()); // 4
}

1. We create an instance of the class.
2. The stopwatch starts.
3. We let this thread sleep for a while ...
4. and then check that stopping the watch we get the expected value.

We should care about robustness for our code, so I have written another few tests to check what happens if I stop a watch already stopped, or I try to start it when already started (nothing).

One useful aspect of writing test cases in advance, is that the developer should think of details that could be slipped from the design. For instance, in this case I found out that originally I had forgot to determine how the stopwatch should react if started, stopped, and then restarted again. I pondered a few seconds on the matter, and in the end I have arbitrarily chosen to let the counting going on without resetting.

Public member functions

I have written only a simple default ctor that, beside setting the termination flag to false, set the counter to zero. It could be useful to let the user to initialize the counter with a specific value and, if you want, you can easily improve in this sense the class, adding the relative test cases. Notice that the class extends boost::noncopyable, so that copy ctor and assignment operator are undefined and inaccessible.

The start function is short and cool. Well, we could discuss about its coolness degree, but for sure it is short:
void StopWatch::start()

{
boost::lock_guard<boost::mutex> lock(mx_); // 1
if(th_.joinable()) // 2
return;
terminate_ = false; // 3
th_ = boost::thread(&StopWatch::run, this); // 4
}

1. We are about to change the object status, we want to do it in a safe way, so we go through the mutex.
2. If the thread is joinable, it it already been started. We don't want to do anything more.
3. The status flag is already set by the ctor, but we need to reset it in case of restarting.
4. We are calling here the boost::thread move copy ctor, that rely on the new C++11 (AKA C++0x) RValue reference concept. Maybe this line would have been more readable if written like:
boost::thread temp(&Counter::run, this);

th_ = temp.move();

In this way we explicitely show that firstly we create a temporary boost::thread object that runs on the run() method of the current object, then we assign it to the member boost::thread object.

If we run step by step the code, we could appreciate how the member boost::thread object is intially created in a not-a-thread status, its thread_info are set to nullptr, and after [4] its value is swapped with the one in the temporary boost::thread object.

Given the start() function its stop() companion comes quite natural:
int StopWatch::stop()

{
{ // 1
boost::lock_guard<boost::mutex> lock(mx_);
if(!th_.joinable()) // 2
return -1;
else // 3
terminate_ = true;
}
th_.join(); // 4
return count_;
}

1. We want to limit the locking on the mutex on this part of the function code, so we open a specific scope.
2. If the member boost::thread is not joinable, then doesn't make any sense stopping it, we return a negative value to signal that something unexpected happened.
3. Otherwise we mark that the user asked he wants the clock to stop.
4. We let the running thread to be completed, then return the current value for the counter.

Private member function

Here is the code actually executed by the worker thread, on user request:
void StopWatch::run()

{
bool terminate; // 1
do {
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2
std::cout << ++count_ << ' ';
{ // 3
boost::lock_guard<boost::mutex> lock(mx_);
terminate = terminate_;
}
} while(!terminate);
}

1. To minimize the span of the critical region, we use a local variable to check the object status.
2. This toy stopwatch counts seconds. The thread sleeps for one second and then increase the counter.
3. This is the critical section where we read the current value for the object status, and we copy it to the local variable we are about to use to check when we have to stop counting.

There was already a similar post, where a class is designed to allow a delayed startup of a thread, but I think that this current example could be a bit more interesting.

Go to the full post

Running a few boost::thread's

Conceptually speaking, running a boost::thread is not a big deal. We create a thread object passing to it the piece of code we want to run for it, and that's it. The boost::thread object takes care of the entire life span of the actual thread.

But let's give some more details on how we could define the code that we want to run, having a look to a simple example that uses an old plain free function, member functions, static and not, and a functor.

Getting feedback

The code I am about to write is quite simple, but still it make sense to have an utility function that dumps to standard output a timestamp and a message. And since we are in a multithreaded world, we should pay attention to carefully shield by mutex the access to shared resources:
boost::mutex mio; // 1
void print(int flag, const char* msg)
{
  boost::lock_guard<boost::mutex> lock(mio); // 2
  std::cout << boost::posix_time::microsec_clock::universal_time() << ' ' // 3
    << flag << ": " << msg << " " << boost::this_thread::get_id() << std::endl;
}
1. Mutex used for avoid clashes when writing to the standard output console.
2. We achieve a simple RAII for acquiring and releasing a mutex through lock_guard.
3. This Boost Date_Time function gives us the current time in a portable way.

Free function

Here is a plain free function that we want to run in a separate thread. Notice that we call yield(), function available from each thread, even if not associated to a boost::thread object, at the end of any loop cycle, so that we notify to the system that the current thread could temporary stop running, if required to:
void runner(int times)
{
  for(int i=0; i < times; ++i)
  {
    print(i, "runner");
    boost::this_thread::yield();
  }
}
We can create a new thread running that free function in this way:
const int TIMES = 10;
boost::thread t1(runner, TIMES);
The first argument we pass to the ctor is the function address we want to run, then we pass all the arguments required by the function itself.

Static member function

There is no "this" pointer for a static class member, that is just included in the class namespace. So we don't expect a big difference between using free functions and static member functions in a Boost Thread context.

Considering this class:
class ARunner
{
public:
  static void runner(int times);
};

void ARunner::runner(int times)
{
  for(int i=0; i < times; ++i)
  {
    print(i, "static runner");
    boost::this_thread::yield();
  }
}
We can run a thread on its static member function like this:
boost::thread t2(ARunner::runner, TIMES);
We simply have to specify the fully qualified function name, so that the compiler could find it.

Functor

A functor is nothing more than a plain class for which is defined the operator(), so that an instance of that class could be used where a functional parameter is expected. Here is an example of such a beast:
class FunRunner : public boost::noncopyable // 1
{
private:
  unsigned const int TIMES_;
public:
  FunRunner(unsigned int times) : TIMES_(times) {}
  void operator()()
  {
    for(unsigned int i=0; i < TIMES_; ++i)
    {
      print(i, "functor runner");
      boost::this_thread::yield();
    }
  }
};
The ctor set an internal constant, and the operator() uses it in its looping.

Here is the code that uses our functor to create and run a new thread:
FunRunner fr(TIMES);
boost::thread t3(std::ref(fr));
Notice that we passed the functor by reference to the boost::thread constructor, using the std::ref() wrapper. If we won't do that, we would need a useless copy of the functor just for the boost::thread object internal usage. But we designed the functor to be non-copyable, so the code wouldn't compile at all.

Member function

Let's add a normal member function to the class we just created:
class FunRunner : public boost::noncopyable
{
// ...
public:
  void runner(const char* msg)
  {
    for(unsigned int i=0; i < TIMES_; ++i)
    {
      print(i, msg);
      boost::this_thread::yield();
    }
  }
// ...
};
Creating a boost::thread that runs this runner() function is a tad more complicated than the other ways:
boost::thread t4(&FunRunner::runner, &fr, "generic member function runner");
The constructor needs to know the function name, then it needs the "this" pointer to the class object, and finally all the parameters required from the function we want to run in the new thread.

Yielding and joining in main()

As we have seen, yield() is a free function that could be called from any thread, so we can call it from the master thread, in our main() function:
for(int i=0; i < TIMES; ++i)
{
  print(i, "main");
  boost::this_thread::yield();
}
In this sense, there is no difference among the different threads, we ask the system to run them, or we give it an hint that we could stop running for a while, but it is the operating system that decides what it is actually and where - if our application is running on a multiprocessor / multicore hardware.

The difference is that the thread that creates other threads should wait till the end of the job of its spawn before returning, and this is done by calling join() on the boost::thread objects:
t1.join();
t2.join();
t3.join();
t4.join();
This list on joins looks a bit funny, we could have made it more professional using a boost::thread_group, but we'll see it in a next post.

Go to the full post

Joining a POSIX thread

Once you create a new thread on a specified function, your original main thread is free to do whatever it likes. It could even terminate the program execution - and that would be no fun for the other thread, since it would like to have its time to do its job, too.

So, usually it is a good idea to wait until the spawned threads complete their life before terminating. To do that, in term of POSIX API, we use the pthread_join() function.Referring to the previous post, where we created the new thread like this:
pthread_t thread;

pthread_create(&thread, /* ... */);
We can now wait that the other thread completes its run calling:
pthread_join(thread, NULL);
The second parameter, when not NULL, would refer to the memory location where the value returned by the function on which that thread runs should be stored.

This function too returns zero if the specified thread joins happily, otherwise a non-zero value showing something bad happened.

Go to the full post

Creating a POSIX thread

If you are working in plain C, or even if you are working in C++ and your compiler does not support yet the friendly std::thread class (and for some obscure reason you could not use boost::thread) you should rely on your platform dependent thread API.

If you are developing for some UNIX/Linux system, there are good chances you have at hand a POSIX compliant thread API.

Here we are going to see how to create a new thread using this API.

We should pass to the new thread a pointer to the function we want to run. That function should accept in input a pointer to void and return another pointer to void. Say that we expect the caller passing as parameter a C-string, and we want jut to output it to standard output. We could write something like this:
void* task(void* arg)
{
    printf("%s\n", (const char*)arg);
    return NULL;
}
This code is painfully sensitive, we should trust the caller is actually passing what we are expecting, since we have no way to let the compiler ensure that the parameter is really a C-string.

Before calling the pthread API function to create a new thread, we need at least a variable to store the id for the newly created thread, and possibly another variable to store the integer returned from the function call, that is an error code set to zero in case success.

The resulting code should look something like this:
pthread_t thread; // 1
int res = pthread_create(&thread, NULL, task, (void*)"hello"); // 2
if(res != 0) // 3
    printf("Can't create a thread: %d\n", res);
1. The posix thread creator function expects an object of type pthread_t for storing the id of the created thread.
2. Here is how we call pthread_create(). First parameter is a pointer to a pthread_t object - this is an output parameter; second one is a pointer to the attributes we want to set for the new thread, for the moment let's assume we'll go with the default ones, and just pass a NULL to let the posix thread library know that we don't want to set anything fancy; third one is a pointer to the function the thread should run; and lastly we have the parameter we want to pass to that function.
3. A failure is signaled by returning a non-zero value. We can't actually doing much here, just giving a warning to the user.

Go to the full post

High watermark for durable pub/sub

In the ZeroMQ pub/sub pattern, if we set an identity for a subscriber, the publisher keeps the data in queue till it is actually delivered to the subscriber. That means we could safely disconnect and then reconnect that subscriber without losing data, but it means also that the publisher could easily eat a huge share of memory in the attempt of keeping the required data waiting in its internal queue.

The high watermark is a technique to have some advantage of a durable connection without be forced to pay a too high price in term of memory usage.

It is quite easy to modify a subscriber for durability. Basically we could take the synchronized subscriber we have already written, and specify its identity before connecting it to the publisher:
zmq::context_t context(1);

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.setsockopt(ZMQ_IDENTITY, "Hello", 5);
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
subscriber.connect("tcp://localhost:5565");
// ...

If we don't change the synchronized publisher code, we are exposed to the risk of an unlimited growth in memory request. To avoid that, we could determine how many messages we want to store. Say that we want to set it to 2:

zmq::socket_t publisher(context, ZMQ_PUB);
__int64 hwm = 2; // 1
publisher.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm)); // 2
publisher.bind("tcp://*:5565");

1. I am developing for VC++ 2010, as you can see from the 64 bit int type used here.
2. The high watermark has to be set before binding the socket to its endpoint.

Post based on the Z-Guide C durable publisher-subscriber example.

Go to the full post

Pub/Sub with envelope

A typical use for ZeroMQ multipart messages is to implement the message envelope concept.

A message envelope is a place where we store additional information related to the message but not explicitly part of it, as could be an address.

In the publisher/subscriber pattern context, an envelope, seen as the first frame of the multipart message, has the interesting property of being the only area filtered by the subscriber.

Let see a publisher that sends indefinitely a couple of multipart messages per second:
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5563");

std::string aEnv("A"); // 1
std::string aCon("This is A message");
std::string bEnv("B");
std::string bCon("This is B message");

while(true)
{
std::cout << '.'; // 2

zmq::message_t ma1((void*)aEnv.c_str(), aEnv.length(), NULL); // 3
zmq::message_t ma2((void*)aCon.c_str(), aCon.length(), NULL);
publisher.send(ma1, ZMQ_SNDMORE); // 4
publisher.send(ma2); // 5

zmq::message_t mb1((void*)bEnv.c_str(), bEnv.length(), NULL);
zmq::message_t mb2((void*)bCon.c_str(), bCon.length(), NULL);
publisher.send(mb1, ZMQ_SNDMORE);
publisher.send(mb2);

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 6
}

1. We are going to send this couple of messages, envelope and content as defined here, all over again.
2. Some feedback to show that the server is actually alive.
3. We use the ZeroMQ zero-copy idiom, the data sent is actually stored in the C++ std::string.
4. The envelope is marked as frame of a multipart message.
5. The content is marked as last frame in a message.
6. Take a sleep, using boost to be more portable.

And here is the C++ code for the subscribers:
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); // 1

while(true)
{
zmq::message_t envelope;
zmq::message_t content;
subscriber.recv(&envelope); // 2
subscriber.recv(&content); // 3

std::cout << "Envelope: ";
dumpMessage(envelope); // 4
std::cout << " content: ";
dumpMessage(content);
}

1. As a filter is expected a C-string that could be passed to the client as an argument from the command line.
2. The client is pending on its SUB socket, waiting for a message that matches with its filter. Remember that multipart messages are managed atomically, and the filtering works accordingly. So if no match is found in the envelope, the complete multipart message is discarded, as one would expect.
3. We know that the message is made by two frames, so we simply get the second one. Usually we should implement a more robust check on the RCVMORE socket option to ensure that there actually is next frame, before receiving it. See the post on multipart messages for details.
4. Since a message data is not a real C-string, missing the terminating NUL character, we need a utility function to print it, something like this:
void dumpMessage(zmq::message_t& msg)
{
std::for_each((char*)msg.data(), (char*)msg.data() + msg.size(),
[](char c){ std::cout << c;});
std::cout << std::endl;
}

In the Z-Guide you'll find more on this issue and the original C source code on which I have based this example.

Go to the full post

Synchronized subscriber

We are about to write a C++ client for a synchronized ZeroMQ PUB/SUB application. More details in the cited post, and I guess it is a good idea to read before this one. Moreover, you should probably have a look at the post on the server, before going on reading here.

First thing, the client sets up its ZeroMQ context, and create a SUB socket connected to the server PUB, with an empty filter:
zmq::context_t context(1);

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5561");
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);

Then it ensures the server is available, checking if it is emitting an hello message:
{
std::cout << "Waiting for server" << std::endl;
zmq::message_t hello;
subscriber.recv(&hello);
std::cout << "Received hello message from server" << std::endl;
}

The server is up, so we connect to its reply socket to let it know another client is ready:
{
zmq::socket_t sync(context, ZMQ_REQ); // 1
sync.connect("tcp://localhost:5562");

zmq::message_t message(MSG_MESSAGE); // 2
sync.send(message);
sync.recv(&message); // 3
}

1. The request socket is used just here, so we create it in a local scope, letting it disappear at the end of it.
2. Send a synchronization request.
3. wait for synchronization reply.

The client main loop:
std::cout << "ready to receive messages" << std::endl;
int nMsg = 0; // 1
for(bool done = false; !done;) // 2
{
zmq::message_t message;
subscriber.recv(&message); // 3

switch(message.size()) // 4
{
case MSG_MESSAGE: // 5
++nMsg;
break;
case MSG_HELLO: // 6
std::cout << "Server is still waiting for other SUB" << std::endl;
break;
case MSG_END: // 7
default:
done = true;
break;
}
}

std::cout << nMsg << " messages received" << std::endl;

1. Counter for the messages actually received, we expect no loss in the communication.
2. We loop until we'll detect the terminator message. As explained talking about the server, we are using the hackish convention of sending the information just in the message size, and not in its actual body - not a very clean approach, I admit it, but it makes the code more concise.
3. Hang waiting for the next message from the server.
4. Check what the server has sent, as said in (2), we just have to check the message size to get it.
5. We have got a new proper message. The counter is increased to keep track of it.
6. When not all the expected clients are connected to the server, the already connected clients get hello messages that have no use here.
7. The message is a terminator (or something unexpected caught by the default label)

If you wonder why I wrote this post, the answer is buried in the Z-Guide.

Go to the full post

Synchronized publisher

We are about to write a C++ server for a synchronized ØMQ PUB/SUB application. More details in the cited previous post, here we are more focused on the code itself.

We have three different kind of messages in our application: normal, control, terminator. We don't care much of real data exchange, and setting the text in a ZeroMQ message is a bore - so we bypass it in an hackish way just setting the length of the message, and not the real data. To make it a bit more readable to the casual reader, here is a few constants that we are about to use:
const int MSG_MESSAGE = 0;
const int MSG_HELLO = 1;
const int MSG_END = 2;

First part of the server code is the usual ZeroMQ setup, a context and the required sockets are created:
zmq::context_t context(1);

zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5561");

zmq::socket_t sync(context, ZMQ_REP);
sync.bind("tcp://*:5562");

Then we have the first task of the server, detecting when all the expected subscribers are connected:

int currentSubscribers = 0;
while(currentSubscribers < SUB) // 1
{
sayHello(publisher); // 2
if(checkSubscribers(sync)) // 3
++currentSubscribers;
}

1. SUB is the expected number of subscribers.
2. sayHello() send an hello message on the PUB socket, see below.
3. checkSubscribers() returns true if a client shows up, sending a message to the REP socket, see below.
void sayHello(zmq::socket_t& socket)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
zmq::message_t hello(MSG_HELLO);
socket.send(hello);
}

bool checkSubscribers(zmq::socket_t& socket)
{
zmq::message_t ack;
if(socket.recv(&ack, ZMQ_NOBLOCK)) // 1
{
socket.send(ack);
return true;
}
return false;
}

1. Could be interesting remarking that we make here a non blocking call to recv() because we don't want to hang on the socket till it receives a message, we just check if there is any message pending, if not we simply return false.

Now we can send the messages, followed by a terminator, to the subscribers:
for(int i = 0; i < 100000; ++i)
{
zmq::message_t message(MSG_MESSAGE);
publisher.send(message);
}
zmq::message_t message(MSG_END);
publisher.send(message);


Post written while reading the Z-Guide.

Go to the full post

Node Coordination

Pair sockets have been designed thinking about threads coordination, and don't work so well in a multi-process architecture, since they miss the automatic reconnecting capability that is so useful in that case.

Coordinating nodes is better achieved using a pair of request/reply sockets. In the Z-Guide you can see a way to do that introducing a one second sleep to ensure no message gets lost. A better solution, requiring a bit more coding, is left as exercise. And this is what we are about to do here.

We want to design an application based on a pub/sub server that is going to send a number of messages to a bunch of clients and then terminate. The interesting part is that it starts sending them only when it knows there is a predetermined number of clients listening.

The server sends hello messages using its publisher socket to let know everyone out there that it is alive and kicking. Each client checks for this hello message, when it gets it, it sends a message to the server on the REQ/REP connection. The server increase the number of listeners and goes on looping till it reaches the expected audience size.

Then it sends all the real messages, using the PUB socket, and finally a terminator, to let know the clients the job is completed.

The client uses a SUB socket, set with an empty filter, to check on the hello message from the server. Then it sends on its REQ socket a message to the server, and waits to receive an answer. At that point it is ready to receive the "real" messages, so it starts looping on the SUB socket. Notice that it could get three different type of messages: a real message, an hello message (the server sends them till all the clients show up), a terminator message.

In the next posts we'll write the code, but now we are taking a short timeout.

Go to the full post

Coordinating threads with pair sockets

The coordination among different threads is achieved in ZeroMQ using messages. Typically two threads get connected using pair sockets and sending/receiving messages to each other to exchange information. As example we are about to write a multithread application designed in this way: - The main thread creates a secondary thread, does some preparation job, then waits for the other thread to signal back, an finally completing its functionality. - The secondary thread creates another thread, that represents the real first step in the job, then when it is done, it lets it know to step 2. Let's see a way to implement the first step:
void step1(zmq::context_t& context) // 1
{
    print("step 1");
    boost::this_thread::sleep(boost::posix_time::milliseconds(50)); // 2

    zmq::socket_t xmitter(context, ZMQ_PAIR); // 3
    xmitter.connect("inproc://step2");

    boost::this_thread::sleep(boost::posix_time::milliseconds(50)); // 4
    print("tell to step2 we're ready");

    zmq::message_t message(0);
    xmitter.send(message); // 5

    print("step1 done");
}
1. Remember that the ZeroMQ context is the only object that could be shared among different threads in the same process. Actually we must use the same context, if we want different threads to connect. 2. Simulation of a real job. 3. Pair socket used to signal when this thread is done. Notice it is connected to step2 using the inproc protocol. 4. Some other things to do. 5. Send a message to signal we are done. The second step is a bit more complicated, since we have to manage two connections, one to step one, the other to the main thread:
void step2(zmq::context_t& context)
{
    print("step2");

    {
        print("step2A");

        zmq::socket_t receiver(context, ZMQ_PAIR); // 1
        receiver.bind("inproc://step2");

        print("creating thread for step1");
        boost::thread t1(std::bind(step1, std::ref(context))); // 2

        print("doing something");
        boost::this_thread::sleep(boost::posix_time::milliseconds(150)); // 3

        zmq::message_t message;
        receiver.recv(&message); // 4

        print("signal received form step1");
        t1.join();
    }

    {
        print("step2B");

        zmq::socket_t xmitter(context, ZMQ_PAIR); // 5
        xmitter.connect("inproc://main");

        print("doing something else");
        boost::this_thread::sleep(boost::posix_time::milliseconds(150));

        print("signal back to main");
        zmq::message_t message(0);
        xmitter.send(message); // 6
    }

    print("step2 done");
}
1. This pair socket estabilishes a connection to step 1. 2. The step 1 is executed in another thread. 3. Some useful job. 4. Pending on the socket, waiting the OK message to continue from step 1. 5. A second pair socket, this one is used to let a message going from this thread to main. 6. The job assigned to step 2 has been completed. And this the main code:
print("entering main function");
zmq::context_t context(1); // 1

print("bind inproc socket before starting step2");
zmq::socket_t receiver(context, ZMQ_PAIR); // 2
receiver.bind("inproc://main");

print("creating thread for step 2");
boost::thread t2(std::bind(step2, std::ref(context))); // 3

print("doing some preparation job");
boost::this_thread::sleep(boost::posix_time::milliseconds(200));

print("wait for step 2 to be completed");
zmq::message_t message;
receiver.recv(&message); // 4

t2.join();
print("job done");
1. This context object is the only one created for this process, and it is shared among all the threads. 2. We use a pair socket so that we can get a signal from step 2 when it completes. 3. A thread that runs on the function step2() is created. 4. This thread hangs here waiting for a message on the socket. I have left out till this moment the code for the logging function print(). It is defined in the anonymous local namespace - that means, it is local to the current file scope - and it uses a mutex, defined in the same context:
boost::mutex mio; // 1

void print(const char* message)
{
    boost::lock_guard<boost::mutex> lock(mio); // 2

    std::cout << boost::this_thread::get_id() << ": " << message << std::endl;
}
1. Since all the threads are going to compete on a shared resource, in this case the standard output console, we need a mutex. 2. Lock on the mutex, to avoid mixing up when writing the message. You can find the C code on which this C++ example is based in the Z-Guide.

Go to the full post

Multithreading with ZeroMQ

If you are used to classical multithreading, you are going to be surprised from the approach taken by ZeroMQ. Mutexes and locks are not normally used, and the communication among different threads is usually performed through ØMQ sockets.

A ZeroMQ multithreading application is designed keeping in mind a few rules:

- Each process has its own ZeroMQ context, that is the only object that should be shared among threads. Nothing else, ZeroMQ socket included, should be shared.
- The threads in a process are connected by inproc sockets.
- A thread could have its own ZeroMQ context, but in this case it can't be connected to other threads in the same process using an inproc socket.

In this post, I use the C++ interface to ØMQ 2.1, if you are using version 3.1, you could be interested in another post, where I have also implemented a graceful way to shutdown the worker threads.

A well designed ZeroMQ application could be easily modified to switch from a multiprocess to a multithread support. For instance, let's have a look at the ZeroMQ broker example we have just seen.

The client doesn't change at all. It is going to have a REQ socket like this:
zmq::socket_t socket(context, ZMQ_REQ);
socket.connect("tcp://localhost:5559");
And send/receive through it a message and its reply as generated by the server.

The changes are all in the server. Originally it was built as a bunch of processes connected to the broker, now we rewrite it as a single multithreaded process. The broker is not running anymore in its own process, but it is part of the server itself.

This is a possible C++ implementation for the server main routine:
boost::thread_group threads; // 1
try
{
   zmq::context_t context(1);
   zmq::socket_t clients(context, ZMQ_ROUTER);
   clients.bind("tcp://*:5559");

   zmq::socket_t workers(context, ZMQ_DEALER);
   zmq_bind(workers, "inproc://workers"); // 2

   for(int i = 0; i < threadNbr; ++i) // 3
      threads.create_thread(std::bind(&doWork, std::ref(context))); // 4

   zmq::device(ZMQ_QUEUE, clients, workers); // 5
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}
threads.join_all(); // 6
1. To make the application more portable, we can use the Boost Thread library. Here we create a group of threads, that would contain all the service threads.
2. This is the major change in the code. The DEALER socket does not expects anymore a connection from other processes, but from other inproc sockets.
3. In the variable threadNbr we have precedently put the number of concurrent service we want to run. This value could be passed as input argument, or read from a configuration file.
4. We create a new thread, specifying the code it has to run, and a parameter that should be passed by reference to the actual function - doWork(). The parameter is the ZeroMQ context that, as we said, is the only object that we expect to be shared among different threads.
5. As before, we use ZeroMQ queue device to do the dirty job.
6. Currently this code is never executed, since the device is expected to run forever. But a more refined implementation should in any case take care of cleaning up.

The doWork() function is very close to the code that was executed by any single process in the previous version. The main differences are that here we don't create a new context, but use the one passed as parameter, enjoying the fact that it is thread-safe; and that the reply socket connects inproc to the broker:
void doWork(zmq::context_t& context)
{
   try
   {
      zmq::socket_t socket(context, ZMQ_REP); // 1
      socket.connect("inproc://workers"); // 2
      while(true)
      {
         zmq::message_t request;
         socket.recv(&request);

         std::string data((char*)request.data(), (char*)request.data() + request.size());
         std::cout << "Received: " << data << std::endl;

         boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3

         zmq::message_t reply(data.length());
         memcpy(reply.data(), data.c_str(), data.length());
         socket.send(reply);
      }
   }
   catch(const zmq::error_t& ze)
   {
      std::cout << "Exception: " << ze.what() << std::endl;
   }
}
1. The socket is created using the process context.
2. Given (1), we can connect the socket inproc.
3. Using Boost sleep we write code easier to port on different platforms.

You can find the C code on which this C++ example is based in the Z-Guide.

Go to the full post