Pages

Client side reliable REQ-REP

We can't assume any software being 100% reliable - actually, we can't assume that for anything in the world - but the ZeroMQ connection are designed to be fast more than reliable, so we usually want to add some fallback mechanism to avoid the more common issues.

In the ZGuide are shown a few patterns we could implement to go in the direction of an improved reliability. The lazy pirate pattern, as they call it, is the first of them. It is very simple, but it could be enough. Here I rewrite that example for C++ and using zmq::Socket, my subclass for the zmq::socket_t, as defined in the original 0MQ version 2.x C++ binding.

The point is that the REQ client sends (along with the payload, not shown in the example) a sequence number. The server replies sending it back to the client (usually along with some feedback, again, not in this example), so that the client could check if anything worked fine. If the client gets no confirmation, we assume the message we sent was lost, so we resend it. That is a bit more complicated than saying it, since the REQ-REP protocol is strictly synchronous. Sending more than one REQ before getting a REP results in an EFSM error. An easy, brutal but effective way to overcome this issue, is closing and reopening the socket before resending the message.

To keep testing easy, I put both client and server in the same process, this is the procedure in the main thread that spawns the two threads:
boost::thread_group threads;
threads.create_thread(std::bind(client, timeout, retries)); // 1
threads.create_thread(server); // 2
threads.join_all();
1. This starts the client, passing to it a couple of ints, timeout, in millisecs, and the number of retries that the client should do before assuming that the server is down. The balance between these number has to be chosen carefully, being very sensitive for giving a good behaving system. In this example I used 2500 and 3, as suggested by the ZGuide, then I played around with them to see what happened. I'd suggest you to do the same.
2. It starts the server.

A few constants used by the code:
const char* SK_ADDR_CLI = "tcp://localhost:5555"; // 1
const char* SK_ADDR_SRV = "tcp://*:5555"; // 2

const int BASE_TIMEOUT = 1000; // 3
1. The client socket connects to this address.
2. The server socket bound address.
3. I set the standard timeout for the application to 1 second (here is in millis, as you would have correctly guessed).

Server

It is very simple. Even simpler than the original one:
void server()
{
    dumpId("Starting server"); // 1

    zmq::context_t context(1);
    zmq::Socket skServer(context, ZMQ_REP);
    skServer.bind(SK_ADDR_SRV);

    for(int i = 1; i < 10; ++i) // 2
    {
        int message = skServer.recvAsInt(); // 3

        if(i % 4 == 0) // 4
        {
            dumpId("CPU overload");
            boost::this_thread::sleep(boost::posix_time::millisec(2 * BASE_TIMEOUT));
        }
        else
            dumpId("Normal request", message);

        boost::this_thread::sleep(boost::posix_time::millisec(BASE_TIMEOUT));
        skServer.send(message);
    }

    dumpId("Terminating, as for a server crash");
}
1. I paid the simplification of running both client and server in the same process, being forced to care about concurrent issues on std::cout. The dumpId() function uses a lock on a mutex to avoid that printing gets garbled by competing threads.
2. This toy server just recv/send a bunch of times before terminate simulating a crash.
3. Socket::recvAsInt() receives on the socket and converts the result in an int, that is returned to the caller.
4. After a while, a CPU overload is simulated, sleeping for a while.

The Lazy Pirate Client

To keep the client code a bit more readable, I have decided to use a utility class tailored on the example need. Actually I should say I am not completely happy about it, but I should have modified the original zmq::socket_t to get a more satisfactory result, but I am not sure that it would be a smart move in this moment. Maybe in the future.

Anyway, here is my current result:
class LazyPirateClient
{
private:
    zmq::context_t& context_;
    std::unique_ptr<zmq::Socket> sk_; // 1

    int sent_; // 2

    void reset() // 3
    {
        sk_.reset(new zmq::Socket(context_, ZMQ_REQ));
        sk_->connect(SK_ADDR_CLI);

        int linger = 0;
        sk_->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
    }
public:
    LazyPirateClient(zmq::context_t& context) : context_(context), sent_(-1) // 4
    {
        reset();
    }

    bool checkedRecv() // 5
    {
        int value = sk_->recvAsInt();
        if(value == sent_)
            return true;

        dumpId("Unexpected reply from server", value);
        return false;
    }

    zmq::Socket* operator()() { return sk_.get(); } // 6

    void send(int value) // 7
    {
        sk_->send(value);
        sent_ = value;
    }

    void resend() // 8
    {
        reset();
        send(sent_);
    }
};
1. Given the limitation of the original zmq::socket_t class, where the underlying raw socket pointer is stored in its private section, I had to think some smart alternative solution to close and reopen the lazy pirate underlying socket.
2. Internal flag, keeping memory of the last sequence number sent to the server, so that we can compare the received result.
3. Create a new REQ socket, and establish a connection to the REP server socket. The linger flag is set to zero, meaning that we want pending messages to be immediately discarded.
4. Ctor, rely on the above defined reset().
5. We don't care much of what we get back from the server, besides checking if the sequence number matches. This method does just that. It uses the Socket::recvAsInt() utility function that extract an int from the received message.
6. Utility function to get the underlying pointer to zmq::Socket
7. It sends the value (as an int, thanks to an explicit overload of send() defined in zmq::Socket), and store it so that it could be checked by (5).
8. As explained before, we can't simply resend a message, we would get an EFSM error. So we reset the socket, deleting the old one and creating a new one, before sending again the last value.

Client

This is the function on which runs the client thread:
void client(int timeout, int retries)
{
    dumpId("Starting client");

    zmq::context_t context(1);
    LazyPirateClient skClient(context);

    for(int sequence = 1; sequence < 100; ++sequence) // 1
    {
        skClient.send(sequence); // 2
        boost::this_thread::sleep(boost::posix_time::millisec(BASE_TIMEOUT));

        bool confirmed = false;
        zmq::pollitem_t items[] = { { *skClient(), 0, ZMQ_POLLIN, 0 } }; // 3
        for(int cycle = 0; cycle < retries; ++cycle)
        {
            zmq::poll(&items[0], 1, timeout * 1000); // 4

            if(items[0].revents & ZMQ_POLLIN) // 5
            {
                if(skClient.checkedRecv()) // 6
                {
                    dumpId("Server is synchronized", sequence);
                    confirmed = true;
                    break;
                }
            }
            else // 7
            {
                dumpId("Retrying", cycle);
                skClient.resend();
                items[0].socket = *skClient(); // 8
            }
        }
        if(!confirmed) // 9
        {
            dumpId("No answer from server, abandoning.");
            break;
        }
    }
}
1. Loop for some time. Actually, when the server is down, the client would automatically shutdown, and this is what is going to happen almost immediately.
2. Send the sequence number to the server. Usually we want to send more interesting stuff, too.
3. Prepare to poll on the client socket for input messages.
4. Wait for the period specified by the user.
5. We actually have something in input.
6. The sequence check succeeded, give the user a feedback and go on.
7. Nothing received in the interval, whatever happened, we simply resend the last message.
8. Remember that we want to poll on the new socket.
9. No confirmation from the server received, the client assumes that something very bad happened out there, and shutdown.

Full C++ source code is on github.

Go to the full post

Sending/receiving ints over ZeroMQ

I often send an integer as a socket message. It is not a problem to convert them to and fro a std::string, still it makes the resulting code a bit confusing, so I decided to add a send() overload and a new recvAsInt() function to my zmq::Socket class.

This is the new send() function:
bool send(int value, int flags =0)
{
    zmq::message_t msg(sizeof(int));
    memcpy(msg.data(), &value, sizeof(int));
    return socket_t::send(msg, flags);
}
A new specialized function to receive a message and format it as an int:
int recvAsInt(int flags =0)
{
    zmq::message_t message;
    if(!socket_t::recv(&message, flags))
        throw error_t();

    return *(static_cast<int*>(message.data()));
}
Even the mild EAGAIN error leads to an exception. Admittedly, this is not good.

Here is how to use this functions:
// ...
skRequest.send(42);

// ...
int value = skReply.recvAsInt();
We can also have a multipart message containing ints:
skRequest.send(42, ZMQ_SNDMORE);
skRequest.send(12);

//    ...

int val1 = skReply.recvAsInt(ZMQ_RCVMORE);
int val2 = skReply.recvAsInt();
There is no type check on the data, so we can write weird code like this:
// ...
skRequest.send("hello");

