Anonymous semaphore

Instead of using a condition, we can synchronize processes using the semaphore concept. A semaphore is initializated with an integer value, and we use it calling wait() on it to check if it is currently greater that zero, and decreasing it, otherwise we are kept waiting for our turn. To signal to the other users that a resource is made available, we call post() on the semaphore, that increases the internal counter and notify one other process.

Initializing a semaphore to one we get a so called binary semaphore, actually equivalent to a mutex where wait() is like locking it, and post() is like unlocking it.

The special feature of semaphore is that post() and wait() could be called in different threads or processes.

In the example we see a buffer containing an array of integer that is accessed by two processes: a producer and a consumer. There are three semaphores on it. One used as a mutex, the other two to avoid reading or writing to the array were that should cause loss of data.

In the main of the application this code
if(argc == 2)
ip10a(argv[1]);
else
ip10b();

is used to determine if the current process is a producer or a consumer, accordingly to that we should firstly call the application with an argument, to create the producer. If not we'll get an exception from the consumer, since it expects the shared memory to be already created.

Here is the code:
#include <iostream>

#include "boost/interprocess/sync/interprocess_semaphore.hpp"
#include "boost/interprocess/shared_memory_object.hpp"
#include "boost/interprocess/mapped_region.hpp"
#include "boost/thread/thread.hpp"

using namespace boost::interprocess;

namespace
{
const char* MY_SHARED = "MySharedMemory";

class SharedBuffer
{
private:
enum { NumItems = 10 };

interprocess_semaphore sMutex_; // for exclusive access
interprocess_semaphore sFull_; // to avoid overflow
interprocess_semaphore sEmpty_; // to avoid underflow

int items[NumItems];
public:
SharedBuffer() : sMutex_(1), sFull_(NumItems), sEmpty_(0) {} // 1.

void put(int i) // 2.
{
sFull_.wait(); // wait if the buffer is full
sMutex_.wait(); // wait for exclusive access
items[i % SharedBuffer::NumItems] = i;
sMutex_.post(); // done exclusive access
sEmpty_.post(); // notify one new item available
}

int get(int i) // 3.
{
int result;

sEmpty_.wait(); // wait if buffer is empty
sMutex_.wait(); // wait for exclusive access
result = items[i % SharedBuffer::NumItems];
sMutex_.post(); // done exclusive access
sFull_.post(); // notify one item consumed

return result;
}
};

class SMManager
{
private:
std::string name_;
shared_memory_object shm_;
mapped_region region_;
SharedBuffer* sb_;

void remove() { shared_memory_object::remove(name_.c_str()); }
public:
SMManager(const char* name, bool create) : name_(name)
{
if(create)
{
remove();

shared_memory_object shm(create_only, name, read_write);
shm.truncate(sizeof(SharedBuffer));
shm_.swap(shm);
}
else
{
shared_memory_object shm(open_only, name, read_write);
shm_.swap(shm);
}

mapped_region region(shm_, read_write);
region_.swap(region);

void* addr = region_.get_address();
sb_ = create ? new (addr) SharedBuffer : static_cast<SharedBuffer*>(addr);
}

~SMManager() { remove(); }

SharedBuffer* getSharedBuffer() { return sb_; }
};
}

void ip10a(const char* id)
{
std::cout << "Producer " << id << " started" << std::endl;

try
{
SMManager smm(MY_SHARED, true);

SharedBuffer* data = smm.getSharedBuffer();
const int NumMsg = 50;

for(int i = 0; i < NumMsg; ++i)
{
data->put(i);

std::cout << i << ' ';
boost::this_thread::sleep(boost::posix_time::milliseconds(250));
}
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << std::endl << "Done!" << std::endl;
}

void ip10b()
{
std::cout << "Consumer started" << std::endl;

try
{
SMManager smm(MY_SHARED, false);

SharedBuffer* data = smm.getSharedBuffer();

const int NumMsg = 50;

int extractedData;

for(int i = 0; i < NumMsg; ++i)
{
extractedData = data->get(i);

std::cout << extractedData << ' ';
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}
}
catch(interprocess_exception &ex)
{
std::cout << ex.what() << std::endl;
return;
}

std::cout << std::endl << "Done!" << std::endl;
}

1. the constructor for SharedBuffer initialize its three semaphores. A first one, sMutex_, is used as mutex - and that's why it gets its name, then we have sFull_, initialized to the array size, that it used to avoid to write elements in the array when this would lead to a data loss, and finally we have sEmpty_, that is used to avoid reading from the array when there is no item available.
2. SharedBuffer::put() is used by the producer to put a new item in the array. Before actually writing the value we should ensure the buffer is not full and that we have the exclusive access to the resource. After writing we release the mutex and notify that a new element has been inserted.
3. SharedBuffer::get() is used by the consumer and is symmetrical to the put() method.

To make the example a bit more interesting a couple of sleep() calls are put in the code for the producer and the consumer.

The code is based on an example provided by the Boost Library Documentation.

No comments:

Post a Comment