Showing posts with label thread. Show all posts
Showing posts with label thread. Show all posts

A thread-safe DateFormatter via ThreadLocal

The exercise 2-b at the end of chapter two from the book Java 8 Lambdas by Richard Warburton asks to wrap a DateFormatter in a ThreadLocal so to make it thread safe.

We have spotted in legacy code, designed to work in a single thread context, something like:
DateFormatter formatter = // ...

// ...

Calendar cal = Calendar.getInstance();
cal.set(Calendar.YEAR, 1970);
cal.set(Calendar.MONTH, Calendar.JANUARY);
cal.set(Calendar.DAY_OF_MONTH, 1);
String formatted = formatter.getFormat().format(cal.getTime());
We know that DateFormatter, a Swing class used to format java util Date, is non thread safe. And now we plan to use this code in a multithreaded environment. We need to refactor it.

If we need to stick to the DateFormatter, here is a possible solution:
ThreadLocal<DateFormatter> formatter =  // 1
    ThreadLocal.withInitial(  // 2
        () -> new DateFormatter(  // 3
            new SimpleDateFormat("dd-MMM-yyyy", Locale.ENGLISH)));  // 4
1. Wrapping an instance of it in a ThreadLocal saves our day, since each thread has its own copy of the variable.
2. Since Java 8, the withInitial() static method let us create a ThreadLocal passing a Supplier that is going to be used to initialize the object.
3. That's the Supplier. Nothing is passed in, and it returns a new DateFormatter.
4. The DateFormatter will be constructed from this SimpleDateFormat built on the fly specifying the format as a string. Notice the second parameter, I want the dates to be localized in English whichever is the default locale.

Job done. Still it would be nice to ...

Switch to java.time

The classes in java.time have been designed for working correctly in multithreading. So, getting rid of Calendar and DateFormatter for LocalDate and DateTimeFormatter, would lead to code simpler and more robust, like this:
DateTimeFormatter formatter8 = DateTimeFormatter.ofPattern("dd-MMM-yyyy", Locale.ENGLISH);
// ...

LocalDate aDate = LocalDate.of(1970, 1, 1);
String formatted = formatter8.format(aDate);
I have pushed the above Java code in my GitHub repository forked from the one gently provided by the author.
Follow the links to see the Question2 solution and its test case.

Go to the full post

Another asynchronous wait on a steady timer

This is a new version of an oldish Boost ASIO example of mine about asynchronously waiting on a timer, keeping advantage of C++11 features. If you are looking for something simpler, there's another post on the same matter but more focused on the bare ASIO functionality. Or you could go straight to the original source, the official tutorial on Boost.

Five years later, I found out that this post requires some adjustments. You could follow the link to its March 2018 version.

The main function of this example is spawning a new thread, that runs a function that does something indefinitely. But before creating the new thread, it would set an asynchronous timer, calling a function on its expiration that would cause the runner to terminate.

It makes sense to encapsulate both function in a single class, like this:
class MyJob
{
private:
    MyJob(const MyJob&) = delete; // 1
    const MyJob& operator=(const MyJob& ) = delete;

    std::mutex mx_; // 2
    bool expired_;
public:
    MyJob() : expired_(false) {}

    void log(const char* message) // 3
    {
        std::unique_lock<std::mutex> lock(mx_);
        std::cout << message << std::endl;
    }

    void timeout() // 4
    {
        expired_ = true;
        log("Timeout!");
    }