// ...
int value = skReply.recvAsInt();
That runs fine, but probably gives a surprising result to the user.

The zmq::Socket new version is on github.

Go to the full post

Router to router among peers

We have seen a couple of ways to let the ZeroMQ router to router pattern work in a client/server setting, with clients id predefined, or a predetermined server id.

But we can't use these approaches in a peer to peer context. All elements are both client and server at the same time, so we need to have all the identities defined in advance.

This example is going to be very simple, we want to create a peer to peer network where each element sends an hallo message to all its peers, then receives all the messages posted to it, and finally terminates.

Firstly, let's define a few constants:
const char* SKA_NODE_SRV[] = {"tcp://*:5380", "tcp://*:5381", "tcp://*:5382" }; // 1
const char* SKA_NODE_CLI[] = {"tcp://localhost:5380", "tcp://localhost:5381", "tcp://localhost:5382" };

const char* NODE_IDS[] = { "N0", "N1", "N2" }; // 2
enum NodeId { N0 = 0, N1, N2, N_NODES = N2 + 1 };
1. Each node has a couple of ROUTERS socket, the server has one of the addresses specified in the SKA_NODE_SRV array, the client connects to the peers' servers by the addresses specified in SKA_NODE_CLI.
2. We can use the same id for both routers in the same node, since there is not risk of overlapping.

In this example, all nodes run in the same process, this is not realistic, but it makes testing easier. It shouldn't be an issue for you to redesign this code to run as a 0MQ multiprocess application. In any case, the main thread creates a few threads, one for each node:
boost::thread_group threads;
for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    threads.create_thread(std::bind(node, id));
threads.join_all();

This is the code run by each peer:
void node(NodeId nid)
{
    zmq::context_t context(1);

    dumpId(NODE_IDS[nid], SKA_NODE_SRV[nid]);
    zmq::Socket skServer(context, ZMQ_ROUTER, NODE_IDS[nid]); // 1
    skServer.bind(SKA_NODE_SRV[nid]);

    dumpId("client router", NODE_IDS[nid]);
    zmq::Socket skClient(context, ZMQ_ROUTER, NODE_IDS[nid]); // 2
    for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    {
        if(id == nid)
            continue;

        skClient.connect(SKA_NODE_CLI[id]); // 3
        dumpId("client connected to", SKA_NODE_CLI[id]);
    }

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 4

    for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    {
        if(id == nid)
            continue;

        dumpId("sending a message to", NODE_IDS[id]);
        skClient.send(NODE_IDS[id], "hey"); // 5
    }

    zmq_pollitem_t servers[] = { { skServer, 0, ZMQ_POLLIN, 0 } };
    while(zmq_poll(servers, 1, 1000 * 1000) > 0) // 6
    {
        zmq::Frames frames;
        if(servers[0].revents & ZMQ_POLLIN)
        {
            dumpId("Receiving from peers");

            frames = skServer.blockingRecv(2); // 7
            dumpId(frames[0].c_str(), frames[1].c_str());

            servers[0].revents = 0; // 8
        }
    }
    dumpId(NODE_IDS[nid], "done");
}
1. A ZeroMQ server ROUTER socket with the specified id.
2. A ZeroMQ client ROUTER socket with the same id.
3. Set a connection to all the other nodes.
4. Give time to all the nodes to be up before start sending messages around.
5. Send a message to each peer through the client router socket.
6. Poll on the server socket. If no message gets available in its input queue for a second, it is assumed that there is nothing more to do.
7. Receive a two-frames message. First frame is the sender id, second frame is the payload.
8. Reset the flag for the next iteration.

Full C++ code for this example is on github.

Go to the full post

Another router to router example

This is just a variation on the sample shown in the previous post. I am writing here a simple ZeroMQ router to router application where it is the server router having a predetermined identity.

The main thread spawns the client and server threads:
const char* server = "Server"; // 1
const int nClients = 3;

boost::thread_group threads;
for(int i =0; i < nClients; ++i)
    threads.create_thread(std::bind(rrClientS, server)); // 2

threads.create_thread(std::bind(rrServerS, server, nClients)); // 3
threads.join_all();
1. This is going to be the server socket identity.
2. Each client has to know which is the server id.
3. Pass the socket id and the number of clients to the server function.

Each client runs on this function:
void rrClientS(const char* server)
{
    dumpId("client startup");
    zmq::context_t context(1);

    std::string tid = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 1
    zmq::Socket skRouter(context, ZMQ_ROUTER, tid);
    skRouter.connect(SK_ADDR_CLI); // 2
    dumpId(SK_ADDR_CLI, server);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3
    skRouter.send(server, ""); // 4

    zmq::Frames frames = skRouter.blockingRecv(2); // 5
    dumpId("client shutdown");
}
1. There is no requirement for the client socket id, I use the thread id as a way to make the output more interesting.
2. Each client socket connects to the router server socket.
3. As in the previous example, but on the other way round. Now are the clients that have to wait till the server socket is ready before starting send messages. Waiting for a while is a simple but unreliable way to accomplish this requirement. For production code we should think about something more sensible.
4. Sending an "hello" message to the server. Notice that it is a two-framed one, first frame is the recipient id, second one is the payload. Actually here we need to let the server know the client id, so we can even send an empty payload.
5. Getting an answer from the server. We don't care much about it here, but it shows that the server can really send messages to the clients, once it gets their id.

And this is the server:
void rrServerS(const char* sid, int nClients)
{
    dumpId("server startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER, sid); // 1
    skRouter.bind(SK_ADDR_SRV);

    dumpId("server ready on", SK_ADDR_SRV);

    std::vector<std::string> clients; // 2
    clients.reserve(nClients);
    for(int i =0; i < nClients; ++i)
    {
        zmq::Frames in = skRouter.blockingRecv(2);
        dumpId("receiving from", in[0].c_str());
        clients.push_back(in[0]); // 3
    }

    std::for_each(clients.begin(), clients.end(), [&skRouter](std::string& client)
    {
        skRouter.send(client, ""); // 4
    });
    dumpId("server shutdown");
}
1. Create a ROUTER socket and set its id as specified.
2. To send messages to the clients, we need their id, we store them in this container.
3. We were interested just in the first frame, the client id, so that we can use it below to send it a message.
4. We send a message with an empty payload to each client, just to prove that we can.

Full C++ source code for this example and the one in the previous post is on github.

Go to the full post

Router to router

Using the ZeroMQ router to router socket combination is kind of tricky. I guess that ZMQ_ROUTER socket has been designed thinking to the request-router messaging pattern, where the router is playing as server, and where each client sends an initial message to the server through its REQ socket, identifying itself, so that the router can use that identity when sending its reply.

Implementing the REQ-ROUTER pattern is straightforward. REQ sends its identity to ROUTER, ROUTER uses that identity as part of its reply to REQ.

I had to think a bit more to find out how to write a router to router example.

I have router server and a configurable number of router clients. I want each client sending a single empty message to the server, and then ending. That looks pretty easy, but we should consider that the client needs to know the server identity to send it a message. It is not enough knowing its tcp address.

We risk to fall in an infinite recursion. To send messages we need the recipient identity, but to get the that identity, the should send it to us, but it needs our identity to do that!

A way to get out of this impasse is remembering that a ZeroMQ identity could be explicitly set. So we can avoid a bootstrap paradox predefining the identities to be used by the clients, or the one used by the server.

Now, there are two choices. Here I show a first solution, where the client identities are predefined. In the next post I'll show the case where the server identity is passed around.

The code is written for ZeroMQ 2.2, Windows+MSVC2010, C++11 with STL and Boost, but could easily ported to a different platform.

It is single process 0MQ application. A real application would almost certainly multiprocess, but since I use the tcp protocol for the sockets, it should be easy to refactor this example to work in that way.

By the way, these are the addresses I use for the sockets:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
The main procedure knows about the client identities, and starts the clients and the server:
const char* clients[] = { "ClientA", "ClientB", "ClientC" }; // 1
int nClients = sizeof(clients) / sizeof(const char*);

boost::thread_group threads;
for(int i =0; i < nClients; ++i)
    threads.create_thread(std::bind(rrClientC, clients[i])); // 2

threads.create_thread(std::bind(rrServerC, clients, nClients)); // 3
threads.join_all();
1. We have three clients, and we want them having these identities.
2. Start the clients, each in a new thread, on the rrClientC() function, passing the i-th client identity, that has to be assigned to its socket.
3. Start the server on rrServerC(), passing the client addresses, and the number of clients.

Here is the client code:
void rrClientC(const char* id)
{
    dumpId("client startup"); // 1
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER, id); // 2
    skRouter.connect(SK_ADDR_CLI);
    dumpId(SK_ADDR_CLI, id);

    zmq::Frames input = skRouter.blockingRecv(2); // 3
    dumpId("client received", input[1].c_str());

    skRouter.send(input[0], id); // 4
    dumpId("client shutdown");
}
1. Being in a multithread context, we can't use a shared resource, like std::cout is, without protecting its access by mutex/lock, this is what dumpId() does.
2. Create a ROUTER socket assigning to it the specified identity, then connect it to the server socket.
3. Wait for a multipart message in two frames. We are interested just in the first frame, that contains the server id.
4. Send a two-frames message to the server, the first one is the mandatory recipient id, the second is the payload, here the client id, for testing purpose.

And this is the server:
void rrServerC(const char* ids[], int nClients) // 1
{
    dumpId("server startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 2
    skRouter.bind(SK_ADDR_SRV);

    dumpId("server ready on", SK_ADDR_SRV);
    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3

    for(int i =0; i < nClients; ++i)
    {
        dumpId("server send to", ids[i]);
        skRouter.send(ids[i], "hello"); // 4
    }

    for(int i =0; i < nClients; ++i)
    {
        zmq::Frames in = skRouter.blockingRecv(2); // 5
        dumpId("receiving from", in[1].c_str());
    }
    dumpId("server shutdown");
}
1. The server needs to know id and numbers of the clients.
2. The server socket id is not important, we can keep the one generated by 0MQ.
3. This is a key point. Before start sending messages through the socket, we should ensure all the clients are already connected. Here I just wait for a while, giving time to the clients to connect. For production code we should think to a most robust solution.
4. Send an "hello" message to each client. First frame is the id of the recipient client.
5. Receive a message from each client. Second frame is the payload.

The complete C++ source code for this and for the next post example is on github.

Go to the full post

Improved sending for zmq::Socket

Minor change in zmq::Socket, my C++ zmq::socket_t subclass for ZeroMQ version 2. Now it is possible to send a multipart message, up to three frames, calling a send() overload that expects in input the frames as raw C-strings or STL string.

Before the change, if we want to send more frames in a single call, we had to go through a vector, like this:
zmq::Frames frames;
frames.reserve(3);
frames.push_back(id);
frames.push_back("");
frames.push_back(payload);
socket_.send(frames);
It was boring to write code like this, and the result was not very readable.

Now we can get the same result in a single line:
socket_.send(id, "", payload);
It is not a big change in the code. I changed slightly the original send() function, and moved it in the private class section:
bool send(const char* frame, size_t len, int flags =0)
{
    zmq::message_t msg(len);
    memcpy(msg.data(), frame, len);
    return socket_t::send(msg, flags);
}
Some more information on zmq::Socket is available in a previous post.

The main change at this level is in the fact that the message length is now passed as an input parameter.

In the public interface, the one-frame-at-the-time functions become:
bool send(const std::string& frame, int flags =0)
{
    return send(frame.c_str(), frame.length(), flags);
}

bool send(const char* frame, int flags =0)
{
    return send(frame, strlen(frame), flags);
}
If we pass a STL string, its size is already known, cached in the object, so we use it. Otherwise it is calculated by a call to strlen(). When we use these overload we should explicitly say if we consider the current frame as part (and not the last one) of a multipart message.

The two- and three-part messages could be now be sent with a single call. Let's see the three-part by c-string parameter overload:
bool send(const char* frame1, const char* frame2, const char* frame3)
{
    if(!send(frame1, ZMQ_SNDMORE))
        return false;
    if(!send(frame2, ZMQ_SNDMORE))
        return false;
    // last frame
    return send(frame3);
}
The first two frames are sent with the SNDMORE flag, going through above descripted single-frame overload, the last one is terminating the sequence.

The improved include file is on github.

Go to the full post

A second ROUTER-DEALER example

In the previous post we have seen a ZeroMQ ROUTER-DEALER application where the ROUTERs act as servers. We can easily modify it to have the DEALER acting as servers instead.

Now the routers are clients, so they connect to all the DEALER sockets available and then pend on the socket to get its share of messages. They are a stored in a temporary vector, and then they are are send back to the dealer. Before sending messages, the dealers should have been connected by the routers. A simple way to achieve this, is let the dealer threads to sleep for a while before start to send.

The routers and dealers are started in this way:
boost::thread_group threads;
for(int i = 0; i < nRouters; ++i) // 1
    threads.create_thread(std::bind(router2dealer, nDealers));
for(int i = 0; i < nDealers; ++i) // 2
    threads.create_thread(std::bind(dealer4router, nRouters, i));
threads.join_all();
1. There should be at least on router, the upper limit is determined by you system. Each client thread runs an instance of router2dealer().
2. I use the same definition for socket address as in the previous example, so we can have just one or two dealers. Each server thread runs on dealer4router().

This is the function for each client thread:
void router2dealer(int n) // 1
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skRouter(context, ZMQ_ROUTER, id);
    for(int i =0; i < n; ++i)
    {
        skRouter.connect(SKA_CLI[i]); // 3
        dumpId("ROUTER CLI on", SKA_CLI[i]);
    }

    std::vector<zmq::Frames> messages;
    messages.reserve(n); // 4

    for(int i =0; i < n; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2);
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 5
    {
        skRouter.send(frames);
    });
}
1. Each client connects to all the n servers, it receives n messages, one from each server.
2. It is handy to specify the socket id for testing purpose, the thread id looks to me a good choice.
3. Each client binds to all the servers.
4. We already know how may messages we are going to receive, one for each server, we are going to store them in the messages STL vector.
5. A router could act as an asynchronous client, and we see it here. Firstly we have received all the expected messages, now we are sending them back, each to its own original sender.

Each server thread runs on this function:
void dealer4router(int n, int index) // 1
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id);
    skDealer.bind(SKA_SRV[index]);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2
    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index);
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i)
    {
        std::string in = skDealer.recvAsString(); // 3
        dumpId(in.c_str());
    }
}
1. Each dealer knows the number of clients, so to send a message to each of them, and its own id, to bind to the right socket address.
2. For such a simple example, waiting a while before start sending messages, it is enough as a way to ensure that all the expected client are up.
3. The dealer servers too act asynchronously. Firstly sending all the messages to the clients, and now receiving the feedback. Notice that the dealer is sending a mono-frame message, 0MQ prepend the dealer id to the message, transforming it in a two-frame message.

Go to the full post

A first ROUTER-DEALER example

Both ROUTER and DEALER ZeroMQ sockets are designed to work asynchronously. Accordingly to the current design requirements, we can use one or the other as server. Here we see the case where the ROUTER sockets are all servers, in the next post we'll see how to write a simple 0MQ application where the DEALER sockets play in the server role.

Same background of the previous posts, the reference platform is Windows, Visual C++2010, C++, ØMQ 2.2 through the standard ZeroMQ C++ wrapper, plus my zmq::Socket class that you can see on github. There is nothing complicated in the code, so I guess it could be easily refactored to work on any environment supported by 0MQ with a limited effort.

The application is designed to to such a pointless job: a few DEALER clients create one message for each ROUTER available, and then send them all. Then each of them gets a bunch of reply on the socket, and print them to let the user see the result. (Not) surprisingly, each client gets back all and only the messages that it sent to the servers.

You can play around changing the number of dealer and routers. In my example I can have just one or two routers, but you can have how many dealer as you want, and your system allows.

The application in developed to run on a single process, and many threads, to keep testing easy, and this is the code that spawns all the client and server threads:
boost::thread_group threads;
for(int i = 0; i < nDealers; ++i) // 1
    threads.create_thread(std::bind(dealer2router, nRouters, i));
for(int i = 0; i < nRouters; ++i) // 2
    threads.create_thread(std::bind(router4dealer, nDealers, i));
threads.join_all();
1. The number of dealers could range from 1 to ... a big number. The actual limit depends on your working environment. Each thread runs the dealer2router(), that asks for the number of routers, and an id for creating a different message for each client.
2. The number of routers is more definite, since we should specify the actual socket address for each of them. Being lazy, here we can have just one or two routers. But it is easy to increase that number. Each server runs on router4dealer(), function that needs the number of expected messages (one for each dealer) and the router id (so that we can peek up its correct socket address).

