Last updated
Last updated
The producer / consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle, letting the producers and the consumers execute in different threads.
The producer/consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle. - by Andres Navarro
Source:
Using java.util.concurrent.BlockingQueue;
Output:
-
Read more:
Output
CONSUMER : Buffer is empty, waiting
PRODUCER added 279133501 into queue
PRODUCER : Signalling that buffer is no more empty now
CONSUMER consumed 279133501 from queue
CONSUMER : Signalling that buffer may be empty now
package com.test.multithread;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TaskProducer extends Thread {
private boolean blnExit = false;
private final List < TaskConsumer > consumers;
private final BlockingQueue < Long > sharedQueue;
public TaskProducer(final BlockingQueue < Long > sharedQueue,
final List < TaskConsumer > consumers) {
this.sharedQueue = sharedQueue;
this.consumers = consumers;
}
@Override
public void run() {
long i = 0;
////////////////////////////////////////////
// PRODUCING THE OBJECTS TO BE CONSUMED
////////////////////////////////////////////
while (!blnExit) {
try {
i++;
sharedQueue.put(Long.valueOf(i));
} catch (final InterruptedException ex) {
ex.printStackTrace();
}
}
/////////////////////////////////
// WAIT UNTIL THE QUEUE IS EMPTY
/////////////////////////////////
while (sharedQueue.size() > 0) {
try {
Thread.sleep(200);
System.out.println("Producer waiting to end.");
} catch (final InterruptedException e) {
break;
}
}
////////////////////////////////////////////
// SEND TO ALL CONSUMERS THE EXIT CONDITION
////////////////////////////////////////////
for (final TaskConsumer consumer: consumers) {
consumer.setExitCondition(true);
}
}
}
package com.test.multithread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TaskConsumer extends Thread {
boolean blnExit = false;
private final int id;
private final BlockingQueue < Long > sharedQueue;
public TaskConsumer(final int id, final BlockingQueue < Long > sharedQueue) {
this.id = id;
this.sharedQueue = sharedQueue;
}
public void setExitCondition(final boolean blnDoExit) {
blnExit = blnDoExit;
}
@Override
public void run() {
final Random generator = new Random();
while (!blnExit) {
try {
if (sharedQueue.size() > 0) {
System.out.println("Consumer id:" + id +
" sent email " + sharedQueue.take());
// TO BE REMOVED (ONLY SIMULATES RANDOM WORKING TIME)
final long start = System.currentTimeMillis();
Thread.sleep(generator.nextInt(1000) + 1000);
final long end = System.currentTimeMillis();
} else
Thread.sleep(500);
} catch (final InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("Consumer " + id + " exiting");
}
}
package com.test.multithread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerPattern {
private final int queueCapacity = 200;
private int numberOfThreads = 10;
public static void main(final String args[]) {
new ProducerConsumerPattern(20);
}
public ProducerConsumerPattern(final int numberOfThreads) {
if (numberOfThreads <= 0 || numberOfThreads > 100)
throw new IllegalArgumentException("The number of threads should be a number between 1 and 100");
this.numberOfThreads = numberOfThreads;
//Creating shared object
final BlockingQueue < Long > sharedQueue = new LinkedBlockingQueue < Long > (queueCapacity);
// Creating and starting the Consumer Threads
final List < TaskConsumer > consumers = new ArrayList < TaskConsumer > ();
for (int i = 0; i <= this.numberOfThreads; i++) {
final TaskConsumer consThread = new TaskConsumer(i, sharedQueue);
consThread.start();
consumers.add(consThread);
}
// Creating and starting the Producer Thread
final TaskProducer prodThread = new TaskProducer(sharedQueue, consumers);
prodThread.start();
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ProducerConsumerPattern {
public static void main(String args[]){
//Creating shared object
BlockingQueue sharedQueue = new LinkedBlockingQueue();
//Creating Producer and Consumer Thread
Thread prodThread = new Thread(new Producer(sharedQueue));
Thread consThread = new Thread(new Consumer(sharedQueue));
//Starting producer and Consumer thread
prodThread.start();
consThread.start();
}
}
//Producer Class in java
class Producer implements Runnable {
private final BlockingQueue sharedQueue;
public Producer(BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=0; i<10; i++){
try {
System.out.println("Produced: " + i);
sharedQueue.put(i);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
//Consumer Class in Java
class Consumer implements Runnable{
private final BlockingQueue sharedQueue;
public Consumer (BlockingQueue sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while(true){
try {
System.out.println("Consumed: "+ sharedQueue.take());
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9
Read more: https://javarevisited.blogspot.com/2012/02/producer-consumer-design-pattern-with.html#ixzz5g9aJMAMz
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Java Program to demonstrate how to use Lock and Condition variable in Java by
* solving Producer consumer problem. Locks are more flexible way to provide
* mutual exclusion and synchronization in Java, a powerful alternative of
* synchronized keyword.
*
* @author Javin Paul
*/
public class ProducerConsumerSolutionUsingLock {
public static void main(String[] args) {
// Object on which producer and consumer thread will operate
ProducerConsumerImpl sharedObject = new ProducerConsumerImpl();
// creating producer and consumer threads
Producer p = new Producer(sharedObject);
Consumer c = new Consumer(sharedObject);
// starting producer and consumer threads
p.start();
c.start();
}
}
class ProducerConsumerImpl {
// producer consumer problem data
private static final int CAPACITY = 10;
private final Queue queue = new LinkedList < > ();
private final Random theRandom = new Random();
// lock and condition variables
private final Lock aLock = new ReentrantLock();
private final Condition bufferNotFull = aLock.newCondition();
private final Condition bufferNotEmpty = aLock.newCondition();
public void put() throws InterruptedException {
aLock.lock();
try {
while (queue.size() == CAPACITY) {
System.out.println(Thread.currentThread().getName() + " : Buffer is full, waiting");
bufferNotEmpty.await();
}
int number = theRandom.nextInt();
boolean isAdded = queue.offer(number);
if (isAdded) {
System.out.printf("%s added %d into queue %n", Thread.currentThread().getName(), number);
// signal consumer thread that, buffer has element now
System.out.println(Thread.currentThread().getName() + " : Signalling that buffer is no more empty now");
bufferNotFull.signalAll();
}
} finally {
aLock.unlock();
}
}
public void get() throws InterruptedException {
aLock.lock();
try {
while (queue.size() == 0) {
System.out.println(Thread.currentThread().getName() + " : Buffer is empty, waiting");
bufferNotFull.await();
}
Integer value = queue.poll();
if (value != null) {
System.out.printf("%s consumed %d from queue %n", Thread.currentThread().getName(), value);
// signal producer thread that, buffer may be empty now
System.out.println(Thread.currentThread().getName() + " : Signalling that buffer may be empty now");
bufferNotEmpty.signalAll();
}
} finally {
aLock.unlock();
}
}
}
class Producer extends Thread {
ProducerConsumerImpl pc;
public Producer(ProducerConsumerImpl sharedObject) {
super("PRODUCER");
this.pc = sharedObject;
}
@Override public void run() {
try {
pc.put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer extends Thread {
ProducerConsumerImpl pc;
public Consumer(ProducerConsumerImpl sharedObject) {
super("CONSUMER");
this.pc = sharedObject;
}
@Override public void run() {
try {
pc.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}