实现阻塞队列

实现阻塞队列

Posted by JiangKun on June 22, 2020

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。 阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。 BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  1. 当队列满了的时候进行入队列操作
  2. 当队列空了的时候进行出队列操作

因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另一个线程进行了入队列操作。 在Java中,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本开始提供),由上面介绍的阻塞队列的特性可知,阻塞队列是线程安全的。 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出; 常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)   先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。   后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。 多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)

在这里插入图片描述 实现一个基于数组的循环队列

public class MyBlockingQueue {
    private int[] array = new int[1];      // 存放数据的数组
    private volatile int size = 0;          // 当前已有元素个数
    private int frontIndex = 0;             // 指向队首元素的下标
    private int rearIndex = 0;              // 指向下一个可用位置的下标

    public synchronized void push(int element) throws InterruptedException {
        // 判断队列是否满
        while (size >= array.length) {
            //throw new RuntimeException("队列已满");
            wait();     // 等着调用 pop 的线程唤醒,所以在 pop 中实现 notify
        }

        array[rearIndex] = element;
        size++;     // 破坏了原子性
        
        rearIndex = (rearIndex + 1) % array.length;
        notifyAll();   // 唤醒的是调用 pop 时阻塞的线程
    }

    public synchronized int pop() throws InterruptedException {
        while (size == 0) {
            //throw new RuntimeException("队列已空");
            wait();     // 等待着调用 push 的线程唤醒
        }

        int element = array[frontIndex];
        frontIndex = (frontIndex + 1) % array.length;
        size--;     // 破坏了原子性

        // 队列中一定出现空间了
        notifyAll();   // 唤醒调用 push 时阻塞的线程

        return element;
    }

    public int size() {
        return size;    // 可能有内存可见性问题
    }
}

被唤醒的时候重新检查条件,加上while循环 在这里插入图片描述 唤醒的时候需要全部唤醒,确保消费者被唤醒,这时候不用担心生产者,因为还有while循环在限制 在这里插入图片描述 我们不能控制wait()唤醒的是哪个,所以需要唤醒之后重新检查条件 在这里插入图片描述