Producer and Consumer

The producer / consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle, letting the producers and the consumers execute in different threads.

The Producer and Consumer Design Patterns

The producer/consumer design pattern is a pre-designed solution to separate the two main components by placing a queue in the middle. - by Andres Navarro

Source: https://dzone.com/articles/producer-consumer-design

Using java.util.concurrent.BlockingQueue;

Producer

package com.test.multithread;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TaskProducer extends Thread {
    private boolean blnExit = false;
    private final List < TaskConsumer > consumers;
    private final BlockingQueue < Long > sharedQueue;
    public TaskProducer(final BlockingQueue < Long > sharedQueue,
        final List < TaskConsumer > consumers) {
        this.sharedQueue = sharedQueue;
        this.consumers = consumers;
    }
    @Override
    public void run() {
        long i = 0;
        ////////////////////////////////////////////
        // PRODUCING THE OBJECTS TO BE CONSUMED
        ////////////////////////////////////////////
        while (!blnExit) {
            try {
                i++;
                sharedQueue.put(Long.valueOf(i));
            } catch (final InterruptedException ex) {
                ex.printStackTrace();
            }
        }
        /////////////////////////////////
        // WAIT UNTIL THE QUEUE IS EMPTY
        /////////////////////////////////
        while (sharedQueue.size() > 0) {
            try {
                Thread.sleep(200);
                System.out.println("Producer waiting to end.");
            } catch (final InterruptedException e) {
                break;
            }
        }
        ////////////////////////////////////////////
        // SEND TO ALL CONSUMERS THE EXIT CONDITION
        ////////////////////////////////////////////
        for (final TaskConsumer consumer: consumers) {
            consumer.setExitCondition(true);
        }
    }
}

Consumer

package com.test.multithread;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TaskConsumer extends Thread {
    boolean blnExit = false;
    private final int id;
    private final BlockingQueue < Long > sharedQueue;
    public TaskConsumer(final int id, final BlockingQueue < Long > sharedQueue) {
        this.id = id;
        this.sharedQueue = sharedQueue;
    }
    public void setExitCondition(final boolean blnDoExit) {
        blnExit = blnDoExit;
    }
    @Override
    public void run() {
        final Random generator = new Random();
        while (!blnExit) {
            try {
                if (sharedQueue.size() > 0) {
                    System.out.println("Consumer id:" + id +
                        " sent email " + sharedQueue.take());
                    // TO BE REMOVED (ONLY SIMULATES RANDOM WORKING TIME)
                    final long start = System.currentTimeMillis();
                    Thread.sleep(generator.nextInt(1000) + 1000);
                    final long end = System.currentTimeMillis();
                } else
                    Thread.sleep(500);
            } catch (final InterruptedException ex) {
                ex.printStackTrace();
            }
        }
        System.out.println("Consumer " + id + " exiting");
    }
}

Together

package com.test.multithread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerPattern {
    private final int queueCapacity = 200;
    private int numberOfThreads = 10;
    public static void main(final String args[]) {
        new ProducerConsumerPattern(20);
    }
    public ProducerConsumerPattern(final int numberOfThreads) {
        if (numberOfThreads <= 0 || numberOfThreads > 100)
            throw new IllegalArgumentException("The number of threads should be a number between 1 and 100");
        this.numberOfThreads = numberOfThreads;
        //Creating shared object
        final BlockingQueue < Long > sharedQueue = new LinkedBlockingQueue < Long > (queueCapacity);
        // Creating and starting the Consumer Threads
        final List < TaskConsumer > consumers = new ArrayList < TaskConsumer > ();
        for (int i = 0; i <= this.numberOfThreads; i++) {
            final TaskConsumer consThread = new TaskConsumer(i, sharedQueue);
            consThread.start();
            consumers.add(consThread);
        }
        // Creating and starting the Producer Thread
        final TaskProducer prodThread = new TaskProducer(sharedQueue, consumers);
        prodThread.start();
    }
}

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){

     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();

     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }

}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }


}