    void operator()() // 5
    {
        for(int i = 0; !expired_; ++i)
        {
            std::ostringstream os;
            os << '[' << i << ']';

            log(os.str().c_str());
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }
};
1. I don't want an object of this class to be copyable, we'll see later why. So I remove from this class interface copy constructor and assignment operator, using the C++11 equals-delete marker.
2. There are two threads insisting on a shared resource (the standard output console), a mutex is needed to rule its access.
3. The shared resource is used in this function only. A lock on the member mutex takes care of protecting it.
4. When the timer expires, it is going to call this method.
5. This function contains the job that is going to run in another thread. Nothing fancy, actually. Just a forever loop with some logging and sleeping. The timeout is going to change the loop control variable, so that we can have a way out.

This is the user code for the class described above:
boost::asio::io_service io; // 1
boost::asio::steady_timer timer(io, std::chrono::seconds(3)); // 2

MyJob job; // 3
timer.async_wait([&job](const boost::system::error_code&) {
  job.timeout();
}); // 4

std::thread thread(std::ref(job)); // 5
io.run();
thread.join();
1. An ASIO I/O service is created.
2. I create a steady timer (that is just like an old deadline timer, but uses the C++11 chrono functionality) on the I/O service object.
3. An object that describes the job I want to run in another thread is instantiated.
4. When the timer expires, the passed lambda function is executed. It is an asynchronous call, so it returns immediately the control, that passed to the next instruction (5). The lambda would call the timeout() method on the job object, that has been captured by reference. Having defined the MyJob class as non-copyable, forgetting the ampersand, passing the job by value, results in a compiler error. Here I don't care about the error code parameter, that is set by ASIO to say if the timer has expired correctly or with an error. I just stop the job running. In a real-life usage a check would be expected.
5. Before running the I/O service, I create a thread on our job - again passed by reference, as the std::ref() shows. Again, trying to pass it by value would result in compiler errors.

Full C++ code for this example is on Github.

Go to the full post

Condition variable on a queue

It's a typical situation in a multithreaded scenario. We have a couple of tasks, and they should synchronize on a shared data structure. To keep things simple, one produces data, the other one consumes them. Here I write a solution to this problem using C++11 synchronization primitives, and a STL queue as data structure.

The important point in this post is how to use a standard condition variable to make a STL container safe for multithreading. I have already written a couple of post on the same theme. In the first one I write a threadsafe queue by extending the STL one, in the second one I use the same approach I am using here, creating a class that rule the access to a pure STL queue, ensuring a synchronized behavior.

The main difference is that two years are passed in the meantime, and now I am using GCC 4.8 that supports C++11 to the point that I could avoid any reference to Boost libraries. In any case it would pretty easy to adapt the code to a C++03 compiler and let Boost doing the job.

Still, remember that multithreading support is not enable by default on GCC. If you don't want to get a runtime exception (with a message like this: "Enable multithreading to use std::thread: Operation not permitted") you'd better say to the linker that you want the pthread library to be linked (by -l option).

How the thing works

I wrote this little class, that acts as a wrapper around a queue:
class Conditional
{
private:
    std::queue<int> queue_; // 1
    std::condition_variable cond_; // 2
    std::mutex mutex_;

public:
    void producer(); // 3
    void consumer(); // 4
};
1. The container that is shared between the two working task I am about to use.
2. As we'll see soon, the synchronization on the queue is performed using a condition variable and a mutex.
3. The producer task runs this method, that generates data and put them in the queue.
4. This is for the consumer task, it uses data in the queue.

A client code for this class would work in this way:
Conditional c;

std::thread tc(&Conditional::consumer, &c); // 1
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 2
std::thread tp(&Conditional::producer, &c);

tc.join();
tp.join();
1. Create and start the consumer task.
2. Before creating the producer, let sleep for a tad, so that we are sure the consumer starts first (and won't find anything to consume).

The sleep between the two thread creation makes sense (obviously?) only for an informal tester as this one. You wouldn't write such a thing in production code (again, obviously?).

Producer

The job that the producer task performs is pretty boring, just pushing ten elements in the queue. I follow the convention that a zero should be interpreted as an "end of transmission" message:
void producer()
{
    for(int i = 10; i >= 0; --i)
    {
        std::unique_lock<std::mutex> lock(mutex_); // 1
        std::cout << "producing " << i << std::endl;
        queue_.push(i);
        cond_.notify_one(); // 2
    }
}
1. Before accessing the shared resources, the task acquires a lock on the designed mutex. The cool thing about unique_lock is that we don't have to explicitly release the mutex, if we don't have any reason to do it. We can rely on its destructor to do this job (it follows the RAII paradigm). Notice that in this case the mutex protects both the queue and the standard output console. Usually it is cleaner to use a mutex for each resource.
2. Once the queue has changed, we notify (one single task pending on) the condition about it.

Consumer

The consumer job is a bit more interesting:
void consumer()
{
    while(true) // 1
    {
        std::unique_lock<std::mutex> lock(mutex_); // 2
        while(queue_.empty()) // 3
        {
            std::cout << "waiting queue" << std::endl;
            cond_.wait(lock); // 4
        }
        int message = queue_.front(); // 5
        queue_.pop();

        if(!message) // 6
        {
            std::cout << "terminating" << std::endl;
            return;
        }

        std::cout << "consuming " << message << std::endl;
    }
}
1. Loop "forever", waiting for elements to be pushed in the queue.
2. Try to acquire access to the queue. Notice (again) that the mutex is actually used to protect both the queue and the output console. This is not usually a good idea.
3. Before reading, we ensure there is actually anything in it.
4. The queue is empty. Let's wait on the condition. The lock sets the mutex free, until it gets a notification that something has changed. Notice that I put the wait in a while-loop, because we should beware of spurious wakeup. In a few words, it is possible (albeit not common) to get a false positive on a condition variable, putting a wait in a if-loop could lead to rare and unexpected failures.
5. We not the queue is not empty, get an element and remove it.
6. Following the convention that a zero element means "that's all folks"

Full C++ source code for this example is on github. I have added a couple of bonus commented variations there, in the consumer code:
  • After the wait on the condition variable, I checked if the wakeup is real or spurious. I have never get a spurious one. But this is not a guarantee of anything, you should still beware of spurious wakeup. If you don't care about logging and checking, you can rewrite this block in a single line:
    cond_.wait(lock, [this] { return !queue_.empty(); });
    The while-loop now is hidden inside the condition variable wait() call.
  • After consuming a message, just once in a while, I added a short sleep, to see what changed in the interaction between threads on the queue. You could have a relative fun playing with it too.
Notice that there is no guarantee on how the scheduler select a thread to run, you could get any time a different output from this program.

Go to the full post

A very simple REQ-ROUTER example

This is the simplest ZeroMQ REQ-ROUTER example I could conceive. I have written it for ØMQ 2.2 on Windows for MSVC, using the C++ built-in wrapper, with an extension to manage easier multipart messages (you can find it as an include file on github). I have made use of STL, Boost, and some C++11 support, but nothing impressive.

The application runs on just one process, on which a few client threads are created, each of them using its own REQ socket, all of them connected to the same ROUTER socket running on another thread. The protocol used is tcp, so we can easily port the example to a multiprocess setting.

Each client sends a single message to the server. The server gets all the messages, it knows in advance how many clients are going to connect to it, and then it sends back them all through the sockets.

The code will make all clearer. Here is an excerpt of the main function:
boost::thread_group threads;
for(int i = 0; i < nReq; ++i) // 1
    threads.create_thread(req4router); // 2
threads.create_thread(std::bind(router, nReq)); // 3
threads.join_all();
1. nReq is the number of REQ clients that we want the application to have. You should get it from the user and ensure it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REQ client runs its own copy of the req4router() function, see below.
3. The router runs on the router() function. It needs to know how many clients to expect.

As said, I used the tcp protocol, and this is how I defined the socket addresses:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
Both client and server make use of an utility function named dumpId() that I won't show in this post, please have a look on the previous posts to have an idea how it should work, but I guess you can figure it out by yourself. It just print to std::cout the strings are passed to it. It takes care of locking on a mutex, since the console is a shared resource we need to protect it from concurrent accesses. Besides it also print the thread id, for debugging purpose.

Here is the function executed by each client:
void req4router()
{
    dumpId("REQ startup");
    zmq::context_t context(1);
    
    zmq::Socket skReq(context, ZMQ_REQ); // 1
    skReq.connect(SK_ADDR_CLI); // 2
    dumpId("REQ CLI on", SK_ADDR_CLI);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    skReq.send(id); // 4
    std::string msg = skReq.recvAsString();
    dumpId(msg.c_str());
}
1. zmq::Socket is my zmq::socket_t subclass.
2. Let's connect this REQ socket as a client to the ROUTER socket, that plays as a server.
3. To check if everything works as expected, we should send a unique message. The thread id looks a good candidate for this job. To convert a Boost thread id to a string we need to explicitly cast it through lexical_cast.
4. Send and receiving on a socket go through specialized methods in zmq::Socket that take care of the dirty job of converting from standard string to ZeroMQ buffers.

And this is the server:
void router(int nReq)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 1
    skRouter.bind(SK_ADDR_SRV);
    dumpId("ROUTER SRV on", SK_ADDR_SRV);

    std::vector<zmq::Frames> msgs; // 2
    for(int i = 0; i < nReq; ++i)
    {
        msgs.push_back(skRouter.blockingRecv(3)); // 3
    }
    dumpId("all received");

    std::for_each(msgs.begin(), msgs.end(), [&skRouter](zmq::Frames& frames)
    {
        skRouter.send(frames); // 4
    });

    dumpId("ROUTER done");
}
1. This is the ROUTER socket accepting the connection from the REQ client sockets.
2. Frames is actually just a typedef for a vector of strings. In msgs we store all the messages coming from the clients.
3. zmq::Socket::blockingRecv() gets the specified number of frames (three, in this case) from the socket, and returns them as a zmq::Frames object. That object is pushed in the buffer vector. The first frame contains the address of the sender, the second one is just a separator, and the third is the actual payload.
4. Just echo back to each client the message it has sent. The lambda function passed to for_each() needs to know what skRouter is, that is way it is specified in the capture clause, as parameter.

Go to the full post

Monitoring broker state with PUB-SUB sockets

This is the first step in describing a fairly complex ZeroMQ sample application based on the example named Interbroker Routing in the ZGuide. We want to create a few interconnected brokers, so that we could exchange jobs among them. In order to do that, each broker should be able to signal to its peers when it has workers available for external jobs. We are going to do that using PUB-SUB socket connections.

Each broker as a PUB socket that would publish to all the other brokers its status, and a SUB socket that would get such information from the others.

My prototype implementation differs from the original ZGuide code for a few ways.

- It is written for Windows+MSVC, so I couldn't use the inproc protocol, that is not available on that platform. I have used tcp instead.
- I ported it from the czmq binding to the standard 0MQ 2.x light-weight C++ one, improved by a zmq::Socket class that provides support for multipart messages, feature that is missing in the original zmq::socket_t.
- The implementation language is C++, so in the code you will see some STL and Boost stuff, and even some C++11 goodies, as lambda function.
- The brokers run all in the same process. It wouldn't make much sense in a real life project, but it makes testing faster. In any case, it should be quite easy to refactor the code to let each broker running in its own process. A minor glitch caused by this approach is that I had to take care of concurrency when printing to standard output. That is the reason why all the uses of std::cout in the code are protected by mutex/lock.

As said above, in this first step each broker doesn't do much, just using the PUB/SUB sockets to share its state with its peers.

These are the addresses used for the broker state sockets, both for client and server mode:
const char* SKA_BROKER_STATE_CLI[] =
    {"tcp://localhost:5180", "tcp://localhost:5181", "tcp://localhost:5182"};
const char* SKA_BROKER_STATE_SRV[] =
    {"tcp://*:5180", "tcp://*:5181", "tcp://*:5182"};
Each broker prototype runs on a different thread the below described function, passing as broker an element of the second array above, and as peers the other two elements from the first array. So, for instance, the first broker is going to have "tcp://*:5180" as server address, and "tcp://localhost:5181", "tcp://localhost:5182" as peer:
void stateFlow(const char* broker, const std::vector<std::string>& peers)
{
    zmq::context_t context(1); // 1

    zmq::Socket stateBackend(context, ZMQ_PUB); // 2
    stateBackend.bind(broker);

    zmq::Socket stateFrontend(context, ZMQ_SUB); // 3
    stateFrontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    std::for_each(peers.begin(), peers.end(), [broker, &stateFrontend](const std::string& peer)
    {
        stateFrontend.connect(peer.c_str());
    });

    std::string tick("."); // 4
    zmq_pollitem_t items [] = { { stateFrontend, 0, ZMQ_POLLIN, 0 } }; // 5
    for(int i = 0; i < 10; ++i) // 6
    {
        if(zmq_poll(items, 1, 250 * 1000) < 0) // 7
            break;

        if(items[0].revents & ZMQ_POLLIN) // 8
        {
            zmq::Frames frames = stateFrontend.blockingRecv(2);
            dumpId(frames[0].c_str(), frames[1].c_str()); // 9
            items[0].revents = 0; // cleanup
        }
        else // 10
        {
            dumpId("sending on", broker);
            zmq::Frames frames;
            frames.reserve(2);
            frames.push_back(broker);
            frames.push_back(tick);
            stateBackend.send(frames);

            tick += '.';
            boost::this_thread::sleep(boost::posix_time::millisec(333)); // 11
        }
    }
}
1. Each broker has its own ZeroMQ context.
2. The stateBackend socket is a PUB that makes available the state of this broker to all the subscribing ones.
3. The stateFrontend socket is a SUB that gets all the messages from its subscribed publisher (as you see in the next line, no filter is set)
4. Currently we are not really interested on the information the broker is sending, so for the time being a simple increasing sequence of dots will do.
5. We poll on the frontend, waiting for messages coming from the peers.
6. I don't want to loop forever, just a few iterations are enough to check the system.
7. Poll for a quarter of second on the SUB socket, in case of error the loop is interrupted. This is the only, and very rough, error handling in this piece of code, but hey, it is just a prototype.
8. There is actually something to be read. I am expecting a two-part message, so I use a checked zmq::Socket::blockingRecv() that throws an exception (not caught here, meaning risk of brutal termination) if something unexpected happens.
9. Give some feedback to the user, dumpId() is just a wrapper for safely printing to standard output, adding as a plus the thread ID.
10. Only if no message has been received from peers, this broker sends its status.
11. As for (10), this line makes sense only because there is no real logic in here, the idea is that I don't want always the same broker to send messages an the other just receiving, so I put to sleep who sends, ensuring in this way it won't send again the next time.

The full C++ source code for this example is on github. The Socket class is there too.

Go to the full post

Async 0MQ app - worker

If you have already read the sort of informal design for this sample asynchronous application, then the post on the client code, and even the one on the server, you are ready to see how I have implemented the worker code.

In brief: this is a C++ ZeroMQ application, based on version 2.2, developed on Windows for MSVC2010, using the 0MQ C++ binding improved by defining a zmq::Socket class that extends the standard zmq::socket_t to simplify multipart messages management.

We have seen in the previous post that the server creates a (variable) number of worker threads and then expects these workers to be ready to work on the messages coming from the client. Here is that function:
void worker(zmq::context_t& context) // 1
{
    zmq::Socket skWorker(context, ZMQ_DEALER); // 2
    skWorker.connect(SK_BCK_ADDR);

    while(true)
    {
        zmq::Frames frames = skWorker.blockingRecv(2, false);
        if(frames.size() == 1) // 3
            break;

        int replies = rand_.getValue(); // 4
        for(int i =0; i < replies; ++i)
        {
            boost::this_thread::sleep(boost::posix_time::millisec(100 * rand_.getValue()));
            dumpId("worker reply");
            skWorker.send(frames);
        }
    }
}
1. Server and workers share the same context.
2. Each worker has its own DEALER socket that is connected (next line) to the DEALER socket on the server. SK_BCK_ADDR is a C-string specifying the inproc address used by the socket.
3. The "real" messages managed by the workers should be composed by two frames, the address of the client and the actual message payload. A mono-framed message is interpreted by the worker as a request from the server to shutdown.
4. To check the asynchronicity of the application, the worker has this strange behavior: it sends back to the client a random number of copies of the original message, each of them is sent with a random delay.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

Go to the full post

Async 0mq application - server

This sample ZeroMQ async application is described in a previous post, here I am commenting the server part of it, that is connected to the already seen clients by DEALER/ROUTER sockets, and to a few workers that we'll seen next, by DEALER/DEALER sockets.
The AsynchronousCS constructor creates a thread running on this function, part of the private section of the same class:
void server(int nWorkers) // 1
{
    zmq::context_t context(1);
    zmq::Socket skFrontend(context, ZMQ_ROUTER); // 2
    skFrontend.bind(SK_SRV_ADDR);

    zmq::Socket skBackend(context, ZMQ_DEALER); // 3
    skBackend.bind(SK_BCK_ADDR);

    boost::thread_group threads;
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(&AsynchronousCS::worker, this, std::ref(context))); // 4

    zmq_pollitem_t items [] =
    {
        { skFrontend, 0, ZMQ_POLLIN, 0 },
        { skBackend,  0, ZMQ_POLLIN, 0 }
    };

    while(true)
    {
        if(zmq_poll(items, 2, 3000 * 1000) < 1) // 5
            break;

        if(items[0].revents & ZMQ_POLLIN) // 6
        {
            zmq::Frames frames = skFrontend.blockingRecv(2);
            skBackend.send(frames);
            items[0].revents = 0; // cleanup
        }
        if(items[1].revents & ZMQ_POLLIN) // 7
        {
            zmq::Frames frames = skBackend.blockingRecv(2);
            skFrontend.send(frames);
            items[1].revents = 0; // cleanup
        }
    }

    for(int i = 0; i < nWorkers; ++i) // 8
        skBackend.send("");

    threads.join_all(); // 9
}
1. The number of workers used by the server is set by the user.
2. zmq::Socket is an extension of the zmq::socket_t as found in the default 0MQ C++ binding. This server socket is used to keep a connection the client (DEALER) sockets.
3. This second socket is used to keep a DEALER/DEALER connection with each worker.
4. A new thread is created for each worker (its code is discussed in the next post), notice that a reference to the ZeroMQ context is passed to each worker, so that all sockets on the server are sharing the same 0MQ context.
5. As a simple way of terminating the server execution, I set the limit of three seconds on the polling. If nothing is received on either socket in such a time frame, the indefinite loop is interrupted.
6. Input received on the frontend, the multipart message is read and passed to the backend.
7. Same as (6), but the other way round.
8. When the clients are not sending anymore messages, it is time to send a terminator to each worker. As a convention, a single empty message would be interpreted by the worker as a terminator.
9. Give time to the workers to terminate, and then the server is down.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

Go to the full post

Async 0mq application - DEALER/ROUTER client

Follow the link for a view on the asynchronous ZeroMQ application I am describing in these posts. Here I won't care of the big picture but I am focusing on the function that is executed by each thread running as a client.

Each client thread is created passing as argument the address of this function, defined as private in the class AsynchronousCS:
void client()
{
    zmq::context_t context(1); // 1
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skClient(context, ZMQ_DEALER, id); // 3
    skClient.connect(SK_CLI_ADDR);

    zmq_pollitem_t items[] = { { skClient, 0, ZMQ_POLLIN, 0 } };

    std::string message("message ");
    for(int i =0; i < 6; ++i) // 4
    {
        for(int heartbeat = 0; heartbeat < 100; ++heartbeat) // 5
        {
            zmq_poll(items, 1, 10 * 1000); // polls each 10 millis
            if(items[0].revents & ZMQ_POLLIN)
            {
                std::string msg = skClient.recvAsString(); // 6
                dumpId("client receives", msg); // 7

                items[0].revents = 0;
            }
        }
        dumpId("client sends", message);
        skClient.send(message);
        message += 'x';
    }

    while(zmq_poll(items, 1, 1000 * 1000) > 0) // 8
    {
        if(items[0].revents & ZMQ_POLLIN)
        {
            std::string msg = skClient.recvAsString();
            dumpId("coda", msg);
            items[0].revents = 0; // cleanup
        }
    }
}
1. Even if the client threads run in the same process as the server, each of them has its own ZeroMQ context, this makes it easy to refactor the application to more common solution where each client runs in its own process.
2. I am going to use the thead id as socket id.
3. zmq::Socket is my extension of the zmq::socket_t as found in the default 0MQ C++ binding. This constructor calls zmq_setsockopt() to set its socket ZMQ_IDENTITY to the passed id. This DEALER client socket is going to connect to a ROUTER server socket.
4. The client is going to send a few messages to the server.
5. We loop 100 times, polling on the socket with a limit of 10 millis. Expecting only a few messages in input, we could assume that this loop will take not much less than a second to be executed.
6. This zmq::Socket function takes care of receiving, on a zmq::message_t and converting it to a std::string.
7. Accessing the shared resource std::cout to dump a message requires a mutex/lock protection, all of that is taken care in this function.
8. As described in the previous post, as a way to keep the code simple, I have implemented no way for the server to tell to the client that the stream of messages is terminated, on the contrary, is the client itself that wait a "reasonable" amount of time (1 sec) after the last input message, before deciding that no more messages are going to come. This is an high unreliable solution, but for such a simple example should suffice.

Full source C++ code for this sample application is on github, where you can find the source for zmq::Socket too.

Go to the full post

An asynchronous client/server 0mq application - introduction

This example is based on the Asynchronous Client-Server ZGuide chapter. I have done a few slight changes, primarly to adapt it to Windows - MSVC2010, and to port the code, originally written for czmq, to use the C++ standard binding available for ZeroMQ version 2.x; actually, I developed and tested the code on the most recent currently available ØMQ version, 2.2.0.

It is a single process application, but this is just for simplify its development and testing. It could be easily rewritten splitting it in a server component and a client one.

Client

We could decide how many clients to run. Each client sends half a dozen messages, one (about) every second. We want it to act asynchronously, so we continuously poll for incoming messages between a send and the next one. Once we complete sending our messages, we keep waiting for a while for answers before terminating the execution.

This behavior is uncommon, but it has the advantage of letting the client to terminate cleanly without requiring a complex server.

To allow an asynchronous behavior, the socket connection between client and server will be a DEALER-ROUTER one.

Server

The server provides the ROUTER socket to which the client DEALER sockets connect. Its job is passing each message it gets from the clients to an available worker of its own, and then sending back to the right client the feedback returned by the worker.

Since we want the worker generate a variable number of replies to a single message coming from the client, we use a DEALER-DEALER socket connection.

As a simple way of terminating the server execution, I decided just to check the polling waiting period. If nothing is coming on the server socket for more than a few seconds, we can assume that it is time to shut down the server.

We have seen that the client already has its own rule to shutdown, so from the server point of view, we have to care only of the workers. The convention I use here is to send from the server an empty message to each worker, that it is going to interpret it as a terminator.

Worker

Each worker has its own DEALER socket connected to the server DEALER socket. Its job is getting a message and echoing it, not once, but a random number of times in [0..2], introducing also a random sleep period, just to make things a bit more fancy.

The messages managed by the worker should be composed by two frames, first one is the address of the sending client, second one is the actual payload. If we receive just one frame, it should be empty, and it should be considered a terminator. And what if that single-framed message is not empty? For simplicity sake, I just terminating the worker as it would be empty.

Class AsynchronousCS

Not much coding in this post, I save the fun stuff for the next ones. Here I just say that I have implemented the thing in a class, named AsynchronousCS, designed in this way:
class AsynchronousCS
{
private:
    boost::thread_group threads_; // 1
    MyRandom rand_; // 2

    void client(); // 3
    void server(int nWorkers); // 4
    void worker(zmq::context_t& context); // 5
public:
    AsynchronousCS(int nClients, int nWorkers) : rand_(0,2) // 6
    {
        for(int i = 0; i < nClients; ++i)
            threads_.create_thread(std::bind(&AsynchronousCS::client, this));
        threads_.create_thread(std::bind(&AsynchronousCS::server, this, nWorkers));
    }

    ~AsynchronousCS() { threads_.join_all(); } // 7
};
1. The main thread has to spawn a few threads, one for the server, a variable number of them for the clients. This thread group keeps track of all of them to make easy joining them.
2. Object of an utility class to generate the random numbers used by the worker.
3. Each client thread runs on this function.
4. The server thread runs on this function, the parameter is the number of workers that the server will spawn.
5. Each worker thread runs on this function. Notice that the workers share the same context of the server, while the client and server could be easily rewritten to work in different processes.
6. Constructor, requires the number of clients and workers for the application, instantiate the random generator object, specifying that we want it to generates values in the interval [0..2], and then creates the client and server threads.
7. Destructor, joins all the threads created in the ctor.

Using this class is just a matter of instantiating an object:
void dealer2router(int nClients, int nWorkers)
{
    AsynchronousCS(nClients, nWorkers);
}
The dealer2router() function hangs on the class dtor till the join completes.

In the next posts I'll show how I have implemented the client, server, and worker functions. Full source C++ code for this sample application is on github. To easily manage multipart messages, I use an extension of the zmq::socket_t class, as defined in the ZeroMQ standard C++ binding. I named it zmq::Socket and you can find it too on github.

Go to the full post

Post on ASIO strand

IMHO, the ASIO strand example on the official Boost tutorial is a bit too complex. Instead of focusing on the matter, it involves also some ASIO deadline_timer knowledge, that makes sense in the tutorial logic, but I'd say make think blurred.

So I have written a minimal example that I hope would result more intuitive as a first introduction to this concept.

This post is from April 2012, and it is now obsolete, please follow the link to the updated version I have written on March 2018.

We have a class designed to be used in a multithread context, it has as data member a resource that is meant to be shared, and we have a couple of functions that modify that shared value, and could be called from different threads.

Usually what we do is relying on mutexes and locks to synchronize the access to the shared resource. ASIO provides us the strand class, as a way to serialize the execution of the works posted to it, making unnecessary explicit synchronization. But be aware that this is true only for the functions going to the same strand.

We want to write a piece of code like this:
namespace ba = boost::asio;

// ...

ba::io_service aios;
Printer p(aios, 10); // 1
boost::thread t(std::bind(&ba::io_service::run, &aios)); // 2
aios.run(); // 3
t.join(); // 4
1. See below for the class Printer definition. In a few words, it is going to post the execution of a couple of its functions on ASIO, both of them acting on the same shared resource.
2. We run a working thread on the ASIO I/O service.
3. Also the main thread is running on ASIO.
4. Wait for the worker completion, than end the execution.

So we have two threads running on ASIO. Let's see now the Printer class private section:
class Printer
{
private:
    ba::strand strand_; // 1
    int count_; // 2

    void print(const char* msg) // 3
    {
        std::cout << boost::this_thread::get_id() << ' ' << msg << ' ' << count_ << std::endl;
    }
    
    void print1() // 4
    {
        if(count_ > 0)
        {
            print("print one");
            --count_;
            strand_.post(std::bind(&Printer::print1, this));
        }
    }

// ...
};
1. We are going to operate on a Boost Asio strand object.
2. This is our shared resource, a simple integer.
3. A utility function that dumps to standard output console the current thread ID, a user message, and the shared resource.
4. Client function for (3), if the shared resource count_ is not zero, it calls (3), than decreases count_ and post through the strand a new execution of this function. There is another private function, print2(), that is exactly like print1(), it just logs a different message.

Since we are in a multithread context, these function should look suspicious. No mutex/lock? No protection to the access of count_? And, being cout an implicitly shared resource, we are risking to get a garbled output too.

Well, these are no issues, since we are using a strand.

But let's see the Printer ctor:
Printer(ba::io_service& aios, int count) : strand_(aios), count_(count) // 1
{
    strand_.post(std::bind(&Printer::print1, this)); // 2
    strand_.post(std::bind(&Printer::print2, this));
}
1. Pay attention to how the private ASIO strand object is constructed from the I/O service.
2. We prepare the first runs, posting on the strand the execution of the private functions.

What happens is that all the works posted on the strand are sequentially executed. Meaning that a new work starts only after the previous one has completed. There is no overlapping, no concurrency, so no need of locking. Since we have two threads available, ASIO will choose which one to use for each work execution. We have no guarantee on which thread is executed what.

We don't have the troubles associated with multithreading, but we don't have some of its advantages either. Namely, when running on a multicore/multiprocessor machine, a strand won't use all the available processing power for its job.

The full C++ source code for this example is on github.

Go to the full post

LRU Queue Broker version two

When working with the extended zmq socket I have introduced in the previous post, writing a ZeroMQ application like the LRU queue device previously described, gets easier.

OK, the comparison is not 100% fair, because when rewriting the example, I have not just used my brand new zmq::Socket, but I have also changed a bit the application login. Now the client is not sending an integer used by the worker to do some job (actually, sleeping), but just a string (its ID) that would be echoed back. It wouldn't be difficult to reintroduce the original behavior in this version, but I found that in this way the example is more readable. Besides, this version lacks almost completely of any error handling. This does not differ much from the previous version, still you don't want to keep such a wishful attitude in your production code.

[After a while, I have changed my Socket class to manage differently multipart messages, this impacted on this code, see my post where I say something more on the broker changes, still, the general structure of the code shown here is the same]

Saying that, here is how the client function looks now, compare it with the original LRU device client:
void client(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skClient(context, ZMQ_REQ, id); // 1
    skClient.connect(SK_ADDR_FRONTEND);
    dump(id, "client is up");

    skClient.send(id); // 2
    dump(id, "client request sent");

    std::string reply = skClient.recvAsString(); // 3
    dump(reply, "received by client");
}
1. Create a ØMQ request socket with the specified ID.
2. Send a std::string on the socket.
3. Receive a std::string on the socket.

The code is slimmer, terser, easier to write, read, and modify.

If you liked the impact of zmq::Socket on the client, you are going to love what it does to the LRU device worker:
void worker(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skWorker(context, ZMQ_REQ, id);
    skWorker.connect(SK_ADDR_BACKEND);

    zmq::Frames frames(2); // 1
    while(true)
    {
        skWorker.send(frames); // 2

        frames = skWorker.blockingRecv(2, false); // 3
        if(frames.size() != 2)
        {
            dump(id, "terminating");
            return;
        }
        dump(frames[0], "client id");
        dump(frames[1], "payload");
    }
}
1. Remember that Frames is a std::vector of std::string's. Here we are creating it with two elements set to empty.
2. First time we send a multipart message containing just empty frames (as expected), otherwise the received multipart message is sent back to the caller.
3. The call returns a dummy Frames (for terminating) or a Frames with the client ID and the payload. If your C++ compiler is recent, and implements the C++11 move syntax, this assignment is going to be as cheap as a few pointers moved around.

The device job of receiving on back- and frontend gets cleaner too:
void receivingOnBackend()
{
    zmq::Frames input = backend_.blockingRecv(3, false); // 1

    dump(input[0], "registering worker");
    qWorkers_.push(input[0]);

    if(input.size() == 3)
    {
        zmq::Frames output(input.begin() + 1, input.end()); // 2
        frontend_.send(output);
    }
}

void receivingOnFrontend()
{
    zmq::Frames input = frontend_.blockingRecv(2); // 3

    dump(input[0], "client id received on frontend");
    dump(input[1], "payload received on frontend");

    std::string id = qWorkers_.front();
    qWorkers_.pop();
    dump(id, "selected worker on frontend");

    zmq::Frames output;
    output.reserve(3); // 4
    output.push_back(id);
    output.insert(output.end(), input.begin(), input.end());

    backend_.send(output);
    dump(id, "message sent to worker");
}
1. The backend socket can receive two different kind of messages: just the worker ID, signalling that it is ready, or the worker ID followed by the client ID and the effective message payload.
2. If the backend sent a "real" message, the device should simply discard the worker ID (first element in the input Frames) and send the resulting multipart message to the frontend.
3. a message on the frontend is always made of two frames, a client ID, and the payload.
4. The device adds at the beginning a worker ID and then forward the multipart message as received by the client to the worker.

The full C++ code for this example is on github. I developed it for ZeroMQ 2.2.0 on MSVC 2010. Notice that the head version on github has differences with the code reported here, now it is not anymore a Socket responsibility to manage separators in multipart messages, but are managed in the user code, that means, here.

Go to the full post

LRU Queue Device - using the device

After seeing what LRU queue broker clients and workers do, now it is finally time to have a look at the device itself.

Recalling what we said in the first post dedicated to LRU queue broker, I have designed the device as a class that it is used in this way:
QueueDevice device;

device.start(nClients, nWorkers);
device.poll();
There are a couple of constants of which we should be aware of:
const char* SK_ADDR_BACKEND = "inproc://backend";
const char* SK_ADDR_FRONTEND = "inproc://frontend";
They represent the address used by the backend and the frontend. Notice that the inproc protocol is specified, since we are developing a ZeroMQ multithread application.

The private class data members are:
zmq::context_t context_;
zmq::socket_t backend_;
zmq::socket_t frontend_;
std::queue<std::string> qWorkers_;
boost::thread_group thWorkers_;
boost::thread_group thClients_;
Constructor, destructor, and start() method are small fish:
QueueDevice() : context_(1), // 1
    backend_(context_, ZMQ_ROUTER), frontend_(context_, ZMQ_ROUTER) // 2
{
    backend_.bind(SK_ADDR_BACKEND); // 3
    frontend_.bind(SK_ADDR_FRONTEND);
}

~QueueDevice() // 4
{
    thWorkers_.join_all();
    thClients_.join_all();
}

void start(int nClients, int nWorkers) // 5
{
    for(int i = 0; i < nWorkers; ++i)
        thWorkers_.create_thread(std::bind(worker, std::ref(context_))); // 6

    boost::thread_group thClients;
    for(int i = 0; i < nClients; ++i)
    {
        int root = 42 + i * 10;
        thClients.create_thread(std::bind(client, std::ref(context_), root)); // 7
    }
}
1. As required, we initialize the 0MQ context.
2. Our device is based on two ROUTER ZeroMQ sockets, that are connecting it to the backend (the workers), and the frontend (the clients).
3. We bind our sockets to the expected address, as specified above.
4. The class dtor takes care of joining on all the threads we created for workers and clients.
5. The user specifies how many clients and workers should be created.
6. For each worker is created a new thread, and each of them is attached to the function worker(), that has as a parameter a reference to the ZeroMQ context stored as private data member.
7. The client threads creation is similar to worker ones, with the minor change that one more parameter, an integer used to distinguish the different clients and making testing more interesting.

More interesting is the poll() function:
void poll()
{
    zmq_pollitem_t items [] = // 1
    {
        { backend_,  0, ZMQ_POLLIN, 0 },
        { frontend_, 0, ZMQ_POLLIN, 0 }
    };

    while(zmq_poll(items, qWorkers_.empty() ? 1 : 2, 1000000) > 0) // 2
    {
        if(items[0].revents & ZMQ_POLLIN) // 3
        {
            receivingOnBackend();
            items[0].revents = 0; // 4
        }
        if(items[1].revents & ZMQ_POLLIN)
        {
            receivingOnFrontend();
            items[1].revents = 0;
        }
    }

    while(!qWorkers_.empty()) // 5
    {
        std::string id = qWorkers_.front(); // 6
        qWorkers_.pop();

        dump(id, "Terminating worker");

        zmq::message_t zmAddress(id.length());
        memcpy(zmAddress.data(), id.c_str(), id.length());

        backend_.send(zmAddress, ZMQ_SNDMORE); // 7
        zmq::message_t zmEmpty;
        backend_.send(zmEmpty, ZMQ_SNDMORE); // 8
        backend_.send(zmEmpty); // 9
    }
}
1. To poll on sockets we need to define an array of zmq_pollitem_t items, where we specify which socket to poll, which kind of polling perform (here POLL IN, polling for messages coming in input), and we set the flag "revents" to zero. This last flag is the one specifying if something happened on that socket.
2. Interesting line. We are polling on the array of pollitems, but if there is no element in the qWorkers queue, meaning no worker is available, we don't even poll on the frontend. The sense is that if we don't have workers available to do the job required by a client, we won't know what to do with that request. The last parameter, one million, is the number of microseconds that poll hangs waiting for something to be received. That is, one second. If at least a socket has something to be received, we enter the loop, otherwise the job is done. That would do, for this toy application. But obviously it is not a reliable assumption for stable code.
3. We check each pollitem, to see which one has something pending to be received. A private function is called to managed each case, receivingOnBackend() and receivingOnFrontend(), we'll see them in the next post, that this one is getting too long.
4. Finally the flag is reset, making it ready for the next iteration.
5. We get here when the looping is considered completed. The clients should shutdown by themselves, but we have to take care personally of the workers. We send a terminator to each worker in queue.
6. Get the least recent worker in queue, and pop it.
7. Specify as address for the message we are sending through the backend socket, the worker id we popped by the queue.
8. Send an empty frame as separator, as required by the ZeroMQ transmission protocol.
9. And finally send a terminator to the worker socket.

The full C++ source code for this example is available on github.

Go to the full post

LRU Queue Device - worker

Before taking care of the LRU queue broker itself, and after having seen what its client does, it is time to look to the other side of the device, the worker threads.

Our device is designed to run a user defined number of workers. Each worker is associated to a thread, and each of them runs this function:
const char* SK_ADDR_BACKEND = "inproc://backend"; // 1

// ...

void worker(zmq::context_t& context) // 2
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    zmq::socket_t skWorker(context, ZMQ_REQ); // 4
    zmq_setsockopt(skWorker, ZMQ_IDENTITY, id.c_str(), id.length());
    skWorker.connect(SK_ADDR_BACKEND);

    std::string receiver;
    int payload = 0;
    while(true)
    {
        zmq::message_t zmReceiver(receiver.length()); // 5
        memcpy(zmReceiver.data(), receiver.c_str(), receiver.length());
        skWorker.send(zmReceiver, ZMQ_SNDMORE); // 5a

        zmq::message_t zmDummy;
        skWorker.send(zmDummy, ZMQ_SNDMORE); // 5b

        zmq::message_t zmOutput(sizeof(int));
        memcpy(zmOutput.data(), &payload, sizeof(int));
        skWorker.send(zmOutput); // 5c

        zmq::message_t zmClientId;
        skWorker.recv(&zmClientId); // 6
        dump(id, zmClientId);

        if(!zmClientId.size()) // 7
        {
            dump(id, "terminating");
            return;
        }
        const char* base = static_cast<const char*>(zmClientId.data());
        receiver = std::string(base, base + zmClientId.size()); // 8

        skWorker.recv(&zmDummy); // 9

        zmq::message_t zmPayload;
        skWorker.recv(&zmPayload); // 10
        if(zmPayload.size() != sizeof(int)) // 11
        {
            dump(id, "bad payload detected");
            return;
        }

        payload = *(int*)zmPayload.data(); // 12
        dump(id, payload);

        boost::this_thread::sleep(boost::posix_time::millisec(payload));
    }
}
1. We are developing a multithread ZeroMQ application, where the sockets are connected on the inproc protocol. The name I choose for the backend connection between the device and the workers is "backend".
2. As we should remember, the 0MQ context is the only object that could be shared among different threads, and actually it should be shared among them when, as in this case, we want to have a data exchange.
3. Usually we don't care to specify the socket identity, and we let ØMQ to do it. Here it is done as a way to make easier to test the application. That's why I used the Boost thread id.
4. To this point, the worker acts just like the client. Both of them are REQUEST sockets, connected inproc to a ROUTER socket in the device. The difference is on what is going to happen next. The worker is going to send a dummy message to let the device know it is up and running, than it waits for a reply, doing some job on it, sends an answer back and waits for a new job, till it gets a terminating message.
5. It takes eight lines to send a request to the device. And the first time, as said in (4), it is just a dummy. The trouble is that we are sending a multipart message, and there is no easy way to do it, out of taking care of the gory details.
As first part (5a) we are sending a character string representing the address of the client that asked for this job. The first time we have no associated client, we are just signalling to the device that we are ready, so we are actually sending an empty frame.
Second part (5b) is a zero sized frame, seen from ZeroMQ as a separator.
Third and last part (5c) is the payload, representing the result of the job performed by the worker. For the first loop this value is meaningless.
6. Now the worker hangs, waiting to receive a reply on the socket. As the name of the variable suggests, what we expect to get in it is the client id, that we'll use to send back an answer in (5a).
7. If no client id is specified, we interpret the message as a termination request, so we stop looping.
8. Otherwise, we convert the raw byte array in a proper C++ string, and we store it in the local variable that will be used in (5a).
9. Next frame is a dummy, we just ignore it.
10. Finally we receive the payload, the read stuff we should work with.
11. This sample application is designed so that only int are expected here, anything else is considered a catastrophic failure (real production shouldn't behave like this, as I guess you well know).
12. The integer in the last frame of the message is converted to an int, and used by the worker to, well, determine how long it should sleep. The value itself, without any change, is going to be used in (5c) to be sent back to the client.

The full C++ source code for this example is on github.

Go to the full post

LRU Queue Device - client

If you have read the previous post, it should be about clear how the LRU queue broker that we are going to create should work. Here we are focusing on the clients used by the device.

When we feel ready, we call the start() public function that creates a thread for each client, associating it to this function:
const char* SK_ADDR_FRONTEND = "inproc://frontend"; // 1

// ...

void client(zmq::context_t& context, int value) // 2
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    zmq::socket_t skClient(context, ZMQ_REQ); // 4
    zmq_setsockopt(skClient, ZMQ_IDENTITY, id.c_str(), id.length());
    skClient.connect(SK_ADDR_FRONTEND);

    dump(id, "client is up"); // 5

    zmq::message_t zmPayload(sizeof(int)); // 6
    memcpy(zmPayload.data(), &value, sizeof(int));
    skClient.send(zmPayload);

    dump(id, "client request sent");

    zmq::message_t zmReply;
    skClient.recv(&zmReply); // 7

    if(zmReply.size() == sizeof(int)) // 8
    {
        int reply = *((int*)zmReply.data());
        dump(id, reply);
    }
    else // unexpected
        dump(id, zmReply);
}
1. This is a multithread ZeroMQ application, the sockets are connected on the inproc protocol. The name I choose for the frontend connection device-clients is "frontend".
2. The synchronization among threads in a 0MQ application happens sharing a 0MQ context object. The second parameter passed to the function is just a value different for each client, so that we can keep track of what is going on in our testing.
3. Here we could have let ØMQ to choose the socket identity, but having a determined socket id for each client is useful from a testing point of view. I guessed the most natural choice was picking up the Boost thread id. Since there is no direct way to see that id as a string, we have to explicitly cast it using the Boost lexical_cast operator.
4. Each client has its own REQ socket, for which we set the identity as described in (2), and that we connect to the frontend router socket defined in our device.
5. We are in a multithread environment, so it is not safe to use a shared resource without protecting it through a mutex. That's why I wrote a few dump() functions to give some basic feedback to the user.
6. What this client does is simply sending the value it gets as input parameter to the device that create it by the socket that connect them. Pretty meaningless, I agree, but it should do as example.
7. This is a REQ socket, we have sent our request, now we pend indefinitely for a reply.
8. I have removed almost all the error checking from the application, to keep it as readable as possible. But I couldn't help to add just a few minimal checks like this one. We are expecting an integer as a reply (actually, it should be just an echo of the integer that we sent), so I ensure that the received message size would be as expected. If so, I extract the int from the message, and print it to the console as a such. Otherwise I print the reply for debug purpose as it would be a character string.

The C++ source code for this example is on github.

Go to the full post

LRU Queue Device - general view

Implementing the Least Recently Used pattern for ZeroMQ was a longish but not complicated job. Here things are getting tougher, since we want to create a device to rule an exchange of messages from a bunch of clients to a bunch of workers, again using the LRU pattern.

This example is based un the Request-Replay Message Broker described in the ZGuide. I have reworked it a bit, both to adapt it to the Windows environment (no ipc protocol supported there) and both to make it clearer, at least from my point of view. I used 0MQ 2.2.0, its standard light-weight C++ wrapper, and Visual C++ 2010 as developing environment.

As they put it in the ZGuide, "Reading and writing multipart messages using the native ØMQ API is like eating a bowl of hot noodle soup, with fried chicken and extra vegetables, using a toothpick", and since that is what we are actually going to do, don't be suprised if sometime it could look to you overwhelmingly complex and messy. Be patient, try to work your way though the example one step after the other, and in the end you will find out that it is not so bad, after all.

The idea is that we want to have a class, QueueDevice, that is going to be used in this way:
void lruQueue(int nWorkers, int nClients)
{
    QueueDevice device;

    device.start(nClients, nWorkers); // 1
    device.poll(); // 2
}
1. We specify the number of clients and workers insisting on our device when we start it.
2. Then we poll on the device till some condition is reached.

The QueueDevice class is going to have a bunch of private data members:
class QueueDevice
{
    // ...
private:
    zmq::context_t context_; // 1
    zmq::socket_t backend_; // 2
    zmq::socket_t frontend_;
    std::queue<std::string> qWorkers_; // 3
    boost::thread_group thWorkers_; // 4
    boost::thread_group thClients_;

    // ...
};
1. First of all, a ZeroMQ context.
2. A couple of sockets, both ROUTER as we'll see in the ctor, one for managing the backend, AKA the connection to the workers, the other for the frontend, meaning the clients.
3. I am going to use a queue to temporary store in the device which worker is available to the clients. We want to implement an LRU pattern, so a queue is just the right data structure.
4. We are going to spawn some threads for the workers and other for the clients. Actually we could have used just one thread_group, but I guess it is more readable having two of them.

There are going to be a few public methods in the QueueDevice class:
  • A constructor to initialize the sockets.
  • A destructor to join all the threads.
  • A start() method to actually start the worker and client threads.
  • A poll() method to, well, poll on the backend and frontend sockets.
The QueueDevice start() method is going to need to access a worker() and a client() fuction representing the jobs executed on the frontend and backend sides of the device.

Finally, we'll have a bunch of dumping functions to be used by about all the many threads that are going to constitute our application to print some feedback for the user on the standard console. As usual, being std::cout a shared resource, we'll need to use a mutex to avoid unpleasent mixing up.

That should be all. In the next posts I'll go through the details. The full C++ source code for this example is available on github.

Go to the full post

LRU Pattern - putting all together

Following the design explained in the ZGuide, I have written a porting to C++ of a simple application that implements the Least Recently Used messaging pattern, using the light-weight ZeroMQ C++ default wrapper, for Windows-MSVC 2010, linking to ØMQ 2.2.0, currently the latest ZeroMQ stable version.

You can find the complete C++ code for this example on github, and in the previous two posts some comments on the client side and on the worker.

There is still a tiny bit of code I should talk about, the dumpMessage() functions:
boost::mutex mio; // 1

void dumpMessage(const std::string& id, int value) // 2
{
    boost::lock_guard<boost::mutex> lock(mio); // 3
    std::cout << id << ' ' << value << std::endl;
}

void dumpMessage(const std::string& id, const char* msg)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::cout << id << ' ' << msg << std::endl;
}
1. In the rest of the code, all the synchronization among threads is done by 0MQ message exchanges, so we don't need mutexes and locks. But here we have to deal with many threads competing on the same shared resource, namely the standard output console. Ruling its access with a mutex is the most natural solution, I reckon.
2. I provide two overloads for the printing function, one for printing the socket/thread id plus its int payload, and one for id plus a message from the code to the user.
3. If there is already a thread running on the next line, we patiently wait here for our turn. The lock would be released by the dtor.

All the printing in the application is done through this couple of functions, with a notable exception in the client main function:
void lru(int nWorkers)
{
    // ...
    boost::thread_group threads;
    // ...
    threads.join_all();
    std::cout << "Number of processed messages: " << processed << std::endl;
}
But at that point we have already executed a "join all" on all the worker threads spawned by the client. We are back in the condition where only one thread is running for this process. So we don't have to worry about competing for that shared resource.

The design of this example should be clear. We enter the lru() function, create as many worker threads as required by the caller (actually, in real code it would be worthy to perform a check on that number), have a data exchange between the client and the workers, retrieve a result from all this job, and terminate the run.

From the ZeroMQ point of view, the interesting part is in the REQ-ROUTER pattern. Each worker has a request socket, so it would communicate to the client socket (a router) when it is ready for another run. When the router has no more job to be done, it would simply send a terminator to each worker asking for new feed, till no one of them is around anymore.

Go to the full post

LRU Pattern - worker

Second step of the LRU pattern example implementation. After the client, now is time to write the worker. The ZeroMQ version I used is the brand new 2.2.0, the chosen implementation language is C++, in the MSVC 2010 flavor, for the Windows operating system. The complete C++ source code is on github.

This is a multithread application, the client creates a thread for each worker, and each of them executes this function:
const char* SOCK_ADDR = "inproc://workers";

// ...

void worker(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 1
    zmq::socket_t worker(context, ZMQ_REQ); // 2
    zmq_setsockopt(worker, ZMQ_IDENTITY, id.c_str(), id.length());
    worker.connect(SOCK_ADDR);

    int processed = 0; // 3
    while(true)
    {
        zmq::message_t msg(&processed, sizeof(int), NULL);
        worker.send(msg); // 4
        zmq::message_t payload;
        if(worker.recv(&payload) == false) // 5
        {
            dumpMessage(id, "error receiving message");
            return;
        }
        if(payload.size() != sizeof(int)) // 6
        {
            dumpMessage(id, "terminating");
            return;
        }

        int value = *(int*)payload.data(); // 7
        dumpMessage(id, value);

        boost::this_thread::sleep(boost::posix_time::millisec(value)); // 8
        ++processed;
    }
}
1. As id for the REQ socket I used the thread id. To convert a Boost thread id to a string we need to go through a lexical cast.
2. The REQ socket is created on the context passed by the main thread: all the threads in a 0MQ multithread application that want to be connected should share the same context.
3. Keeping track of the messages processed in this thread, so that we can pass back this information to the client.
4. Sending a message through the socket. Notice that this socket has an identity set, so what we are sending here is actually a multipart message consisting of three frames: the identity, empty data, and the number of currently processed messages.
5. Receiving the reply to our request. The identity is automatically stripped, we have to take care only of the effective payload.
6. The error handling is very poor in this example. We expect an int as message, anything with a different size is considered equivalent to a null sized message, that I conventionally consider as a terminator.
7. Extract the int contained in the message ...
8. ... and use it to do some job, in this case just sleeping for a while.

All the synchronization among threads is done through messages sent on ZeroMQ sockets. The exception are the dumpMessage() functions, that have access to a resource (std::cout) shared among all threads. We'll how to deal with this in the next post.

Go to the full post

LRU Pattern - client

Let's implement the Least Recently Used Routing pattern for ZeroMQ 2.2.0 (just released) in C++ on Windows by MSCV. On the ZGuide you would find the rationale behind this code, and the C implementation that I used as guideline in this post. Beside porting it to C++, I have adapted it to Windows (there is no IPC support for this platform) and I have done some minor changes that, I guess, makes this example even more interesting.

It is a multithreaded application, where the client has a router socket that connects to a bunch of request sockets, one for each worker thread. The REQ sends a message containing its own id to the ROUTER when it is ready to process a new message; the ROUTER use the REQ id to reply to that specific REQ socket that is known as available. Here I talk about the client part of the application, but you can already have a look at the entire source code on github.

The main function gets as parameter the number of workers that we want to run, creates a router socket and the worker threads, each of them with its own REQ socket, on the same context, lets the router send a few messages to the workers, and then terminates them, getting as a side effect the number of messages processed by each thread:
void lru(int nWorkers)
{
    zmq::context_t context(1);
    MyRouter router(context); // 1

    boost::thread_group threads; // 2
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(worker, std::ref(context))); // 3

    for(int i = 0; i < nWorkers * 10; ++i)
        router.sendMessage(); // 4

    int processed = 0;
    for(int i = 0; i < nWorkers; ++i)
        processed += router.terminateWorker(); // 5

    threads.join_all(); // 6
    std::cout << "Number of processed messages: " << processed << std::endl;
}
1. See below for the MyRouter class, for the moment think of it as a wrapper to a ØMQ ROUTER socket.
2. The Boost thread_group makes easy to manage a group of threads, like the one we want to have here.
3. Each thread is created on the function worker(), we'll talk about it in a next post, passing to it a reference to the 0MQ context.
4. Send an average of ten messages to each 0MQ REQ socket.
5. Send a terminator to each worker, terminateWorker() returns the number of processed message for the current worker.
6. Join all the threads, give a feedback to the user and terminate.

The MyRouter class uses a const and a class:
const char* SOCK_ADDR = "inproc://workers"; // 1

class RandomTimeGenerator // 2
{
private:
    boost::random::mt19937 generator_;
    boost::random::uniform_int_distribution<> random_;
public:
    RandomTimeGenerator(int low, int high) : random_(low, high) {}
    int getValue() { return random_(generator_); }
};

class MyRouter
{
private:
    zmq::socket_t client_;
    RandomTimeGenerator rtg_;

public:
    MyRouter(zmq::context_t& context) : client_(context, ZMQ_ROUTER), rtg_(1, 1000) // 3
    {
        client_.bind(SOCK_ADDR);
    }

    void sendMessage() // 4
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);

        zmq::message_t zmDummy1;
        client_.recv(&zmDummy1);
        zmq::message_t zmDummy2;
        client_.recv(&zmDummy2);

        client_.send(zmAddress, ZMQ_SNDMORE);

        zmq::message_t zmEmpty; // 5
        client_.send(zmEmpty, ZMQ_SNDMORE);

        int value = rtg_.getValue();
        zmq::message_t zmPayload(sizeof(int));
        memcpy(zmPayload.data(), &value, sizeof(int));
        client_.send(zmPayload);
    }

    int terminateWorker() // 6
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);
        zmq::message_t zmDummy;
        client_.recv(&zmDummy);

        zmq::message_t zmPayload;
        client_.recv(&zmPayload);
        std::string id((char*)zmAddress.data(), (char*)zmAddress.data() + zmAddress.size());
        int acknowledged = *(int*)zmPayload.data();
        dumpMessage(id, acknowledged); // 7

        client_.send(zmAddress, ZMQ_SNDMORE); // 8

        zmq::message_t zmEmpty;
        client_.send(zmEmpty, ZMQ_SNDMORE);
        client_.send(zmEmpty);

        return acknowledged;
    }
};
1. The address used by the sockets for the inproc connections.
2. Class to generate a random int that will be used to simulate a randomly long task to be executed by the worker. As generator is used a Boost Marsenne twister, and as distribution an uniform integer one.
3. The ctor expects in input the ZeroMQ context that should be used to create the 0MQ ROUTER socket. Besides, the random generator is initialized, so that it would generate a series of number in the interval [1, 1000]. The client socket is bound to the above specified inproc socket address.
4. The core of this class, firstly, we wait for a worker ZeroMQ REQ socket to state that it is available, we care only about the first frame it has sent to us, that contains the address of the REQ socket. The other two parts are discarded, but the first is sent back as the first frame of our reply, so that ZeroMQ could associate it to the original sender.
5. Second frame of the reply is empty, and in the third one we place an int as returned by the random generator. Notice the ZMQ_SNDMORE flag for the first two frames, to let ZeroMQ understand as the three sends have to be seen as three parts of an single message.
6. The last message received from each 0MQ REQ is managed differently from all the previous ones. The first frame is sent back as in (4), but we use also the third frame, the actual payload in this multipart message, from which we extract an integer, that represent the number of messages that have been received by that worker.
7. This function just print on the standard console the passed parameters. But we are in a multithread context, and std::cout is a shared resource, so we should be careful.
8. We send a last message to the current worker. Actually an empty message that has to be interpreted as a terminator.

Go to the full post

ZMQ_PAIR sockets for multithreading coordination

In a classic multithreading model, all this coordination is done through access to shared variables protected by mutexes and locks. In a messaging multithread model, synchronization is achieved through messages. Normally we use the same messaging pattern for both multiprocess and multithreading application, as we have seen for multithread broker in a previous post, but when we need to squeeze even the last drop of speed in our ØMQ multithread application, ZMQ_PAIR sockets are there for us. The other side of the story is that, if we use ZMQ_PAIR sockets in our multithread application, we make harder to rewrite it as a multiprocess application. ZMQ_PAIR are designed explicitly for multithreading use, and don't scale well in a multiprocess environment. They are designed to be used in a stable context, so they do not automatically reconnect.

Let's think to a three-step multithreaded application. The main thread creates a worker thread, and sits waiting for it to signaling the job is done. The worker creates a sub-worker, does some job, than puts its result together with the one from its sub, and signals the main thread that it could go on.

No locks and mutexes are used in the application, with the noticeable exception of printing to standard output, resource shared among all the threads:
boost::mutex mio;

void print(const char* message)
{
    boost::lock_guard<boost::mutex> lock(mio);

    std::cout << boost::this_thread::get_id() << ": " << message << std::endl;
}
Here is the code to be executed by the main thread:
print("main: start");
void* context = zmq_init(1);

print("main: create socket for answer");
void* skMain = zmq_socket(context, ZMQ_PAIR); // 1
zmq_bind(skMain, "inproc://main"); // 2

print("main: create thread for worker");
boost::thread t2(doWork, context); // 3

print("main: do some preparation job");
boost::this_thread::sleep(boost::posix_time::milliseconds(200)); // 4

print("main: wait for worker result");
zmq_recv(skMain, NULL, 0, 0); // 5
print("main: message received");

t2.join(); // 6
zmq_close(skMain);
zmq_term(context);
print("main: done");
1. Even though this socket is used only a few lines down, it has to be created here, before the worker thread. The reason lies in the nature of pair sockets, they have no automatic reconnection capability, so the server socket must be up before its client try to connect.
2. This is a server socket in a pair connection, it uses an in process protocol, and it is identified by the name "main".
3. The worker is started, see below the code for doWork(), but pay attention to the fact that we are passing to it the 0MQ context.
4. Simulating some work.
5. We don't want to get any data from the worker, so we get an empty message. Here a bit of error handling would be a good idea, for a production code.
6. Wait the worker to terminate and then shut down the socket and the context.

Here is the worker in all its splendor:
void doWork(void* context) // 1
{
    print("worker: part 1 cooperating with another thread");

    void* skSub = zmq_socket(context, ZMQ_PAIR); // 2
    zmq_bind(skSub, "inproc://sub");

    print("worker: create thread for subworker");
    boost::thread t(doSub, context); // 3

    print("worker: do something");
    boost::this_thread::sleep(boost::posix_time::milliseconds(150));

    print("worker: waiting for subworker");
    zmq_recv(skSub, NULL, 0, 0); // 4

    print("worker: subworker is done");
    t.join();
    zmq_close(skSub);

    // ---

    print("worker: part 2, answering to main");

    void* skMain = zmq_socket(context, ZMQ_PAIR); // 5
    zmq_connect(skMain, "inproc://main");

    print("worker: doing something else");
    boost::this_thread::sleep(boost::posix_time::milliseconds(150));

    print("worker: signal back to main");
    zmq_send(skMain, NULL, 0, 0);

    print("worker: done");
    zmq_close(skMain);
}
1. The ZMQ context is the only thread-safe object in this structure, so it could, and should, be passed around threads that want to be coordinated.
2. Another pair socket used to connect the worker to a its sub-worker.
3. See below the code for the sub-worker function.
4. Again, this thread wait a communication from its relative worker, and then close the socket.
5. This socket is used to connect this thread with the main one, it expects the server socket for this connection to be already up.

The code for the sub-worker should be no surprise:
void doSub(void* context)
{
    print("sub: start");
    boost::this_thread::sleep(boost::posix_time::milliseconds(50));

    void* skSub = zmq_socket(context, ZMQ_PAIR); // 1
    zmq_connect(skSub, "inproc://sub");

    boost::this_thread::sleep(boost::posix_time::milliseconds(50));
    print("sub: tell to worker we're ready");
    zmq_send(skSub, NULL, 0, 0); // 2

    print("sub: done");
    zmq_close(skSub);
}
1. Pair socket back to the worker.
2. An empty message to say that it is done.

I have written the code in this post expressly for ZeroMQ 3.1, and the standard C interface. It is basically a porting of my previous post that I wrote for the C++ official interface while reading the ZGuide (that currently still refers to version 2.1.x).

Go to the full post

ZeroMQ 3.1 multithreading reviewed

I have already written a post on how to approach multithreading with ØMQ. In that case I used 0MQ version 2.1 and the C++ wrapper API, here I am using the 3.1 release, still in beta, and the C standard interface.

But I guess this is not the most interesting point of this post, I found more challenging thinking a way of gracefully shutdown the worker threads managed by the server. The idea is take advantage of how polling reacts when the connected socket closes. But let's start from the beginning.

We want to write something like the just seen REQ-REP DEALER-ROUTER broker but here the broker does not connect the client to services each one running in a different process, but to a worker in another thread in the same process of the broker.

The client code changes only because I changed the requisite, to add a bit of fun. Now the client sends integers, the server feeds the workers with these value, that are used for a mysterious job - actually, just sleeping. From the client point of view, the change is limited to the message buffer:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_REQ);
zmq_connect(socket, "tcp://localhost:5559");

for(int i = 0; i != 10; ++i)
{
    std::cout << "Sending " << i << std::endl;
    zmq_send(socket, &i, sizeof(int), 0); // 1

    int buffer;
    int len = zmq_recv(socket, &buffer, sizeof(int), 0);
    if(len < sizeof(int) || len > sizeof(int)) // 2
        std::cout << "Unexpected answer (" << len << ") discarded";
    else
        std::cout << "Received " << buffer << std::endl;
}
zmq_send(socket, NULL, 0, 0); // 3

zmq_close(socket);
zmq_term(context);
1. The message now is an int, and its size is, well, the size of an int.
2. A very rough error handling. If the size of the message is not the expected one (or is an error flag), we just print an error message.
3. I'm going on with the convention of sending an empty message as a command to shut down the server.

The server needs to change a bit more, it should create a thread for each service we want to use for this run of the application. The number of threads is passed to the function as a parameter.
void mtServer(int nt)
{
    boost::thread_group threads; // 1

    void* context = zmq_init(1);
    void* frontend = zmq_socket(context, ZMQ_ROUTER); // 2
    zmq_bind(frontend, "tcp://*:5559");

    void* backend = zmq_socket(context, ZMQ_DEALER); // 3
    zmq_bind(backend, "inproc://workers"); // 4

    for(int i = 0; i < nt; ++i)
        threads.create_thread(std::bind(&doWork, context)); // 5

    const int NR_ITEMS = 2; // 6
    zmq_pollitem_t items[NR_ITEMS] = 
    {
        { frontend, 0, ZMQ_POLLIN, 0 },
        { backend, 0, ZMQ_POLLIN, 0 }
    };

    dump("Server is ready");
    while(true)
    {
        zmq_poll(items, NR_ITEMS, -1); // 7

        if(items[0].revents & ZMQ_POLLIN) // 8
            if(receiveAndSend(frontend, backend))
                break;
        if(items[1].revents & ZMQ_POLLIN) // 9
            receiveAndSend(backend, frontend);
    }

    dump("Shutting down");
    zmq_close(frontend);
    zmq_close(backend);
    zmq_term(context);

    threads.join_all(); // 10
}
1. We need to manage a group of thread, this Boost utility class is just right.
2. This router socket connects the server to the client request socket.
3. This dealer socket connects the server to all the worker sockets.
4. The protocol used for thread to thread communicating is "inproc".
5. A number of threads is created, each of them runs on the function doWork(), see it below, and gets in input the ZeroMQ context, that is thread-safe.
6. The broker pattern requires an array of poll items, each of them specifying the socket to be polled and the way the polling should act.
7. Polling on the items, waiting indefinitely for a message coming from either the frontend or the backend.
8. The frontend sent a message, receive from it and send to the backend. If receiveAndSend() returns true, we received a terminator from the client, it is time to exit the while loop.
9. Other way round, a message from the backend should be sent back to the frontend.
10. Wait all the worker threads to terminate before closing the application.

Writing a multithreaded application with ZeroMQ requires designing the code in a different way from the standard techniques. Synchronization among threads is ruled by messages exchange, so we usually don't use mutex and locks. Here I break this advice, and I'd say that it make sense. We have a shared resource, the output console, and we need to rule the access to it:
boost::mutex mio;

void dump(const char* header, int value)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::cout << boost::this_thread::get_id() << ' ' << header << ": " << value << std::endl;
}

void dump(const char* mss)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::cout << boost::this_thread::get_id() << ": " << mss << std::endl;
}
The receiveAndSend() function has not change much. Even if the messages now are int, we won't do any assumption on the messages passing from here. In any case we should remember that we have to manage multipart messages, required by the router pattern:
const int MSG_SIZE = 64;
size_t sockOptSize = sizeof(int); // 1

bool receiveAndSend(void* skFrom, void* skTo)
{
    int more;
    do {
        int message[MSG_SIZE];
        int len = zmq_recv(skFrom, message, MSG_SIZE, 0);
        zmq_getsockopt(skFrom, ZMQ_RCVMORE, &more, &sockOptSize);

        if(more == 0 && len == 0)
        {
            dump("Terminator!");
            return true;
        }
        zmq_send(skTo, message, len, more ? ZMQ_SNDMORE : 0);
    } while(more);

    return false;
}
1. The variable storing the sizeof int can't be const, because zmq_getsockopt() could change it.

Finally, the most interesting piece of code, the worker:
void doWork(void* context) // 1
{
    void* socket = zmq_socket(context, ZMQ_REP); // 2
    zmq_connect(socket, "inproc://workers");

    zmq_pollitem_t items[1] = { { socket, 0, ZMQ_POLLIN, 0 } }; // 3

    while(true)
    {
        if(zmq_poll(items, 1, -1) < 1) // 4
        {
            dump("Terminating worker");
            break;
        }

        int buffer;
        int size = zmq_recv(socket, &buffer, sizeof(int), 0); // 5
        if(size < 1 || size > sizeof(int))
        {
            dump("Unexpected termination!");
            break;
        }

        dump("Received", buffer);
        zmq_send(socket, &buffer, size, 0);

        boost::this_thread::sleep(boost::posix_time::seconds(buffer));
    }
    zmq_close(socket);
}
1. The 0MQ context is thread-safe, so we can safely pass it around the threads.
2. Each working thread has its own ZeroMQ reply socket connected to the backend socket in the main thread of the server by inproc protocol.
3. It looks a bit strange polling on just one item, but is exactly what we need here.
4. Polling indefinitely on the socket. If it returns in an error state (it is not expected a return value of zero) we can safely assume that the connected socket has been closed, and we can stop waiting for a message.
5. We know a message waits to be received, we check it has an int size, and then we send back the same message before sleeping for the number passed by the client.

Go to the full post