Producer-consumer problem in [C] using pthreads/bounded-buffer/semaphores Source code


SO, today we are talking about the producer-consumer problem, and we are going to solve it utilizing a bounded-buffer and pthreads.
Let’s talk about a producer-consumer relationship for a second, shall we? Basically, the producer produces goods while the consumer consumes the goods and typically does something with them.
In our case our producer will produce an item and place it in a bound-buffer for the consumer. Then the consumer will remove the item from the buffer and print it to the screen.
At this point you may be asking yourself what exactly is a “bound-buffer”? Well a buffer is a container of sorts; a bound-buffer is a container with a limit. We have to be very careful in our case that we don’t over fill the buffer or remove something that isn’t there; in c this will produce a segmentation fault.
The last thing we need to talk about before I explain the problem is semaphores. What, where, when, why and how???
What: A synchronization tool used in concurrent programming
Where: We will use semaphores in any place where we may have concurrency issues. In other words any place where we feel more than one thread will access the data or structure at any given time.
When: To help solve concurrency issues when programming with threads.
Why: Think about how registers work in the operating system for a second. Here is an example of how registers work when you increment a counter-

register1 = counter;
register1 = register1 + 1;
counter = register1;
Now image two threads manipulating this same example but one thread is decrementing -
(T1) register1 = counter;              [register1 = 5]
(T1) register1 = register1 + 1;    [register1 = 6]
(T2) register2 = counter;             [register2 = 5]
(T2) register2 = register2 – 1;     [register2 = 4]
(T1) counter = register1;             [counter = 6]
(T2) counter = register2;            [counter = 4]
WOW, that does look good, does it?! Because both threads were allowed to run without synchronization our counter now has a definitely wrong value. With synchronization the answer should come out to be 5 like it started.
How: We implement a semaphore as an integer value that is only accessible through two atomic operations wait() and signal(). Defined as follows:
/* The wait operation */
wait(S) {
   while(S <= 0); //no-operation
   S--;
}
 
/* The signal operation */
signal(S) {
   S++;
}
S: Semaphore
The operation wait() tells the system that we are about to enter a critical section and signal() notifies that we have left the critical section and it is now accessible to other threads.
Therefore:
wait(mutex);
//critical section where the work is done
signal(mutex)
//continue on with life
Mutex stands for mutual exclusion. Meaning only one process may execute the section at a time.
Are we starting to get it? Well if not we have an example that demonstrates how semaphores are used in reference to pthreads coming up right after this problem walk-through.
Basically, we are going to have a program that creates an N number of producer and consumer threads. The job of the producer will be to generate a random number and place it in a bound-buffer. The role of the consumer will be to remove items from the bound-buffer and print them to the screen. Sounds simple right? Well, yes and no. Remember the big issue here is concurrency so we will be using semaphores to help prevent any issues that might occur. To double our efforts we will also be using a pthread mutex lock to further guarantee synchronization.
The user will pass in three arguments to start to application: <INT, time for the main method to sleep before termination> <INT, Number of producer threads> <INT, number of consumer threads>
We will then use a function in initialize the data, semaphores, mutex lock, and pthread attributes.
Create the producer threads.
Create the consumer threads.
Put main() to sleep().
Exit the program.

Time for the code…
CODE:
/* buffer.h */
typedef int buffer_item;
#define BUFFER_SIZE 5
 
/* main.c */
 
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include "buffer.h"
 
#define RAND_DIVISOR 100000000
#define TRUE 1
 
/* The mutex lock */
pthread_mutex_t mutex;
 
/* the semaphores */
sem_t full, empty;
 
/* the buffer */
buffer_item buffer[BUFFER_SIZE];
 
/* buffer counter */
int counter;
 
pthread_t tid;       //Thread ID
pthread_attr_t attr; //Set of thread attributes
 
void *producer(void *param); /* the producer thread */
void *consumer(void *param); /* the consumer thread */
 
