Producer Consumer Problem Java : Detailed Explaination

Producer Consumer Problem is well-known example of multi-process synchronization problem. It is described as synchronization problem over a fixed size data buffer implemented as queue being modified by two or more different processes referred as producers and consumers. For a data buffer, we can have multiple number of producers and consumers. Producers task is to keep generating data and place it into buffer while Consumers task is to keep consuming data from buffer. Problem is to ensure that Producers do not add data into buffer if its full and Consumer do not consumer data if buffer is empty.

PCP_AndroidSRC.net

For resolving above problem, Producer and Consumer should behave as below.

Producer

1. Check if Buffer is full or not. If full, then wait() for buffer items to get consumed.

2. Generate data and put it into Buffer.

3. Notify Consumer that Data has been placed into Buffer.

4. Repeat step 1-3

Consumer

1. Check if Buffer has items. If empty, then wait() for buffer to get filled.

2. Consume data from Buffer.

3. Notify Producer that Data has been consumed from Buffer

4. Repear step 1-3

We will implement Producer Consumer Problem Java using two approaches.

1. Synchronized Instance Methods

A synchronized instance method in Java is synchronized on the instance (object) owning the method. Thus, each instance has its synchronized methods synchronized on a different object: the owning instance. Only one thread can execute inside a synchronized instance method.

Producer Consumer Problem Java Source Code Using Synchronized Instance Methods

import java.util.Random;

public class ProducerConsumerProblem {

    public static void main(String[] args) {
        ProducerConsumerProblem pcp = new ProducerConsumerProblem();
        DataBuffer buffer = pcp.new DataBuffer(2);

        Producer p1 = pcp.new Producer(buffer, "Producer 1");
        Producer p2 = pcp.new Producer(buffer, "Producer 2");
        Producer p3 = pcp.new Producer(buffer, "Producer 3");
        Producer p4 = pcp.new Producer(buffer, "Producer 4");
        Producer p5 = pcp.new Producer(buffer, "Producer 5");

        Consumer c1 = pcp.new Consumer(buffer, "Consumer 1");
        Consumer c2 = pcp.new Consumer(buffer, "Consumer 2");
        Consumer c3 = pcp.new Consumer(buffer, "Consumer 3");
        Consumer c4 = pcp.new Consumer(buffer, "Consumer 4");
        Consumer c5 = pcp.new Consumer(buffer, "Consumer 5");

        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        c1.start();
        c2.start();
        c3.start();
        c4.start();
        c5.start();
    }

    class DataBuffer {
        //Circular Queue implementation is used
        // Refernce www.cs.sfu.ca/CourseCentral/225/jmanuch/lec/5-1.ppt
        private int dataBuffer[];
        private int capacity;

        private int front = 0;
        private int currentSize = 0;

        public DataBuffer(int capacity) {
            this.capacity = capacity;
            dataBuffer = new int[capacity];
        }

        private boolean isEmpty() {
            return (currentSize == 0);
        }

        private boolean isFull() {
            return (currentSize == capacity);
        }

        public synchronized void produce(int data, String producerName) {
            while (isFull()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            dataBuffer[(front + currentSize) % capacity] = data;
            System.out.println("Data " + data + " produced by " + producerName);
            currentSize++;
            notifyAll();
        }

        public synchronized int consume(String consumerName) {
            while (isEmpty()) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            int data = dataBuffer[front];
            front = (front + 1) % capacity;
            currentSize--;
            notifyAll();
            System.out.println("Data " + data + " consumed by " + consumerName);
            return data;
        }
    }

    class Producer extends Thread {

        private DataBuffer buffer;
        private Random random;

        public Producer(DataBuffer buffer, String threadName) {
            this.buffer = buffer;
            setName(threadName);
            random = new Random();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                int data = random.nextInt(100);
                buffer.produce(data, getName());
            }
        }
    }

    class Consumer extends Thread {

        private DataBuffer buffer;

        public Consumer(DataBuffer buffer, String threadName) {
            this.buffer = buffer;
            setName(threadName);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                buffer.consume(getName());
            }
        }
    }
}

2. BlockingQueue (java.util.concurrent.BlockingQueue)

BlockingQueues are thread-safe. They wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element. For this purpose we will use put() to add data to buffer and take() to remove data from buffer.

Producer Consumer Problem Java Source Code Using BlockingQueue

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ProducerConsumerProblem {

    public static void main(String[] args) {
        ProducerConsumerProblem pcp = new ProducerConsumerProblem();
        ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(2);

        Producer p1 = pcp.new Producer(buffer, "Producer 1");
        Producer p2 = pcp.new Producer(buffer, "Producer 2");
        Producer p3 = pcp.new Producer(buffer, "Producer 3");
        Producer p4 = pcp.new Producer(buffer, "Producer 4");
        Producer p5 = pcp.new Producer(buffer, "Producer 5");

        Consumer c1 = pcp.new Consumer(buffer, "Consumer 1");
        Consumer c2 = pcp.new Consumer(buffer, "Consumer 2");
        Consumer c3 = pcp.new Consumer(buffer, "Consumer 3");
        Consumer c4 = pcp.new Consumer(buffer, "Consumer 4");
        Consumer c5 = pcp.new Consumer(buffer, "Consumer 5");

        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        c1.start();
        c2.start();
        c3.start();
        c4.start();
        c5.start();
    }

    class Producer extends Thread {

        private BlockingQueue<Integer> buffer;
        private Random random;

        public Producer(BlockingQueue<Integer> buffer, String threadName) {
            this.buffer = buffer;
            setName(threadName);
            random = new Random();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                int data = random.nextInt(100);
                try {
                    //Inserts the specified element into this queue 
                    //waits if necessary for space to become available.
                    buffer.put(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer extends Thread {

        private BlockingQueue<Integer> buffer;

        public Consumer(BlockingQueue<Integer> buffer, String threadName) {
            this.buffer = buffer;
            setName(threadName);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    //take() Retrieves and removes the head of this queue 
                    //waits if necessary until an element becomes available.
                    buffer.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *