阻塞隊(duì)列通過添加兩組方法來擴(kuò)展隊(duì)列:
BlockingQueue
接口的實(shí)例表示阻塞隊(duì)列。 BlockingQueue
接口繼承自 Queue
接口。
put()
和 offer()
方法在阻塞隊(duì)列的尾部添加一個(gè)元素。如果阻塞隊(duì)列已滿,則put()方法將無限期阻塞,直到隊(duì)列中的空間可用。offer()方法允許您指定等待空間可用的時(shí)間段。 如果指定的元素添加成功,則返回true; 否則為假。
take()和poll()方法檢索和刪除阻塞隊(duì)列的頭。如果阻塞隊(duì)列為空,take()方法將無限期阻塞。poll()方法允許您指定在阻塞隊(duì)列為空時(shí)要等待的時(shí)間段; 如果在元素可用之前過去了指定的時(shí)間,則返回null。
來自 BlockingQueue
中 Queue
接口的方法就像使用 Queue
。
BlockingQueue
被設(shè)計(jì)為線程安全的并且可以使用在生產(chǎn)者/消費(fèi)者的情況下。
阻塞隊(duì)列不允許空元素和可以是有界的或無界的。
BlockingQueue
中的 remainingCapacity()
返回可以添加到阻止隊(duì)列中而不阻塞的元素?cái)?shù)。
BlockingQueue
可以控制多個(gè)線程被阻塞時(shí)的公平性。 如果阻塞隊(duì)列是公平的,它可以選擇最長的等待線程來執(zhí)行操作。如果阻塞隊(duì)列不公平,則不指定選擇的順序。
BlockingQueue
接口及其所有實(shí)現(xiàn)類都在 java.util.concurrent
包中。 以下是 BlockingQueue
接口的實(shí)現(xiàn)類:
由數(shù)組支持的 ArrayBlockingQueue
是 BlockingQueue
的有界實(shí)現(xiàn)類。 我們可以在其構(gòu)造函數(shù)中指定阻塞隊(duì)列的公平性。 默認(rèn)情況下,它不公平。
LinkedBlockingQueue
可以用作有界或無界阻塞隊(duì)列。 它不允許為阻塞隊(duì)列指定公平規(guī)則。
PriorityBlockingQueue
是 BlockingQueue
的無界實(shí)現(xiàn)類。 它的工作方式與 PriortyQueue
相同,用于排序阻塞隊(duì)列中的元素,并將阻塞特性添加到 PriorityQueue
中。
SynchronousQueue
實(shí)現(xiàn) BlockingQueue
,沒有任何容量。 put操作等待take操作以獲取元素。 它可以在兩個(gè)線程之間進(jìn)行握手,并在兩個(gè)線程之間交換對象。 它的isEmpty()方法總是返回true。
DelayQueue是BlockingQueue的無界實(shí)現(xiàn)類。它保持一個(gè)元素,直到該元素經(jīng)過指定的延遲。 如果有超過一個(gè)元素的延遲已經(jīng)過去,那么其延遲最早傳遞的元素將被放置在隊(duì)列的頭部。
以下代碼顯示了如何在生產(chǎn)者/消費(fèi)者應(yīng)用程序中使用阻塞隊(duì)列。
import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; class BQProducer extends Thread { private final BlockingQueue<String> queue; private final String name; public BQProducer(BlockingQueue<String> queue, String name) { this.queue = queue; this.name = name; } @Override public void run() { while (true) { try { this.queue.put(UUID.randomUUID().toString()); Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); break; } } } } class BQConsumer extends Thread { private final BlockingQueue<String> queue; private final String name; public BQConsumer(BlockingQueue<String> queue, String name) { this.queue = queue; this.name = name; } @Override public void run() { while (true) { try { String str = this.queue.take(); System.out.println(name + " took: " + str); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); break; } } } } public class Main { public static void main(String[] args) { int capacity = 5; boolean fair = true; BlockingQueue<String> queue = new ArrayBlockingQueue<>(capacity, fair); new BQProducer(queue, "Producer1").start(); new BQProducer(queue, "Producer2").start(); new BQProducer(queue, "Producer3").start(); new BQConsumer(queue, "Consumer1").start(); new BQConsumer(queue, "Consumer2").start(); } }
上面的代碼生成以下結(jié)果。
DelayQueue
實(shí)現(xiàn) BlockingQueue
接口。 DelayQueue
中的元素必須保留一定的時(shí)間。
DelayQueue
使用一個(gè)名為 Delayed
的接口來獲取延遲時(shí)間。
該接口在java.util.concurrent包中。 其聲明如下:
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit timeUnit); }
它擴(kuò)展了 Comparable
接口,它的 compareTo()
方法接受一個(gè)Delayed對象。
DelayQueue
調(diào)用每個(gè)元素的 getDelay()
方法來獲取元素必須保留多長時(shí)間。 DelayQueue
將傳遞 TimeUnit
到此方法。
當(dāng) getDelay()
方法返回一個(gè)零或一個(gè)負(fù)數(shù)時(shí),是元素離開隊(duì)列的時(shí)間。
隊(duì)列通過調(diào)用元素的 compareTo()
方法確定要彈出的那個(gè)。 此方法確定要從隊(duì)列中刪除的過期元素的優(yōu)先級(jí)。
以下代碼顯示了如何使用DelayQueue。
import static java.time.temporal.ChronoUnit.MILLIS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Instant; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; class DelayedJob implements Delayed { private Instant scheduledTime; String jobName; public DelayedJob(String jobName, Instant scheduledTime) { this.scheduledTime = scheduledTime; this.jobName = jobName; } @Override public long getDelay(TimeUnit unit) { long delay = MILLIS.between(Instant.now(), scheduledTime); long returnValue = unit.convert(delay, MILLISECONDS); return returnValue; } @Override public int compareTo(Delayed job) { long currentJobDelay = this.getDelay(MILLISECONDS); long jobDelay = job.getDelay(MILLISECONDS); int diff = 0; if (currentJobDelay > jobDelay) { diff = 1; } else if (currentJobDelay < jobDelay) { diff = -1; } return diff; } @Override public String toString() { String str = this.jobName + ", " + "Scheduled Time: " + this.scheduledTime; return str; } } public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueue<DelayedJob> queue = new DelayQueue<>(); Instant now = Instant.now(); queue.put(new DelayedJob("A", now.plusSeconds(9))); queue.put(new DelayedJob("B", now.plusSeconds(3))); queue.put(new DelayedJob("C", now.plusSeconds(6))); queue.put(new DelayedJob("D", now.plusSeconds(1))); while (queue.size() > 0) { System.out.println("started..."); DelayedJob job = queue.take(); System.out.println("Job: " + job); } System.out.println("Finished."); } }
上面的代碼生成以下結(jié)果。
傳輸隊(duì)列擴(kuò)展阻塞隊(duì)列。
生產(chǎn)者使用 TransferQueue
的 transfer(E element)
方法將元素傳遞給消費(fèi)者。
當(dāng)生產(chǎn)者調(diào)用傳遞(E元素)方法時(shí),它等待直到消費(fèi)者獲取其元素。 tryTransfer()方法提供了該方法的非阻塞和超時(shí)版本。
getWaitingConsumerCount()
方法返回等待消費(fèi)者的數(shù)量。
如果有一個(gè)等待消費(fèi)者, hasWaitingConsumer()
方法返回true; 否則,返回false。 LinkedTransferQueue
是 TransferQueue
接口的實(shí)現(xiàn)類。 它提供了一個(gè)無界的 TransferQueue
。
以下代碼顯示如何使用 TransferQueue
。
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicInteger; class TQProducer extends Thread { private String name; private TransferQueue<Integer> tQueue; private AtomicInteger sequence; public TQProducer(String name, TransferQueue<Integer> tQueue, AtomicInteger sequence) { this.name = name; this.tQueue = tQueue; this.sequence = sequence; } @Override public void run() { while (true) { try { Thread.sleep(4000); int nextNum = this.sequence.incrementAndGet(); if (nextNum % 2 == 0) { System.out.format("%s: Enqueuing: %d%n", name, nextNum); tQueue.put(nextNum); // Enqueue } else { System.out.format("%s: Handing off: %d%n", name, nextNum); System.out.format("%s: has a waiting consumer: %b%n", name, tQueue.hasWaitingConsumer()); tQueue.transfer(nextNum); // A hand off } } catch (InterruptedException e) { e.printStackTrace(); } } } } class TQConsumer extends Thread { private final String name; private final TransferQueue<Integer> tQueue; public TQConsumer(String name, TransferQueue<Integer> tQueue) { this.name = name; this.tQueue = tQueue; } @Override public void run() { while (true) { try { Thread.sleep(3000); int item = tQueue.take(); System.out.format("%s removed: %d%n", name, item); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Main { public static void main(String[] args) { final TransferQueue<Integer> tQueue = new LinkedTransferQueue<>(); final AtomicInteger sequence = new AtomicInteger(); for (int i = 0; i < 5; i++) { try { tQueue.put(sequence.incrementAndGet()); System.out.println("Initial queue: " + tQueue); new TQProducer("Producer-1", tQueue, sequence).start(); new TQConsumer("Consumer-1", tQueue).start(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
上面的代碼生成以下結(jié)果。
更多建議: