Problem

Implement a thread-safe bounded blocking queue that has the following methods:

  • BoundedBlockingQueue(int capacity) The constructor initializes the queue with a maximum capacity.
  • void enqueue(int element) Adds an element to the front of the queue. If the queue is full, the calling thread is blocked until the queue is no longer full.
  • int dequeue() Returns the element at the rear of the queue and removes it. If the queue is empty, the calling thread is blocked until the queue is no longer empty.
  • int size() Returns the number of elements currently in the queue.

Your implementation will be tested using multiple threads at the same time. Each thread will either be a producer thread that only makes calls to the enqueue method or a consumer thread that only makes calls to the dequeue method. The size method will be called after every test case.

Please do not use built-in implementations of bounded blocking queue as this will not be accepted in an interview.

Examples

Example 1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
Input:
1
1
["BoundedBlockingQueue","enqueue","dequeue","dequeue","enqueue","enqueue","enqueue","enqueue","dequeue"]
[[2],[1],[],[],[0],[2],[3],[4],[]]
Output:
[1,0,2,2]
Explanation:
Number of producer threads = 1
Number of consumer threads = 1
BoundedBlockingQueue queue = new BoundedBlockingQueue(2);   // initialize the queue with capacity = 2.
queue.enqueue(1);   // The producer thread enqueues 1 to the queue.
queue.dequeue();    // The consumer thread calls dequeue and returns 1 from the queue.
queue.dequeue();    // Since the queue is empty, the consumer thread is blocked.
queue.enqueue(0);   // The producer thread enqueues 0 to the queue. The consumer thread is unblocked and returns 0 from the queue.
queue.enqueue(2);   // The producer thread enqueues 2 to the queue.
queue.enqueue(3);   // The producer thread enqueues 3 to the queue.
queue.enqueue(4);   // The producer thread is blocked because the queue's capacity (2) is reached.
queue.dequeue();    // The consumer thread returns 2 from the queue. The producer thread is unblocked and enqueues 4 to the queue.
queue.size();       // 2 elements remaining in the queue. size() is always called at the end of each test case.

Example 2:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
Input:
3
4
["BoundedBlockingQueue","enqueue","enqueue","enqueue","dequeue","dequeue","dequeue","enqueue"]
[[3],[1],[0],[2],[],[],[],[3]]
Output:
[1,0,2,1]
Explanation:
Number of producer threads = 3
Number of consumer threads = 4
BoundedBlockingQueue queue = new BoundedBlockingQueue(3);   // initialize the queue with capacity = 3.
queue.enqueue(1);   // Producer thread P1 enqueues 1 to the queue.
queue.enqueue(0);   // Producer thread P2 enqueues 0 to the queue.
queue.enqueue(2);   // Producer thread P3 enqueues 2 to the queue.
queue.dequeue();    // Consumer thread C1 calls dequeue.
queue.dequeue();    // Consumer thread C2 calls dequeue.
queue.dequeue();    // Consumer thread C3 calls dequeue.
queue.enqueue(3);   // One of the producer threads enqueues 3 to the queue.
queue.size();       // 1 element remaining in the queue.
Since the number of threads for producer/consumer is greater than 1, we do not know how the threads will be scheduled in the operating system, even though the input seems to imply the ordering. Therefore, any of the output [1,0,2] or [1,2,0] or [0,1,2] or [0,2,1] or [2,0,1] or [2,1,0] will be accepted.

Constraints:

  • 1 <= Number of Prdoucers <= 8
  • 1 <= Number of Consumers <= 8
  • 1 <= size <= 30
  • 0 <= element <= 20
  • The number of calls to enqueue is greater than or equal to the number of calls to dequeue.
  • At most 40 calls will be made to enque, deque, and size.

Solution

Method 1 – Condition Variables and Locks (Producer-Consumer Pattern)

Intuition

To implement a thread-safe bounded blocking queue, we use a queue to store elements, a lock to synchronize access, and two condition variables: one to wait for space (not full) and one to wait for items (not empty). Producers block when the queue is full, and consumers block when the queue is empty.

Approach

  1. Use a queue (e.g., deque) to store elements.
  2. Use a lock to synchronize access to the queue.
  3. Use two condition variables: not_full and not_empty.
  4. In enqueue, acquire the lock, wait on not_full if the queue is full, add the element, notify not_empty, and release the lock.
  5. In dequeue, acquire the lock, wait on not_empty if the queue is empty, remove and return the element, notify not_full, and release the lock.
  6. In size, acquire the lock, return the current size, and release the lock.

Code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
from collections import deque
class BoundedBlockingQueue:
    def __init__(self, capacity: int):
        self.q = deque()
        self.cap = capacity
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)
    def enqueue(self, element: int) -> None:
        with self.not_full:
            while len(self.q) == self.cap:
                self.not_full.wait()
            self.q.append(element)
            self.not_empty.notify()
    def dequeue(self) -> int:
        with self.not_empty:
            while not self.q:
                self.not_empty.wait()
            val = self.q.popleft()
            self.not_full.notify()
            return val
    def size(self) -> int:
        with self.lock:
            return len(self.q)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.*;
public class BoundedBlockingQueue {
    private Queue<Integer> q;
    private int cap;
    public BoundedBlockingQueue(int capacity) {
        q = new LinkedList<>();
        cap = capacity;
    }
    public synchronized void enqueue(int element) throws InterruptedException {
        while (q.size() == cap) {
            wait();
        }
        q.offer(element);
        notifyAll();
    }
    public synchronized int dequeue() throws InterruptedException {
        while (q.isEmpty()) {
            wait();
        }
        int val = q.poll();
        notifyAll();
        return val;
    }
    public synchronized int size() {
        return q.size();
    }
}

Complexity

  • ⏰ Time complexity: O(1) for enqueue, dequeue, and size, as all operations are on a queue.
  • 🧺 Space complexity: O(capacity), for storing up to capacity elements in the queue.