A first ROUTER-DEALER example

Both ROUTER and DEALER ZeroMQ sockets are designed to work asynchronously. Accordingly to the current design requirements, we can use one or the other as server. Here we see the case where the ROUTER sockets are all servers, in the next post we'll see how to write a simple 0MQ application where the DEALER sockets play in the server role.

Same background of the previous posts, the reference platform is Windows, Visual C++2010, C++, ØMQ 2.2 through the standard ZeroMQ C++ wrapper, plus my zmq::Socket class that you can see on github. There is nothing complicated in the code, so I guess it could be easily refactored to work on any environment supported by 0MQ with a limited effort.

The application is designed to to such a pointless job: a few DEALER clients create one message for each ROUTER available, and then send them all. Then each of them gets a bunch of reply on the socket, and print them to let the user see the result. (Not) surprisingly, each client gets back all and only the messages that it sent to the servers.

You can play around changing the number of dealer and routers. In my example I can have just one or two routers, but you can have how many dealer as you want, and your system allows.

The application in developed to run on a single process, and many threads, to keep testing easy, and this is the code that spawns all the client and server threads:
boost::thread_group threads;
for(int i = 0; i < nDealers; ++i) // 1
    threads.create_thread(std::bind(dealer2router, nRouters, i));
for(int i = 0; i < nRouters; ++i) // 2
    threads.create_thread(std::bind(router4dealer, nDealers, i));
threads.join_all();
1. The number of dealers could range from 1 to ... a big number. The actual limit depends on your working environment. Each thread runs the dealer2router(), that asks for the number of routers, and an id for creating a different message for each client.
2. The number of routers is more definite, since we should specify the actual socket address for each of them. Being lazy, here we can have just one or two routers. But it is easy to increase that number. Each server runs on router4dealer(), function that needs the number of expected messages (one for each dealer) and the router id (so that we can peek up its correct socket address).

As I said, only two server maximum are available:
const char* SKA_CLI[] = {"tcp://localhost:5380", "tcp://localhost:5381" };
const char* SKA_SRV[] = {"tcp://*:5380", "tcp://*:5381" };
Extends these arrays if you want more routers. In the following code, you are going to see a call to a utility function named dumpId(), for which I don't show the source code. It just print to std::cout, locking to protect this shared resource by concurrent accesses.

Each client thread runs this function:
void dealer2router(int n, int index)
{
    dumpId("DEALER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id); // 1
    for(int i =0; i < n; ++i)
    {
        skDealer.connect(SKA_CLI[i]); // 2
        dumpId("DEALER CLI on", SKA_CLI[i]);
    }

    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index); // 3
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i) // 4
    {
        std::string in = skDealer.recvAsString();
        dumpId(in.c_str());
    }
}
1. For testing purpose, I set each dealer id to its thread id, so that the output looks easier to be checked.
2. The parameter "n" passed to the function represents the number of routers to which each dealer could connect. Here we are actually connecting to each of them.
3. The parameter "n" represents also the number of messages that each dealer sends, being that one for each available router. The message sent is build in a bit cryptic way, its length is increased at each iteration, starting with one, and it contains a repetition of the same character, based on the loop iteration and the "index" parameter passed to the function to identify the current dealer. The sense of this messy setup is making the output recognizable during the testing.
4. Here we see the dealer's asynchronicity. Firstly we send all the messages, than we receive them all.

The dealer is sending out a multipart message composed by two frames: its address, that I explicitly set to the thread id in (1), and the payload. The address will be used by the router to identify which dealer has to get a reply, and will be consumed by 0MQ. That's way in (4) we should receive just the payload.

Each server runs this code on its own thread:
void router4dealer(int nMsg, int id)
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);

    zmq::Socket skRouter(context, ZMQ_ROUTER);
    skRouter.bind(SKA_SRV[id]); // 1
    dumpId("ROUTER SRV on", SKA_SRV[id]);

    std::vector<zmq::Frames> messages;
    messages.reserve(nMsg); // 2

    for(int i =0; i < nMsg; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2); // 3
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 4
    {
        skRouter.send(frames);
    });

    dumpId("ROUTER done");
}
1. The id passed to the function is used to select the right socket address for the current router. You should carefully check it to avoid out of range values, that should drive this puny application to a miserable crash.
2. We already know how many messages this router is going to receive, so we reserve enough room for them in the vector where we are going to temporary store them.
3. Only one message is expected from each dealer in the application, and each of them is composed by two frames, the dealer address, and the message payload.
4. The routers are asynchronous too, and we see that at work here. Firstly we have received all the messages from the dealers, now we are sending them back. I am using an STL for_each algorithm that iterates on each element of the vector where the messages have been stored. For each message, a C++11 lambda function is called that makes use of the socket, accessed by reference, and accepts in input the current element of the vector, again by reference. In the lambda body we just send the message back through the socket.

No comments:

Post a Comment