Pages

Httpd virtual hosts

I wanted to manage a couple of web sites, let's call them one.dd and two.dd, with my Apache Web Server, and I wanted them to live on the same machine, sharing the same IP address. We know that to do that in the real life, I could not choose randomly a fancy name, like I have just said, but I have to register a proper name under well known limitations. But if I play just on my local machine(s), I can forget about that, and being free and foolish. Still I have to follow a few basic rules.

I am working on a Debian box, on a Apache httpd 2.2 built from scratch, downloading the package from the official Apache site. I reckon you can adapt very easily what I have done to your current setup.

Setting the hosts

The operating system should be aware of the names I want to use on the current machine. This is done in a text file, typically (for *x environments) named /etc/hosts. There we see, among the other things, the standard mapping between 127.0.0.1 and localhost, and we are about to extend it to add our two host names:
127.0.0.1 localhost one.dd two.dd
Setting the httpd configuration

Apache has to know how to manage our virtual hosts, too. The standard http configuration file, conf/httpd.conf, has a commented line that, when activated, includes the specific configuration file for virtual hosts.
# Virtual hosts
#Include conf/extra/httpd-vhosts.conf
It is usually considered a better idea to let the provided example alone, and work on a different file.

This is my virtual host configuration file:
# Virtual Hosts
NameVirtualHost *

<VirtualHost *>
    DocumentRoot /site/www/one.dd
    ServerName www.one.dd
</VirtualHost>

<VirtualHost *>
    DocumentRoot /site/www/two.dd
    ServerName www.two.dd
</VirtualHost>
I guess this is the simplest configuration file one could conceive.

The directive NameVirtualHost says to Apache that we want to attach one or more virtual hosts to the specified address/port. Here I passed a star to it, meaning "anything you get to this point". Usually you want to be more choosy. Besides, I didn't specify any port number. In this case, Apache assumes I expect it to use the one specified in the Listen directive.

Then I have a VirtualHost block for each host I want to define. If anything not matching with the ServerName's specified is getting here, the first one is considered as the default one.

The DocumentRoot says to Apache which directory to use as root for the site. I have created the specified document root directories, and put in both of them an index.html file.

Looks easy, doesn't it? Still, even at this basic level, there are a few thing that could go wrong. And the resulting error messages could look cryptical.

Wrong!

If NameVirtualHost is not matching with any VirtualHost (a different port number is enough) Apache doesn't know what to do of that directive, and a "NameVirtualHost has no VirtualHosts" warning is issued at startup.

I have already noted that if the NameVirtualHost port is not explicitly given, the one specified in the Listen directive is used. But you should ensure to keep the same convention for the associated VirtualHost, too. Otherwise you could get a "VirtualHost mixing * ports and non-* ports with a NameVirtualHost address is not supported, proceeding with undefined results".

Go to the full post

Iterating over an Apache apr_table_t

A common data structure that is very useful to have at hand when working with a web server, is an associative array where both key and value are strings. If Apache httpd was developed in C++, they would have probably used an STL unordered_map, but here we are dealing with pure C, so an internal data structure named apr_table_t has been designed expressly for this scope, with a bunch of associated functions for manage it.

Here I am going to write an example that uses apr_table_do() to loop over all the elements in an Apache table.

What I want to do is writing an Apache2 module that generates as output an HTML page listing all the properties in the request header.

If we have a look to the apr_tables.h, we'll find this couple of interesting lines:
typedef int (apr_table_do_callback_fn_t)(
    void* rec, const char* key, const char* value);

int apr_table_do(apr_table_do_callback_fn_t* comp,
    void* rec, const apr_table_t* t, ...);
The apr_table_do() gets as first parameter a callback function, then the module request record, and the Apache table we want to loop on. Finally we specify which tables elements we are interested in, or a NULL if we want to go through all of them.

Here is the function I want to use as callback, a simple output of the current key-value pair:
int print(void* rec, const char* key, const char* value)
{
    request_rec* r = static_cast<request_rec*>(rec); // 1
    ap_rprintf(r, "%s: %s<br />\n", key, value); // 2

    return 1; // 3
}
1. Tiny nuisance, the request_rec is seen by the callback prototype as a void pointer - to allow more flexibility, I reckon - so we need to cast it back to its original type. I was about to check the cast result, but in the end I decided that was a bit too paranoid for such a basic example.
2. Dump the pair to the HTML response that the module is generating.
3. And finally return a non-zero value, to mean success.

In the handler, I'll have something like:
int handler(request_rec* r)
{
    // ...
    apr_table_do(print, r, r->headers_in, NULL);

    // ...
    return OK;
}
The full C++ source code is on github. You should compile it, possibly using a make file like the one showed in the previous post, and make the resulting shared object available to Apache.

In the httpd configuration file, we should explain to Apache how to map a request to the server to a call for our module, and how to load the module:
<Location /info>
    SetHandler info
</Location>

# ...

LoadModule info_module modules/mod_info.so
And what it is left to do, to have the new module available, it is just stop and start your Apache server.

Go to the full post

Makefile for C++ Apache module

The Apache web server (AKA httpd, or just Apache) is written in C language, but this is not a compelling reason for us to write our modules in the same language. And, as you could expect, it is pretty easy to use the C++ language instead.

Converting the minimal Hello World and the simple example from C to C++ (actually g++ 4.4.5 on Linux Debian for Apache 2.2) took a minimal effort.

What I had to do was adding an explicit include directive for http_protocol.h, to let the less forgiving C++ compiler to properly check against a few functions. Not doing it was leading to these errors:
error: ‘ap_set_content_type’ was not declared in this scope
error: ‘ap_rputs’ was not declared in this scope
error: ‘ap_rprintf’ was not declared in this scope
Besides, I also removed the static specification for all the local function, and put them instead in an unnamed namespace.

Finally I wrote this Makefile:
all: mod_hello.so

mod_hello.o : mod_hello.cpp
    g++ -c -I/path/to/apache22/include -fPIC mod_hello.cpp

mod_hello.so : mod_hello.o
    g++ -shared -o mod_hello.so mod_hello.o

clean:
    rm -rf mod_hello.o mod_hello.so
To build the object I called g++ with a few options:
-c because I don't want it to run the linker, its output should be the object file.
-I to specify the apache include directory (put there your actual one).
-fPIC is due to the fact that we are about to create a shared object, so we need g++ to generate position-independent code.

The actual generation of the shared object is accomplished by second call to g++, this time specifying as options:
-shared to let it know that a shared object is what we want.
-o to specify the output file name.

Remember that in a Makefile you should put TAB and only TAB (no white spaces at all!), if you don't want to get a puzzling error like this:
Makefile:6: *** missing separator.  Stop.

Go to the full post

Simple Apache module

My first Apache module needs to be improved in many ways. Here I am addressing to a few basic requirements, answering only to one specific request; logging using the Apache built-in facility; and generating an HTML document as answer.

We want to set our Apache web server so that it would answer with an HTML page containing a few information on the actual received request, and we want it to give this feedback only when we ask for "hello" on it.

Request-module connection

Apache should know about how to associate a user request to the handler that our module is designed to manage. To do that we add a Location directive in the Apache httpd configuration file.

You'll find this file in the Apache conf directory, named httpd.conf, we open it and we add this section:
<Location /hello>
    SetHandler hello
</Location>
We ask Apache to set the handler "hello" so that it is invoked when is issued a request to the hello page directly under the root of my server.

Secondly, we change the C source code, so that we check the passed request, and we ensure the passed handler matches our expectation:
// ...

static const char* MOD_HANDLER = "hello"; // 1

static int handler(request_rec* r)
{
    if(r->handler == NULL || strcmp(r->handler, MOD_HANDLER)) // 2
    {
        // ...
        return DECLINED; // 3
    }

    // ...

    return OK; // 4
}
1. The handler for this module, it's value is used in httpd.conf, as shown above.
2. Check the handler as passed by Apache.
3. If our module has nothing to say here we return DECLINED, to let know to Apache that it has to look elsewhere.
4. Otherwise, after we did our job, we return OK, saying in this way that Apache could considered the request accomplished.

Apache logging

It is often a good idea to log what is going on in our code, both for debugging and administrative purpose. In this module, it would be nice to have a feedback also when a request is discarded. We get this effect adding this line before "return DECLINED":
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "handling declined: %s", r->handler);
The Apache ap_log_rerror() lets us writing to the Apache error log file, you'll find it in the logs folder, in your Apache httpd installation directory, named "error_log".
APLOG_MARK is just a way to combine the standard __FILE__ and __LINE__ defines in a single symbol, to save a few keystrokes.
After it, we specify the log level, that ranges from APLOG_EMERG down to APLOG_DEBUG (not to mention the TRACE defines, not commonly used, as far as I know). In the httpd.conf file we configure which level of logging we actually want to print to the log file, setting the log level:
LogLevel debug
As you can imagine, in production it is usually a smart move to set the configured log level higher than debug.
Next parameter, here set to 0, is not very interesting here, and it is followed by the pointer to the request, as we get it from Apache.
Finally we have a string, representing what we want to log, and that could include printf-style parameters.

Building a HTML response

Admittedly, an Apache module is not the most programmer-friendly tool to create a HTML page. But it still make sense in such a simple case:
ap_set_content_type(r, "text/html"); // 1
ap_rputs("<html><head><title>Hello Module</title></head><body>", r); // 2
ap_rprintf(r, "handler: %s<br />\n", r->handler); // 3
ap_rprintf(r, "filename: %s<br />\n", r->filename);
ap_rprintf(r, "the_request: %s<br />\n", r->the_request);
ap_rprintf(r, "header_only: %d<br />\n", r->header_only);
ap_rprintf(r, "hostname: %s<br />\n", r->hostname);
ap_rputs("</body></html>", r);
1. Firstly, set the reply content type.
2. To put a static string, as this one, ap_rputs() does an excellent job.
3. When we need to send parametrized stuff, we'd better using ap_rprintf().

As you can see, I generated the answer from the request, extracting a few (more or less) interesting values as received from Apache.

The complete C code for this module is on github. To compile it and add the generated .so to the server, you can use the apxs utility (more details in the previous post). Remember to set the Apache configuration file with the Location/SetHandler directives.

Go to the full post

A minimal Hello World Apache module

After you have set up the Apache development environment, it is time to create a first module.

I tried to create a minimal module, following the K&R's "Hello World" spirit, that I reckon is so rewarding when you are approaching new stuff.

This module is going to be very impolite, trying to answer to all the user requests to the Apache server in the same way. It would even override the standard index.html page on root.

It is made of a module declaration, logically the first thing we are interested in, but traditionally placed at the bottom of the file, and a couple of static functions (I forgot to mention it, but I am developing in plain old C language).

One of the two functions, hooks(), is passed to the module declaration, and it sets a connection between Apache and the other function, that I named handler(), that it is going to be called to handle the user's jobs.

Here is the three guys above mentioned in detail:
static int handler(request_rec* r) // 1
{
    ap_set_content_type(r, "text/plain"); // 2
    ap_rputs("Hello Apache httpd module", r); // 3
    return OK; // 4
}

static void hooks(apr_pool_t* p) // 5
{
    ap_hook_handler(handler, NULL, NULL, APR_HOOK_REALLY_FIRST); // 6
}

module AP_MODULE_DECLARE_DATA hello_module = // 7
{
    STANDARD20_MODULE_STUFF, NULL, NULL, NULL, NULL, NULL, hooks // 8
};
1. This function is called by Apache so that we can provide a reply to a client request. As we can see, as parameter we get a pointer to a structure that actually represents the user request.
2. I am not doing any check here, I always prepare a plain text reply, by calling the Apache function that sets the content type, ...
3. ... and I fill it with a puny string, with the Apache version of the well known puts() function, but reinterpreted for working on a request_rec.
4. Finally I return OK, meaning that my module has been able to fulfill the user request, and Apache could happily consider this job as done.
5. Here I set the hooks that let Apache know which are the functions in my module it can call.
6. The first parameter to ap_hook_handler() is a function that Apache could call to reply to a user request, and the last one is the priority this hook should have in the collection of hooks owned by Apache. Here I am saying that I want full priority.
7. Here is my module declaration. It is known to Apache by the suggestive name of hello_module.
8. And this are a bunch of information we are passing to Apache about our module. The STANDARD20_MODULE_STUFF define is an aggregate of constants that are saying to Apache this is a standard module version 2, and there is not much more to say about it. We'll say something more on the subsequent five NULLs, but more interesting is the last parameter, the function name Apache needs to know to perform the module initialization.

This is almost everything about it. There are a couple of header inclusions you have to perform to let the compiler knowing what the heck are all those ap_... things, namely httpd.h and http_config.h, but you can see the full code on github.

And, well, you have to compile and register this code before you can actually use it on Apache. To do that, there is a nifty Apache utility, apxs, that basically does all the job for us.

In this case, I would call it something like this:
/path/to/apache/bin/apxs -a -i -c mod_hello.c
Then I stop and start Apache, run it, submit any request whatsoever to it through a we browser, and I should always get back the same reply.

Go to the full post

Installing Apache Httpd

In my current project I am also developing some stuff on the Apache HTTP Server, also known as Apache httpd, or even just Apache, in a Debian box. The environment setup is not complicated, but it does have a couple of twists. So I reckoned it was worthy to put down a few notes about the process.

Currently, on the official Apache httpd download page, we have access to three versions (2.0, 2.2, and 2.4) and a number of different formats, ready to be grabbed.

Since I want a developer install, I should avoid the lean standard cuts provided by apt-get (for Debian) or packaged in a msi (for Windows), and I should go for the "Source" releases. So in my case, I go for a 2.2 (this is the version used at work) Unix Source download.

If you check your chosen link, you would see a different provider accordingly to your geographical position, but the archive name should end like ".../httpd/httpd-2.2.22.tar.gz" (being 2.2.22 the current 2.2 version). I downloaded it through wget, and then I have extracted it by tar xvfz (that is why I have picked up the tar.gz flavor), getting all the raw stuff in a subfolder. I changed directory to there and, before starting the real installation process, I decided where to put the thing, let's call it $APACHE2, that would usually be something like $HOME/apache2.

Firstly I have to prepare the configuration, this is usually done by calling the command
./configure --prefix=$APACHE2
In my case, I want to enable the Dynamic Shared Object (DSO) Support, so that I could install shared modules in a next step. To do that, configure has to be called passing the enable-so option too:
./configure --prefix=$APACHE2 --enable-module=so
Once configure has run, it is time to make Apache. This needs two steps, a first call to "make", and a second one, to "make install".

Almost done. Still I had to set the Apache endpoint in its configuration file: I went to $APACHE2/conf, I edited httpd.conf, setting the property Listen to localhost:8080 - a common setup.

Now I can start and stop my Apache HTTP server, going to $APACHE2/bin, and running:
./apachectl start
./apachectl stop
After starting, and before stopping, I should be able to connect from my web browser to the Apache local home page, by accessing the page on localhost:8080 - if this is not the case, that means I have some trouble.

Building Apache 1.3 on a modern environment

If you need to install a legacy Apache httpd server, you would follow more or less the same procedure, but you could bump in a couple of issues.

Firstly, running configure could lead to errors and a garbled output. This is easy to solve, just run it through a new bash shell, like this:
bash ./configure --prefix=$APACHE13 --enable-module=so
Secondly, you could get a failure in making Apache, due to the use of the symbol getline, that now is part of the C standard library.
The solution here is editing the offending files, htdigest.c htpasswd.c logresolve.c, to rename the local getline function.

Go to the full post

CDS internal client error

A glitch in VMWare, running on Windows for a Linux guest. On my machine it was not possible to install the useful tools, and so I couldn't (among the other things) use the complete screen for Linux. A real pain.

This was caused by CDS internal client error (3033), issued when I tried to download VMWare Tools for Linux - version 8.8.4.

As one could expect, the solution is download the package by hand. Problem is that was not so immediate to find the right file on the net.

If you have the same issue, here is an hint:
http://softwareupdate.vmware.com/cds/vmw-desktop/player/4.0.4/744019/windows/packages/
This is the file I was looking for:
tool-linux-8.8.4.exe.tar
I un-tarred, and then executed it. And then I was able to continue the standard procedure from my Linux box.

Go to the full post

Paranoid Pirate Heartbeating - worker

The worker described here, is the companion to the ZeroMQ router server described in the previous post. In another post there is an overview for this simple heartbeat example. Full C++ source code is available on github.

The app controller part spawns a new thread for the worker on this function:
void worker(char id, int lifespan) // 1
{
    const int PATIENCE = 3;
    HeartbeatWorker worker(id); // 2

    int patience = 0;
    int cycle = 0;
    int sent = 0;
    int iteration = 1;

    while(true)
    {
        uint8_t control = worker.recv(); // 3
        if(control == PHB_NOTHING)
        {
            if(cycle++ == PATIENCE)
            {
                if(patience++ == PATIENCE)
                    break; // 4

                int factor = static_cast<int>(std::pow(2.0, patience));
                boost::this_thread::sleep(bp::millisec(factor * BASE_INTERVAL)); // 5

                worker.reset();
                cycle = 0;
            }

            if(iteration++ == lifespan) // 6
                break;
        }
        else
            cycle = patience = 0; // 7
        worker.heartbeat(); // 8
    }
    worker.shutdown(); // 9
}
1. The worker is identified by a single character that is used to generate a unique id for each socket that is created here. The thing is that each time the worker looses contact with the server, the existing socket is closed and replaced by a new one. Any new socket should have a different id, so that the server won't be confused. As a second parameter we pass to the worker function its lifespan.
2. A HeartbeatWorker class is used to make the code more readable, it is showed below.
3. HeartbeatWorker::recv() combines a poll() and a recv() call to the worker socket. If no message was pending the PHB_NOTHING value is returned.
4. If no heartbeat is received from the server for a while, the worker shuts itself down.
5. But before committing suicide, the worker tries to reconnect to the server, closing its socket, waiting for a (growing) while, and then opening a new socket, with a new identity but pointing to the same endpoint.
6. When we reach the limit imposed by the caller, even if the server is still alive, we terminate the worker.
7. If there was actually something on the socket, we reset the cycle counting .
8. In any case, send an heartbeat to the server.
9. Gracefully terminate the worker.

This is my HeartbeatWorker implementation:
class HeartbeatWorker
{
private:
    zmq::context_t context_;
    std::unique_ptr<zmq::Socket> sk_; // 1
    std::string id_; // 2
    char idRoot_;
    bp::ptime heartbeat_; // 3
    zmq::pollitem_t items_[1];

public:
    HeartbeatWorker(char id) : context_(1), idRoot_(id),
        heartbeat_(bp::microsec_clock::local_time() +  bp::millisec(BASE_INTERVAL))
    {
        reset();

        items_[0].fd = 0;
        items_[0].events = ZMQ_POLLIN;
        items_[0].revents = 0;
    }

    void reset()
    {
        id_ += idRoot_; // 4
        sk_.reset(new zmq::Socket(context_, ZMQ_DEALER, id_)); // 5
        sk_->setLinger(0); // 6
        sk_->connect(SKA_BACKEND_CLI);
        items_[0].socket = *sk_.get(); // 7

        sk_->send(PHB_READY); // 8
    }

    void heartbeat() // 9
    {
        if(bp::microsec_clock::local_time() > heartbeat_)
        {
            heartbeat_ = bp::microsec_clock::local_time() + bp::millisec(BASE_INTERVAL);
            sk_->send(PHB_HEARTBEAT);
        }
    }

    void shutdown()
    {
        sk_->send(PHB_DOWN);
    }

    uint8_t recv() // 10
    {
        if(zmq::poll(items_, 1, BASE_INTERVAL * 1000) < 1)
            return PHB_NOTHING;

        uint8_t res = PHB_NOTHING;
        if(items_[0].revents & ZMQ_POLLIN)
            res = sk_->recvAsByte();
        items_[0].revents = 0;
        
        return res;
    }
};
1. Any time we lose the connection to the server, we kill the socket and create a new one. For this reason, I use a (smart) pointer instead of an object on the stack.
2. Socket id. It is based on the idRoot_ character (defined in the next line).
3. Expected time for sending an heartbeat to the server. The bp namespace is defined as synonym of boost::posix_time.
4. The socket id is changed any time a reset occurs.
5. The unique_ptr::reset() take cares of closing the previous socket (if set) before deleting it.
6. We don't want to wait for messages sent on a socket to be delivered when we close it. By default we have an indefinite lingering, but this is no good here, because it could happen that we send messages to a server that is not there anymore, and we want to close the socket (to create a new one) even if those messages are not delivered.
7. A tricky line. We store a pointer to socket, but the zmq::pollitem_t structure requires a socket itself about its members. So dereferencing is required.
8. Sends a "ready" message to the server.
9. The heartbeat message is sent to the server only if its time has arrived.
10. HeartbeatWorker::recv() combines zmq::poll() and an actual receiving on the socket. We expect in input a single-part message containing a single byte.

Go to the full post

Paranoid Pirate Heartbeating - server

As described in the previous post, I have developed a cut to the bone ZeroMQ C++ router-dealer heartbeat exchange-only application that should help understanding the basics of the process.

The reason to write such an example is that, as they say in the ZGuide, "as for the Paranoid Pirate queue, the heartbeating is quite tricky to get right". I hope that such a stripped down example could help to clarify the concept.

In its simplest configuration, this heartbeat example is supposed to run as a monoprocess, multithread application. The main thread spawns two new threads, one for the server and one for a worker, and they are left alone to interact till their natural termination.

