java.util.concurrent簡(jiǎn)介
java.util.concurrent包提供了很多有用的類,方便我們進(jìn)行并發(fā)程序的開(kāi)發(fā)。本文將會(huì)做一個(gè)總體的簡(jiǎn)單介紹。
主要的組件
java.util.concurrent包含了很多內(nèi)容, 本文將會(huì)挑選其中常用的一些類來(lái)進(jìn)行大概的說(shuō)明:
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
Executor
Executor是一個(gè)接口,它定義了一個(gè)execute方法,這個(gè)方法接收一個(gè)Runnable,并在其中調(diào)用Runnable的run方法。
我們看一個(gè)Executor的實(shí)現(xiàn):
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
現(xiàn)在我們可以直接調(diào)用該類中的方法:
public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
log.info("{}", Thread.currentThread().toString());
});
}
注意,Executor并不一定要求執(zhí)行的任務(wù)是異步的。
ExecutorService
如果我們真正的需要使用多線程的話,那么就需要用到ExecutorService了。
ExecutorService管理了一個(gè)內(nèi)存的隊(duì)列,并定時(shí)提交可用的線程。
我們首先定義一個(gè)Runnable類:
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}
我們可以通過(guò)Executors來(lái)方便的創(chuàng)建ExecutorService:
ExecutorService executor = Executors.newFixedThreadPool(10);
上面創(chuàng)建了一個(gè)ThreadPool, 我們也可以創(chuàng)建單線程的ExecutorService:
ExecutorService executor =Executors.newSingleThreadExecutor();
我們這樣提交task:
public void execute() {
executor.submit(new Task());
}
因?yàn)镋xecutorService維持了一個(gè)隊(duì)列,所以它不會(huì)自動(dòng)關(guān)閉, 我們需要調(diào)用executor.shutdown() 或者executor.shutdownNow()來(lái)關(guān)閉它。
如果想要判斷ExecutorService中的線程在收到shutdown請(qǐng)求后是否全部執(zhí)行完畢,可以調(diào)用如下的方法:
try {
executor.awaitTermination( 5l, TimeUnit.SECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}
ScheduledExecutorService
ScheduledExecutorService和ExecutorService很類似,但是它可以周期性的執(zhí)行任務(wù)。
我們這樣創(chuàng)建ScheduledExecutorService:
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();
executorService的schedule方法,可以傳入Runnable也可以傳入Callable:
Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);
還有兩個(gè)比較相近的方法:
scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )
scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )
兩者的區(qū)別是前者的period是以任務(wù)開(kāi)始時(shí)間來(lái)計(jì)算的,后者是以任務(wù)結(jié)束時(shí)間來(lái)計(jì)算。
Future
Future用來(lái)獲取異步執(zhí)行的結(jié)果??梢哉{(diào)用cancel(boolean mayInterruptIfRunning) 方法來(lái)取消線程的執(zhí)行。
我們看下怎么得到一個(gè)Future對(duì)象:
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}
我們看下怎么獲取Future的結(jié)果:
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
future還可以接受一個(gè)時(shí)間參數(shù),超過(guò)指定的時(shí)間,將會(huì)報(bào)TimeoutException。
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
CountDownLatch
CountDownLatch是一個(gè)并發(fā)中很有用的類,CountDownLatch會(huì)初始化一個(gè)counter,通過(guò)這個(gè)counter變量,來(lái)控制資源的訪問(wèn)。我們會(huì)在后面的文章詳細(xì)介紹。
CyclicBarrier
CyclicBarrier和CountDownLatch很類似。CyclicBarrier主要用于多個(gè)線程互相等待的情況,可以通過(guò)調(diào)用await() 方法等待,知道達(dá)到要等的數(shù)量。
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
Semaphore
Semaphore包含了一定數(shù)量的許可證,通過(guò)獲取許可證,從而獲得對(duì)資源的訪問(wèn)權(quán)限。通過(guò) tryAcquire()來(lái)獲取許可,如果獲取成功,許可證的數(shù)量將會(huì)減少。
一旦線程release()許可,許可的數(shù)量將會(huì)增加。
我們看下怎么使用:
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}
}
ThreadFactory
ThreadFactory可以很方便的用來(lái)創(chuàng)建線程:
public class ThreadFactoryUsage implements ThreadFactory {
private int threadId;
private String name;
public ThreadFactoryUsage(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
log.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}
愿與諸君共進(jìn)步,大量的面試題及答案還有資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識(shí)體系,可以微信搜索539413949獲取,最后祝大家都能拿到自己心儀的offer