As I said, only two server maximum are available:
const char* SKA_CLI[] = {"tcp://localhost:5380", "tcp://localhost:5381" };
const char* SKA_SRV[] = {"tcp://*:5380", "tcp://*:5381" };
Extends these arrays if you want more routers. In the following code, you are going to see a call to a utility function named dumpId(), for which I don't show the source code. It just print to std::cout, locking to protect this shared resource by concurrent accesses.

Each client thread runs this function:
void dealer2router(int n, int index)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id); // 1
    for(int i =0; i < n; ++i)
    {
        skDealer.connect(SKA_CLI[i]); // 2
        dumpId("DEALER CLI on", SKA_CLI[i]);
    }

    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index); // 3
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i) // 4
    {
        std::string in = skDealer.recvAsString();
        dumpId(in.c_str());
    }
}
1. For testing purpose, I set each dealer id to its thread id, so that the output looks easier to be checked.
2. The parameter "n" passed to the function represents the number of routers to which each dealer could connect. Here we are actually connecting to each of them.
3. The parameter "n" represents also the number of messages that each dealer sends, being that one for each available router. The message sent is build in a bit cryptic way, its length is increased at each iteration, starting with one, and it contains a repetition of the same character, based on the loop iteration and the "index" parameter passed to the function to identify the current dealer. The sense of this messy setup is making the output recognizable during the testing.
4. Here we see the dealer's asynchronicity. Firstly we send all the messages, than we receive them all.

The dealer is sending out a multipart message composed by two frames: its address, that I explicitly set to the thread id in (1), and the payload. The address will be used by the router to identify which dealer has to get a reply, and will be consumed by 0MQ. That's way in (4) we should receive just the payload.

Each server runs this code on its own thread:
void router4dealer(int nMsg, int id)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER);
    skRouter.bind(SKA_SRV[id]); // 1
    dumpId("ROUTER SRV on", SKA_SRV[id]);

    std::vector<zmq::Frames> messages;
    messages.reserve(nMsg); // 2

    for(int i =0; i < nMsg; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2); // 3
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 4
    {
        skRouter.send(frames);
    });

    dumpId("ROUTER done");
}
1. The id passed to the function is used to select the right socket address for the current router. You should carefully check it to avoid out of range values, that should drive this puny application to a miserable crash.
2. We already know how many messages this router is going to receive, so we reserve enough room for them in the vector where we are going to temporary store them.
3. Only one message is expected from each dealer in the application, and each of them is composed by two frames, the dealer address, and the message payload.
4. The routers are asynchronous too, and we see that at work here. Firstly we have received all the messages from the dealers, now we are sending them back. I am using an STL for_each algorithm that iterates on each element of the vector where the messages have been stored. For each message, a C++11 lambda function is called that makes use of the socket, accessed by reference, and accepts in input the current element of the vector, again by reference. In the lambda body we just send the message back through the socket.

Go to the full post

Basic DEALER-REP example

I stripped out every possible detail and wrote a minimal ZeroMQ that uses a DEALER-REPLY socket combination. It should be useful to play around with it to see how this pattern works on its own, before combining it in a more complex structure.

The code is based on ØMQ 2.2 for C++, using my zmq::socket_t extension that you can find in an include file on github). The target platform is Windows + Visual C++ 2010. Some STL, Boost, and C++11 features have been used.

The dealer thread creates a few multipart messages, sends them through the socket, and wait for their echo. Each reply thread would get its part of that messages, and send them back to the socket.

There are a few interesting point to notice.

The dealer sends multipart messages, composed by three frames, representing the sender address, a separator, and the actual payload. The reply socket gets just the payload, it has no access to the address (an nor to the separator).

The dealer performs a fair load balancing among the REP sockets connected to it, so we expect each client to get the same share of messages. To let the example run as expected, we should ensure that all the clients are connected to the server before the messages begin to be sent on the socket, otherwise only the already connected ones would get their share.

The clients and server are created by this code:
boost::thread_group threads;
for(int i = 0; i < nRep; ++i) // 1
    threads.create_thread(std::bind(rep2dealer, nMsg)); // 2
threads.create_thread(std::bind(dealer, nRep, nMsg)); // 3
threads.join_all();
1. nRep is the number of REP clients that we want the application to have. You should ensure that it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REP client runs its own copy of the rep2dealer() function, see below. It expects in input the number of messages that each client should receive. Same consideration as seen in (1) applies.
3. The server runs on the dealer() function. It needs to know how many clients to expect, and how many messages to send to each of them.

The socket addresses are:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
I am not showing here the code for the utility function dumpId(), it just print to std::cout, locking to protect this shared resource by concurrent accesses.

Here is the function executed by each client:
void rep2dealer(int nMsg)
{
    dumpId("REP startup");
    zmq::context_t context(1);
    
    zmq::Socket skRep(context, ZMQ_REP); // 1
    skRep.connect(SK_ADDR_CLI);
    dumpId("REP CLI on", SK_ADDR_CLI);

    for(int i =0; i < nMsg; ++i) // 2
    {
        std::string msg = skRep.recvAsString();
        dumpId("REP for", msg.c_str());
        skRep.send(msg);
    }
}
1. Reply socket, connected to the DEALER one that will see in the server.
2. On a reply socket we synchronously receive and send messages. Here we loop to do that for a (single-part) message for the number of expected messages for each client. If the balance done by the dealer wouldn't be fair, or for any reason a client would receive less messages than expected, it would hang trying to get more stuff.

And this is the server:
void dealer(int nRep, int nMsg)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    zmq::Socket skDealer(context, ZMQ_DEALER); // 1
    skDealer.bind(SK_ADDR_SRV);
    dumpId("DEALER SRV on", SK_ADDR_SRV);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2

    zmq::Frames frames; // 3
    frames.reserve(3);
    frames.push_back(boost::lexical_cast<std::string>(boost::this_thread::get_id()));
    frames.push_back("");
    frames.push_back("");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        frames[2] += 'k'; // 4
        skDealer.send(frames);
    }
    dumpId("all sent");

    for(int i =0; i < nRep * nMsg; ++i)
    {
        zmq::Frames input = skDealer.blockingRecv(3); // 5
        dumpId(input[2].c_str()); // 6
    }

    dumpId("DEALER done");
}
1. This is the DEALER socket accepting the connection from the REP client sockets.
2. Before sending messages, we wait till all the clients are connected, so that each of them would get its own share of messages.
3. Let's prepare the base for the message to be sent. It is a three part message, first frame is the address of the sender, here it is not interesting, but still I set it to the thread id. Second element is empty, acting as a separator. Third element is the payload. It is initialized empty and modified in the sending loop, to give some sensible testing feedback.
4. The first generated message has a single 'k' as a payload, any other time another 'k' is added.
5. Getting the message back from the clients.
6. Dump to standard output the payload.

Go to the full post

A very simple REQ-ROUTER example

This is the simplest ZeroMQ REQ-ROUTER example I could conceive. I have written it for ØMQ 2.2 on Windows for MSVC, using the C++ built-in wrapper, with an extension to manage easier multipart messages (you can find it as an include file on github). I have made use of STL, Boost, and some C++11 support, but nothing impressive.

The application runs on just one process, on which a few client threads are created, each of them using its own REQ socket, all of them connected to the same ROUTER socket running on another thread. The protocol used is tcp, so we can easily port the example to a multiprocess setting.

Each client sends a single message to the server. The server gets all the messages, it knows in advance how many clients are going to connect to it, and then it sends back them all through the sockets.

The code will make all clearer. Here is an excerpt of the main function:
boost::thread_group threads;
for(int i = 0; i < nReq; ++i) // 1
    threads.create_thread(req4router); // 2
threads.create_thread(std::bind(router, nReq)); // 3
threads.join_all();
1. nReq is the number of REQ clients that we want the application to have. You should get it from the user and ensure it is at least one, and "not too big", accordingly to what "big" is in your environment.
2. Each REQ client runs its own copy of the req4router() function, see below.
3. The router runs on the router() function. It needs to know how many clients to expect.

As said, I used the tcp protocol, and this is how I defined the socket addresses:
const char* SK_ADDR_CLI = "tcp://localhost:5380";
const char* SK_ADDR_SRV = "tcp://*:5380";
Both client and server make use of an utility function named dumpId() that I won't show in this post, please have a look on the previous posts to have an idea how it should work, but I guess you can figure it out by yourself. It just print to std::cout the strings are passed to it. It takes care of locking on a mutex, since the console is a shared resource we need to protect it from concurrent accesses. Besides it also print the thread id, for debugging purpose.

Here is the function executed by each client:
void req4router()
{
    dumpId("REQ startup");
    zmq::context_t context(1);
    
    zmq::Socket skReq(context, ZMQ_REQ); // 1
    skReq.connect(SK_ADDR_CLI); // 2
    dumpId("REQ CLI on", SK_ADDR_CLI);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    skReq.send(id); // 4
    std::string msg = skReq.recvAsString();
    dumpId(msg.c_str());
}
1. zmq::Socket is my zmq::socket_t subclass.
2. Let's connect this REQ socket as a client to the ROUTER socket, that plays as a server.
3. To check if everything works as expected, we should send a unique message. The thread id looks a good candidate for this job. To convert a Boost thread id to a string we need to explicitly cast it through lexical_cast.
4. Send and receiving on a socket go through specialized methods in zmq::Socket that take care of the dirty job of converting from standard string to ZeroMQ buffers.

And this is the server:
void router(int nReq)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER); // 1
    skRouter.bind(SK_ADDR_SRV);
    dumpId("ROUTER SRV on", SK_ADDR_SRV);

    std::vector<zmq::Frames> msgs; // 2
    for(int i = 0; i < nReq; ++i)
    {
        msgs.push_back(skRouter.blockingRecv(3)); // 3
    }
    dumpId("all received");

    std::for_each(msgs.begin(), msgs.end(), [&skRouter](zmq::Frames& frames)
    {
        skRouter.send(frames); // 4
    });

    dumpId("ROUTER done");
}
1. This is the ROUTER socket accepting the connection from the REQ client sockets.
2. Frames is actually just a typedef for a vector of strings. In msgs we store all the messages coming from the clients.
3. zmq::Socket::blockingRecv() gets the specified number of frames (three, in this case) from the socket, and returns them as a zmq::Frames object. That object is pushed in the buffer vector. The first frame contains the address of the sender, the second one is just a separator, and the third is the actual payload.
4. Just echo back to each client the message it has sent. The lambda function passed to for_each() needs to know what skRouter is, that is way it is specified in the capture clause, as parameter.

Go to the full post

Monitoring broker state with PUB-SUB sockets

This is the first step in describing a fairly complex ZeroMQ sample application based on the example named Interbroker Routing in the ZGuide. We want to create a few interconnected brokers, so that we could exchange jobs among them. In order to do that, each broker should be able to signal to its peers when it has workers available for external jobs. We are going to do that using PUB-SUB socket connections.

Each broker as a PUB socket that would publish to all the other brokers its status, and a SUB socket that would get such information from the others.

My prototype implementation differs from the original ZGuide code for a few ways.

- It is written for Windows+MSVC, so I couldn't use the inproc protocol, that is not available on that platform. I have used tcp instead.
- I ported it from the czmq binding to the standard 0MQ 2.x light-weight C++ one, improved by a zmq::Socket class that provides support for multipart messages, feature that is missing in the original zmq::socket_t.
- The implementation language is C++, so in the code you will see some STL and Boost stuff, and even some C++11 goodies, as lambda function.
- The brokers run all in the same process. It wouldn't make much sense in a real life project, but it makes testing faster. In any case, it should be quite easy to refactor the code to let each broker running in its own process. A minor glitch caused by this approach is that I had to take care of concurrency when printing to standard output. That is the reason why all the uses of std::cout in the code are protected by mutex/lock.

As said above, in this first step each broker doesn't do much, just using the PUB/SUB sockets to share its state with its peers.

These are the addresses used for the broker state sockets, both for client and server mode:
const char* SKA_BROKER_STATE_CLI[] =
    {"tcp://localhost:5180", "tcp://localhost:5181", "tcp://localhost:5182"};
const char* SKA_BROKER_STATE_SRV[] =
    {"tcp://*:5180", "tcp://*:5181", "tcp://*:5182"};
Each broker prototype runs on a different thread the below described function, passing as broker an element of the second array above, and as peers the other two elements from the first array. So, for instance, the first broker is going to have "tcp://*:5180" as server address, and "tcp://localhost:5181", "tcp://localhost:5182" as peer:
void stateFlow(const char* broker, const std::vector<std::string>& peers)
{
    zmq::context_t context(1); // 1

    zmq::Socket stateBackend(context, ZMQ_PUB); // 2
    stateBackend.bind(broker);

    zmq::Socket stateFrontend(context, ZMQ_SUB); // 3
    stateFrontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    std::for_each(peers.begin(), peers.end(), [broker, &stateFrontend](const std::string& peer)
    {
        stateFrontend.connect(peer.c_str());
    });

    std::string tick("."); // 4
    zmq_pollitem_t items [] = { { stateFrontend, 0, ZMQ_POLLIN, 0 } }; // 5
    for(int i = 0; i < 10; ++i) // 6
    {
        if(zmq_poll(items, 1, 250 * 1000) < 0) // 7
            break;

        if(items[0].revents & ZMQ_POLLIN) // 8
        {
            zmq::Frames frames = stateFrontend.blockingRecv(2);
            dumpId(frames[0].c_str(), frames[1].c_str()); // 9
            items[0].revents = 0; // cleanup
        }
        else // 10
        {
            dumpId("sending on", broker);
            zmq::Frames frames;
            frames.reserve(2);
            frames.push_back(broker);
            frames.push_back(tick);
            stateBackend.send(frames);

            tick += '.';
            boost::this_thread::sleep(boost::posix_time::millisec(333)); // 11
        }
    }
}
1. Each broker has its own ZeroMQ context.
2. The stateBackend socket is a PUB that makes available the state of this broker to all the subscribing ones.
3. The stateFrontend socket is a SUB that gets all the messages from its subscribed publisher (as you see in the next line, no filter is set)
4. Currently we are not really interested on the information the broker is sending, so for the time being a simple increasing sequence of dots will do.
5. We poll on the frontend, waiting for messages coming from the peers.
6. I don't want to loop forever, just a few iterations are enough to check the system.
7. Poll for a quarter of second on the SUB socket, in case of error the loop is interrupted. This is the only, and very rough, error handling in this piece of code, but hey, it is just a prototype.
8. There is actually something to be read. I am expecting a two-part message, so I use a checked zmq::Socket::blockingRecv() that throws an exception (not caught here, meaning risk of brutal termination) if something unexpected happens.
9. Give some feedback to the user, dumpId() is just a wrapper for safely printing to standard output, adding as a plus the thread ID.
10. Only if no message has been received from peers, this broker sends its status.
11. As for (10), this line makes sense only because there is no real logic in here, the idea is that I don't want always the same broker to send messages an the other just receiving, so I put to sleep who sends, ensuring in this way it won't send again the next time.

The full C++ source code for this example is on github. The Socket class is there too.

Go to the full post

Async 0MQ app - worker

If you have already read the sort of informal design for this sample asynchronous application, then the post on the client code, and even the one on the server, you are ready to see how I have implemented the worker code.

In brief: this is a C++ ZeroMQ application, based on version 2.2, developed on Windows for MSVC2010, using the 0MQ C++ binding improved by defining a zmq::Socket class that extends the standard zmq::socket_t to simplify multipart messages management.

We have seen in the previous post that the server creates a (variable) number of worker threads and then expects these workers to be ready to work on the messages coming from the client. Here is that function:
void worker(zmq::context_t& context) // 1
{
    zmq::Socket skWorker(context, ZMQ_DEALER); // 2
    skWorker.connect(SK_BCK_ADDR);

    while(true)
    {
        zmq::Frames frames = skWorker.blockingRecv(2, false);
        if(frames.size() == 1) // 3
            break;

        int replies = rand_.getValue(); // 4
        for(int i =0; i < replies; ++i)
        {
            boost::this_thread::sleep(boost::posix_time::millisec(100 * rand_.getValue()));
            dumpId("worker reply");
            skWorker.send(frames);
        }
    }
}
1. Server and workers share the same context.
2. Each worker has its own DEALER socket that is connected (next line) to the DEALER socket on the server. SK_BCK_ADDR is a C-string specifying the inproc address used by the socket.
3. The "real" messages managed by the workers should be composed by two frames, the address of the client and the actual message payload. A mono-framed message is interpreted by the worker as a request from the server to shutdown.
4. To check the asynchronicity of the application, the worker has this strange behavior: it sends back to the client a random number of copies of the original message, each of them is sent with a random delay.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

Go to the full post

Async 0mq application - server

This sample ZeroMQ async application is described in a previous post, here I am commenting the server part of it, that is connected to the already seen clients by DEALER/ROUTER sockets, and to a few workers that we'll seen next, by DEALER/DEALER sockets.
The AsynchronousCS constructor creates a thread running on this function, part of the private section of the same class:
void server(int nWorkers) // 1
{
    zmq::context_t context(1);
    zmq::Socket skFrontend(context, ZMQ_ROUTER); // 2
    skFrontend.bind(SK_SRV_ADDR);

    zmq::Socket skBackend(context, ZMQ_DEALER); // 3
    skBackend.bind(SK_BCK_ADDR);

    boost::thread_group threads;
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(&AsynchronousCS::worker, this, std::ref(context))); // 4

    zmq_pollitem_t items [] =
    {
        { skFrontend, 0, ZMQ_POLLIN, 0 },
        { skBackend,  0, ZMQ_POLLIN, 0 }
    };

    while(true)
    {
        if(zmq_poll(items, 2, 3000 * 1000) < 1) // 5
            break;

        if(items[0].revents & ZMQ_POLLIN) // 6
        {
            zmq::Frames frames = skFrontend.blockingRecv(2);
            skBackend.send(frames);
            items[0].revents = 0; // cleanup
        }
        if(items[1].revents & ZMQ_POLLIN) // 7
        {
            zmq::Frames frames = skBackend.blockingRecv(2);
            skFrontend.send(frames);
            items[1].revents = 0; // cleanup
        }
    }

    for(int i = 0; i < nWorkers; ++i) // 8
        skBackend.send("");

    threads.join_all(); // 9
}
1. The number of workers used by the server is set by the user.
2. zmq::Socket is an extension of the zmq::socket_t as found in the default 0MQ C++ binding. This server socket is used to keep a connection the client (DEALER) sockets.
3. This second socket is used to keep a DEALER/DEALER connection with each worker.
4. A new thread is created for each worker (its code is discussed in the next post), notice that a reference to the ZeroMQ context is passed to each worker, so that all sockets on the server are sharing the same 0MQ context.
5. As a simple way of terminating the server execution, I set the limit of three seconds on the polling. If nothing is received on either socket in such a time frame, the indefinite loop is interrupted.
6. Input received on the frontend, the multipart message is read and passed to the backend.
7. Same as (6), but the other way round.
8. When the clients are not sending anymore messages, it is time to send a terminator to each worker. As a convention, a single empty message would be interpreted by the worker as a terminator.
9. Give time to the workers to terminate, and then the server is down.

Full source C++ code for this sample application is on github, in the same repository you can find also the zmq::Socket source code.

Go to the full post

Async 0mq application - DEALER/ROUTER client

Follow the link for a view on the asynchronous ZeroMQ application I am describing in these posts. Here I won't care of the big picture but I am focusing on the function that is executed by each thread running as a client.

Each client thread is created passing as argument the address of this function, defined as private in the class AsynchronousCS:
void client()
{
    zmq::context_t context(1); // 1
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skClient(context, ZMQ_DEALER, id); // 3
    skClient.connect(SK_CLI_ADDR);

    zmq_pollitem_t items[] = { { skClient, 0, ZMQ_POLLIN, 0 } };

    std::string message("message ");
    for(int i =0; i < 6; ++i) // 4
    {
        for(int heartbeat = 0; heartbeat < 100; ++heartbeat) // 5
        {
            zmq_poll(items, 1, 10 * 1000); // polls each 10 millis
            if(items[0].revents & ZMQ_POLLIN)
            {
                std::string msg = skClient.recvAsString(); // 6
                dumpId("client receives", msg); // 7

                items[0].revents = 0;
            }
        }
        dumpId("client sends", message);
        skClient.send(message);
        message += 'x';
    }

    while(zmq_poll(items, 1, 1000 * 1000) > 0) // 8
    {
        if(items[0].revents & ZMQ_POLLIN)
        {
            std::string msg = skClient.recvAsString();
            dumpId("coda", msg);
            items[0].revents = 0; // cleanup
        }
    }
}
1. Even if the client threads run in the same process as the server, each of them has its own ZeroMQ context, this makes it easy to refactor the application to more common solution where each client runs in its own process.
2. I am going to use the thead id as socket id.
3. zmq::Socket is my extension of the zmq::socket_t as found in the default 0MQ C++ binding. This constructor calls zmq_setsockopt() to set its socket ZMQ_IDENTITY to the passed id. This DEALER client socket is going to connect to a ROUTER server socket.
4. The client is going to send a few messages to the server.
5. We loop 100 times, polling on the socket with a limit of 10 millis. Expecting only a few messages in input, we could assume that this loop will take not much less than a second to be executed.
6. This zmq::Socket function takes care of receiving, on a zmq::message_t and converting it to a std::string.
7. Accessing the shared resource std::cout to dump a message requires a mutex/lock protection, all of that is taken care in this function.
8. As described in the previous post, as a way to keep the code simple, I have implemented no way for the server to tell to the client that the stream of messages is terminated, on the contrary, is the client itself that wait a "reasonable" amount of time (1 sec) after the last input message, before deciding that no more messages are going to come. This is an high unreliable solution, but for such a simple example should suffice.

Full source C++ code for this sample application is on github, where you can find the source for zmq::Socket too.

Go to the full post

An asynchronous client/server 0mq application - introduction

This example is based on the Asynchronous Client-Server ZGuide chapter. I have done a few slight changes, primarly to adapt it to Windows - MSVC2010, and to port the code, originally written for czmq, to use the C++ standard binding available for ZeroMQ version 2.x; actually, I developed and tested the code on the most recent currently available ØMQ version, 2.2.0.

It is a single process application, but this is just for simplify its development and testing. It could be easily rewritten splitting it in a server component and a client one.

Client

We could decide how many clients to run. Each client sends half a dozen messages, one (about) every second. We want it to act asynchronously, so we continuously poll for incoming messages between a send and the next one. Once we complete sending our messages, we keep waiting for a while for answers before terminating the execution.

This behavior is uncommon, but it has the advantage of letting the client to terminate cleanly without requiring a complex server.

To allow an asynchronous behavior, the socket connection between client and server will be a DEALER-ROUTER one.

Server

The server provides the ROUTER socket to which the client DEALER sockets connect. Its job is passing each message it gets from the clients to an available worker of its own, and then sending back to the right client the feedback returned by the worker.

Since we want the worker generate a variable number of replies to a single message coming from the client, we use a DEALER-DEALER socket connection.

As a simple way of terminating the server execution, I decided just to check the polling waiting period. If nothing is coming on the server socket for more than a few seconds, we can assume that it is time to shut down the server.

We have seen that the client already has its own rule to shutdown, so from the server point of view, we have to care only of the workers. The convention I use here is to send from the server an empty message to each worker, that it is going to interpret it as a terminator.

Worker

Each worker has its own DEALER socket connected to the server DEALER socket. Its job is getting a message and echoing it, not once, but a random number of times in [0..2], introducing also a random sleep period, just to make things a bit more fancy.

The messages managed by the worker should be composed by two frames, first one is the address of the sending client, second one is the actual payload. If we receive just one frame, it should be empty, and it should be considered a terminator. And what if that single-framed message is not empty? For simplicity sake, I just terminating the worker as it would be empty.

Class AsynchronousCS

Not much coding in this post, I save the fun stuff for the next ones. Here I just say that I have implemented the thing in a class, named AsynchronousCS, designed in this way:
class AsynchronousCS
{
private:
    boost::thread_group threads_; // 1
    MyRandom rand_; // 2

    void client(); // 3
    void server(int nWorkers); // 4
    void worker(zmq::context_t& context); // 5
public:
    AsynchronousCS(int nClients, int nWorkers) : rand_(0,2) // 6
    {
        for(int i = 0; i < nClients; ++i)
            threads_.create_thread(std::bind(&AsynchronousCS::client, this));
        threads_.create_thread(std::bind(&AsynchronousCS::server, this, nWorkers));
    }

    ~AsynchronousCS() { threads_.join_all(); } // 7
};
1. The main thread has to spawn a few threads, one for the server, a variable number of them for the clients. This thread group keeps track of all of them to make easy joining them.
2. Object of an utility class to generate the random numbers used by the worker.
3. Each client thread runs on this function.
4. The server thread runs on this function, the parameter is the number of workers that the server will spawn.
5. Each worker thread runs on this function. Notice that the workers share the same context of the server, while the client and server could be easily rewritten to work in different processes.
6. Constructor, requires the number of clients and workers for the application, instantiate the random generator object, specifying that we want it to generates values in the interval [0..2], and then creates the client and server threads.
7. Destructor, joins all the threads created in the ctor.

Using this class is just a matter of instantiating an object:
void dealer2router(int nClients, int nWorkers)
{
    AsynchronousCS(nClients, nWorkers);
}
The dealer2router() function hangs on the class dtor till the join completes.

In the next posts I'll show how I have implemented the client, server, and worker functions. Full source C++ code for this sample application is on github. To easily manage multipart messages, I use an extension of the zmq::socket_t class, as defined in the ZeroMQ standard C++ binding. I named it zmq::Socket and you can find it too on github.

Go to the full post

Queue Broker v.2 rewritten

A few days ago, I wrote a version of the Least Recently Used Queue broker based on a subclass of the standard C++ ZeroMQ socket_t class. I have just redesigned it to change the way multipart messages are managed, you can read some more about the new zmq::Socket in its specific post. Here I say something about the changes in the broker itself.

The full broker source code is on github, in any case, the changes are in the way multipart messages are managed.

For instance, we can have a look at MyDevice::receivingOnBackend():
void receivingOnBackend()
{
//    zmq::Frames input = backend_.blockingRecv(3, false);
    zmq::Frames input = backend_.blockingRecv(5, false); // workerID (, "", clientID, "", payload) // 1

    // ...
    
//    if(input.size() == 3)
    if(input.size() == 5) // 2
    {
//        zmq::Frames output(input.begin() + 1, input.end());
        zmq::Frames output(input.begin() + 2, input.end()); // 3
        frontend_.send(output);
    }
}
1. On the backend we receive a workerID and, normally, the clientID and the message payload. That means one or three "real" frames. In my original implementation, we didn't get the separators, now we do. So we get a total of one or five frames.
2. As said above, a full message now has size 5, and not anymore 3.
3. We send to the frontend the clientID and the payload, to do that we discard the first two frames, the workerID and the first separator.

Go to the full post

New version for my 0MQ socket wrapper

Instead of using zmq::socket_t class provided by the official C++ wrapper, I have written a subclass with some extended capabilities that I find useful in making the code easier to write and maintain. I have just released a new version on github.

The change is in the way zmq::Socket manages the multipart messages to be sent and received. My original idea was stripping the separators from the vector of strings representing the complete message. You can see the code as it was originally thought in the github history.

Now I realized that this supposed simplification was putting too much pressure on the user code, that could be confused by the behavior of this class.

The zmq::Socket code is cleaner. For instance here is the new main loop in Socket::blockingRecv():
do {
    zmq::message_t message;
    if(!socket_t::recv(&message, 0))
        throw error_t();

    const char* base = static_cast(message.data());
    frames.push_back(std::string(base, base + message.size()));
} while(sockopt_rcvmore());
We receive on the socket till the flag on the socket itself is set to show that there is something more to be read. Each message, without caring of any detail, is pushed in the string vector that is going to be returned to the caller.

As comparison, this was the previous code:
int currentFrame = 1;
do {
    zmq::message_t message;
    if(!socket_t::recv(&message, 0))
        throw error_t();

    if(!(currentFrame++ % 2))
    { // skipping separators
        if(message.size())
            throw error_t();
    }
    else
    {
        const char* base = static_cast(message.data());
        frames.push_back(std::string(base, base + message.size()));
    }
} while(sockopt_rcvmore());
Separators was assumed to be in even positions, and simply discarded.

Alternatively, I should have had to provide a way to let the user code to decide if it wanted to send/receive multipart messages with or without separator. And it looked to me fishing for troubles.

Go to the full post

Building czmq on MSVC

CZMQ, the high level C binding for ØMQ, is available for download from the zeromq.org official page, currently in version 1.1.0. The problem is that, if you want to use it for Visual Studio, there are a few issues.

Firstly, the zip file currently available misses some stuff, most notably, the solution files for MSVC. You can overcome this, getting the latest czmq version from github:
git clone git://github.com/zeromq/czmq

Next step is compiling it, and, as one should expect, the czmq source code includes references to zeromq. You can get it from here:
git clone git://github.com/zeromq/zeromq2-x

Now you are ready to open the czmq solution in the win32 subfolder. Ensure in the project - properties that the relation to zeromq is carried on correctly.

Compile it. If no changes occurred from when I have written this post, compiling for MSVC2010 you should get a few errors:
1>..\src\zframe.c(461): error C2440: 'initializing' : cannot convert from 'void *' to 'char *'
1>..\src\zmsg.c(194): error C2440: 'initializing' : cannot convert from 'void *' to 'zframe_t *'
1>..\src\zmsg.c(503): error C2440: '=' : cannot convert from 'void *' to 'byte *'
1>..\src\zmsg.c(772): error C2440: 'initializing' : cannot convert from 'void *' to 'byte *'
1>..\src\zsockopt.c(138): error C2440: 'initializing' : cannot convert from 'void *' to 'char *'
Explicit cast are required, patch the code like this:
// zframe.c:461
char *buffer = static_cast<char*>(malloc(1024));

// zmsg.c:194
zframe_t *frame = static_cast<zframe_t *>(zlist_pop (self->frames));

// zmsg.c:503
*buffer = static_cast<byte *>(malloc (buffer_size));

// zmsg.c:772
byte *blank = static_cast<byte *>(zmalloc (100000));

// zsockopt.c:138
char *identity = static_cast<char*>(zmalloc (option_len));
Once corrected these errors, you should complete correctly the czmq project, producing czmq.lib in the win32 folder.

Create a project that uses zmq and czmq, setting properly include and library directories, and a reference to the actual lib files among the linker additional dependencies.

Finally you can write a first tiny hello czmq application that just instantiate and destroy a zmq context:
#include <czmq.h>

// ...
{
    zctx_t* ctx = zctx_new();
    assert(ctx);
    zctx_destroy(&ctx);
    assert(ctx == NULL);
}

Go to the full post

Hello Windows Sockets 2 - client

The Winsock API is a collection of quite low level functions. As we have seen in the previous post, writing an echo server, substantially a simple procedure (wait for data, send them back, shutdown when the connection closes), it is not complicated but longish, we have access to the tiniest detail in the process. The most tedious part of the job is that we have to specify any single step by hand, even the ones that could be easily automatized. A slender C++ wrapper would be enough to make the job much more simpler, but till now I haven't found around anything like that. Any suggestion is welcomed.

Let's now complete our echo application, thinking about the client side.

Initializing Winsock

This is the same stuff that we have already seen for the server side. We need to enclose out code in a pair of function call to WSAStartup() and WSACleanup(), the Ws2_32.lib should be linked to our project, and we have to include a couple of header files, winsock2.h and ws2tcpip.h, in our C/C++ source file.

Open/close socket

Almost the same as for the server:
void client(const char* host, const char* port) // 1
{
    ADDRINFO hints;
    wsa::set(hints, 0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP); // 2

    ADDRINFO* ai = wsa::get(host, port, &hints);
    if(ai)
    {
        wsa::list(ai); // 3

        SOCKET sk = wsa::createClientSocket(ai);
        freeaddrinfo(ai);

        if(sk != INVALID_SOCKET)
        {
            coreClient(sk); // 4
            closesocket(sk);
        }
    }
}
1. The user should provide us both machine address an port, so that we can connect to the server socket.
2. Tiny variation, we specify AF_UNSPEC as address family, so that both TCP/IP v4 and v6 are accepted, when available. This does not make much sense here, since we already know that the server is using a TCP/IP v4, but in a real case scenario could be a useful trick.
3. We asked to winsock for all the address info matching our requirements. If its current version supports both TCP/IP v4 and v6, we should get both of them. This tiny utility function, see it below, checks all the retrieved address info and dump to the console some of their settings.
4. The rest of the code is just the same of what we have already seen for the server, before calling the specific client code, we create the socket, and then we'll close it. Notice also that we had to free by hand the memory associated to the address info.

As promised, here is the address info listing utility function:
namespace wsa
{
    void list(ADDRINFO* ai)
    {
        while(ai)
        {
            std::cout << ai->ai_family << ' ' << ai->ai_socktype << ' ' << ai->ai_protocol << std::endl;
            ai = ai->ai_next; // 1
        }
    }
}
1. ADDRINFO is actually a linked list of records. Its ai_next field points to the next element, if any. So here we are looping on all the items available.

Send and receive

Here is the client specific code:
void coreClient(SOCKET sk)
{
    char* message = "Hello Winsock 2!";

    int size = send(sk, message, strlen(message), 0); // 1
    if(size == SOCKET_ERROR)
    {
        std::cout << "send failed: " << WSAGetLastError() << std::endl;
        return;
    }

    std::cout << size << " bytes sent" << std::endl;

    if(shutdown(sk, SD_SEND) == SOCKET_ERROR) // 2
    {
        std::cout << "shutdown failed: " << WSAGetLastError() << std::endl;
        return;
    }

    do { // 3
        char buffer[BUFLEN];
        size = recv(sk, buffer, BUFLEN, 0); // 4
        if(size > 0)
        {
            std::cout << "Buffer received: '";
            std::for_each(buffer, buffer + size, [](char c){ std::cout << c; });
            std::cout << '\'' << std::endl;
        }
        else if(!size)
            std::cout << "Connection closed." << std::endl;
        else
            std::cout << "recv failed: " << WSAGetLastError() << std::endl;
    } while(size > 0);
}
1. The message is sent through the socket, winsock returns the number of bytes actually sent, or an error code.
2. This simple application sends just one message, so here we can already close the send component of the socket, releasing some resources doing that, still keeping alive its receive component.
3. Loop until we receive data from the server
4. The received data is put in the buffer, its size is returned by the call. A zero size message is interpreted as a shutdown signal coming from the server. A negative value has to be intepreted as an error code.

If we run the client alone, it won't find any server socket to connect, and would return an error to the user.

The full client/server source code is on github.

Go to the full post

Hello Windows Sockets 2 - server

The Winsock API, also known as WSA, implements the socket paradigm not much differently from the BSD (Berkeley Software Distribution) concept. Here I am going to write a simple echo application that should help to familiarize with its basic functionality.

Initializing Winsock

To use Winsock 2 we need to link Ws2_32.lib (if the 32 bit version is OK for you), and we can do that specifying it in the project Properties - Linker - Input - Additional Dependencies field. Alternatively, we could use a pragma directive:
#pragma comment(lib, "Ws2_32.lib")
Then include files required are winsock2.h and, almost ever, ws2tcpip.h, the WinSock2 extension for TCP/IP protocols header.

The winsock code should be within two calls that take care of initializing and shutting down the socket support. Typically your code should look like:
WSADATA wsaData; // 1
int failCode = WSAStartup(MAKEWORD(2,2), &wsaData); // 2
if(failCode)
{
    std::cout << "WSAStartup failed: " << failCode << std::endl;
    return;
}

// 3
// ...

WSACleanup(); // 4
1. WSADATA is a structure containing a few data related to the current active Winsock API.
2. WSAStartup() try to initialize winsock for the passed version support (2.2 here) and sets the wsadata structure. If it works fine it returns 0, otherwise it returns an error code.
3. Your winsock code goes here.
4. Winsock shutdown.

Open/close socket

To create a socket we need to specify an ADDRINFO structure. Here is the how to for the server:
void server(const char* port) // 1
{
    ADDRINFO hints;
    wsa::set(hints, AI_PASSIVE, AF_INET, SOCK_STREAM, IPPROTO_TCP); // 2

    ADDRINFO* ai = wsa::get(NULL, port, &hints); // 3
    if(ai)
    {
        SOCKET sk = wsa::createServerSocket(ai); // 4
        freeaddrinfo(ai); // 5

        if(sk != INVALID_SOCKET)
        {
            coreServer(sk); // 6
            closesocket(sk);
        }
    }
}
1. The user tells on which port the socket should sit.
2. See below for this tiny utility function I have written thinking it makes the code clearer. The addrinfo is initialize to say to winsock that we want to create a server TCP v4 socket.
3. A second tiny utility function of mine, to wrap a call to getaddrinfo() that returns an available addrinfo matching our request, or NULL.
4. Another utility function, this one wrapping a call to socket(), that creates a winsock socket, and bind() it to the passed address.
5. Once we get the socket, we can get rid of the addrinfo.
6. If we have got a good socket, we can do the real job.
7. We are done with the socket, so we close it.

Let see the utility functions used above:
namespace wsa
{
    void set(ADDRINFO& that, int flags =0, int family =0, int type =0, int protocol =0, // 1
        size_t addrlen =0, char* name =0, SOCKADDR* addr =0, ADDRINFO* next =0)
    {
        that.ai_flags = flags;
        that.ai_family = family;
        that.ai_socktype = type;
        that.ai_protocol = protocol;
        that.ai_addrlen = addrlen;
        that.ai_canonname = name;
        that.ai_addr = addr;
        that.ai_next = next;
    }

    ADDRINFO* get(const char* host, const char* port, ADDRINFO* hints)
    {
        ADDRINFO* ai = NULL;
        int failCode = getaddrinfo(host, port, hints, &ai); // 2
        if(failCode)
        {
            std::cout << "getaddrinfo failed: " << failCode << std::endl;
            return NULL;
        }
        return ai;
    }

    SOCKET createSocket(ADDRINFO* ai) // 3
    {
        SOCKET sk = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
        if(sk == INVALID_SOCKET)
        {
            std::cout << "error creating socket: " << WSAGetLastError() << std::endl;
            return INVALID_SOCKET;
        }
        return sk;
    }

    SOCKET createServerSocket(ADDRINFO* ai)
    {
        SOCKET sk = createSocket(ai); // 4

        if(sk != INVALID_SOCKET && bind(sk, ai->ai_addr, ai->ai_addrlen) == SOCKET_ERROR) // 5
        {
            closesocket(sk); // 6

            std::cout << "Unable to bind: " << WSAGetLastError() << std::endl;
            return INVALID_SOCKET;
        }
        return sk;
    }
}
1. It ensures all the fields are set to zero/NULL or to a valid value.
2. Wraps a call to getaddrinfo().
3. First step in the creation of a server socket, it wraps a call to socket().
4. After creating a socket in (3) ...
5. ... we bind it to the passed address.
6. Some error handling.

Accepting a client

After all this setting up, we can finally do something with our socket:
void coreServer(SOCKET skServer)
{
    if(listen(skServer, SOMAXCONN) == SOCKET_ERROR) // 1
    {
        std::cout << "Listen failed with error: " << WSAGetLastError() << std::endl;
        return;
    }

    SOCKET skClient = accept(skServer, NULL, NULL); // 2
    if(skClient == INVALID_SOCKET)
    {
        std::cout << "accept failed: " << WSAGetLastError() << std::endl;
        return;
    }

    echoLoop(skClient); // 3
    closesocket(skClient);
}
1. We have a passive (server) socket, bound to a specific address. Before working on it, we have to prepare it to listen for incoming traffic.
2. Here we hangs till we receive an input from a client socket. In this simple example, there could be just one client accessing the server. Once the server accepts a client connection, it has no way to accept another one of them.
3. The real job is done here.
4. And finally we close the client socket.

Receive and send

We have the client socket counterpart. We can receive on it, and send back some data to it. Here we implements an echo procedure:
void echoLoop(SOCKET skClient)
{
    char buffer[BUFLEN];

    int rSize;
    do {
        rSize = recv(skClient, buffer, BUFLEN, 0); // 1
        if(rSize > 0)
        {
            std::cout << "Buffer received: '";
            std::for_each(buffer, buffer + rSize, [](char c){ std::cout << c; });
            std::cout << '\'' << std::endl;

            int sSize = send(skClient, buffer, rSize, 0); // 2
            if(sSize == SOCKET_ERROR)
            {
                std::cout << "send failed: " << WSAGetLastError() << std::endl;
                return;
            }
            std::cout << sSize << " bytes sent back" << std::endl;
        }
        else if(rSize == 0)
            std::cout << "closing connection ..." << std::endl;
        else
        {
            std::cout << "Error receiving: " << WSAGetLastError() << std::endl;
            return;
        }
    } while(rSize > 0);
}
1. Receive until the client shuts down the connection
2. Echo the buffer back to the sender

If we run the server alone, it will hang forever waiting for a client. We'll see it in the next post, but you can already go to github and get the full client/server source code. This example is based on the official Getting started with Winsock topic on MSDN.

Go to the full post