We could even let run just the server (or, as we'll see in the next post, the worker) alone. It won't get any ping from a worker, and after a while it would shutdown.

Here is the server function, it makes use of a custom class, HeartbeatServer, that I comment below:
void server(int lifespan) // 1
{
    HeartbeatServer server(lifespan); // 2

    while(server.isAlive()) // 3
    {
        zmq::Frames input = server.recv(); // 4
        if(!input.empty())
        {
            uint8_t control = (input.size() != 2 || input[1].empty()) ? PHB_UNEXPECTED : input[1].at(0);

            switch(control)
            {
            case PHB_READY:
                server.pushWorker(input[0]); // 5
                break;
            case PHB_HEARTBEAT:
                server.refreshWorker(input[0]); // 6
                break;
            case PHB_DOWN:
                server.dropWorker(input[0]); // 7
                break;
            default: // 8
                break;
            }
        }
        server.heartbeat(); // 9
    }
}
1. The server function expects in input a parameter stating how long it should theoretically run, passing INT_MAX (as defined in limits.h) states that we'd like to run the server (almost) forever.
2. Create an Heartbeat Server, see below for details.
3. HeartbeatServer::isAlive() returns false when the server is ready to shutdown.
4. HeartbeatServer::recv() combines a poll and a receive on the server socket. If a (multipart) message is pending on the socket, it is received and returned as deque of strings (if you wonder why, have a look at this post). We expect a two-part message, with the worker id as first element and the payload (a single byte) in second position. Whatever else could be happily discarded.
5. If the payload is a "ready" message, the worker id is stored in the server.
6. We received an "heartbeat" from the worker, we refresh its status. Remember that a worker should be seen as active by the server only if it sends at least an heartbeat once in a while. After a longish silence from its side, we should consider it as lost in space.
7. If the worker ends its life gracefully, it sends a message to notify the server, so that it would remove it on the spot.
8. We received something weird. Let it know to the user.
9. If there is a worker pending on the server, send an heartbeat to it.

And this is the server class:
class HeartbeatServer
{
private:
    zmq::context_t context_;
    zmq::Socket backend_;
    zmq::pollitem_t items_[1];

    std::string wid_; // 1
    bp::ptime expiry_; // 2

    int lifespan_; // 3
    int busy_; // 4
    bp::ptime heartbeat_; // 5
    enum { INTERVAL = 1000, BUSY = 5 };
public:
    HeartbeatServer(int lifespan = INT_MAX) : context_(1), backend_(context_, ZMQ_ROUTER), lifespan_(lifespan), 
        heartbeat_(bp::microsec_clock::local_time() + bp::millisec(INTERVAL)), busy_(BUSY)
    {
        backend_.bind(SKA_BACKEND_SRV);
        backend_.setLinger(0); // 6

        items_[0].socket = backend_; // 7
        items_[0].fd = 0;
        items_[0].events = ZMQ_POLLIN;
        items_[0].revents = 0;
    }

    bool isAlive() // 8
    {
        return busy_ > 0 && lifespan_ > 0;
    }

    zmq::Frames recv() // 9
    {
        --lifespan_;

        zmq::Frames res;
        if(zmq::poll(items_, 1, INTERVAL * 1000) > 0)
        {
            busy_ = BUSY;

            if(items_[0].revents & ZMQ_POLLIN)
                res = backend_.blockingRecv();
            items_[0].revents = 0;

            return res;
        }
        else
            --busy_;

        return res; // no message
    }

    void heartbeat() // 10
    {
        if(wid_.empty())
            return;

        // if it's time, send heartbeat
        if(bp::microsec_clock::local_time() > heartbeat_)
        {
            std::string control(&PHB_HEARTBEAT, &PHB_HEARTBEAT + 1);
            backend_.send(wid_, control);
            heartbeat_ = bp::microsec_clock::local_time() + bp::millisec(BASE_INTERVAL);
        }

        // if the worker is expired, remove its id 
        if(expiry_ < bp::microsec_clock::local_time())
            wid_ = "";
    }

    void pushWorker(const std::string& id) // 11
    {
        wid_ = id;
    }

    void refreshWorker(const std::string& id)
    {
        if(wid_ == id)
            expiry_ = bp::microsec_clock::local_time() + bp::millisec(CHECK_FACTOR * BASE_INTERVAL);
    }

    void dropWorker(const std::string& id)
    {
        if(wid_ == id)
            wid_ = "";
    }
};
1. The worker id pending on the server. In this minimal implementation we can have only one worker. If no worker is available, the string is empty.
2. Keep track of when the worker identified by (1) is going to expire. The namespace bp is a synonym for boost::posix_time.
3. Expected server lifespan.
4. If the server is idle for a while, this counter goes to zero, and then we signal that the server should shutdown.
5. When the server should send the next heartbeat.
6. The sockets in this app have a troubled life, it often happens that one sends a message to the other but that one is already gone. Besides, the worker has a custom identity, so it is marked as persistent. To avoid the server socket hanging on termination, it is better to specify that we don't want it linger. This function calls zmq_setsockopt() for the ZMQ_LINGER option on the underlying socket.
7. Initialize the array of zmq::pollitem_t to poll on the socket.
8. To be considered alive, the server should both be in the time frame defined by the user and busy.
9. Each time recv() is called, the server life expectancy decreases. If there is actually anything on the socket, the busy counter is restored to its maximum value, otherwise it decreases too.
10. An heartbeat is sent to the pending worker only if a worker is actually there, and it is actually time to send it. After sending it (if required) the worker expiration time is checked, to see if it is time to remove it from the server.
11. In this trivial implementation, pushing, refreshing, dropping a worker on the server is very easy. We have just one (or none) of them, so we just have to ensure nothing unexpected (bad worker id) happens.

The full C++ code for the application is on github.

Go to the full post

Paranoid Pirate Heartbeating

Measuring a paranoid pirate heartbeating is not for the faint of heart. I guess that that is true in any case, but here I talking about the ZeroMQ Paranoid Pirate Protocol.

As they state in the ZGuide, talking about their implementation for that protocol based on czmq, the high level C wrapper to 0MQ, "Heartbeating is simple once it works, but quite difficult to invent." For this reason I though it was interesting seeing first how heartbeating could work, and then integrating it in a paranoid pirate implementation.

I wrote the code for ZeroMQ version 2.2, developed on Windows + MSVC2010, using the standard 0MQ C++ wrapper with my zmq::Socket class that adds some features to the original zmq::socket_t. See previous posts, like this one, for details.

Since I am interested only in heartbeating, I have designed the application to be as simple as I could think, to the point of looking quite meaningless.

We have a server (without clients) with one or no associated worker. The server has a ROUTER socket, and the worker a DEALER one. They exchange only status information, no real payload.

When a worker signals to the server that it is alive, the server stores its id, and then it sends to the worker an heartbeat to signal it that it is still alive. The same from the worker, it sends to the server a heartbeat till it shutdowns.

If the server stops getting heartbeats from the worker, it simply remove its id, and doesn't care about it anymore.

The worker is more caring. It retries a few time the check on the server heartbeat, and only when it has lost any hope, it considers the server lost, wait for a while, an then create a new socket, trying to establish a new connection.

Even with this minimal requirement, the resulting code is not immediate. For this reason I split the discussion in a few posts. After this introduction you could read about the router server and the dealer worker in the next two posts.

The main thread in this heartbeat testing application runs a couple of test cases:
boost::thread_group threads;
threads.create_thread(std::bind(server, INT_MAX)); // 1
threads.create_thread(std::bind(worker, 'A', 6)); // 2
threads.join_all();
1. The server() function, expects in input an int parameter, representing how many iteration we want to run. Here I specify the largest int available, as defined in limits.h, meaning that I want to run it (almost) forever.
2. The worker() needs to know which character should be used as seed for the worker id, the second one is the number of heartbeat that the worker is going to send to the server before shutting down.

The result of this test case should be that the server should stay up till the workers sends all its heartbeats, then its is going to wait a bit more idle, before shutting down. The worker id should be "A".

Second test case:
boost::thread_group threads;
threads.create_thread(std::bind(server, 3)); // 1
threads.create_thread(std::bind(worker, 'B', 7)); // 2

boost::this_thread::sleep(boost::posix_time::seconds(10)); // 3
threads.create_thread(std::bind(server, INT_MAX)); // 4
threads.join_all();
1. This server is going to be short lived.
2. Same worker as before.
3. Ensure enough time passes before restarting the server.
4. An almost-forever server is started.

Here we expect that worker seeing the server going offline, restarting and completing its job. The worker id should swap from "A" to "AA".

The full C++ code for the complete example is on github.

Go to the full post

Sending and receiving a single byte

After the major change of using deque as container for multipart messages, I have done a minor change in my zmq::Socket class, that extends the standard ZeroMQ C++ wrapper provided with version 2 of the package.

I was reading the Paranoid Pirate Protocol when I saw that it expects the ready and heartbeat commands to be sent as single bytes. So I decided to add a couple of methods in my class to explicitly manage this case. It is not a strict necessity, but they surely make the code more readable.

Here are the two new methods:
bool send(uint8_t value, int flags =0) // 1
{
    zmq::message_t msg(sizeof(uint8_t));
    memcpy(msg.data(), &value, sizeof(uint8_t));
    return socket_t::send(msg, flags);
}

uint8_t recvAsByte(int flags =0)
{
    zmq::message_t message;
    if(!socket_t::recv(&message, flags))
        throw error_t();

    return *(static_cast<uint8_t*>(message.data()));
}
uint8_t is defined in stdint.h as unsigned char, assuming in your environment byte has 8 bits, as almost everywhere happens. Using this typedef should make clearer that we are interested in a tiny number and not in a character.

I have pushed on github the new file version.

Go to the full post

ZeroMQ Multipart message as deque

I originally designed my zmq::Socket class extending the zmq::socket_t class, as defined in the standard C++ wrapper included in the ØMQ 2.x distribution, to provide a more intuitive multipart message management, basing it on a std::vector container.

This is fine for building up multipart messages on receive, but it gets clumsy when we want to modify an existing multipart message, typically to put an envelope around it, or to strip that envelope out.

For this kind of manipulation I think is much better using std::deque.

The previous changes to the this class, till the latest introduction of a couple of methods to send and receive ints, were not dramatic ones. But this redesign breaks the original interface, a deque does not provide a reserve() method as the vector does, and I often used it in my code when creating a multipart message. Besides, I have also changed the blockingRecv() signature, since I found out that delegating to that method the check of the actual number of frames in a multipart message didn't add much value to the resulting code. On the contrary, it made it a bit less intuitive.

For this reason, I left on github the old include file, and I created a new one, zmq2a.h, where the deque version of multipart messages is defined and used.

Again, here is the change applied:

- zmq::Frames, type used for multipart messages, now is a typedef for a deque of strings.
- blockingRecv() now does not have any input parameter, it still throws an zmq::error_t exception, but only in case of failure receiving a frame in the message.

Go to the full post

Improved LRU with basic reliable queuing

We can apply the just seen lazy pirate pattern (LPP) for the LRU Queue Broker. In this way we make it more reliable inpacting only on the client side of the application.

This approach is called in the ZGuide simple pirate pattern.

What we had, a queue of least recently used workers, managed by a ROUTER to ROUTER 0MQ broker that gets in input requests coming from clients, send them to workers (accordingly to their availability), that do the job, send the result back, through their REQ socket, to the broker, that sends it back to the original client, is matched to the previously seen LPP client.

We have alredy seen about all the code, the only issue is matching it. The full example C++ code is on github, here are just the startup lines:
boost::thread_group threads; // 1
threads.create_thread(std::bind(client, 2500,  3)); //2
threads.create_thread(lruQueue); // 3
threads.create_thread(worker);

boost::this_thread::sleep(boost::posix_time::seconds(20)); // 4
dumpId("---");
threads.create_thread(worker);
threads.create_thread(std::bind(client, 2500,  3));

threads.join_all();
1. To simplify the testing, I have put all the components in the same process, each one running in its own thread. This is not realistic, but it is not an issue to refactor the example to run as a multiprocess application.
2. The function client() is the one we have seen in the LPP, see previous post for details.
3. The lruQueue() and worker() components come from the LRU example, the worker has been modified to perform restlessly.
4. Actually, the sample worker is designed to simulate crash after a few seconds, and the client would shutdown if it won't get any feedback. On the other side, the broker is designed to run forever (but it is not protected against interrupts). Here we wait a while, to be sure that the worker crashes, and the client shuts itself down. Then we create another worker and client, and we can check how the system springs back to work.

The code shown on the ZGuide is written for the high level czmq c-binding. This porting could be interesting to you, if you want to see an example of how to do the same in C++. I have used the official C++ wrapper provided for ZeroMQ 2.x, adapted with my extension to zmq::socket_t, called zmq::Socket, that provides some better suppport to multipart and int messages.

Go to the full post

Client side reliable REQ-REP

We can't assume any software being 100% reliable - actually, we can't assume that for anything in the world - but the ZeroMQ connection are designed to be fast more than reliable, so we usually want to add some fallback mechanism to avoid the more common issues.

In the ZGuide are shown a few patterns we could implement to go in the direction of an improved reliability. The lazy pirate pattern, as they call it, is the first of them. It is very simple, but it could be enough. Here I rewrite that example for C++ and using zmq::Socket, my subclass for the zmq::socket_t, as defined in the original 0MQ version 2.x C++ binding.

The point is that the REQ client sends (along with the payload, not shown in the example) a sequence number. The server replies sending it back to the client (usually along with some feedback, again, not in this example), so that the client could check if anything worked fine. If the client gets no confirmation, we assume the message we sent was lost, so we resend it. That is a bit more complicated than saying it, since the REQ-REP protocol is strictly synchronous. Sending more than one REQ before getting a REP results in an EFSM error. An easy, brutal but effective way to overcome this issue, is closing and reopening the socket before resending the message.

To keep testing easy, I put both client and server in the same process, this is the procedure in the main thread that spawns the two threads:
boost::thread_group threads;
threads.create_thread(std::bind(client, timeout, retries)); // 1
threads.create_thread(server); // 2
threads.join_all();
1. This starts the client, passing to it a couple of ints, timeout, in millisecs, and the number of retries that the client should do before assuming that the server is down. The balance between these number has to be chosen carefully, being very sensitive for giving a good behaving system. In this example I used 2500 and 3, as suggested by the ZGuide, then I played around with them to see what happened. I'd suggest you to do the same.
2. It starts the server.

A few constants used by the code:
const char* SK_ADDR_CLI = "tcp://localhost:5555"; // 1
const char* SK_ADDR_SRV = "tcp://*:5555"; // 2

const int BASE_TIMEOUT = 1000; // 3
1. The client socket connects to this address.
2. The server socket bound address.
3. I set the standard timeout for the application to 1 second (here is in millis, as you would have correctly guessed).

Server

It is very simple. Even simpler than the original one:
void server()
{
    dumpId("Starting server"); // 1

    zmq::context_t context(1);
    zmq::Socket skServer(context, ZMQ_REP);
    skServer.bind(SK_ADDR_SRV);

    for(int i = 1; i < 10; ++i) // 2
    {
        int message = skServer.recvAsInt(); // 3

        if(i % 4 == 0) // 4
        {
            dumpId("CPU overload");
            boost::this_thread::sleep(boost::posix_time::millisec(2 * BASE_TIMEOUT));
        }
        else
            dumpId("Normal request", message);

        boost::this_thread::sleep(boost::posix_time::millisec(BASE_TIMEOUT));
        skServer.send(message);
    }

    dumpId("Terminating, as for a server crash");
}
1. I paid the simplification of running both client and server in the same process, being forced to care about concurrent issues on std::cout. The dumpId() function uses a lock on a mutex to avoid that printing gets garbled by competing threads.
2. This toy server just recv/send a bunch of times before terminate simulating a crash.
3. Socket::recvAsInt() receives on the socket and converts the result in an int, that is returned to the caller.
4. After a while, a CPU overload is simulated, sleeping for a while.

The Lazy Pirate Client

To keep the client code a bit more readable, I have decided to use a utility class tailored on the example need. Actually I should say I am not completely happy about it, but I should have modified the original zmq::socket_t to get a more satisfactory result, but I am not sure that it would be a smart move in this moment. Maybe in the future.

Anyway, here is my current result:
class LazyPirateClient
{
private:
    zmq::context_t& context_;
    std::unique_ptr<zmq::Socket> sk_; // 1

    int sent_; // 2

    void reset() // 3
    {
        sk_.reset(new zmq::Socket(context_, ZMQ_REQ));
        sk_->connect(SK_ADDR_CLI);

        int linger = 0;
        sk_->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
    }
public:
    LazyPirateClient(zmq::context_t& context) : context_(context), sent_(-1) // 4
    {
        reset();
    }

    bool checkedRecv() // 5
    {
        int value = sk_->recvAsInt();
        if(value == sent_)
            return true;

        dumpId("Unexpected reply from server", value);
        return false;
    }

    zmq::Socket* operator()() { return sk_.get(); } // 6

    void send(int value) // 7
    {
        sk_->send(value);
        sent_ = value;
    }

    void resend() // 8
    {
        reset();
        send(sent_);
    }
};
1. Given the limitation of the original zmq::socket_t class, where the underlying raw socket pointer is stored in its private section, I had to think some smart alternative solution to close and reopen the lazy pirate underlying socket.
2. Internal flag, keeping memory of the last sequence number sent to the server, so that we can compare the received result.
3. Create a new REQ socket, and establish a connection to the REP server socket. The linger flag is set to zero, meaning that we want pending messages to be immediately discarded.
4. Ctor, rely on the above defined reset().
5. We don't care much of what we get back from the server, besides checking if the sequence number matches. This method does just that. It uses the Socket::recvAsInt() utility function that extract an int from the received message.
6. Utility function to get the underlying pointer to zmq::Socket
7. It sends the value (as an int, thanks to an explicit overload of send() defined in zmq::Socket), and store it so that it could be checked by (5).
8. As explained before, we can't simply resend a message, we would get an EFSM error. So we reset the socket, deleting the old one and creating a new one, before sending again the last value.

Client

This is the function on which runs the client thread:
void client(int timeout, int retries)
{
    dumpId("Starting client");

    zmq::context_t context(1);
    LazyPirateClient skClient(context);

    for(int sequence = 1; sequence < 100; ++sequence) // 1
    {
        skClient.send(sequence); // 2
        boost::this_thread::sleep(boost::posix_time::millisec(BASE_TIMEOUT));

        bool confirmed = false;
        zmq::pollitem_t items[] = { { *skClient(), 0, ZMQ_POLLIN, 0 } }; // 3
        for(int cycle = 0; cycle < retries; ++cycle)
        {
            zmq::poll(&items[0], 1, timeout * 1000); // 4

            if(items[0].revents & ZMQ_POLLIN) // 5
            {
                if(skClient.checkedRecv()) // 6
                {
                    dumpId("Server is synchronized", sequence);
                    confirmed = true;
                    break;
                }
            }
            else // 7
            {
                dumpId("Retrying", cycle);
                skClient.resend();
                items[0].socket = *skClient(); // 8
            }
        }
        if(!confirmed) // 9
        {
            dumpId("No answer from server, abandoning.");
            break;
        }
    }
}
1. Loop for some time. Actually, when the server is down, the client would automatically shutdown, and this is what is going to happen almost immediately.
2. Send the sequence number to the server. Usually we want to send more interesting stuff, too.
3. Prepare to poll on the client socket for input messages.
4. Wait for the period specified by the user.
5. We actually have something in input.
6. The sequence check succeeded, give the user a feedback and go on.
7. Nothing received in the interval, whatever happened, we simply resend the last message.
8. Remember that we want to poll on the new socket.
9. No confirmation from the server received, the client assumes that something very bad happened out there, and shutdown.

Full C++ source code is on github.

Go to the full post

Sending/receiving ints over ZeroMQ

I often send an integer as a socket message. It is not a problem to convert them to and fro a std::string, still it makes the resulting code a bit confusing, so I decided to add a send() overload and a new recvAsInt() function to my zmq::Socket class.

This is the new send() function:
bool send(int value, int flags =0)
{
    zmq::message_t msg(sizeof(int));
    memcpy(msg.data(), &value, sizeof(int));
    return socket_t::send(msg, flags);
}
A new specialized function to receive a message and format it as an int:
int recvAsInt(int flags =0)
{
    zmq::message_t message;
    if(!socket_t::recv(&message, flags))
        throw error_t();

    return *(static_cast<int*>(message.data()));
}
Even the mild EAGAIN error leads to an exception. Admittedly, this is not good.

Here is how to use this functions:
// ...
skRequest.send(42);

// ...
int value = skReply.recvAsInt();
We can also have a multipart message containing ints:
skRequest.send(42, ZMQ_SNDMORE);
skRequest.send(12);

//    ...

int val1 = skReply.recvAsInt(ZMQ_RCVMORE);
int val2 = skReply.recvAsInt();
There is no type check on the data, so we can write weird code like this:
// ...
skRequest.send("hello");

// ...
int value = skReply.recvAsInt();
That runs fine, but probably gives a surprising result to the user.

The zmq::Socket new version is on github.

Go to the full post

Router to router among peers

We have seen a couple of ways to let the ZeroMQ router to router pattern work in a client/server setting, with clients id predefined, or a predetermined server id.

But we can't use these approaches in a peer to peer context. All elements are both client and server at the same time, so we need to have all the identities defined in advance.

This example is going to be very simple, we want to create a peer to peer network where each element sends an hallo message to all its peers, then receives all the messages posted to it, and finally terminates.

Firstly, let's define a few constants:
const char* SKA_NODE_SRV[] = {"tcp://*:5380", "tcp://*:5381", "tcp://*:5382" }; // 1
const char* SKA_NODE_CLI[] = {"tcp://localhost:5380", "tcp://localhost:5381", "tcp://localhost:5382" };

const char* NODE_IDS[] = { "N0", "N1", "N2" }; // 2
enum NodeId { N0 = 0, N1, N2, N_NODES = N2 + 1 };
1. Each node has a couple of ROUTERS socket, the server has one of the addresses specified in the SKA_NODE_SRV array, the client connects to the peers' servers by the addresses specified in SKA_NODE_CLI.
2. We can use the same id for both routers in the same node, since there is not risk of overlapping.

In this example, all nodes run in the same process, this is not realistic, but it makes testing easier. It shouldn't be an issue for you to redesign this code to run as a 0MQ multiprocess application. In any case, the main thread creates a few threads, one for each node:
boost::thread_group threads;
for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    threads.create_thread(std::bind(node, id));
threads.join_all();

This is the code run by each peer:
void node(NodeId nid)
{
    zmq::context_t context(1);

    dumpId(NODE_IDS[nid], SKA_NODE_SRV[nid]);
    zmq::Socket skServer(context, ZMQ_ROUTER, NODE_IDS[nid]); // 1
    skServer.bind(SKA_NODE_SRV[nid]);

    dumpId("client router", NODE_IDS[nid]);
    zmq::Socket skClient(context, ZMQ_ROUTER, NODE_IDS[nid]); // 2
    for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    {
        if(id == nid)
            continue;

        skClient.connect(SKA_NODE_CLI[id]); // 3
        dumpId("client connected to", SKA_NODE_CLI[id]);
    }

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 4

    for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    {
        if(id == nid)
            continue;

        dumpId("sending a message to", NODE_IDS[id]);
        skClient.send(NODE_IDS[id], "hey"); // 5
    }

    zmq_pollitem_t servers[] = { { skServer, 0, ZMQ_POLLIN, 0 } };
    while(zmq_poll(servers, 1, 1000 * 1000) > 0) // 6
    {
        zmq::Frames frames;
        if(servers[0].revents & ZMQ_POLLIN)
        {
            dumpId("Receiving from peers");

            frames = skServer.blockingRecv(2); // 7
            dumpId(frames[0].c_str(), frames[1].c_str());

            servers[0].revents = 0; // 8
        }
    }
    dumpId(NODE_IDS[nid], "done");
}
1. A ZeroMQ server ROUTER socket with the specified id.
2. A ZeroMQ client ROUTER socket with the same id.
3. Set a connection to all the other nodes.
4. Give time to all the nodes to be up before start sending messages around.
5. Send a message to each peer through the client router socket.
6. Poll on the server socket. If no message gets available in its input queue for a second, it is assumed that there is nothing more to do.
7. Receive a two-frames message. First frame is the sender id, second frame is the payload.
8. Reset the flag for the next iteration.

Full C++ code for this example is on github.

Go to the full post

Another router to router example

This is just a variation on the sample shown in the previous post. I am writing here a simple ZeroMQ router to router application where it is the server router having a predetermined identity.

The main thread spawns the client and server threads:
const char* server = "Server"; // 1
const int nClients = 3;

boost::thread_group threads;
for(int i =0; i < nClients; ++i)
    threads.create_thread(std::bind(rrClientS, server)); // 2

threads.create_thread(std::bind(rrServerS, server, nClients)); // 3
threads.join_all();
1. This is going to be the server socket identity.
2. Each client has to know which is the server id.
3. Pass the socket id and the number of clients to the server function.

Each client runs on this function:
void rrClientS(const char* server)
{
    dumpId("client startup");
    zmq::context_t context(1);

    std::string tid = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 1
    zmq::Socket skRouter(context, ZMQ_ROUTER, tid);
    skRouter.connect(SK_ADDR_CLI); // 2
    dumpId(SK_ADDR_CLI, server);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3
    skRouter.send(server, ""); // 4

    zmq::Frames frames = skRouter.blockingRecv(2); // 5
    dumpId("client shutdown");
}
1. There is no requirement for the client socket id, I use the thread id as a way to make the output more interesting.
2. Each client socket connects to the router server socket.
3. As in the previous example, but on the other way round. Now are the clients that have to wait till the server socket is ready before starting send messages. Waiting for a while is a simple but unreliable way to accomplish this requirement. For production code we should think about something more sensible.
4. Sending an "hello" message to the server. Notice that it is a two-framed one, first frame is the recipient id, second one is the payload. Actually here we need to let the server know the client id, so we can even send an empty payload.
5. Getting an answer from the server. We don't care much about it here, but it shows that the server can really send messages to the clients, once it gets their id.

And this is the server:
void rrServerS(const char* sid, int nClients)
{
    dumpId("server startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER, sid); // 1
    skRouter.bind(SK_ADDR_SRV);

    dumpId("server ready on", SK_ADDR_SRV);

    std::vector<std::string> clients; // 2
    clients.reserve(nClients);
    for(int i =0; i < nClients; ++i)
    {
        zmq::Frames in = skRouter.blockingRecv(2);
        dumpId("receiving from", in[0].c_str());
        clients.push_back(in[0]); // 3
    }

    std::for_each(clients.begin(), clients.end(), [&skRouter](std::string& client)
    {
        skRouter.send(client, ""); // 4
    });
    dumpId("server shutdown");
}
1. Create a ROUTER socket and set its id as specified.
2. To send messages to the clients, we need their id, we store them in this container.
3. We were interested just in the first frame, the client id, so that we can use it below to send it a message.
4. We send a message with an empty payload to each client, just to prove that we can.

Full C++ source code for this example and the one in the previous post is on github.

Go to the full post

Router to router

Using the ZeroMQ router to router socket combination is kind of tricky. I guess that ZMQ_ROUTER socket has been designed thinking to the request-router messaging pattern, where the router is playing as server, and where each client sends an initial message to the server through its REQ socket, identifying itself, so that the router can use that identity when sending its reply.

Implementing the REQ-ROUTER pattern is straightforward. REQ sends its identity to ROUTER, ROUTER uses that identity as part of its reply to REQ.

I had to think a bit more to find out how to write a router to router example.

I have router server and a configurable number of router clients. I want each client sending a single empty message to the server, and then ending. That looks pretty easy, but we should consider that the client needs to know the server identity to send it a message. It is not enough knowing its tcp address.

We risk to fall in an infinite recursion. To send messages we need the recipient identity, but to get the that identity, the should send it to us, but it needs our identity to do that!

A way to get out of this impasse is remembering that a ZeroMQ identity could be explicitly set. So we can avoid a bootstrap paradox predefining the identities to be used by the clients, or the one used by the server.

Now, there are two choices. Here I show a first solution, where the client identities are predefined. In the next post I'll show the case where the server identity is passed around.

The code is written for ZeroMQ 2.2, Windows+MSVC2010, C++11 with STL and Boost, but could easily ported to a different platform.

It is single process 0MQ application. A real application would almost certainly multiprocess, but since I use the tcp protocol for the sockets, it should be easy to refactor this example to work in that way.

By the way, these are the addresses I use for the sockets:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
The main procedure knows about the client identities, and starts the clients and the server:
const char* clients[] = { "ClientA", "ClientB", "ClientC" }; // 1
int nClients = sizeof(clients) / sizeof(const char*);

boost::thread_group threads;
for(int i =0; i < nClients; ++i)
    threads.create_thread(std::bind(rrClientC, clients[i])); // 2

threads.create_thread(std::bind(rrServerC, clients, nClients)); // 3
threads.join_all();
1. We have three clients, and we want them having these identities.
2. Start the clients, each in a new thread, on the rrClientC() function, passing the i-th client identity, that has to be assigned to its socket.
3. Start the server on rrServerC(), passing the client addresses, and the number of clients.

Here is the client code:
void rrClientC(const char* id)
{
    dumpId("client startup"); // 1
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER, id); // 2
    skRouter.connect(SK_ADDR_CLI);
    dumpId(SK_ADDR_CLI, id);

    zmq::Frames input = skRouter.blockingRecv(2); // 3
    dumpId("client received", input[1].c_str());

    skRouter.send(input[0], id); // 4
    dumpId("client shutdown");
}
1. Being in a multithread context, we can't use a shared resource, like std::cout is, without protecting its access by mutex/lock, this is what dumpId() does.
2. Create a ROUTER socket assigning to it the specified identity, then connect it to the server socket.
3. Wait for a multipart message in two frames. We are interested just in the first frame, that contains the server id.
4. Send a two-frames message to the server, the first one is the mandatory recipient id, the second is the payload, here the client id, for testing purpose.

And this is the server:
void rrServerC(const char* ids[], int nClients) // 1
{
    dumpId("server startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 2
    skRouter.bind(SK_ADDR_SRV);

    dumpId("server ready on", SK_ADDR_SRV);
    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3

    for(int i =0; i < nClients; ++i)
    {
        dumpId("server send to", ids[i]);
        skRouter.send(ids[i], "hello"); // 4
    }

    for(int i =0; i < nClients; ++i)
    {
        zmq::Frames in = skRouter.blockingRecv(2); // 5
        dumpId("receiving from", in[1].c_str());
    }
    dumpId("server shutdown");
}
1. The server needs to know id and numbers of the clients.
2. The server socket id is not important, we can keep the one generated by 0MQ.
3. This is a key point. Before start sending messages through the socket, we should ensure all the clients are already connected. Here I just wait for a while, giving time to the clients to connect. For production code we should think to a most robust solution.
4. Send an "hello" message to each client. First frame is the id of the recipient client.
5. Receive a message from each client. Second frame is the payload.

The complete C++ source code for this and for the next post example is on github.

Go to the full post

Improved sending for zmq::Socket

Minor change in zmq::Socket, my C++ zmq::socket_t subclass for ZeroMQ version 2. Now it is possible to send a multipart message, up to three frames, calling a send() overload that expects in input the frames as raw C-strings or STL string.

Before the change, if we want to send more frames in a single call, we had to go through a vector, like this:
zmq::Frames frames;
frames.reserve(3);
frames.push_back(id);
frames.push_back("");
frames.push_back(payload);
socket_.send(frames);
It was boring to write code like this, and the result was not very readable.

Now we can get the same result in a single line:
socket_.send(id, "", payload);
It is not a big change in the code. I changed slightly the original send() function, and moved it in the private class section:
bool send(const char* frame, size_t len, int flags =0)
{
    zmq::message_t msg(len);
    memcpy(msg.data(), frame, len);
    return socket_t::send(msg, flags);
}
Some more information on zmq::Socket is available in a previous post.

The main change at this level is in the fact that the message length is now passed as an input parameter.

In the public interface, the one-frame-at-the-time functions become:
bool send(const std::string& frame, int flags =0)
{
    return send(frame.c_str(), frame.length(), flags);
}

bool send(const char* frame, int flags =0)
{
    return send(frame, strlen(frame), flags);
}
If we pass a STL string, its size is already known, cached in the object, so we use it. Otherwise it is calculated by a call to strlen(). When we use these overload we should explicitly say if we consider the current frame as part (and not the last one) of a multipart message.

The two- and three-part messages could be now be sent with a single call. Let's see the three-part by c-string parameter overload:
bool send(const char* frame1, const char* frame2, const char* frame3)
{
    if(!send(frame1, ZMQ_SNDMORE))
        return false;
    if(!send(frame2, ZMQ_SNDMORE))
        return false;
    // last frame
    return send(frame3);
}
The first two frames are sent with the SNDMORE flag, going through above descripted single-frame overload, the last one is terminating the sequence.

The improved include file is on github.

Go to the full post

A second ROUTER-DEALER example

In the previous post we have seen a ZeroMQ ROUTER-DEALER application where the ROUTERs act as servers. We can easily modify it to have the DEALER acting as servers instead.

Now the routers are clients, so they connect to all the DEALER sockets available and then pend on the socket to get its share of messages. They are a stored in a temporary vector, and then they are are send back to the dealer. Before sending messages, the dealers should have been connected by the routers. A simple way to achieve this, is let the dealer threads to sleep for a while before start to send.

The routers and dealers are started in this way:
boost::thread_group threads;
for(int i = 0; i < nRouters; ++i) // 1
    threads.create_thread(std::bind(router2dealer, nDealers));
for(int i = 0; i < nDealers; ++i) // 2
    threads.create_thread(std::bind(dealer4router, nRouters, i));
threads.join_all();
1. There should be at least on router, the upper limit is determined by you system. Each client thread runs an instance of router2dealer().
2. I use the same definition for socket address as in the previous example, so we can have just one or two dealers. Each server thread runs on dealer4router().

This is the function for each client thread:
void router2dealer(int n) // 1
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skRouter(context, ZMQ_ROUTER, id);
    for(int i =0; i < n; ++i)
    {
        skRouter.connect(SKA_CLI[i]); // 3
        dumpId("ROUTER CLI on", SKA_CLI[i]);
    }

    std::vector<zmq::Frames> messages;
    messages.reserve(n); // 4

    for(int i =0; i < n; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2);
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 5
    {
        skRouter.send(frames);
    });
}
1. Each client connects to all the n servers, it receives n messages, one from each server.
2. It is handy to specify the socket id for testing purpose, the thread id looks to me a good choice.
3. Each client binds to all the servers.
4. We already know how may messages we are going to receive, one for each server, we are going to store them in the messages STL vector.
5. A router could act as an asynchronous client, and we see it here. Firstly we have received all the expected messages, now we are sending them back, each to its own original sender.

