Showing posts with label DS. Show all posts
Showing posts with label DS. Show all posts

Producer-consumer problem in [C] using pthreads/bounded-buffer/semaphores Source code


SO, today we are talking about the producer-consumer problem, and we are going to solve it utilizing a bounded-buffer and pthreads.
Let’s talk about a producer-consumer relationship for a second, shall we? Basically, the producer produces goods while the consumer consumes the goods and typically does something with them.
In our case our producer will produce an item and place it in a bound-buffer for the consumer. Then the consumer will remove the item from the buffer and print it to the screen.
At this point you may be asking yourself what exactly is a “bound-buffer”? Well a buffer is a container of sorts; a bound-buffer is a container with a limit. We have to be very careful in our case that we don’t over fill the buffer or remove something that isn’t there; in c this will produce a segmentation fault.
The last thing we need to talk about before I explain the problem is semaphores. What, where, when, why and how???
What: A synchronization tool used in concurrent programming
Where: We will use semaphores in any place where we may have concurrency issues. In other words any place where we feel more than one thread will access the data or structure at any given time.
When: To help solve concurrency issues when programming with threads.
Why: Think about how registers work in the operating system for a second. Here is an example of how registers work when you increment a counter-

register1 = counter;
register1 = register1 + 1;
counter = register1;
Now image two threads manipulating this same example but one thread is decrementing -
(T1) register1 = counter;              [register1 = 5]
(T1) register1 = register1 + 1;    [register1 = 6]
(T2) register2 = counter;             [register2 = 5]
(T2) register2 = register2 – 1;     [register2 = 4]
(T1) counter = register1;             [counter = 6]
(T2) counter = register2;            [counter = 4]
WOW, that does look good, does it?! Because both threads were allowed to run without synchronization our counter now has a definitely wrong value. With synchronization the answer should come out to be 5 like it started.
How: We implement a semaphore as an integer value that is only accessible through two atomic operations wait() and signal(). Defined as follows:
/* The wait operation */
wait(S) {
   while(S <= 0); //no-operation
   S--;
}
 
/* The signal operation */
signal(S) {
   S++;
}
S: Semaphore
The operation wait() tells the system that we are about to enter a critical section and signal() notifies that we have left the critical section and it is now accessible to other threads.
Therefore:
wait(mutex);
//critical section where the work is done
signal(mutex)
//continue on with life
Mutex stands for mutual exclusion. Meaning only one process may execute the section at a time.
Are we starting to get it? Well if not we have an example that demonstrates how semaphores are used in reference to pthreads coming up right after this problem walk-through.
Basically, we are going to have a program that creates an N number of producer and consumer threads. The job of the producer will be to generate a random number and place it in a bound-buffer. The role of the consumer will be to remove items from the bound-buffer and print them to the screen. Sounds simple right? Well, yes and no. Remember the big issue here is concurrency so we will be using semaphores to help prevent any issues that might occur. To double our efforts we will also be using a pthread mutex lock to further guarantee synchronization.
The user will pass in three arguments to start to application: <INT, time for the main method to sleep before termination> <INT, Number of producer threads> <INT, number of consumer threads>
We will then use a function in initialize the data, semaphores, mutex lock, and pthread attributes.
Create the producer threads.
Create the consumer threads.
Put main() to sleep().
Exit the program.

Time for the code…
CODE:
/* buffer.h */
typedef int buffer_item;
#define BUFFER_SIZE 5
 
/* main.c */
 
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include "buffer.h"
 
#define RAND_DIVISOR 100000000
#define TRUE 1
 
/* The mutex lock */
pthread_mutex_t mutex;
 
/* the semaphores */
sem_t full, empty;
 
/* the buffer */
buffer_item buffer[BUFFER_SIZE];
 
/* buffer counter */
int counter;
 
pthread_t tid;       //Thread ID
pthread_attr_t attr; //Set of thread attributes
 
void *producer(void *param); /* the producer thread */
void *consumer(void *param); /* the consumer thread */
 
void initializeData() {
 
   /* Create the mutex lock */
   pthread_mutex_init(&mutex, NULL);
 
   /* Create the full semaphore and initialize to 0 */
   sem_init(&full, 0, 0);
 
   /* Create the empty semaphore and initialize to BUFFER_SIZE */
   sem_init(&empty, 0, BUFFER_SIZE);
 
   /* Get the default attributes */
   pthread_attr_init(&attr);
 
   /* init buffer */
   counter = 0;
}
 
/* Producer Thread */
void *producer(void *param) {
   buffer_item item;
 
   while(TRUE) {
      /* sleep for a random period of time */
      int rNum = rand() / RAND_DIVISOR;
      sleep(rNum);
 
      /* generate a random number */
      item = rand();
 
      /* acquire the empty lock */
      sem_wait(&empty);
      /* acquire the mutex lock */
      pthread_mutex_lock(&mutex);
 
      if(insert_item(item)) {
         fprintf(stderr, " Producer report error condition\n");
      }
      else {
         printf("producer produced %d\n", item);
      }
      /* release the mutex lock */
      pthread_mutex_unlock(&mutex);
      /* signal full */
      sem_post(&full);
   }
}

/* Consumer Thread */
void *consumer(void *param) {
   buffer_item item;
 
   while(TRUE) {
      /* sleep for a random period of time */
      int rNum = rand() / RAND_DIVISOR;
      sleep(rNum);
 
      /* aquire the full lock */
      sem_wait(&full);
      /* aquire the mutex lock */
      pthread_mutex_lock(&mutex);
      if(remove_item(&item)) {
         fprintf(stderr, "Consumer report error condition\n");
      }
      else {
         printf("consumer consumed %d\n", item);
      }
      /* release the mutex lock */
      pthread_mutex_unlock(&mutex);
      /* signal empty */
      sem_post(&empty);
   }
}
 
/* Add an item to the buffer */
int insert_item(buffer_item item) {
   /* When the buffer is not full add the item
      and increment the counter*/
   if(counter < BUFFER_SIZE) {
      buffer[counter] = item;
      counter++;
      return 0;
   }
   else { /* Error the buffer is full */
      return -1;
   }
}
 
/* Remove an item from the buffer */
int remove_item(buffer_item *item) {
   /* When the buffer is not empty remove the item
      and decrement the counter */
   if(counter > 0) {
      *item = buffer[(counter-1)];
      counter--;
      return 0;
   }
   else { /* Error buffer empty */
      return -1;
   }
}
 
int main(int argc, char *argv[]) {
   /* Loop counter */
   int i;
 
   /* Verify the correct number of arguments were passed in */
   if(argc != 4) {
      fprintf(stderr, "USAGE:./main.out <INT> <INT> <INT>\n");
   }
 
   int mainSleepTime = atoi(argv[1]); /* Time in seconds for main to sleep */
   int numProd = atoi(argv[2]); /* Number of producer threads */
   int numCons = atoi(argv[3]); /* Number of consumer threads */
 
   /* Initialize the app */
   initializeData();
 
   /* Create the producer threads */
   for(i = 0; i < numProd; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,producer,NULL);
    }
 
   /* Create the consumer threads */
   for(i = 0; i < numCons; i++) {
      /* Create the thread */
      pthread_create(&tid,&attr,consumer,NULL);
   }
 
   /* Sleep for the specified amount of time in milliseconds */
   sleep(mainSleepTime);
 
   /* Exit the program */
   printf("Exit the program\n");
   exit(0);
}
OUTPUT:
lee@isis:~/programming/c/producer-consumer$ ./pc.out 10 10 10
producer produced 35005211
consumer consumed 35005211
producer produced 1726956429
consumer consumed 1726956429
producer produced 278722862
consumer consumed 278722862
producer produced 468703135
producer produced 1801979802
producer produced 635723058
producer produced 1125898167
consumer consumed 1125898167
Exit the program
Here, we have the output for the program. As you can see we told main to quit after 10 seconds and we produced 10 producer and 10 consumer threads.



The NFS Daemons


