It is quite natural to put the key in a specific frame, and store the information in another one. Still we have to do it by hand, managing explicitly a multi-part message, taking care of the gory details.
The almost meaningless ØMQ 3.1 PUB-SUB application shown here is designed in this way: the publisher sends ten couples of messages, each of them composed by two frames, key and value; the client subscribes for a key, and gets a couple of messages before terminating. The sense of sending all those messages from the producer is that I haven't implemented any synchronization between PUB and SUB, and we should in some way giving enough time to the subscribers to enter in the game before the publisher ends its stream of messages.
This is the main loop in the producer:
for(int i = 0; i < 10; ++i) { zmq_send(skPub, "A", 1, ZMQ_SNDMORE); // 1 char* msgA = "An uninteresting message"; zmq_send(skPub, msgA, strlen(msgA), 0); // 2 std::cout << "Sent: " << msgA << std::endl; zmq_send(skPub, "B", 1, ZMQ_SNDMORE); // 3 char* msgB = "An interesting message"; zmq_send(skPub, msgB, strlen(msgB), 0); // 4 std::cout << "Sent: " << msgB << std::endl; boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5 }1. Send the first frame in a message through the PUB socket. The flag is set to ZMQ_SNDMORE, at least one more frame should be sent to complete the message. This is the key of the message, and it is set to a one character string, "A". Right, one character, there is no '\0' in a 0MQ string.
2. Second frame for the first message, the flag is set to zero, so this is the tail of it.
3. A new messages starts here, having in its first frame a different one-character key, "B".
4. Payload for the second message.
5. The main reason of looping in the publisher is giving time to the subscriber. So we slow down the process with a sleep.
Each subscriber subscribes to the key that it likes, and then gets the relevant messages:
// ... zmq_setsockopt(skSub, ZMQ_SUBSCRIBE, "B", 1); // 1 for(int i = 0; i < 2; ++i) { char key[BUF_SIZE]; int kLen = zmq_recv(skSub, key, BUF_SIZE, 0); // 2 std::cout << "Key: "; std::for_each(key, key + kLen, [](char c){ std::cout << c; }); std::cout << std::endl; char value[BUF_SIZE]; int vLen = zmq_recv(skSub, value, BUF_SIZE, 0); // 3 std::cout << "Value: "; std::for_each(value, value + vLen, [](char c){ std::cout << c; }); std::cout << std::endl; }1. Here we want to get only the "B" messages.
2. Receive a frame. We should already know that this is just a key, and that the next frame is still part of the same message.
3. Second frame received. If the design changes, we are going to be in troubles, since there is no way to know if this is really the last part of the same message.
The post is based on the Pub-sub Message Envelopes section of the Z-Guide. I have already written a similar post, but in that case it was for version 2.1 and in C++; I ported it here to 3.1 and using the raw C interface.
No comments:
Post a Comment