Producer Consumer Problem Java : Detailed Explaination
Producer Consumer Problem is well-known example of multi-process synchronization problem. It is described as synchronization problem over a fixed size data buffer implemented as queue being modified by two or more different processes referred as producers and consumers. For a data buffer, we can have multiple number of producers and consumers. Producers task is to keep generating data and place it into buffer while Consumers task is to keep consuming data from buffer. Problem is to ensure that Producers do not add data into buffer if its full and Consumer do not consumer data if buffer is empty.
For resolving above problem, Producer and Consumer should behave as below.
Producer
1. Check if Buffer is full or not. If full, then wait() for buffer items to get consumed.
2. Generate data and put it into Buffer.
3. Notify Consumer that Data has been placed into Buffer.
4. Repeat step 1-3
Consumer
1. Check if Buffer has items. If empty, then wait() for buffer to get filled.
2. Consume data from Buffer.
3. Notify Producer that Data has been consumed from Buffer
4. Repear step 1-3
We will implement Producer Consumer Problem Java using two approaches.
1. Synchronized Instance Methods
A synchronized instance method in Java is synchronized on the instance (object) owning the method. Thus, each instance has its synchronized methods synchronized on a different object: the owning instance. Only one thread can execute inside a synchronized instance method.
Producer Consumer Problem Java Source Code Using Synchronized Instance Methods
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import java.util.Random; public class ProducerConsumerProblem { public static void main(String[] args) { ProducerConsumerProblem pcp = new ProducerConsumerProblem(); DataBuffer buffer = pcp.new DataBuffer(2); Producer p1 = pcp.new Producer(buffer, "Producer 1"); Producer p2 = pcp.new Producer(buffer, "Producer 2"); Producer p3 = pcp.new Producer(buffer, "Producer 3"); Producer p4 = pcp.new Producer(buffer, "Producer 4"); Producer p5 = pcp.new Producer(buffer, "Producer 5"); Consumer c1 = pcp.new Consumer(buffer, "Consumer 1"); Consumer c2 = pcp.new Consumer(buffer, "Consumer 2"); Consumer c3 = pcp.new Consumer(buffer, "Consumer 3"); Consumer c4 = pcp.new Consumer(buffer, "Consumer 4"); Consumer c5 = pcp.new Consumer(buffer, "Consumer 5"); p1.start(); p2.start(); p3.start(); p4.start(); p5.start(); c1.start(); c2.start(); c3.start(); c4.start(); c5.start(); } class DataBuffer { //Circular Queue implementation is used // Refernce www.cs.sfu.ca/CourseCentral/225/jmanuch/lec/5-1.ppt private int dataBuffer[]; private int capacity; private int front = 0; private int currentSize = 0; public DataBuffer(int capacity) { this.capacity = capacity; dataBuffer = new int[capacity]; } private boolean isEmpty() { return (currentSize == 0); } private boolean isFull() { return (currentSize == capacity); } public synchronized void produce(int data, String producerName) { while (isFull()) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } dataBuffer[(front + currentSize) % capacity] = data; System.out.println("Data " + data + " produced by " + producerName); currentSize++; notifyAll(); } public synchronized int consume(String consumerName) { while (isEmpty()) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int data = dataBuffer[front]; front = (front + 1) % capacity; currentSize--; notifyAll(); System.out.println("Data " + data + " consumed by " + consumerName); return data; } } class Producer extends Thread { private DataBuffer buffer; private Random random; public Producer(DataBuffer buffer, String threadName) { this.buffer = buffer; setName(threadName); random = new Random(); } @Override public void run() { while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } int data = random.nextInt(100); buffer.produce(data, getName()); } } } class Consumer extends Thread { private DataBuffer buffer; public Consumer(DataBuffer buffer, String threadName) { this.buffer = buffer; setName(threadName); } @Override public void run() { while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } buffer.consume(getName()); } } } } |
2. BlockingQueue (java.util.concurrent.BlockingQueue)
BlockingQueues are thread-safe. They wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element. For this purpose we will use put() to add data to buffer and take() to remove data from buffer.
Producer Consumer Problem Java Source Code Using BlockingQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerProblem { public static void main(String[] args) { ProducerConsumerProblem pcp = new ProducerConsumerProblem(); ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(2); Producer p1 = pcp.new Producer(buffer, "Producer 1"); Producer p2 = pcp.new Producer(buffer, "Producer 2"); Producer p3 = pcp.new Producer(buffer, "Producer 3"); Producer p4 = pcp.new Producer(buffer, "Producer 4"); Producer p5 = pcp.new Producer(buffer, "Producer 5"); Consumer c1 = pcp.new Consumer(buffer, "Consumer 1"); Consumer c2 = pcp.new Consumer(buffer, "Consumer 2"); Consumer c3 = pcp.new Consumer(buffer, "Consumer 3"); Consumer c4 = pcp.new Consumer(buffer, "Consumer 4"); Consumer c5 = pcp.new Consumer(buffer, "Consumer 5"); p1.start(); p2.start(); p3.start(); p4.start(); p5.start(); c1.start(); c2.start(); c3.start(); c4.start(); c5.start(); } class Producer extends Thread { private BlockingQueue<Integer> buffer; private Random random; public Producer(BlockingQueue<Integer> buffer, String threadName) { this.buffer = buffer; setName(threadName); random = new Random(); } @Override public void run() { while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } int data = random.nextInt(100); try { //Inserts the specified element into this queue //waits if necessary for space to become available. buffer.put(data); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer extends Thread { private BlockingQueue<Integer> buffer; public Consumer(BlockingQueue<Integer> buffer, String threadName) { this.buffer = buffer; setName(threadName); } @Override public void run() { while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } try { //take() Retrieves and removes the head of this queue //waits if necessary until an element becomes available. buffer.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } |