Showing posts with label IPC. Show all posts
Showing posts with label IPC. Show all posts

Message Queue

The boost interprocess message queue lets different threads put and get messages on queue available even to different processes.

Each message has a priority, a length, and the data associated.

It is possible to get and put messages on the queue in three different ways: blocking, try, timed.

This message queue works with raw bytes, so it is not possible to manage directly instantiation of classes that are not trivial: it could be used to exchange int items, but not std::string. To overcome this limitation we could use the boost serialization library.

On message queue construction we must specify how the object should be generated (create and/or open), its name, and the message max number and size. At end of usage we should explicitly remove the message queue calling message_queue::remove().

In the simple example below a producer process creates a message queue of C strings (in the sense of array of chars) and sends a few messages. The consumer process read the messages and output them to the console.

In the main we select to execute the producer or the consumer accordingly to the number of argument passed to the executable:
if(argc == 2)
  ip11a(argv[1]);
else
  ip11b();
If we try to run the consumer before the producer we get an exception.

Here is the code:
#include <iostream>
#include <cstring>
#include <vector>
#include <string>

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/scoped_ptr.hpp>

using namespace boost::interprocess;

namespace
{
  const char* MQ_NAME = "MessageQueue";
  const int MQ_MSG_NR = 10;
  const int MSG_NR = MQ_MSG_NR * 2; // 1

  class QueueManager
  {
  private:
    bool drop_; // 2
    boost::scoped_ptr<message_queue> mq_; // 3

    void remove() { message_queue::remove(MQ_NAME); }
  public:
    enum { MSG_SIZE = 80 };

    // ctor for producer
    QueueManager(int maxNr) : drop_(false)
    {
      remove();
      mq_.reset(new message_queue(create_only, MQ_NAME, maxNr, MSG_SIZE));
    }

    // ctor for consumer
    QueueManager() : drop_(true), mq_(new message_queue(open_only, MQ_NAME)) {}

    ~QueueManager() { if(drop_) remove(); }

    void send(const char* id, int i)
    {
      char buffer[MSG_SIZE];
      sprintf(buffer, "%s_%d", id, i);

      mq_->send(buffer, MSG_SIZE, 0);
    }

    std::string receive()
    {
      char buffer[MSG_SIZE];

      unsigned int priority;
      std::size_t recvd_size;
      mq_->receive(&buffer, MSG_SIZE, recvd_size, priority);

      return std::string(buffer);
    }

    static bool checkIdLen(const char* id)
    {
      if(strlen(id) > QueueManager::MSG_SIZE - 5)
      {
        std::cout << "The specified id [" << id << "] is too long" << std::endl;
        return false;
      }
      return true;
    }
  };
}

void ip11a(const char* id)
{
  std::cout << "Starting producer ..." << std::endl;
  if(QueueManager::checkIdLen(id) == false)
    return;

  try
  {
    QueueManager qm(MQ_MSG_NR);

    std::cout << "Sending messages: ";
    for(int i = 0; i < MSG_NR; ++i)
    {
      qm.send(id, i);
      std::cout << i << ' ';
    }
    std::cout << std::endl;
  }
  catch(interprocess_exception &ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::cout << "done" << std::endl;
}

void ip11b()
{
  std::cout << "Starting consumer ..." << std::endl;

  std::vector<std::string> vec;
  vec.reserve(MSG_NR);

  try
  {
    QueueManager qm;

    for(int i = 0; i < MSG_NR; ++i)
    {
      vec.push_back(qm.receive());
      std::cout << '.';
    }
    std::cout << std::endl;
  }
  catch(interprocess_exception &ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::copy(vec.begin(), vec.end(), std::ostream_iterator<std::string>(std::cout, " "));
  std::cout << std::endl;
}
  1. Just to have a thrill, we'll send to the queue 2x its maximum capacity of messages.
  2. The QueueManager member drop_ is set to true when we want to remove the message queue in the destructor.
  3. In the costructor for the producer we can initialize the message queue object only after removing an eventually pending previous message queue with the same name. But a message_queue can't be created empty and then filled with the required information, so we use a pointer, or better, a smart pointer to stay on the safe side.
The code is based on an example provided by the Boost Library Documentation.

Go to the full post

Anonymous semaphore

Instead of using a condition, we can synchronize processes using the semaphore concept. A semaphore is initializated with an integer value, and we use it calling wait() on it to check if it is currently greater that zero, and decreasing it, otherwise we are kept waiting for our turn. To signal to the other users that a resource is made available, we call post() on the semaphore, that increases the internal counter and notify one other process.

Initializing a semaphore to one we get a so called binary semaphore, actually equivalent to a mutex where wait() is like locking it, and post() is like unlocking it.

The special feature of semaphore is that post() and wait() could be called in different threads or processes.

In the example we see a buffer containing an array of integer that is accessed by two processes: a producer and a consumer. There are three semaphores on it. One used as a mutex, the other two to avoid reading or writing to the array were that should cause loss of data.

In the main of the application this code
if(argc == 2)
ip10a(argv[1]);
else
ip10b();

is used to determine if the current process is a producer or a consumer, accordingly to that we should firstly call the application with an argument, to create the producer. If not we'll get an exception from the consumer, since it expects the shared memory to be already created.

Here is the code:
#include <iostream>

#include "boost/interprocess/sync/interprocess_semaphore.hpp"
#include "boost/interprocess/shared_memory_object.hpp"
#include "boost/interprocess/mapped_region.hpp"
#include "boost/thread/thread.hpp"

using namespace boost::interprocess;

namespace
{
const char* MY_SHARED = "MySharedMemory";

class SharedBuffer
{
private:
enum { NumItems = 10 };

interprocess_semaphore sMutex_; // for exclusive access
interprocess_semaphore sFull_; // to avoid overflow
interprocess_semaphore sEmpty_; // to avoid underflow

int items[NumItems];
public:
SharedBuffer() : sMutex_(1), sFull_(NumItems), sEmpty_(0) {} // 1.

void put(int i) // 2.
{
sFull_.wait(); // wait if the buffer is full
sMutex_.wait(); // wait for exclusive access
items[i % SharedBuffer::NumItems] = i;
sMutex_.post(); // done exclusive access
sEmpty_.post(); // notify one new item available
}

int get(int i) // 3.
{
int result;

sEmpty_.wait(); // wait if buffer is empty
sMutex_.wait(); // wait for exclusive access
result = items[i % SharedBuffer::NumItems];
sMutex_.post(); // done exclusive access
sFull_.post(); // notify one item consumed

return result;
}
};

class SMManager
{
private:
std::string name_;
shared_memory_object shm_;
mapped_region region_;
SharedBuffer* sb_;

void remove() { shared_memory_object::remove(name_.c_str()); }
public:
SMManager(const char* name, bool create) : name_(name)
{
if(create)
{
remove();

shared_memory_object shm(create_only, name, read_write);
shm.truncate(sizeof(SharedBuffer));
shm_.swap(shm);
}
else
{
shared_memory_object shm(open_only, name, read_write);
shm_.swap(shm);
}

mapped_region region(shm_, read_write);
region_.swap(region);

void* addr = region_.get_address();
sb_ = create ? new (addr) SharedBuffer : static_cast<SharedBuffer*>(addr);
}

~SMManager() { remove(); }

SharedBuffer* getSharedBuffer() { return sb_; }
};
}

void ip10a(const char* id)
{
std::cout << "Producer " << id << " started" << std::endl;

try
{
SMManager smm(MY_SHARED, true);

SharedBuffer* data = smm.getSharedBuffer();
const int NumMsg = 50;

for(int i = 0; i < NumMsg; ++i)
{
data->put(i);

std::cout << i << ' ';
boost::this_thread::sleep(boost::posix_time::milliseconds(250));
}
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << std::endl << "Done!" << std::endl;
}

void ip10b()
{
std::cout << "Consumer started" << std::endl;

try
{
SMManager smm(MY_SHARED, false);

SharedBuffer* data = smm.getSharedBuffer();

const int NumMsg = 50;

int extractedData;

for(int i = 0; i < NumMsg; ++i)
{
extractedData = data->get(i);

std::cout << extractedData << ' ';
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << std::endl << "Done!" << std::endl;
}

1. the constructor for SharedBuffer initialize its three semaphores. A first one, sMutex_, is used as mutex - and that's why it gets its name, then we have sFull_, initialized to the array size, that it used to avoid to write elements in the array when this would lead to a data loss, and finally we have sEmpty_, that is used to avoid reading from the array when there is no item available.
2. SharedBuffer::put() is used by the producer to put a new item in the array. Before actually writing the value we should ensure the buffer is not full and that we have the exclusive access to the resource. After writing we release the mutex and notify that a new element has been inserted.
3. SharedBuffer::get() is used by the consumer and is symmetrical to the put() method.

To make the example a bit more interesting a couple of sleep() calls are put in the code for the producer and the consumer.

The code is based on an example provided by the Boost Library Documentation.

Go to the full post

Anonymous condition

Let's write an example to show how to use anonymous conditions in a multiprocess application using the boost interprocess (IPC) library. We write a producer/consumer that manage the exchange of message through a buffer built in the shared memory.

Two conditions are used to let the two process communicate to each other the change of buffer state.

The main of the application performs the selection between the two functions that specify if the process is actually the producer or the consumer. The producer, identified by an argument, should be launched first:
if(argc == 2)
ip09a(argv[1]);
else
ip09b();

And here is the actual code:

#include <iostream>
#include <cstdio>
#include <cstring>
#include <string>

#include "boost/interprocess/sync/interprocess_mutex.hpp"
#include "boost/interprocess/sync/interprocess_condition.hpp"
#include "boost/interprocess/sync/scoped_lock.hpp"
#include "boost/interprocess/shared_memory_object.hpp"
#include "boost/interprocess/mapped_region.hpp"

using namespace boost::interprocess;

namespace
{
const char* MY_SHARED = "MySharedMemory";

class SharedMessage // 1.
{
private:
enum { SIZE = 100 };
char message_[SIZE];

interprocess_mutex mutex_;
bool pending_;
bool terminated_;

interprocess_condition cEmpty_;
interprocess_condition cFull_;

public:
SharedMessage() : pending_(false), terminated_(false) {}

void sendMessage(const char* id, int i) // 2.
{
scoped_lock<interprocess_mutex> lock(mutex_);
if(pending_)
cFull_.wait(lock);

std::sprintf(message_, "%s_%d", id, i);
pending_ = true;
cEmpty_.notify_one();
}

void terminate() // 3.
{
scoped_lock<interprocess_mutex> lock(mutex_);
if(pending_)
cFull_.wait(lock);

terminated_ = true;
pending_ = true;

cEmpty_.notify_one();
}

bool readMessage(std::string& buffer) // 4.
{
scoped_lock<interprocess_mutex> lock(mutex_);
if(!pending_)
cEmpty_.wait(lock);

if(terminated_)
return false; // no message read

buffer = message_;
pending_ = false;
cFull_.notify_one();

return true;
}
};

class SMManager // 5.
{
private:
std::string name_;
shared_memory_object shm_;
mapped_region region_;
SharedMessage* sm_;

void remove() { shared_memory_object::remove(name_.c_str()); }
public:
SMManager(const char* name, bool create) : name_(name)
{
if(create)
{
remove();

shared_memory_object shm(create_only, name, read_write);
shm.truncate(sizeof(SharedMessage));
shm_.swap(shm);
}
else
{
shared_memory_object shm(open_only, name, read_write);
shm_.swap(shm);
}

mapped_region region(shm_, read_write);
region_.swap(region);

void* addr = region_.get_address();
sm_ = create ? new (addr) SharedMessage : static_cast<SharedMessage*>(addr);
}

~SMManager() { remove(); }

SharedMessage* getSharedMessage() { return sm_; }
};
}

void ip09a(const char* id) // 6.
{
std::cout << "Starting producer process" << std::endl;

try
{
SMManager smm(MY_SHARED, true);
SharedMessage* pSM = smm.getSharedMessage();

for(int i = 0; i < 7; ++i)
pSM->sendMessage(id, i);
pSM->terminate();
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << "Execution completed" << std::endl;
}

void ip09b() // 7.
{
std::cout << "Starting consumer " << std::endl;

try
{
SMManager smm(MY_SHARED, false);
SharedMessage* pSM = smm.getSharedMessage();

std::string message;
while(pSM->readMessage(message))
std::cout << "Message received: " << message << std::endl;
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << "Execution completed" << std::endl;
}

1. the SharedMessage class manages the buffer in the shared memory. Basically we have a mutex to rule the access to the shared resources and a couple of conditions to let the processes to communicate between them the change of status. A boolean, pending_, is used to keep track of the current status of the buffer and another one, terminated_, to signal the consumer when the producer has completed its operations.
2. SharedMessage.sendMessage() is used by the producer to put a message in the shared memory for the consumer. A scoped lock is created, than we check for the status variable, if there is already a message in the buffer, the process waits on the lock through the condition variable cFull_. After acquiring the rightful access, we put the message in the shared memory, set the status boolean, and then notify to the condition cEmpty that, well, message is not empty anymore.
3. SharedMessage.terminate() puts a different message in the shared memory, the one saying that the producer is done with producing messages. Instead of putting a string in the buffer we just change the boolean terminated_ setting it to true.
4. SharedMessage.readMessage() is used by the consumer to read the message stored by the producer in the shared memory, but before reading the buffer we check if actually the producer has ended its message production, looking the terminated_ flag. Notice that the condition variable usage mirrows the one of the sendMessage() function.
5. the SMManager class takes care of the shared memory management. Its constructor has a paramenter, create, that let us use it for the producer, that actually allocates the shared memory, and the consumer, that just accesses it. The shared memory is associated to a SharedMessage object, the producer use the new placement to call the SharedMessage constructor without allocating memory, we just use the shared memory that we already have available, the consumer just cast the memory to a pointer to SharedMessage.
6. This is the function used to implement the producer functionality. It creates a SMManager object - to allocate shared memory and create a SharedMessage object - and then sends a few messages.
7. Here we see the logic of the consumer. It creates a SMManager object - to gain access to the shared memory allocated by the producer, seeing it as a SharedMessage object - and then reads messages until it finds that the producer has completed its job.

The code is based on an example provided by the Boost Library Documentation.

Go to the full post

Named mutex

A named mutex could be used in IPC context to synchronize processes on a file. The code showed in the example puts on a file a string that include the thread id of the current process and a counter. If we execute it in different processes, the strings could be mixed up, so we use a lock on the named mutex to let any process having exclusive access to the file when writing.

Here is the code:
#include <fstream>
#include <iostream>
#include <string>

#include "boost/interprocess/sync/scoped_lock.hpp"
#include "boost/interprocess/sync/named_mutex.hpp"
#include "boost/thread/thread.hpp"

using namespace boost::interprocess;

namespace
{
class FileManager // 1.
{
private:
named_mutex mutex_;
std::ofstream file_;

public:
FileManager() : mutex_(open_or_create, "fstream_named_mutex"),
file_("boost.log", std::ios_base::app) {}
~FileManager() { named_mutex::remove("fstream_named_mutex"); }

void log(std::string line)
{
scoped_lock<named_mutex> lock(mutex_);
file_ << line << std::endl;
}
};
}

void ip08()
{
try
{
FileManager fm;

for(int i = 0; i < 10; ++i)
{
std::cout << '.'; // a sign of life for the user ...
std::ostringstream os("Process id ");
os << boost::this_thread::get_id() << " iteration # " << i;

fm.log(os.str());
boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); // 2.
}
std::cout << std::endl;
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}
return;
}

1. through the class FileManager we manage the file stream and the associated mutex, the constructor open the file, in append mode, and create or open the named mutex we use to rule the access to the file. The destructor removes the mutex. The log() methods use a scoped lock on the mutex to safely perform the write to the file.
2. a sleep is used to let the user interact to the execution.

The code is based on an example provided by the Boost Library Documentation.

Go to the full post

Anonymous mutex

The application we are going to use to show how to use anonymous mutex is build around a simple cyclic buffer that resides in shared memory and it is used by two different processes. An interprocess_mutex, defined in the Boost IPC library, is used to rule the access to the shared resources through a scoped_lock.

To create different processes we call the executable with no parameter - for the master process - and then with a parameter, the name of the secondary process.

So, the main of our application would distinguish among the two different cases, calling the appropriate function, using a piece of code like that:
if(argc == 1)
  ip07a();
else
  ip07b(argv[1]);
Let's now have a look at the complete code, than we'll say something about the most interesting passages:
#include <cstdio>
#include <iostream>

#include "boost/interprocess/sync/interprocess_mutex.hpp"
#include "boost/interprocess/sync/scoped_lock.hpp"
#include "boost/interprocess/shared_memory_object.hpp"
#include "boost/interprocess/mapped_region.hpp"
#include "boost/thread/thread.hpp"

using namespace boost::interprocess;

namespace
{
const char* MY_SHARED = "MySharedMemory";

class SharedMemoryLog // 1
{
private:
  enum { NUM_ITEMS = 10, LINE_SIZE = 100 };
  boost::interprocess::interprocess_mutex mutex_;

  char items[NUM_ITEMS][LINE_SIZE];
  int curLine_;
  bool done_;
public:
  SharedMemoryLog() : curLine_(0), done_(false) {}

  void push_line(const char* id, int index)
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    std::sprintf(items[(curLine_++) % SharedMemoryLog::NUM_ITEMS], "%s_%d", id, index);
    std::cout << "Inserting item " << id << ' ' << index << std::endl;
  }

  void dump()
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    for(int i = 0; i < NUM_ITEMS; ++i)
      std::cout << items[i] << std::endl;
  }

  void done()
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    done_ = true;
  }

  bool isDone()
  {
    scoped_lock<interprocess_mutex> lock(mutex_);
    return done_;
  }
};

class ShMemManager // 2
{
private:
  std::string name_;
  bool create_;
  shared_memory_object shm_;
  mapped_region region_;
  SharedMemoryLog* sml_;

  void remove() { shared_memory_object::remove(name_.c_str()); }
public:
  ShMemManager(const char* name, bool create = true) : name_(name), create_(create)
  {
    if(create_)
    {
      remove();

      shared_memory_object shm(create_only, name_.c_str(), read_write);
      shm.truncate(sizeof(SharedMemoryLog));
      shm_.swap(shm);
    }
    else
    {
      shared_memory_object shm(open_only, name_.c_str(), read_write);
      shm_.swap(shm);
    }

    mapped_region region(shm_, read_write);
    region_.swap(region);
    void* addr = region_.get_address();

    sml_ = create_ ? new (addr) SharedMemoryLog : static_cast<SharedMemoryLog*>(addr);
  }

  ~ShMemManager() { remove(); }

  SharedMemoryLog* getMemory() { return sml_; }
};
}

void ip07a() // 4
{
  std::cout << "Starting master process ..." << std::endl;

  try
  {
    ShMemManager smm(MY_SHARED);
    SharedMemoryLog* data = smm.getMemory();

    for(int i = 0; i < 7; ++i)
    {
      data->push_line("master", i);
      boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
    }

    std::cout << "Master dumps data:" << std::endl;
    data->dump();

    while(true)
    {
      if(data->isDone())
      {
        std::cout << "Master sees that the other process is done" << std::endl;
        break;
      }

      std::cout << "Master waits for the other process" << std::endl;
      boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
    }

    std::cout << "Master dumps again the data:" << std::endl;
    data->dump();
  }
  catch(interprocess_exception& ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::cout << "Master execution completed" << std::endl;
}

void ip07b(const char* id) // 5
{
  std::cout << "Process " << id << " started" << std::endl;

  try
  {
    ShMemManager smm(MY_SHARED, false);
    SharedMemoryLog* data = smm.getMemory();

    for(int i = 0; i < 7; ++i)
    {
      data->push_line(id, i);
      boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
    }
    data->done();

    std::cout << id << " dumps data:" << std::endl;
    data->dump();
  }
  catch(interprocess_exception& ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::cout << "Process " << id << " done" << std::endl;
}
1. the class SharedMemoryLog manages the concurrent access by the differnt processes, and it would be associated to memory placed in shared memory. Notice that any method is shielded by scoped_lock on the mutex owned by the class.
2. the class ShMemManager is used to manage the shared memory. A parameter in the constructor let us to determine if we want call it to actually create the shared memory - that would be the usage for the master process - or just to read it - for the secondary process.
3. the last line of the ShMemManager constructor associate the sml_ pointer to the SharedMemoryLog class to the shared memory we have just created or accessed. If we are in creation mode, we should actually call the constructor for the SharedMemoryLog asking it to use the shared memory. To do that we use the so called placement new construct "new (addr) SharedMemoryLog" specifying the memory address we should use. Otherwise we simply perform a static cast to the require type.
4. the function called from the master process. It just puts a few lines in the log (slowing down the process with a sleep call), dumps the log, stays in busy wait for the other process to complete, then performs another dump before returning. This busy wait is not very good programming, we should use a condition instead. We'll see how to do that in a next post.
5. the function called by the secondary process. The main diffences to the master is that we call the constructor for the ShMemManager specifying that we want to access shared memory already available; and then we let the master knowing we are done callint the function done() that sets an internal flag. As already said, this is not a very clean way of working, we'll see how to do better using a condition.

The code is based on an example provided by the Boost Library Documentation.

Go to the full post

File_mapping

Using the Boost IPC library, it is possible to associate a file content with shared memory, that gives a number of advantages, like delegating to the OS all the trouble connected with data sync and caching.

By the way, sometimes could be useful to use file_mapping even if we are not interested in sharing memory, but just to simplify the data management on a file.

Here is an example:

#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstring>
#include <cstddef>
#include <cstdlib>
#include "boost/interprocess/file_mapping.hpp"
#include "boost/interprocess/mapped_region.hpp"

using namespace boost::interprocess;

namespace
{
const std::size_t FILE_SIZE = 10000;
const char* FILE_NAME = "file.bin";

class FileManager // 1.
{
public:
enum Mode { create_remove, create, remove, access };
FileManager(const char* fname, Mode mode) : fname_(fname), mode_(mode)
{
if(mode == create_remove || mode == create)
{
std::filebuf fbuf;
fbuf.open(FILE_NAME, std::ios_base::in | std::ios_base::out
| std::ios_base::trunc | std::ios_base::binary);
//Set the size
fbuf.pubseekoff(FILE_SIZE-1, std::ios_base::beg);
fbuf.sputc(0);
}
}

mapped_region getMappedRegion(mode_t mode)
{
//Create a file mapping
file_mapping m_file(fname_.c_str(), mode);

//Map the whole file with read-write permissions in this process
return mapped_region(m_file, mode);
}

~FileManager()
{
if(mode_ == remove || mode_ == create_remove)
{
file_mapping::remove(fname_.c_str());
std::cout << "File " << fname_ << " removed" << std::endl;
}
}
private:
std::string fname_;
Mode mode_;
};

// ensure the executable file name is quoted in case it has internal blanks
bool launchChildProcess(const char* progName)
{
bool quote = strchr(progName, ' ') == 0 ? false : true;
std::string s((quote ? "\"" : ""));
s += progName;
if(quote)
s += "\"";
s += " child";

if(std::system(s.c_str()) != 0)
{
std::cout << "error in the child process" << std::endl;
return false;
}
return true;
}

void checkMemoryOne(void* address, std::size_t size)
{
const char* mem = static_cast<const char*>(address);
for(std::size_t i = 0; i < size; ++i)
{
if(*mem++ != 1)
{
std::cout << "Memory check failed" << std::endl;
return;
}
}
std::cout << "Memory check succeeded" << std::endl;
}

void sharedMemoryAccess() // 2.
{
FileManager fm(FILE_NAME, FileManager::access);
mapped_region region = fm.getMappedRegion(read_only);

//Get the address of the mapped region
void* addr = region.get_address();
std::size_t size = region.get_size();

checkMemoryOne(addr, size);
}

void fileAccess() // 3.
{
std::filebuf fbuf;
fbuf.open(FILE_NAME, std::ios_base::in | std::ios_base::binary);

//Read it to memory
std::vector<char> vect(FILE_SIZE, 0);
fbuf.sgetn(&vect[0], std::streamsize(vect.size()));

checkMemoryOne(&vect[0], FILE_SIZE);
}
}

// parent process
void ip06a(const char* progName)
{
FileManager fm(FILE_NAME, FileManager::create_remove);
mapped_region region = fm.getMappedRegion(read_write);

//Get the address of the mapped region
void* addr = region.get_address();
std::size_t size = region.get_size();

//Write all the memory to 1
std::memset(addr, 1, size);

launchChildProcess(progName);
}

// child process
void ip06b()
{
std::cout << "Accessing the shared memory" << std::endl;
sharedMemoryAccess();

std::cout << "Accessing the file on disk" << std::endl;
fileAccess();
}

1. the class FileManager is just a little wrapper to the basic functionality for the file access. Its constructor keeps track of the associate file name and the way we want to access it. Only if we want to actually create it, the filebuf functionality are called to access the file, otherwise, if we just want to access the data (or remove the file) we rely on the fact that someone else should have already crete the file. The getMappedRegion() method performs the mapping between shared memory and file, and return the mapped region, in the mode we require. The destructor remove the file, only when required.
2. the function sharedMemoryAccess() shows how to work with the mapped_region.
3. as comparison, the function fileAccess() performs the same action of sharedMemoryAccess(), but accessing directly the file.

The main for this application, basically just calls the parent or the child function accordingly to the passed parameter:
if(argc == 1)
ip06a(argv[0]);
else
ip06b();

The code is based on an example provided by the Boost Library Documentation.

Go to the full post

Shared_memory_object

As far as I know, the new C++ standard (C++0x, still a draft when I'm writing) does not bring much in the interprocess communication (IPC) field. I guess the problem is that almost any environment provides its own native way to approach the issue, and at this point it is not easy to find a solution that would make anyone happy.

Luckily Boost provides an interprocess library that is vary useful in keeping the code as portable as possible.

Here is a first example that shows how to manage shared memory among different processes.

We create two processes. A first one takes care of allocating the shared memory, then it spawns a new process that accesses the shared memory, and does something with it before terminating. Then the parent process gets back in control, removes the shared memory and terminates.

This is the main of our application:
int main(int argc, char* argv[])
{
  if(argc == 1)
    ip05a(argv[0]);
  else
    ip05b();

  system("pause");
}
If we call the executable without parameters, the only argument passed to main() is the executable name, and we use this information to consider this process as the parent one for our application. Otherwise we assume to be in the child process.

This is the implementation for the two functions:
#include <cstring>
#include <cstdlib>
#include <string>
#include <iostream>
#include "boost/interprocess/shared_memory_object.hpp"
#include "boost/interprocess/mapped_region.hpp"

using namespace boost::interprocess;

namespace
{
  const char* MY_SHARED = "MySharedMemory";
  const size_t MY_SHARED_SIZE = 1000;

  // Remove shared memory on construction and destruction
  class Remover
  {
  private:
    std::string name_;
    void remove() { shared_memory_object::remove(name_.c_str()); }
  public:
    Remover(const char* name) : name_(name) { remove(); }
    ~Remover() { remove(); }
  };

  // ensure the executable file name is quoted in case it has internal blanks
  bool launchChildProcess(const char* progName)
  {
    bool quote = strchr(progName, ' ') == 0 ? false : true;
    std::string s((quote ? "\"" : ""));
    s += progName;
    if(quote)
      s += "\"";
    s += " child";

    if(std::system(s.c_str()) != 0)
    {
      std::cout << "error in the child process" << std::endl;
      return false;
    }
    return true;
  }
}

void ip05a(const char* progName)
{
  try
  {
    Remover remover(MY_SHARED); // 1
    shared_memory_object shm(create_only, MY_SHARED, read_write); // 2
    shm.truncate(MY_SHARED_SIZE); // 3
    mapped_region region(shm, read_write); // 4
    std::memset(region.get_address(), 1, region.get_size()); // 5

    launchChildProcess(progName);
  }
  catch(interprocess_exception& ie)
  {
    std::cout << "can't create the shared memory: " << ie.what() << std::endl;
  }
}

void ip05b()
{
  try
  {
    shared_memory_object shm(open_only, MY_SHARED, read_only); // 6
    mapped_region region(shm, read_only);
    std::cout << "Working on a mapped region with size " << region.get_size() << std::endl;

    // do something
    char* mem = static_cast<char*>(region.get_address());
    for(std::size_t i = 0; i < region.get_size(); ++i)
    {
      if(*mem++ != 1)
      {
        std::cout << "unexpected value in the shared memory" << std::endl;
        return;
      }
    }

    std::cout << "shared memory read correctly" << std::endl;
  }
  catch(interprocess_exception& ie)
  {
    std::cout << "can't work on the shared memory: " << ie.what() << std::endl;
  }
}
  1. We create Remover object in order to ensure that the shared memory is correctly destroyed when we left the parent process. The object calls shared_memory_object::remove() for the specified name both on construction and deletion of the object. The call on construction could be seen as an overkilling, but it is cheap, and saves us some trouble in case for any reason the shared memory allocated from a previous execution is unexpectedly still there.
  2. We create an instance of shared_memory_object passing the flag create_only so, if already exists an object in shared memory with the passed name, an exception is raised. Besides, we specify the read_write access mode since we actually want to modify the associated memory.
  3. The shared_memory_object.truncate() method is used to set the size for the object.
  4. Mapped_region maps a shared_memory_object in a region, making the memory available to the current process. We specify here too that we want access the memory in read_write mode.
  5. The mapped_region memory location is accessed through get_address() and get_size().
  6. We create an instance of shared_memory_object passing the flag open_only so, if does not exist yet an object in shared memory with the passed name, an exception is raised. Besides, we specify the read_only access mode since we want just to read the associated memory.
The code is based on an example provided by the Boost Library Documentation.

Go to the full post