Ø  If you want to provide NFS service to other hosts, you have to run the nfsd and mountd daemons on your machine. As RPC-based programs, they are not managed by inetd, but are started up at boot time, and register themselves with the portmapper. Therefore, you have to make sure to start them only after rpc.portmap is running. Usually, you include the following two lines in your rc.inet2 script: 
            if [ -x /usr/sbin/rpc.mountd ]; then
                    /usr/sbin/rpc.mountd; echo -n " mountd"
            fi
            if [ -x /usr/sbin/rpc.nfsd ]; then
                    /usr/sbin/rpc.nfsd; echo -n " nfsd"
            Fi
Ø  The ownership information of files a NFS daemon provides to its clients usually contains only numerical user and group id's. If both client and server associate the same user and group names with these numerical id's, they are said to share the same uid/gid space. For example, this is the case when you use NIS to distribute the passwd information to all hosts on your LAN.
Ø  On some occasions, however, they do not match. Rather updating the uid's and gid's of the client to match those of the server, you can use the ugidd mapping daemon to work around this. Using the map_daemon option explained below, you can tell nfsd to map the server's uid/gid space to the client's uid/gid space with the aid of the ugidd on the client.
ugidd is an RPC-based server, and is started from rc.inet2 just like nfsd and mountd.
            if [ -x /usr/sbin/rpc.ugidd ]; then
                  /usr/sbin/rpc.ugidd; echo -n " ugidd"
            fi
Ø  While the above options applied to the client's NFS configuration, there is a different set of options on the server side that configure its per-client behavior. These options must be set in the /etc/exports file. 
Ø  By default, mountd will not allow anyone to mount directories from the local host, which is a rather sensible attitude. To permit one or more hosts to NFS-mount a directory, it must exported, that is, must be specified in the exports file. A sample file may look like this:
             # exports file for vlager
             /home             vale(rw) vstout(rw) vlight(rw)
             /usr/X386         vale(ro) vstout(ro) vlight(ro)
             /usr/TeX          vale(ro) vstout(ro) vlight(ro)
             /                 vale(rw,no root squash)
             /home/ftp         (ro)
Ø  The host name is followed by an optional, comma-separated list of flags, enclosed in brackets. These flags may take the following values:
insecure
Permit non-authenticated access from this machine.

unix-rpc
Require UNIX-domain RPC authentication from this machine. This simply requires that requests originate from a reserved internet port (i.e. the port number has to be less than 1024). This option is on by default.

secure-rpc
Require secure RPC authentication from this machine. This has not been implemented yet. See Sun's documentation on Secure RPC.

kerberos
Require Kerberos authentication on accesses from this machine. This has not been implemented yet. See the MIT documentation on the Kerberos authentication system.

root squash

This is a security feature that denies the super user on the specified hosts any special access rights by mapping requests from uid 0 on the client to uid 65534 (-2) on the server. This uid should be associated with the user nobody.

no root squash
Don't map requests from uid 0. This option is on by default.
ro
Mount file hierarchy read-only. This option is on by default.
rw
Mount file hierarchy read-write.

link relative
Convert absolute symbolic links (where the link contents start with a slash) into relative links by prepending the nec- essary number of ../'s to get from the directory containing the link to the root on the server. This option only makes sense when a host's entire file system is mounted, else some of the links might point to nowhere, or even worse, files they were never meant to point to.
This option is on by default.
Ø  and transparently mounts any NFS volume as needed, and unmounts them after they have not been used for some time.
Ø  One of the clever things about an automounter is that it is able to mount a certain volume from alternative places. For instance, you may keep copies of your X-programs and support files on two or three hosts, and have all other hosts mount them via NFS. Using an automounter, you may specify all three of them to be mounted on /usr/X386; the automounter will then try to mount any of these until one of the mount attempts succeeds.



       

NFS introducation