Each server thread runs on this function:
void dealer4router(int n, int index) // 1
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id);
    skDealer.bind(SKA_SRV[index]);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2
    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index);
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i)
    {
        std::string in = skDealer.recvAsString(); // 3
        dumpId(in.c_str());
    }
}
1. Each dealer knows the number of clients, so to send a message to each of them, and its own id, to bind to the right socket address.
2. For such a simple example, waiting a while before start sending messages, it is enough as a way to ensure that all the expected client are up.
3. The dealer servers too act asynchronously. Firstly sending all the messages to the clients, and now receiving the feedback. Notice that the dealer is sending a mono-frame message, 0MQ prepend the dealer id to the message, transforming it in a two-frame message.

Go to the full post

A first ROUTER-DEALER example

Both ROUTER and DEALER ZeroMQ sockets are designed to work asynchronously. Accordingly to the current design requirements, we can use one or the other as server. Here we see the case where the ROUTER sockets are all servers, in the next post we'll see how to write a simple 0MQ application where the DEALER sockets play in the server role.

Same background of the previous posts, the reference platform is Windows, Visual C++2010, C++, ØMQ 2.2 through the standard ZeroMQ C++ wrapper, plus my zmq::Socket class that you can see on github. There is nothing complicated in the code, so I guess it could be easily refactored to work on any environment supported by 0MQ with a limited effort.