Output:

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9


Read more: https://javarevisited.blogspot.com/2012/02/producer-consumer-design-pattern-with.html#ixzz5g9aJMAMz

Java Lock and Condition Example using Producer Consumer Solution

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; 

/** 
* Java Program to demonstrate how to use Lock and Condition variable in Java by 
* solving Producer consumer problem. Locks are more flexible way to provide 
* mutual exclusion and synchronization in Java, a powerful alternative of 
* synchronized keyword. 
* 
* @author Javin Paul 
*/
public class ProducerConsumerSolutionUsingLock {
    public static void main(String[] args) {
        // Object on which producer and consumer thread will operate 
        ProducerConsumerImpl sharedObject = new ProducerConsumerImpl();
        // creating producer and consumer threads 
        Producer p = new Producer(sharedObject);
        Consumer c = new Consumer(sharedObject);
        // starting producer and consumer threads 
        p.start();
        c.start();
    }
}
class ProducerConsumerImpl {
    // producer consumer problem data 
    private static final int CAPACITY = 10;
    private final Queue queue = new LinkedList < > ();
    private final Random theRandom = new Random();
    // lock and condition variables 
    private final Lock aLock = new ReentrantLock();
    private final Condition bufferNotFull = aLock.newCondition();
    private final Condition bufferNotEmpty = aLock.newCondition();
    public void put() throws InterruptedException {
        aLock.lock();
        try {
            while (queue.size() == CAPACITY) {
                System.out.println(Thread.currentThread().getName() + " : Buffer is full, waiting");
                bufferNotEmpty.await();
            }
            int number = theRandom.nextInt();
            boolean isAdded = queue.offer(number);
            if (isAdded) {
                System.out.printf("%s added %d into queue %n", Thread.currentThread().getName(), number);
                // signal consumer thread that, buffer has element now 
                System.out.println(Thread.currentThread().getName() + " : Signalling that buffer is no more empty now");
                bufferNotFull.signalAll();
            }
        } finally {
            aLock.unlock();
        }
    }
    public void get() throws InterruptedException {
        aLock.lock();
        try {
            while (queue.size() == 0) {
                System.out.println(Thread.currentThread().getName() + " : Buffer is empty, waiting");
                bufferNotFull.await();
            }
            Integer value = queue.poll();
            if (value != null) {
                System.out.printf("%s consumed %d from queue %n", Thread.currentThread().getName(), value);
                // signal producer thread that, buffer may be empty now 
                System.out.println(Thread.currentThread().getName() + " : Signalling that buffer may be empty now");
                bufferNotEmpty.signalAll();
            }
        } finally {
            aLock.unlock();
        }
    }
}
class Producer extends Thread {
    ProducerConsumerImpl pc;
    public Producer(ProducerConsumerImpl sharedObject) {
        super("PRODUCER");
        this.pc = sharedObject;
    }
    @Override public void run() {
        try {
            pc.put();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class Consumer extends Thread {
    ProducerConsumerImpl pc;
    public Consumer(ProducerConsumerImpl sharedObject) {
        super("CONSUMER");
        this.pc = sharedObject;
    }
    @Override public void run() {
        try {
            pc.get();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block 
            e.printStackTrace();
        }
    }
}

-

Output 
CONSUMER : Buffer is empty, waiting 
PRODUCER added 279133501 into queue 
PRODUCER : Signalling that buffer is no more empty now 
CONSUMER consumed 279133501 from queue 
CONSUMER : Signalling that buffer may be empty now

Read more: https://javarevisited.blogspot.com/2015/06/java-lock-and-condition-example-producer-consumer.html#ixzz5gcGd1oXQ

Reference and Reading List

The Producer and Consumer Design Patterns

Producer–consumer problem - Wikipedia

Lecture 17: Concurrency—Producer/Consumer Pattern and Thread

Producer-Consumer solution using threads in Java - GeeksforGeeks

Producer Consumer Design Pattern | Code Pumpkin

Last updated