Ø  NFS, the network filesystem, is probably the most prominent network services using RPC. It allows to access files on remote hosts in exactly the same way as a user would access any local files. This is made possible by a mixture of kernel functionality on the client side (that uses the remote file system) and an NFS server on the server side (that provides the file data). This file access is completely transparent to the client, and works across a variety of server and host architectures.
NFS offers a number of advantages:
Data accessed by all users can be kept on a central host, with clients mounting this directory at boot time. For example, you can keep all user accounts on one host, and have all hosts on your network Ø  mount /home from that host. If installed alongside with NIS, users can then log into any system, and still work on one set of files.
Ø  Data consuming large amounts of disk space may be kept on a single host. For example, all files and programs relating to LaTeX and METAFONT could be kept and maintained in one place.
Ø  Administrative data may be kept on a single host. No need to use rcp anymore to install the same stupid file on 20 different machines.

Preparing NFS
Ø  Before you can use NFS, be it as server or client, you must make sure your kernel has NFS support compiled in. Newer kernels have a simple interface on the proc filesystem for this, the /proc/filesystems file, which you can display using cat:
          $ cat /proc/filesystems
          minix
          ext2
          msdos
nodev   proc
          nodev   nfs
Ø  If nfs is missing from this list, then you have to compile your own kernel with NFS enabled. Configuring the kernel network options is explained in section ``Kernel Configuration'' .
Ø  For older kernels prior to -1.1, the easiest way to find out whether your kernel has NFS support enabled is to actually try to mount an NFS file system. For this, you could create a directory below /tmp, and try to mount a local directory on it:
     # mkdir /tmp/test
     # mount localhost:/etc /tmp/test
Ø  If this mount attempt fails with an error message saying ``fs type nfs no supported by kernel'', you must make a new kernel with NFS enabled. Any other error messages are completely harmless, as you haven't configured the NFS daemons on your host yet.

Mounting an NFS Volume

Ø  NFS volumes are mounted very much the way usual file systems are mounted. You invoke mount using the following syntax:
              # mount -t nfs nfs volume local dir options
 nfs_volume is given as remote_host:remote_dir. Since this notation is unique to NFS file systems, you can leave out the -t nfs option.
Ø  There are a number of additional options that you may specify to mount upon mounting an NFS volume. These may either be given following the -o switch on the command line, or in the options field of the /etc/fstab entry for the volume. In both cases, multiple options are separated from each other by commas. Options specified on the command line always override those given in the fstab file.
Ø  A sample entry in /etc/fstab might be
     # volume              mount point       type  options
     news:/usr/spool/news  /usr/spool/news   nfs   timeo=14,intr

Ø  In the absence of a fstab entry, NFS mount invocations look a lot uglier. For instance, suppose you mount your users' home directories from a machine named moonshot, which uses a default block size of 4K for read/write operations. You might decrease block size to 2K to suit ' datagram size limit by issuing
     # mount moonshot:/home /home -o rsize=2048,wsize=2048

Ø  The list of all valid options is described in its entirety in the nfs(5) manual page that comes with Rick Sladkey's NFS-aware mount tool which can be found in Rik Faith's util-linux package). The following is an incomplete list of those you would probably want to use:

   rsize = n and wsize = n
These specify the datagram size used by the NFS clients on read and write requests, respectively. They cur- rently default to 1024 bytes, due to the limit on UDP datagram size described above.

  timeo = n
This sets the time (in tenths of a second) the NFS client will wait for a request to complete. The default values is 0.7 sec- onds.
  hard
Explicitly mark this volume as hard-mounted. This is on by default.
  soft
Soft-mount the driver (as opposed to hard-mount).
  intr
Allow signals to interrupt an NFS call. Useful for aborting when the server doesn't respond.

clock synchronization NTP & Lamports Algorithm


Network Time Protocol (NTP) is a networking protocol for clock synchronization between computer systems over packet-switched, variable-latency data networks.

OVERVIEW
·       NTP provides Coordinated Universal Time (UTC) including scheduled leap second adjustments. No information about time zones or daylight saving time is transmitted; this information is outside its scope and must be obtained separately.
·       NTP uses Manzullo’s algorithm and is designed to resist the effects of variable latency. NTP can usually maintain time to within tens of milliseconds over the public Internet, and can achieve 1 millisecond accuracy in local area networks under ideal conditions.
·       The protocol uses the User Datagram Protocol (UDP) on port number 123.

NTP SOFTWARE IMPLEMENTATIONS

·       For modern Unix systems, the NTP client is implemented as a daemon process that runs continuously in user space (ntpd). Because of sensitivity to timing.
·       it is important to have the standard NTP clock phase-locked loop implemented in kernel space. All recent versions of Linux, BSD, Mac OS X, Solaris and AIX are implemented in this manner.
·       The NTP packet is a UDP datagram, carried on port 123.

 

MICROSOFT WINDOWS

·       Microsoft Windows NT 4.0 did not come with an NTP implementation. The reference implementation of NTP can be used on NT4 systems.
·       All Microsoft Windows versions since Windows 2000 and Windows XP include the Windows Time Service ("w32time"), which has the ability to sync the computer clock to an NTP server.
·       The version in Windows 2000 and Windows XP only implements Simple NTP, and violates several aspects of the NTP version 3 standard. Beginning with Windows Server 2003 and Windows Vista, a compliant implementation of full NTP is included.
·       However, Microsoft does not guarantee that their implementation will be more accurate than 2 seconds. If higher accuracy is desired, Microsoft recommends to use a different NTP implementation.

LAMPORT'S BAKERY ALGORITHM
·       It  is a computer algorithm devised by computer scientist Leslie Lamport, which is intended to improve the safety in the usage of shared resources among multiple threads by means of mutual exclusion.
·       In computer science, it is common for multiple threads to simultaneously access the same resources. Data corruption can occur if two or more threads try to write into the same memory location, or if one thread reads a memory location before another has finished writing into it.
·       Lamport's bakery algorithm is one of many mutual exclusion algorithms designed to prevent concurrent threads entering critical sections of code concurrently to eliminate the risk of data corruption.

Algorithm:

·       Lamport envisioned a bakery with a numbering machine at its entrance so each customer is given a unique number. Numbers increase by one as customers enter the store.
·        A global counter displays the number of the customer that is currently being served. All other customers must wait in a queue until the baker finishes serving the current customer and the next number is displayed.
·       When the customer is done shopping and has disposed of his or her number, the clerk increments the number, allowing the next customer to be served. That customer must draw another number from the numbering machine in order to shop again.
·       Due to the limitations of computer architecture, some parts of Lamport's analogy need slight modification. It is possible that more than one thread will get the same number when they request it; this cannot be avoided.
·       Therefore, it is assumed that the thread identifier i is also a priority. A lower value of i means a higher priority and threads with higher priority will enter the critical section first.
·       Lamport was the first to give a distributed mutual exclusion algorithm as an illustration of his clock synchronization scheme. Let Ri be the request set of site Si , i.e. the set of sites from which Si needs permission when it wants to enter CS. In Lamport's algorithm, i : 1 ≤ i ≤ N :: Ri= {S1, S2,...,SN}. Every site Si keeps a queue, request_queuei, which contains mutual exclusion requests ordered by their timestamps. This algorithm requires messages to be delivered in the FIFO order between every pair of sites.

Requesting the critical section.

          1. When a site Si wants to enter the CS, it sends REQUEST(tsi, i) message to all the sites in its request set Ri and places the request on request_queuei (tsi is the timestamp of the request).
          2. When a site Sj receives the REQUEST(tsi, i) message from site Si, it returns a timestamped REPLY message to Si and places site S'is request on request_queuej.


Executing the critical section.

          1. Site Si enters the CS when the two following conditions hold:
            a) [L1:] Si has received a message with timestamp larger than (tsi, i) from all other sites.
            b) [L2:] S'is request is at the top request_queuei.

Releasing the critical section.

          1. Site Si, upon exiting the CS, removes its request from the top of its request queue and sends a timestamped RELEASE message to all the sites in its request set.
          2. When a site Sj receives a RELEASE message from site Si, it removes S'is request from its request queue.