The application is designed to to such a pointless job: a few DEALER clients create one message for each ROUTER available, and then send them all. Then each of them gets a bunch of reply on the socket, and print them to let the user see the result. (Not) surprisingly, each client gets back all and only the messages that it sent to the servers.

You can play around changing the number of dealer and routers. In my example I can have just one or two routers, but you can have how many dealer as you want, and your system allows.

The application in developed to run on a single process, and many threads, to keep testing easy, and this is the code that spawns all the client and server threads:
boost::thread_group threads;
for(int i = 0; i < nDealers; ++i) // 1
    threads.create_thread(std::bind(dealer2router, nRouters, i));
for(int i = 0; i < nRouters; ++i) // 2
    threads.create_thread(std::bind(router4dealer, nDealers, i));
threads.join_all();
1. The number of dealers could range from 1 to ... a big number. The actual limit depends on your working environment. Each thread runs the dealer2router(), that asks for the number of routers, and an id for creating a different message for each client.
2. The number of routers is more definite, since we should specify the actual socket address for each of them. Being lazy, here we can have just one or two routers. But it is easy to increase that number. Each server runs on router4dealer(), function that needs the number of expected messages (one for each dealer) and the router id (so that we can peek up its correct socket address).

As I said, only two server maximum are available:
const char* SKA_CLI[] = {"tcp://localhost:5380", "tcp://localhost:5381" };
const char* SKA_SRV[] = {"tcp://*:5380", "tcp://*:5381" };
Extends these arrays if you want more routers. In the following code, you are going to see a call to a utility function named dumpId(), for which I don't show the source code. It just print to std::cout, locking to protect this shared resource by concurrent accesses.

Each client thread runs this function:
void dealer2router(int n, int index)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id); // 1
    for(int i =0; i < n; ++i)
    {
        skDealer.connect(SKA_CLI[i]); // 2
        dumpId("DEALER CLI on", SKA_CLI[i]);
    }

    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index); // 3
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i) // 4
    {
        std::string in = skDealer.recvAsString();
        dumpId(in.c_str());
    }
}
1. For testing purpose, I set each dealer id to its thread id, so that the output looks easier to be checked.
2. The parameter "n" passed to the function represents the number of routers to which each dealer could connect. Here we are actually connecting to each of them.
3. The parameter "n" represents also the number of messages that each dealer sends, being that one for each available router. The message sent is build in a bit cryptic way, its length is increased at each iteration, starting with one, and it contains a repetition of the same character, based on the loop iteration and the "index" parameter passed to the function to identify the current dealer. The sense of this messy setup is making the output recognizable during the testing.
4. Here we see the dealer's asynchronicity. Firstly we send all the messages, than we receive them all.

The dealer is sending out a multipart message composed by two frames: its address, that I explicitly set to the thread id in (1), and the payload. The address will be used by the router to identify which dealer has to get a reply, and will be consumed by 0MQ. That's way in (4) we should receive just the payload.

Each server runs this code on its own thread:
void router4dealer(int nMsg, int id)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER);
    skRouter.bind(SKA_SRV[id]); // 1
    dumpId("ROUTER SRV on", SKA_SRV[id]);

    std::vector<zmq::Frames> messages;
    messages.reserve(nMsg); // 2

    for(int i =0; i < nMsg; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2); // 3
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 4
    {
        skRouter.send(frames);
    });

    dumpId("ROUTER done");
}
1. The id passed to the function is used to select the right socket address for the current router. You should carefully check it to avoid out of range values, that should drive this puny application to a miserable crash.
2. We already know how many messages this router is going to receive, so we reserve enough room for them in the vector where we are going to temporary store them.
3. Only one message is expected from each dealer in the application, and each of them is composed by two frames, the dealer address, and the message payload.
4. The routers are asynchronous too, and we see that at work here. Firstly we have received all the messages from the dealers, now we are sending them back. I am using an STL for_each algorithm that iterates on each element of the vector where the messages have been stored. For each message, a C++11 lambda function is called that makes use of the socket, accessed by reference, and accepts in input the current element of the vector, again by reference. In the lambda body we just send the message back through the socket.

Go to the full post

Basic DEALER-REP example

I stripped out every possible detail and wrote a minimal ZeroMQ that uses a DEALER-REPLY socket combination. It should be useful to play around with it to see how this pattern works on its own, before combining it in a more complex structure.

The code is based on ØMQ 2.2 for C++, using my zmq::socket_t extension that you can find in an include file on github). The target platform is Windows + Visual C++ 2010. Some STL, Boost, and C++11 features have been used.

The dealer thread creates a few multipart messages, sends them through the socket, and wait for their echo. Each reply thread would get its part of that messages, and send them back to the socket.

There are a few interesting point to notice.

The dealer sends multipart messages, composed by three frames, representing the sender address, a separator, and the actual payload. The reply socket gets just the payload, it has no access to the address (an nor to the separator).

The dealer performs a fair load balancing among the REP sockets connected to it, so we expect each client to get the same share of messages. To let the example run as expected, we should ensure that all the clients are connected to the server before the messages begin to be sent on the socket, otherwise only the already connected ones would get their share.

The clients and server are created by this code:
boost::thread_group threads;
for(int i = 0; i < nRep; ++i) // 1
    threads.create_thread(std::bind(rep2dealer, nMsg)); // 2
threads.create_thread(std::bind(dealer, nRep, nMsg)); // 3
threads.join_all();
1. nRep is the number of REP clients that we want the application to have. You should ensure that it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REP client runs its own copy of the rep2dealer() function, see below. It expects in input the number of messages that each client should receive. Same consideration as seen in (1) applies.
3. The server runs on the dealer() function. It needs to know how many clients to expect, and how many messages to send to each of them.

The socket addresses are:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
I am not showing here the code for the utility function dumpId(), it just print to std::cout, locking to protect this shared resource by concurrent accesses.

Here is the function executed by each client:
void rep2dealer(int nMsg)
{
    dumpId("REP startup");
    zmq::context_t context(1);
    
    zmq::Socket skRep(context, ZMQ_REP); // 1
    skRep.connect(SK_ADDR_CLI);
    dumpId("REP CLI on", SK_ADDR_CLI);

    for(int i =0; i < nMsg; ++i) // 2
    {
        std::string msg = skRep.recvAsString();
        dumpId("REP for", msg.c_str());
        skRep.send(msg);
    }
}
1. Reply socket, connected to the DEALER one that will see in the server.
2. On a reply socket we synchronously receive and send messages. Here we loop to do that for a (single-part) message for the number of expected messages for each client. If the balance done by the dealer wouldn't be fair, or for any reason a client would receive less messages than expected, it would hang trying to get more stuff.

And this is the server:
void dealer(int nRep, int nMsg)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    zmq::Socket skDealer(context, ZMQ_DEALER); // 1
    skDealer.bind(SK_ADDR_SRV);
    dumpId("DEALER SRV on", SK_ADDR_SRV);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2

    zmq::Frames frames; // 3
    frames.reserve(3);
    frames.push_back(boost::lexical_cast<std::string>(boost::this_thread::get_id()));
    frames.push_back("");
    frames.push_back("");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        frames[2] += 'k'; // 4
        skDealer.send(frames);
    }
    dumpId("all sent");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        zmq::Frames input = skDealer.blockingRecv(3); // 5
        dumpId(input[2].c_str()); // 6
    }

    dumpId("DEALER done");
}
1. This is the DEALER socket accepting the connection from the REP client sockets.
2. Before sending messages, we wait till all the clients are connected, so that each of them would get its own share of messages.
3. Let's prepare the base for the message to be sent. It is a three part message, first frame is the address of the sender, here it is not interesting, but still I set it to the thread id. Second element is empty, acting as a separator. Third element is the payload. It is initialized empty and modified in the sending loop, to give some sensible testing feedback.
4. The first generated message has a single 'k' as a payload, any other time another 'k' is added.
5. Getting the message back from the clients.
6. Dump to standard output the payload.

Go to the full post

A very simple REQ-ROUTER example

This is the simplest ZeroMQ REQ-ROUTER example I could conceive. I have written it for ØMQ 2.2 on Windows for MSVC, using the C++ built-in wrapper, with an extension to manage easier multipart messages (you can find it as an include file on github). I have made use of STL, Boost, and some C++11 support, but nothing impressive.

The application runs on just one process, on which a few client threads are created, each of them using its own REQ socket, all of them connected to the same ROUTER socket running on another thread. The protocol used is tcp, so we can easily port the example to a multiprocess setting.

Each client sends a single message to the server. The server gets all the messages, it knows in advance how many clients are going to connect to it, and then it sends back them all through the sockets.

The code will make all clearer. Here is an excerpt of the main function:
boost::thread_group threads;
for(int i = 0; i < nReq; ++i) // 1
    threads.create_thread(req4router); // 2
threads.create_thread(std::bind(router, nReq)); // 3
threads.join_all();
1. nReq is the number of REQ clients that we want the application to have. You should get it from the user and ensure it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REQ client runs its own copy of the req4router() function, see below.
3. The router runs on the router() function. It needs to know how many clients to expect.

As said, I used the tcp protocol, and this is how I defined the socket addresses:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
Both client and server make use of an utility function named dumpId() that I won't show in this post, please have a look on the previous posts to have an idea how it should work, but I guess you can figure it out by yourself. It just print to std::cout the strings are passed to it. It takes care of locking on a mutex, since the console is a shared resource we need to protect it from concurrent accesses. Besides it also print the thread id, for debugging purpose.

Here is the function executed by each client:
void req4router()
{
    dumpId("REQ startup");
    zmq::context_t context(1);
    
    zmq::Socket skReq(context, ZMQ_REQ); // 1
    skReq.connect(SK_ADDR_CLI); // 2
    dumpId("REQ CLI on", SK_ADDR_CLI);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    skReq.send(id); // 4
    std::string msg = skReq.recvAsString();
    dumpId(msg.c_str());
}
1. zmq::Socket is my zmq::socket_t subclass.
2. Let's connect this REQ socket as a client to the ROUTER socket, that plays as a server.
3. To check if everything works as expected, we should send a unique message. The thread id looks a good candidate for this job. To convert a Boost thread id to a string we need to explicitly cast it through lexical_cast.
4. Send and receiving on a socket go through specialized methods in zmq::Socket that take care of the dirty job of converting from standard string to ZeroMQ buffers.

And this is the server:
void router(int nReq)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 1
    skRouter.bind(SK_ADDR_SRV);
    dumpId("ROUTER SRV on", SK_ADDR_SRV);

    std::vector<zmq::Frames> msgs; // 2
    for(int i = 0; i < nReq; ++i)
    {
        msgs.push_back(skRouter.blockingRecv(3)); // 3
    }
    dumpId("all received");

    std::for_each(msgs.begin(), msgs.end(), [&skRouter](zmq::Frames& frames)
    {
        skRouter.send(frames); // 4
    });

    dumpId("ROUTER done");
}
1. This is the ROUTER socket accepting the connection from the REQ client sockets.
2. Frames is actually just a typedef for a vector of strings. In msgs we store all the messages coming from the clients.
3. zmq::Socket::blockingRecv() gets the specified number of frames (three, in this case) from the socket, and returns them as a zmq::Frames object. That object is pushed in the buffer vector. The first frame contains the address of the sender, the second one is just a separator, and the third is the actual payload.
4. Just echo back to each client the message it has sent. The lambda function passed to for_each() needs to know what skRouter is, that is way it is specified in the capture clause, as parameter.

Go to the full post

Monitoring broker state with PUB-SUB sockets

This is the first step in describing a fairly complex ZeroMQ sample application based on the example named Interbroker Routing in the ZGuide. We want to create a few interconnected brokers, so that we could exchange jobs among them. In order to do that, each broker should be able to signal to its peers when it has workers available for external jobs. We are going to do that using PUB-SUB socket connections.

Each broker as a PUB socket that would publish to all the other brokers its status, and a SUB socket that would get such information from the others.

My prototype implementation differs from the original ZGuide code for a few ways.

- It is written for Windows+MSVC, so I couldn't use the inproc protocol, that is not available on that platform. I have used tcp instead.
- I ported it from the czmq binding to the standard 0MQ 2.x light-weight C++ one, improved by a zmq::Socket class that provides support for multipart messages, feature that is missing in the original zmq::socket_t.
- The implementation language is C++, so in the code you will see some STL and Boost stuff, and even some C++11 goodies, as lambda function.
- The brokers run all in the same process. It wouldn't make much sense in a real life project, but it makes testing faster. In any case, it should be quite easy to refactor the code to let each broker running in its own process. A minor glitch caused by this approach is that I had to take care of concurrency when printing to standard output. That is the reason why all the uses of std::cout in the code are protected by mutex/lock.

As said above, in this first step each broker doesn't do much, just using the PUB/SUB sockets to share its state with its peers.

These are the addresses used for the broker state sockets, both for client and server mode:
const char* SKA_BROKER_STATE_CLI[] =
    {"tcp://localhost:5180", "tcp://localhost:5181", "tcp://localhost:5182"};
const char* SKA_BROKER_STATE_SRV[] =
    {"tcp://*:5180", "tcp://*:5181", "tcp://*:5182"};
Each broker prototype runs on a different thread the below described function, passing as broker an element of the second array above, and as peers the other two elements from the first array. So, for instance, the first broker is going to have "tcp://*:5180" as server address, and "tcp://localhost:5181", "tcp://localhost:5182" as peer:
void stateFlow(const char* broker, const std::vector<std::string>& peers)
{
    zmq::context_t context(1); // 1

    zmq::Socket stateBackend(context, ZMQ_PUB); // 2
    stateBackend.bind(broker);

    zmq::Socket stateFrontend(context, ZMQ_SUB); // 3
    stateFrontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    std::for_each(peers.begin(), peers.end(), [broker, &stateFrontend](const std::string& peer)
    {
        stateFrontend.connect(peer.c_str());
    });

    std::string tick("."); // 4
    zmq_pollitem_t items [] = { { stateFrontend, 0, ZMQ_POLLIN, 0 } }; // 5
    for(int i = 0; i < 10; ++i) // 6
    {
        if(zmq_poll(items, 1, 250 * 1000) < 0) // 7
            break;

        if(items[0].revents & ZMQ_POLLIN) // 8
        {
            zmq::Frames frames = stateFrontend.blockingRecv(2);
            dumpId(frames[0].c_str(), frames[1].c_str()); // 9
            items[0].revents = 0; // cleanup
        }
        else // 10
        {
            dumpId("sending on", broker);
            zmq::Frames frames;
            frames.reserve(2);
            frames.push_back(broker);
            frames.push_back(tick);
            stateBackend.send(frames);

            tick += '.';
            boost::this_thread::sleep(boost::posix_time::millisec(333)); // 11
        }
    }
}
1. Each broker has its own ZeroMQ context.
2. The stateBackend socket is a PUB that makes available the state of this broker to all the subscribing ones.
3. The stateFrontend socket is a SUB that gets all the messages from its subscribed publisher (as you see in the next line, no filter is set)
4. Currently we are not really interested on the information the broker is sending, so for the time being a simple increasing sequence of dots will do.
5. We poll on the frontend, waiting for messages coming from the peers.
6. I don't want to loop forever, just a few iterations are enough to check the system.
7. Poll for a quarter of second on the SUB socket, in case of error the loop is interrupted. This is the only, and very rough, error handling in this piece of code, but hey, it is just a prototype.
8. There is actually something to be read. I am expecting a two-part message, so I use a checked zmq::Socket::blockingRecv() that throws an exception (not caught here, meaning risk of brutal termination) if something unexpected happens.
9. Give some feedback to the user, dumpId() is just a wrapper for safely printing to standard output, adding as a plus the thread ID.
10. Only if no message has been received from peers, this broker sends its status.
11. As for (10), this line makes sense only because there is no real logic in here, the idea is that I don't want always the same broker to send messages an the other just receiving, so I put to sleep who sends, ensuring in this way it won't send again the next time.

The full C++ source code for this example is on github. The Socket class is there too.

Go to the full post