It is a single process application, but this is just for simplify its development and testing. It could be easily rewritten splitting it in a server component and a client one.
Client
We could decide how many clients to run. Each client sends half a dozen messages, one (about) every second. We want it to act asynchronously, so we continuously poll for incoming messages between a send and the next one. Once we complete sending our messages, we keep waiting for a while for answers before terminating the execution.
This behavior is uncommon, but it has the advantage of letting the client to terminate cleanly without requiring a complex server.
To allow an asynchronous behavior, the socket connection between client and server will be a DEALER-ROUTER one.
Server
The server provides the ROUTER socket to which the client DEALER sockets connect. Its job is passing each message it gets from the clients to an available worker of its own, and then sending back to the right client the feedback returned by the worker.
Since we want the worker generate a variable number of replies to a single message coming from the client, we use a DEALER-DEALER socket connection.
As a simple way of terminating the server execution, I decided just to check the polling waiting period. If nothing is coming on the server socket for more than a few seconds, we can assume that it is time to shut down the server.
We have seen that the client already has its own rule to shutdown, so from the server point of view, we have to care only of the workers. The convention I use here is to send from the server an empty message to each worker, that it is going to interpret it as a terminator.
Worker
Each worker has its own DEALER socket connected to the server DEALER socket. Its job is getting a message and echoing it, not once, but a random number of times in [0..2], introducing also a random sleep period, just to make things a bit more fancy.
The messages managed by the worker should be composed by two frames, first one is the address of the sending client, second one is the actual payload. If we receive just one frame, it should be empty, and it should be considered a terminator. And what if that single-framed message is not empty? For simplicity sake, I just terminating the worker as it would be empty.
Class AsynchronousCS
Not much coding in this post, I save the fun stuff for the next ones. Here I just say that I have implemented the thing in a class, named AsynchronousCS, designed in this way:
class AsynchronousCS { private: boost::thread_group threads_; // 1 MyRandom rand_; // 2 void client(); // 3 void server(int nWorkers); // 4 void worker(zmq::context_t& context); // 5 public: AsynchronousCS(int nClients, int nWorkers) : rand_(0,2) // 6 { for(int i = 0; i < nClients; ++i) threads_.create_thread(std::bind(&AsynchronousCS::client, this)); threads_.create_thread(std::bind(&AsynchronousCS::server, this, nWorkers)); } ~AsynchronousCS() { threads_.join_all(); } // 7 };1. The main thread has to spawn a few threads, one for the server, a variable number of them for the clients. This thread group keeps track of all of them to make easy joining them.
2. Object of an utility class to generate the random numbers used by the worker.
3. Each client thread runs on this function.
4. The server thread runs on this function, the parameter is the number of workers that the server will spawn.
5. Each worker thread runs on this function. Notice that the workers share the same context of the server, while the client and server could be easily rewritten to work in different processes.
6. Constructor, requires the number of clients and workers for the application, instantiate the random generator object, specifying that we want it to generates values in the interval [0..2], and then creates the client and server threads.
7. Destructor, joins all the threads created in the ctor.
Using this class is just a matter of instantiating an object:
void dealer2router(int nClients, int nWorkers) { AsynchronousCS(nClients, nWorkers); }The dealer2router() function hangs on the class dtor till the join completes.
In the next posts I'll show how I have implemented the client, server, and worker functions. Full source C++ code for this sample application is on github. To easily manage multipart messages, I use an extension of the zmq::socket_t class, as defined in the ZeroMQ standard C++ binding. I named it zmq::Socket and you can find it too on github.
No comments:
Post a Comment