void initializeData() {
 
   /* Create the mutex lock */
   pthread_mutex_init(&mutex, NULL);
 
   /* Create the full semaphore and initialize to 0 */
   sem_init(&full, 0, 0);
 
   /* Create the empty semaphore and initialize to BUFFER_SIZE */
   sem_init(&empty, 0, BUFFER_SIZE);
 
   /* Get the default attributes */
   pthread_attr_init(&attr);
 
   /* init buffer */
   counter = 0;
}
 
/* Producer Thread */
void *producer(void *param) {
   buffer_item item;
 
   while(TRUE) {
      /* sleep for a random period of time */
      int rNum = rand() / RAND_DIVISOR;
      sleep(rNum);
 
      /* generate a random number */
      item = rand();
 
      /* acquire the empty lock */
      sem_wait(&empty);
      /* acquire the mutex lock */
      pthread_mutex_lock(&mutex);
 
      if(insert_item(item)) {
         fprintf(stderr, " Producer report error condition\n");
      }
      else {
         printf("producer produced %d\n", item);
      }
      /* release the mutex lock */
      pthread_mutex_unlock(&mutex);
      /* signal full */
      sem_post(&full);
   }
}

/* Consumer Thread */
void *consumer(void *param) {
   buffer_item item;
 
   while(TRUE) {
      /* sleep for a random period of time */
      int rNum = rand() / RAND_DIVISOR;
      sleep(rNum);
 
      /* aquire the full lock */
      sem_wait(&full);
      /* aquire the mutex lock */
      pthread_mutex_lock(&mutex);
      if(remove_item(&item)) {
         fprintf(stderr, "Consumer report error condition\n");
      }
      else {
         printf("consumer consumed %d\n", item);
      }
      /* release the mutex lock */
      pthread_mutex_unlock(&mutex);
      /* signal empty */
      sem_post(&empty);
   }
}
 
/* Add an item to the buffer */
int insert_item(buffer_item item) {
   /* When the buffer is not full add the item
      and increment the counter*/
   if(counter < BUFFER_SIZE) {
      buffer[counter] = item;
      counter++;
      return 0;
   }
   else { /* Error the buffer is full */
      return -1;
   }
}
 
/* Remove an item from the buffer */
int remove_item(buffer_item *item) {
   /* When the buffer is not empty remove the item
      and decrement the counter */
   if(counter > 0) {
      *item = buffer[(counter-1)];
      counter--;
      return 0;
   }
   else { /* Error buffer empty */
      return -1;
   }
}
 
int main(int argc, char *argv[]) {
   /* Loop counter */
   int i;
 
   /* Verify the correct number of arguments were passed in */
   if(argc != 4) {
      fprintf(stderr, "USAGE:./main.out <INT> <INT> <INT>\n");
   }
 
   int mainSleepTime = atoi(argv[1]); /* Time in seconds for main to sleep */
   int numProd = atoi(argv[2]); /* Number of producer threads */
   int numCons = atoi(argv[3]); /* Number of consumer threads */
 
   /* Initialize the app */
   initializeData();
 
   /* Create the producer threads */
   for(i = 0; i < numProd; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,producer,NULL);
    }
 
   /* Create the consumer threads */
   for(i = 0; i < numCons; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,consumer,NULL);
   }
 
   /* Sleep for the specified amount of time in milliseconds */
   sleep(mainSleepTime);
 
   /* Exit the program */
   printf("Exit the program\n");
   exit(0);
}
OUTPUT:
lee@isis:~/programming/c/producer-consumer$ ./pc.out 10 10 10
producer produced 35005211
consumer consumed 35005211
producer produced 1726956429
consumer consumed 1726956429
producer produced 278722862
consumer consumed 278722862
producer produced 468703135
producer produced 1801979802
producer produced 635723058
producer produced 1125898167
consumer consumed 1125898167
Exit the program
Here, we have the output for the program. As you can see we told main to quit after 10 seconds and we produced 10 producer and 10 consumer threads.