Thinking in java 之并發(fā)其五:強大的 JUC 包
一、前言
java 的 java.util.concurrent 是 java 用于提供一些并發(fā)程序所需功能的類包。它的功能全面且強大,在前面,我們已經使用過原子基本變量,BlockingQueue 等類?,F在,我們需要更加深入的去了解 JUC 的強大功能。
二、CountDownLatch
該類用來同步一個或多個任務,強制它們等待由其他任務執(zhí)行的一組操作完成。
在 CountDownLatch 對象中設置一個初始的計數值,任何在這個對象上調用 wait() 的方法都講阻塞,直至這個計數值到達0。其他任務在結束工作時,可以在該對象上調用 countDown() 來減小這個數值。同事,CountDownLatch 只能出發(fā)一次,計數值不能被重置。如果有重置的需要,可以使用 CyclicBarrier。
先來看一個使用 CountDownLatch 的簡單示例:
package JUCTest;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class TaskPortion implements Runnable{
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch countDownLatch;
public TaskPortion(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
doWork();
countDownLatch.countDown();
}catch(InterruptedException e) {
System.out.println("Exit");
}
}
public void doWork() throws InterruptedException{
TimeUnit.MILLISECONDS.sleep(rand.nextInt(20000));
System.out.println(this + " complete");
}
public String toString() {
return "TaskPorition : " + id;
}
}
class WaitingTask implements Runnable{
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch countDownLatch;
public WaitingTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
countDownLatch.await();
System.out.println("latch barrier pass for " + this);
}catch(InterruptedException e) {
System.out.println(this + "interrupted");
}
}
public String toString() {
return "TaskPorition : " + id;
}
}
public class CountDownLatchDemo {
static final int SIZE = 10;
public static void main(String args[]) {
ExecutorService exec = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(SIZE);
for(int i=0;i<10;i++) {
exec.execute(new WaitingTask(latch));
}
for(int i=0;i<SIZE;i++) {
exec.execute(new TaskPortion(latch));
}
System.out.println("Launched all tasls");
exec.shutdown();
}
}
//output
/*Launched all tasls
TaskPorition : 5 complete
TaskPorition : 1 complete
TaskPorition : 4 complete
TaskPorition : 3 complete
TaskPorition : 9 complete
TaskPorition : 0 complete
TaskPorition : 7 complete
TaskPorition : 8 complete
TaskPorition : 6 complete
TaskPorition : 2 complete
latch barrier pass for TaskPorition : 1
latch barrier pass for TaskPorition : 2
latch barrier pass for TaskPorition : 0
latch barrier pass for TaskPorition : 6
latch barrier pass for TaskPorition : 7
latch barrier pass for TaskPorition : 3
latch barrier pass for TaskPorition : 4
latch barrier pass for TaskPorition : 5
latch barrier pass for TaskPorition : 8
latch barrier pass for TaskPorition : 9
*/
通過前面章節(jié)的內容,我們可以很容一個實現 “A 任務 等到 B 任務完成之后再去執(zhí)行” 的功能,而在上述例子中,B 任務是由 10 個子任務構成的。通過 CountDownLatch 我們沒完成一個子任務,就會是 countDownLatch 減1。等待所有子任務完成,countDownLatch 變?yōu)?后,啟動 A 任務。
二、CyclicBarrier
countDownLatch 可以使某個任務完成之后進入阻塞狀態(tài),阻塞狀態(tài)持續(xù)到其他相關任務全部完成之后(countDownLatch 變?yōu)?)。CyclicBarrier 類似于countDownLatch ,和 countDownLatch 的區(qū)別在于。在所有任務完成之后,CyclicBarrier 的計數器會重置。
先看一個簡單的示例:
package JUCTest;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Horse implements Runnable{
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) {
barrier = b;
}
public synchronized int getStrides() {
return strides;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
strides += rand.nextInt(3);
}
barrier.await();
}
}catch(InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
public String toString() {
return "Horse " + id +" ";
}
public String tracks(){
StringBuilder s = new StringBuilder();
for(int i=0;i<getStrides();i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses,final int pause) {
barrier = new CyclicBarrier(nHorses,new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i=0;i<FINISH_LINE;i++) {
s.append("=");
}
System.out.println(s);
for(Horse horse : horses)
System.out.println(horse.tracks());
for(Horse horse : horses) {
if(horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "Won!");
exec.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
}catch(InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
});
for(int i=0;i<nHorses;i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
new HorseRace(nHorses,pause);
}
}
上述程序是一個模擬賽馬的操作,一共有75個柵欄,每個馬的速度都不一樣的,所以每次打印每只馬跨越了多少柵欄時,會出現你追我趕的情況。但是程序內在邏輯是怎么樣呢?
我們可以把馬對應成一個任務,馬跨域柵欄是一次 run() 方法內部走完了一次循環(huán)。
CyclicBarrier 就相當于一堵墻,它橫在所有馬的前方,當馬完成一次操作(隨機跨越1~3個柵欄),它來到了墻面前,被墻擋住(代碼是通過 await() 實現的)。等所有的馬(具體幾只是在 CyclicBarrier 的構造函數里確定的)都來到墻面前的時候,墻打開,所有馬進行下一次操作。
可以推測出來,CyclicBarrier 內部一定有一個計數器(通過查看源碼可以知道 在構造函數里是把值賦給 final int parties 和 int count 的,前者是 final 無法改變用于重置計數器使用,后者用于計數),我們沒調用一次 await() 方法,這個計數器就會減1。直到我們調用了 parties 次 await() 計數器變?yōu)?0 。然后所有任務可以進行一下步,同時,計數器變?yōu)?parties ,繼續(xù)阻塞任務進入再下一步的操作,直到它再次為0;
ps: 通過源碼可以肯定我們的推測,事實上 每次我們調用 await(), count 就會遞減,而當 count 為 0 時,就會調用 nextGeneration 方法。nextGeneration 會把計數器重置,同時會喚醒阻塞的任務。順便一提的事,CyclicBarrier 實現阻塞和喚醒的方式是使用 Condition (前面有具體內容)。
在 CyclicBarrier 的構造函數里還有一個 Runnable,它會在計數器為 0 的時候啟動。
三、DelayQueue
在 JUC 中,除了之前提到的,LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 之外,還有其他幾種 Queue, DelayQueue 就是其中之一。
DelayQueue 是一個無界的 BlockingQueue,用于放置實現了 Delayed 接口的對象,其中對象只能在其到期才能從隊列中取走。并且該隊列是有序的,我們需要實現 compareTo 方法用來作為排序的標準。當從 DelayQueue 獲取對象時,只會獲取延遲到期的對象。
package JUCTest;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class DelayedTask implements Runnable,Delayed{
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> squence = new ArrayList<DelayedTask>();
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
squence.add(this);
}
@Override
public void run() {
System.out.println(this);
}
@Override
public int compareTo(Delayed o) {
DelayedTask that = (DelayedTask) o;
if(trigger < that.trigger) return 1;
if(trigger > that.trigger) return 1;
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(),TimeUnit.NANOSECONDS);
}
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "("+id+":"+delta+")";
}
public static class EndSentinel extends DelayedTask{
private ExecutorService exec;
public EndSentinel(int delay,ExecutorService e) {
super(delay);
exec=e;
}
public void run() {
for(DelayedTask pt : squence) {
System.out.print(pt.summary() + " ");
}
System.out.println(" ");
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable{
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
q.take().run();
}
}catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("Finished DelayedTaskConsumer");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
for(int i=0;i<20;i++) {
queue.put(new DelayedTask(rand.nextInt(5000)));
}
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
在上面這個例子中,我們讓 DelayedTask 實現了 Runnable 和 Delayed 接口。除了 run() 方法之外,我們同時實現了 compareTo() 和 getDelay() 方法。然后輸出的結果表明,任務從隊列總出來的順序是按照 getDelay() 所獲得的值來確定的。
我們使用變量 delta 來作為延遲時間的,System.nanoTime() 會獲得一個納秒為單位的數字,這個數字單獨使用沒有任何意義,但是,在程序的兩個位置都使用 System.nanoTime() 并且把這兩個值相減,就能得到一個精準的時間差。在構造函數里 trigger 被賦值為 System.nanoTime() + delta,而在 getDelay() 中返回的值是 trigger - System.nanoTime()(第二次使用,后面用 System.nanoTime()2 做區(qū)別),那么返回的值其實是,System.nanoTime() + delta - System.nanoTime()2,System.nanoTime()2 - System.nanoTime() 可以認為使我們給 trigger 賦值和程序調用 getDelay() 之間的時間差,當時間差,也就是經過的時間 > delta (設定的延遲時間) 時,對象才能出列。換句話說 getDelay() 返回的值 < 0 才能出列。
但是對象出列除了延遲時間到達之外這個條件之外,還得滿足它在對列的首位,所以我們必須使用 compareTo() 來規(guī)定一個排列的順序,使得延遲時間到達最短的放在隊首位置。所以我們用 trigger 來盡行比較。注意,這里的排隊應該是最塊走完延遲時間的排前面,而不是延遲時間最短的排前面。比如,A的延遲時間為 1s 他是在第 10s 中的時候放進去的,B的延遲時間為 2 s 它是在第 4s 的時候放進去的,那么B應該排在A前面。
那么,如果我們使用錯誤的方式來排隊,比如把延遲時間到達最晚的放在前面。就會導致效率低下,程序會等到最長的延遲時間到達才會有出列操作。
四、PriorityBlockingQueue
顧名思義,他是以優(yōu)先級作為排序順序來給隊列中的對象排序的。而排序的方法,依舊是通過 compareTo 方法實現,其實,DelayQueue 可以看做是一種特殊的優(yōu)先級排序,除了排序之外,他還有延遲的附加條件。所以對于 PriorityBlockingQueue 我們不做過多的說明。
五、SheduledExecutor
SheduledExecutor 可以使任務按照設定的計劃去執(zhí)行,通常,我們需要在指定的時間執(zhí)行某項任務,或者在一定的周期內循環(huán)的執(zhí)行某項目,就會使用到 SheduledExecutor。
package JUCTest;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class GreenhouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() {
return thermostat;
}
public synchronized void setThermostat(String thermostat) {
this.thermostat = thermostat;
}
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event,long delay) {
scheduler.schedule(event, delay, TimeUnit.MILLISECONDS);
}
public void repeat(Runnable event,long initialDelay,long period) {
scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS);
}
class LightOn implements Runnable{
public void run() {
System.out.println("Turn on lights");
light = true;
}
}
class LightOff implements Runnable{
public void run() {
System.out.println("Turn off lights");
light = false;
}
}
class WaterOn implements Runnable{
public void run() {
System.out.println("Turning greenhoulse water on");
water = true;
}
}
class WaterOff implements Runnable{
public void run() {
System.out.println("Turning greenhoulse water off");
water = false;
}
}
class ThermostatNight implements Runnable{
public void run() {
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay implements Runnable{
public void run() {
System.out.println("Thermostat to day setting");
}
}
class Bell implements Runnable{
public void run() {
System.out.println("Bing!");
}
}
class Terminate implements Runnable{
public void run() {
System.out.println("Terminating!");
scheduler.shutdown();
new Thread() {
public void run() {
for(DataPoint p : data) {
System.out.println(p);
}
}
};
}
}
static class DataPoint{
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d,float temp,float hum) {
time = d;
temperature = temp;
humidity = hum;
}
public String toString() {
return time.getTime() + String.format(" temperature:, %1s$.1f humidity: %2$.2f",temperature);
}
}
private Calendar lastTime = Calendar.getInstance();
{
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = 1;
private float lastHumidity = 50.0f;
private int humidityDirection = 1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>());
class CollectData implements Runnable{
public void run() {
System.out.println("Collecting date");
synchronized(GreenhouseScheduler.this) {
lastTime.set(Calendar.MINUTE,lastTime.get(Calendar.MINUTE) + 30);
}
if(rand.nextInt(5) == 4) {
tempDirection = -tempDirection;
}
lastTemp = lastTemp + tempDirection*(1.0f + rand.nextFloat());
if(rand.nextInt(5) == 4) {
humidityDirection = -humidityDirection;
}
lastHumidity = lastHumidity + humidityDirection * rand.nextFloat();
data.add(new DataPoint((Calendar)lastTime.clone(),lastTemp,lastHumidity));
}
}
public static void main(String[] args) {
GreenhouseScheduler gh = new GreenhouseScheduler();
gh.schedule(gh.new ThermostatNight(),5000);
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.newThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
}
這里我們引入了一個新的線程池—— ScheduledThreadPoolExecutor,他添加和執(zhí)行任務的方法不在是 Executor,而是 schedule 和 schedule。schedule 除了需要提供一個 Runnable 作為參數以外,還要提供一個延遲時間,和時間單位。延遲時間和時間單位共同決定了任務在什么時候被啟動。scheduleAtFixedRate 還需額外提供一個周期時間,在到達延遲時間之后,每過一個周期,任務就會執(zhí)行一次。
在上面的示例中,我們創(chuàng)建了一個溫室,溫室需要進行開關燈、防水、收集數據等操作。
我們一共設置了1個單一任務和8個循環(huán)任務。在程序進行到 5s 中時,所有任務被中斷。
六、Semaphore
無論是使用 synchronized 亦或是 lock 的方式,都能保證某項資源只能被一個任務獲取和使用。但有時候,我們或許會希望能夠允許指定數量的任務來獲取同一個資源。JUC 為我們提供了 Semaphore 來實現這方面的需求。
在 Thinking in Java 的關于 Semaphore 的示例中,首先創(chuàng)建了一個使用 Semaphore 來進行控制的對象池,然后通過這個對象池來實現“允許指定數量的任務來獲取同一個資源”這一功能,我們先看代碼。
在看示例錢,我們需要簡單理解下 Semaphore 的運作方式,Seamaphore 的構造方法里,包含兩個參數:permits(int),fair(bool)。permits 就是所謂的計數器的值,即我們希望資源能同時被多少任務訪問。而 fair 是一個布爾值,它決定我們使用的是公平鎖還是非公平鎖,關于公平鎖,在之后的拓展章節(jié)再詳細敘述。
創(chuàng)建完 Semaphore 之后,我們通過它的 aquire() 來獲取進入資源的權限,此時計數器 -1,通過它的 release() 方法,來釋放一個權限,此時計數器 +1。
以下是 Thinking in Java 示例中所用的對象池:
package JUCTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject,int size) {
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(10,true);
for(int i=0;i<size;i++) {
try {
items.add(classObject.newInstance());
}catch(Exception e) {
throw new RuntimeException(e);
}
}
}
public T checkOut() throws InterruptedException{
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x)) {
available.release();
}
}
private synchronized T getItem() {
for(int i=0;i<size;i++) {
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
}
return null;
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) {
checkedOut[index] = false;
return true;
}
return false;
}
}
在 pool 的構造函數中,我們創(chuàng)建一個可以放置對象(泛型 )的 List,初始化 Semaphore。同時使用了 newInstance() 的方式創(chuàng)建了 size 個對象。
在更詳細的說明之前,先來看看這個對象池的應用。首先,我們需要新建一個類:
package JUCTest;
public class Fat {
private volatile double d;
private static int counter = 0;
private int id = counter++;
public Fat() {
for(int i=1;i<10000;i++) {
d += (Math.PI + Math.E);
}
}
public void operation() {
System.out.println("this");
}
public String toString() {
return "Fat id: " + id;
}
}
然后,我么通過 Pool 來對該對象進行管理:
package JUCTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + " checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this + "cheked in " + item);
pool.checkIn(item);
}catch(InterruptedException e) {
System.out.println("InterruptedException");
}
}
public String toString() {
return "CheckoutTesk " + id + " ";
}
}
public class SemaphoreDemo{
final static int SIZE = 25;
public static void main(String[] args) throws InterruptedException {
final Pool<Fat> pool = new Pool<Fat>(Fat.class,SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
List<Fat> lists = new ArrayList<Fat>();
for(int i=0;i<SIZE;i++) {
Fat f = pool.checkOut();
System.out.println(i + ": main() thread check out");
f.operation();
lists.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
public void run() {
try {
pool.checkOut();
}catch(InterruptedException e) {
System.out.println("Check out Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true);
System.out.println("Check in object in " + lists);
for(Fat f : lists) {
pool.checkIn(f);
}
for(Fat f : lists) {
pool.checkIn(f);
}
exec.shutdown();
}
}
在 SemaphoreDemo 中,創(chuàng)建了一個容量為 SIZE 的 pool,在 pool 的構造方法中,我們根據傳入的模板參數,創(chuàng)建了 SIZE 個 Fat 對象,然后所有的 Fat 的對象全部通過 checkout 從 pool 里取出。
在最后往 pool 中 checkin Fat 時,我們發(fā)現不論我們往里添加了多個對象,在 pool 中始終最多只有 SIZE 個對象。那么后來的添加的對象哪里去了?checkin 的操作并沒有消失,也沒有出錯,只是被阻塞了,如果我們此時通過 checkout 釋放出一些位置,那些消失的 Fat 就會順利的插入到 pool 里。
那么這是如何實現的?
在 Checkout() 中,我們再獲取到 Fat 對象前,需要進行一次 acquire() 每次的 acquire 操作,都會使得 Semaphore 中的計數器 -1,當技術器為 0 時,我們繼續(xù)進行 checkout()(或者繼續(xù)進行 checkout() 里的 acquire() 操作),就會被阻塞。直到我們使用 checkin() (或者說是 checkin() 里的 release()),使得計數器 +1。被阻塞的 checkout 操作才會繼續(xù)執(zhí)行。
在上面的代碼中,我們先進行了 SIZE 次 checkout() 操作,然后,再新建一個任務繼續(xù)使用 checkout 操作,其被阻塞,直到我們將其中斷。后臺輸出 Check out Interrupted,如果我們在 blocked.cancel(true) ——中斷操作之前,執(zhí)行 checkin 操作,就會使阻塞的任務能夠繼續(xù)進行下去。
七、Exchanger
Exchanger 是在兩個任務之間交換對象的柵欄。當 A 和 B 任務進入柵欄時,它們各自擁有一個對象 C 和 D,當他們離開時,擁有的對象互換,即 A 擁有 D,B 有用 C。在創(chuàng)建 A 和 B 時,需要把他們和同一個 Exchanger 綁定。
package JUCTest;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class ExchangerProducer implements Runnable{
private Exchanger<Integer> exchanger;
public ExchangerProducer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
for(int i=1;i<10;i++) {
Integer data = i;
System.out.println(i + " : producer before exchange : " + data);
try {
data = exchanger.exchange(data);
} catch (InterruptedException e) {
System.out.println("Interrupted...");
}
System.out.println(i + ": producer after exchange : " + data);
}
}
}
class ExchagerConsumer implements Runnable{
private Exchanger<Integer> exchanger;
public ExchagerConsumer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
for(int i=1;i<10;i++) {
Integer data = i * 2;
System.out.println(i + " : consumer before exchange : " + data);
try {
data = exchanger.exchange(data);
} catch (InterruptedException e) {
System.out.println("Interrupted...");
}
System.out.println(i + " : consumer after exchange : " + data);
}
}
}
public class ExchagerDemo {
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<Integer>();
ExchangerProducer exchangerProducer = new ExchangerProducer(exchanger);
ExchagerConsumer exchagerConsumer = new ExchagerConsumer(exchanger);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(exchangerProducer);
exec.execute(exchagerConsumer);
TimeUnit.SECONDS.sleep(3);
exec.shutdownNow();
}
}
在上面的示例中,producer 負責生產奇數,consumer 負責生產偶數,在 producer 或者 consumer 生產完一個數之后,會將其放入 Exchanger 中等待交換,雙方進入到阻塞狀態(tài),等待交換完成之后,任務繼續(xù)進行。