阿里P7告訴你什么是java并發(fā)包、線程池、鎖

并發(fā)包

java.util.concurrent從jdk1.5開始新加入的一個(gè)包,致力于解決并發(fā)編程的線程安全問題,使用戶能夠更為快捷方便的編寫多線程情況下的并發(fā)程序。

同步容器

同步容器只有包括Vector和HashTable,相比其他容器類只是多用了Synchronize的技術(shù)

Vector與ArrayList區(qū)別

1.ArrayList是最常用的List實(shí)現(xiàn)類,內(nèi)部是通過數(shù)組實(shí)現(xiàn)的,它允許對(duì)元素進(jìn)行快速隨機(jī)訪問。數(shù)組的缺點(diǎn)是每個(gè)元素之間不能有間隔,當(dāng)數(shù)組大小不滿足時(shí)需要增加存儲(chǔ)能力,就要講已經(jīng)有數(shù)組的數(shù)據(jù)復(fù)制到新的存儲(chǔ)空間中。當(dāng)從ArrayList的中間位置插入或者刪除元素時(shí),需要對(duì)數(shù)組進(jìn)行復(fù)制、移動(dòng)、代價(jià)比較高。因此,它適合隨機(jī)查找和遍歷,不適合插入和刪除。

2.Vector與ArrayList一樣,也是通過數(shù)組實(shí)現(xiàn)的,不同的是它支持線程的同步,即某一時(shí)刻只有一個(gè)線程能夠?qū)慥ector,避免多線程同時(shí)寫而引起的不一致性,但實(shí)現(xiàn)同步需要很高的花費(fèi),因此,訪問它比訪問ArrayList慢

注意: Vector線程安全、ArrayList線程不安全

HasTable與HasMap區(qū)別

1.HashMap不是線程安全的?

HastMap是一個(gè)接口 是map接口的子接口,是將鍵映射到值的對(duì)象,其中鍵和值都是對(duì)象,并且不能包含重復(fù)鍵,但可以包含重復(fù)值。HashMap允許null key和null value,而hashtable不允許。

2.HashTable是線程安全的一個(gè)Collection。

3.HashMap是Hashtable的輕量級(jí)實(shí)現(xiàn)(非線程安全的實(shí)現(xiàn)),他們都完成了Map接口,主要區(qū)別在于HashMap允許空(null)鍵值(key),由于非線程安全,效率上可能高于Hashtable。

HashMap允許將null作為一個(gè)entry的key或者value,而Hashtable不允許。

HashMap把Hashtable的contains方法去掉了,改成containsvalue和containsKey。

注意: HashTable線程安全,HashMap線程不安全。

synchronizedMap synchronizedList

Collections.synchronized*(m) 將線程不安全集合變?yōu)榫€程安全集合

Map m = Collections.synchronizedMap(new HashMap());??

List l = Collections.synchronizedList(new ArrayList());

ConcurrentHashMap

ConcurrentHashMap內(nèi)部使用段(Segment)來表示這些不同的部分,每個(gè)段其實(shí)就是一個(gè)小的HashTable,它們有自己的鎖。只要多個(gè)修改操作發(fā)生在不同的段上,它們就可以并發(fā)進(jìn)行。把一個(gè)整體分成了16個(gè)段(Segment)也就是最高支持16個(gè)線程的并發(fā)修改操作。

這也是在重線程場景時(shí)減小鎖的粒度從而降低鎖競爭的一種方案。并且代碼中大多共享變量使用volatile關(guān)鍵字聲明,目的是第一時(shí)間獲取修改的內(nèi)容,性能非常好。

大家可以點(diǎn)擊加入群:【Java高級(jí)架構(gòu)進(jìn)階群】:854180697 里面有Java高級(jí)大牛直播講解知識(shí)點(diǎn) 走的就是高端路線,(如果你想跳槽換工作 但是技術(shù)又不夠 或者工作上遇到了瓶頸 ,我這里有一個(gè)JAVA的免費(fèi)直播課程 ,講的是高端的知識(shí)點(diǎn)基礎(chǔ)不好的誤入喲,只要你有1-5年的開發(fā)經(jīng)驗(yàn)可以加群找我要課堂鏈接 注意:是免費(fèi)的 沒有開發(fā)經(jīng)驗(yàn)誤入哦)

并發(fā)容器

CountDownLatch

CountDownLatch是JAVA提供在java.util.concurrent包下的一個(gè)輔助類,可以把它看成是一個(gè)計(jì)數(shù)器,其內(nèi)部維護(hù)著一個(gè)count計(jì)數(shù),只不過對(duì)這個(gè)計(jì)數(shù)器的操作都是原子操作,同時(shí)只能有一個(gè)線程去操作這個(gè)計(jì)數(shù)器,CountDownLatch通過構(gòu)造函數(shù)傳入一個(gè)初始計(jì)數(shù)值,調(diào)用者可以通過調(diào)用CounDownLatch對(duì)象的cutDown()方法,來使計(jì)數(shù)減1;如果調(diào)用對(duì)象上的await()方法,那么調(diào)用者就會(huì)一直阻塞在這里,直到別人通過cutDown方法,將計(jì)數(shù)減到0,才可以繼續(xù)執(zhí)行。

public class Test002 {??public static void main(String[] args) throws InterruptedException {????System.out.println("等待子線程執(zhí)行完畢...");????CountDownLatch countDownLatch = new CountDownLatch(2);????new Thread(new Runnable() {??????@Override??????public void run() {????????System.out.println("子線程," +Thread.currentThread().getName() + "開始執(zhí)行...");????????countDownLatch.countDown();// 每次減去1????????System.out.println("子線程," + Thread.currentThread().getName() + "結(jié)束執(zhí)行...");??????}????}).start();????new Thread(new Runnable() {??????@Override??????public void run() {????????System.out.println("子線程," + Thread.currentThread().getName() + "開始執(zhí)行...");????????countDownLatch.countDown();????????System.out.println("子線程," + Thread.currentThread().getName() + "結(jié)束執(zhí)行...");????????}??????}).start();??????countDownLatch.await();// 調(diào)用當(dāng)前方法主線程阻塞?countDown結(jié)果為0, 阻塞變?yōu)檫\(yùn)行狀態(tài)??????System.out.println("兩個(gè)子線程執(zhí)行完畢....");??????System.out.println("繼續(xù)主線程執(zhí)行..");????}}

CyclicBarrier

一個(gè)同步輔助類,它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時(shí)地互相等待,此時(shí) CyclicBarrier 很有用。因?yàn)樵?barrier 在釋放等待線程后可以重用,所以稱它為循環(huán) 的 barrier。

使用場景

需要所有的子任務(wù)都完成時(shí),才執(zhí)行主任務(wù),這個(gè)時(shí)候就可以選擇使用CyclicBarrier。

public class CyclicBarrierTest {??public static void main(String[] args) throws IOException, InterruptedException {????//如果將參數(shù)改為4,但是下面只加入了3個(gè)選手,這永遠(yuǎn)等待下去????//Waits until all parties have invoked await on this barrier.?????CyclicBarrier barrier = new CyclicBarrier(3);????ExecutorService executor = Executors.newFixedThreadPool(3);????executor.submit(new Thread(new Runner(barrier, "1號(hào)選手")));????executor.submit(new Thread(new Runner(barrier, "2號(hào)選手")));????executor.submit(new Thread(new Runner(barrier, "3號(hào)選手")));????executor.shutdown();??}}class Runner implements Runnable {??// 一個(gè)同步輔助類,它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)??private CyclicBarrier barrier;??private String name;??public Runner(CyclicBarrier barrier, String name) {????super();????this.barrier = barrier;????this.name = name;??}??@Override??public void run() {????try {??????Thread.sleep(1000 * (new Random()).nextInt(8));??????System.out.println(name + " 準(zhǔn)備好了...");??????// barrier的await方法,在所有參與者都已經(jīng)在此 barrier 上調(diào)用 await 方法之前,將一直等待。??????barrier.await();????} catch (InterruptedException e) {??????e.printStackTrace();????} catch (BrokenBarrierException e) {??????e.printStackTrace();????}????System.out.println(name + " 起跑!");??}}

Semaphore

Semaphore是計(jì)數(shù)信號(hào)量。Semaphore管理一系列許可證。每個(gè)acquire方法阻塞,直到有一個(gè)許可證可以獲得然后拿走一個(gè)許可證;每個(gè)release方法增加一個(gè)許可證,這可能會(huì)釋放一個(gè)阻塞的acquire方法。然而,其實(shí)并沒有實(shí)際的許可證這個(gè)對(duì)象,Semaphore只是維持了一個(gè)可獲得許可證的數(shù)量。?

Semaphore經(jīng)常用于限制獲取某種資源的線程數(shù)量

需求: 一個(gè)廁所只有3個(gè)坑位,但是有10個(gè)人來上廁所,那怎么辦?假設(shè)10個(gè)人的編號(hào)分別為1-10,并且1號(hào)先到廁所,10號(hào)最后到廁所。那么1-3號(hào)來的時(shí)候必然有可用坑位,順利如廁,4號(hào)來的時(shí)候需要看看前面3人是否有人出來了,如果有人出來,進(jìn)去,否則等待。同樣的道理,4-10號(hào)也需要等待正在上廁所的人出來后才能進(jìn)去,并且誰先進(jìn)去這得看等待的人是否有素質(zhì),是否能遵守先來先上的規(guī)則。

class Parent implements Runnable {??private String name;??private Semaphore wc;??public Parent(String name,Semaphore wc){????this.name=name;????this.wc=wc;??}??@Override??public void run() {????try {??????// 剩下的資源(剩下的茅坑)??????int availablePermits = wc.availablePermits();??????if (availablePermits > 0) {????????System.out.println(name+"天助我也,終于有茅坑了...");??????} else {????????System.out.println(name+"怎么沒有茅坑了...");??????}??????//申請(qǐng)茅坑 如果資源達(dá)到3次,就等待??????wc.acquire();??????System.out.println(name+"終于輪我上廁所了..爽啊");????????Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時(shí)間。??????System.out.println(name+"廁所上完了...");??????wc.release();????} catch (Exception e) {????}??}}public class TestSemaphore02 {??public static void main(String[] args) {????Semaphore semaphore = new Semaphore(3);????for (int i = 1; i <=10; i++) {???????Parent parent = new Parent("第"+i+"個(gè)人,",semaphore);???????new Thread(parent).start();????}??}}

并發(fā)隊(duì)列

ConcurrentLinkedQueue

ConcurrentLinkedQueue : 是一個(gè)適用于高并發(fā)場景下的隊(duì)列,通過無鎖的方式,實(shí)現(xiàn)了高并發(fā)狀態(tài)下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列。該隊(duì)列的元素遵循先進(jìn)先出的原則。頭是最先加入的,尾是最近加入的,該隊(duì)列不允許null元素。

add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個(gè)方法沒有任何區(qū)別) poll() 和peek() 都是取頭元素節(jié)點(diǎn),區(qū)別在于前者會(huì)刪除元素,后者不會(huì)。

public class ConcurrentLinkedQueueTest {

??private static ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();

??private static int count = 2; // 線程個(gè)數(shù)

??//CountDownLatch,一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。

??private static CountDownLatch latch = new CountDownLatch(count);??public static void main(String[] args) throws InterruptedException {????long timeStart = System.currentTimeMillis();????ExecutorService es = Executors.newFixedThreadPool(4);????ConcurrentLinkedQueueTest.offer();????for (int i = 0; i < count; i++) {??????es.submit(new Poll());????}????latch.await(); //使得主線程(main)阻塞直到latch.countDown()為零才繼續(xù)執(zhí)行????System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");????es.shutdown();??}??/**???* 生產(chǎn)???*/??public static void offer() {????for (int i = 0; i < 100000; i++) {??????queue.offer(i);????}??}??static class Poll implements Runnable {????public void run() {??????// while (queue.size()>0) {??????while (!queue.isEmpty()) {????????System.out.println(queue.poll());??????}??????latch.countDown();????}??}}

BlockingQueue

阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:

在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/p>

當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。?

阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素

ArrayBlockingQueue

ArrayBlockingQueue是一個(gè)有邊界的阻塞隊(duì)列,它的內(nèi)部實(shí)現(xiàn)是一個(gè)數(shù)組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時(shí)候指定它的容量大小,容量大小一旦指定就不可改變。

ArrayBlockingQueue是以先進(jìn)先出的方式存儲(chǔ)數(shù)據(jù),最新插入的對(duì)象是尾部,最新移出的對(duì)象是頭部。

LinkedBlockingQueue

LinkedBlockingQueue阻塞隊(duì)列大小的配置是可選的,如果我們初始化時(shí)指定一個(gè)大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實(shí)是采用了默認(rèn)大小為Integer.MAX_VALUE的容量 。它的內(nèi)部實(shí)現(xiàn)是一個(gè)鏈表

SynchronousQueue

SynchronousQueue隊(duì)列內(nèi)部僅允許容納一個(gè)元素。當(dāng)一個(gè)線程插入一個(gè)元素后會(huì)被阻塞,除非這個(gè)元素被另一個(gè)線程消費(fèi)

public class BlockingQueueTest2 {??/**???*????* 定義裝蘋果的籃子???*????*/??public class Basket {????// 籃子,能夠容納3個(gè)蘋果????BlockingQueue basket = new LinkedBlockingQueue(3);????// 生產(chǎn)蘋果,放入籃子????public void produce() throws InterruptedException {??????// put方法放入一個(gè)蘋果,若basket滿了,等到basket有位置??????basket.put("An apple");????}????// 消費(fèi)蘋果,從籃子中取走????public String consume() throws InterruptedException {??????// take方法取出一個(gè)蘋果,若basket為空,等到basket有蘋果為止(獲取并移除此隊(duì)列的頭部)??????return basket.take();????}??}??// 定義蘋果生產(chǎn)者??class Producer implements Runnable {????private String instance;????private Basket basket;????public Producer(String instance, Basket basket) {??????this.instance = instance;??????this.basket = basket;????}????public void run() {??????try {????????while (true) {??????????// 生產(chǎn)蘋果??????????System.out.println("生產(chǎn)者準(zhǔn)備生產(chǎn)蘋果:" + instance);??????????basket.produce();??????????System.out.println("!生產(chǎn)者生產(chǎn)蘋果完畢:" + instance);??????????// 休眠300ms??????????Thread.sleep(300);????????}??????} catch (InterruptedException ex) {????????System.out.println("Producer Interrupted");??????}????}??}??// 定義蘋果消費(fèi)者??class Consumer implements Runnable {????private String instance;????private Basket basket;????public Consumer(String instance, Basket basket) {??????this.instance = instance;??????this.basket = basket;????}????public void run() {??????try {????????while (true) {??????????// 消費(fèi)蘋果??????????System.out.println("消費(fèi)者準(zhǔn)備消費(fèi)蘋果:" + instance);??????????System.out.println(basket.consume());??????????System.out.println("!消費(fèi)者消費(fèi)蘋果完畢:" + instance);??????????// 休眠1000ms??????????Thread.sleep(1000);????????}??????} catch (InterruptedException ex) {????????System.out.println("Consumer Interrupted");??????}????}??}??public static void main(String[] args) {????BlockingQueueTest2 test = new BlockingQueueTest2();????// 建立一個(gè)裝蘋果的籃子????Basket basket = test.new Basket();????ExecutorService service = Executors.newCachedThreadPool();????Producer producer = test.new Producer("生產(chǎn)者001", basket);????Producer producer2 = test.new Producer("生產(chǎn)者002", basket);????Consumer consumer = test.new Consumer("消費(fèi)者001", basket);????service.submit(producer);????service.submit(producer2);????service.submit(consumer);????// 程序運(yùn)行5s后,所有任務(wù)停止//????try {//??????Thread.sleep(1000 * 5);//????} catch (InterruptedException e) {//??????e.printStackTrace();//????}//????service.shutdownNow();??}}

PriorityBlockingQueue

優(yōu)先級(jí)阻塞隊(duì)列,該實(shí)現(xiàn)類需要自己實(shí)現(xiàn)一個(gè)繼承了 Comparator 接口的類, 在插入資源時(shí)會(huì)按照自定義的排序規(guī)則來對(duì)資源數(shù)組進(jìn)行排序。 其中值大的排在數(shù)組后面 ,取值時(shí)從數(shù)組頭開始取

public class TestQueue{??static Logger logger = LogManager.getLogger();??static Random random = new Random(47);??public static void main(String args[]) throws InterruptedException??{????PriorityBlockingQueue queue = new PriorityBlockingQueue();????ExecutorService executor = Executors.newCachedThreadPool();????executor.execute(new Runnable()????{??????public void run()??????{????????int i = 0;????????while (true)????????{??????????queue.put(new PriorityEntity(random.nextInt(10), i++));??????????try??????????{????????????TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));??????????}??????????catch (InterruptedException e)??????????{????????????logger.error(e);??????????}????????}??????}????});????executor.execute(new Runnable()????{??????public void run()??????{????????while (true)????????{??????????try??????????{????????????System.out.println("take-- " + queue.take() + " left:-- [" + queue.toString() + "]");????????????try????????????{??????????????TimeUnit.MILLISECONDS.sleep(random.nextInt(3000));????????????}????????????catch (InterruptedException e)????????????{??????????????logger.error(e);????????????}??????????}??????????catch (InterruptedException e)??????????{????????????logger.error(e);??????????}????????}??????}????});????try????{??????TimeUnit.SECONDS.sleep(5);????}????catch (InterruptedException e)????{??????logger.error(e);????}??}??static class PriorityEntity implements Comparable??{????private static int count = 0;????private int id = count++;????private int priority;????private int index = 0;????public PriorityEntity(int _priority, int _index)????{??????System.out.println("_priority : " + _priority);??????this.priority = _priority;??????this.index = _index;????}????public String toString()????{??????return id + "# [index=" + index + " priority=" + priority + "]";????}????//數(shù)字小,優(yōu)先級(jí)高????public int compareTo(PriorityEntity o)????{??????return this.priority > o.priority ? 1 : this.priority < o.priority ? -1 : 0;????}??}}

線程池

開發(fā)過程中,合理地使用線程池可以帶來3個(gè)好處:

降低資源消耗:通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。

提高響應(yīng)速度:當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。

提高線程的可管理性:線程是稀缺資源,如果無限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

線程池作用

線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。

根據(jù)系統(tǒng)的環(huán)境情況,可以自動(dòng)或手動(dòng)設(shè)置線程數(shù)量,達(dá)到運(yùn)行的最佳效果;少了浪費(fèi)了系統(tǒng)資源,多了造成系統(tǒng)擁擠效率不高。用線程池控制線程數(shù)量,其他線程排隊(duì)等候。一個(gè)任務(wù)執(zhí)行完畢,再從隊(duì)列的中取最前面的任務(wù)開始執(zhí)行。若隊(duì)列中沒有等待進(jìn)程,線程池的這一資源處于等待。當(dāng)一個(gè)新任務(wù)需要運(yùn)行時(shí),如果線程池中有等待的工作線程,就可以開始運(yùn)行了;否則進(jìn)入等待隊(duì)列。

線程池的分類

newCachedThreadPool

創(chuàng)建一個(gè)可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程

public static ExecutorService newCachedThreadPool() {????return new ThreadPoolExecutor(0, Integer.MAX_VALUE,???????????????????60L, TimeUnit.SECONDS,???????????????????new SynchronousQueue());??}public class ThreadPoolExecutorTest {????public static void main(String[] args) {??????ExecutorService cachedThreadPool = Executors.newCachedThreadPool();???????for (int i = 0; i < 10; i++) {??????????final int index = i;??????????try {??????????Thread.sleep(index * 1000);??????????} catch (InterruptedException e) {??????????e.printStackTrace();??????????}??????????cachedThreadPool.execute(new Runnable() {??????????public void run() {?????????????System.out.println(index);??????????}????????});???????}?????}??}??

線程池為無限大,當(dāng)執(zhí)行第二個(gè)任務(wù)時(shí)第一個(gè)任務(wù)已經(jīng)完成,會(huì)復(fù)用執(zhí)行第一個(gè)任務(wù)的線程,而不用每次新建線程

newFixedThreadPool

創(chuàng)建一個(gè)定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {????return new ThreadPoolExecutor(nThreads, nThreads,???????????????????0L, TimeUnit.MILLISECONDS,???????????????????new LinkedBlockingQueue());}public class ThreadPoolExecutorTest {????public static void main(String[] args) {??????ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);??????for (int i = 0; i < 10; i++) {????????final int index = i;????????fixedThreadPool.execute(new Runnable() {??????????public void run() {????????????try {??????????????System.out.println(index);??????????????Thread.sleep(2000);????????????} catch (InterruptedException e) {??????????????e.printStackTrace();????????????}??????????}????????});??????}????}??}?

因?yàn)榫€程池大小為3,每個(gè)任務(wù)輸出index后sleep 2秒,所以每兩秒打印3個(gè)數(shù)字。定長線程池的大小最好根據(jù)系統(tǒng)資源進(jìn)行設(shè)置。如Runtime.getRuntime().availableProcessors()

newScheduledThreadPool

創(chuàng)建一個(gè)定長線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。

public ScheduledThreadPoolExecutor(int corePoolSize) {????super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,???????new DelayedWorkQueue());??}public class ThreadPoolExecutorTest {????public static void main(String[] args) {??????ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);??????scheduledThreadPool.schedule(new Runnable() {????????public void run() {??????????System.out.println("delay 3 seconds");????????}??????}, 3, TimeUnit.SECONDS);????}??}??

表示延遲3秒執(zhí)行

public class ThreadPoolExecutorTest {????public static void main(String[] args) {??????ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);??????scheduledThreadPool.scheduleAtFixedRate(new Runnable() {????????public void run() {??????????System.out.println("delay 1 seconds, and excute every 3 seconds");????????}??????}, 1, 3, TimeUnit.SECONDS);????}??}??

表示延遲1秒后每3秒執(zhí)行一次

newSingleThreadExecutor

創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。

public static ExecutorService newSingleThreadExecutor() {????return new FinalizableDelegatedExecutorService??????(new ThreadPoolExecutor(1, 1,??????????????????0L, TimeUnit.MILLISECONDS,??????????????????new LinkedBlockingQueue()));??}

阿里發(fā)布的 Java開發(fā)手冊(cè)中強(qiáng)制線程池不允許使用 Executors 去創(chuàng)建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)

ThreadPoolExecutor??public ThreadPoolExecutor(int corePoolSize,???????????????

int maximumPoolSize,??????????????

?long keepAliveTime,??????????????

?TimeUnit unit,???????????????

BlockingQueue workQueue,??????????????

ThreadFactory threadFactory,???????????????

RejectedExecutionHandler handler)?

corePoolSize - 線程池核心池的大小。

maximumPoolSize - 線程池的最大線程數(shù)。

keepAliveTime - 當(dāng)線程數(shù)大于核心時(shí),此為終止前多余的空閑線程等待新任務(wù)的最長時(shí)間。

unit - keepAliveTime 的時(shí)間單位。

workQueue - 用來儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列。

threadFactory - 線程工廠。

handler - 拒絕策略。

線程優(yōu)先級(jí):

corePoolSize > workQueue > maximumPoolSize>handler(拒絕)

拒絕策略:

CallerRunsPolicy :這個(gè)策略重試添加當(dāng)前的任務(wù),他會(huì)自動(dòng)重復(fù)調(diào)用 execute() 方法,直到成功。

AbortPolicy :對(duì)拒絕任務(wù)拋棄處理,并且拋出異常。(默認(rèn)使用的)

DiscardPolicy :對(duì)拒絕任務(wù)直接無聲拋棄,沒有異常信息。

DiscardOldestPolicy :對(duì)拒絕任務(wù)不拋棄,而是拋棄隊(duì)列里面等待最久的一個(gè)線程,然后把拒絕任務(wù)加到隊(duì)列。

合理配置線程池

要想合理的配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來進(jìn)行分析:

任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。

任務(wù)的優(yōu)先級(jí):高,中和低。

任務(wù)的執(zhí)行時(shí)間:長,中和短。

任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。

任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)配置盡可能少的線程數(shù)量,如配置Ncpu+1個(gè)線程的線程池。IO密集型任務(wù)則由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu。混合型的任務(wù),如果可以拆分,則將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒必要進(jìn)行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)。

優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來處理。它可以讓優(yōu)先級(jí)高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。

執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級(jí)隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行。

依賴數(shù)據(jù)庫連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時(shí)間越長CPU空閑時(shí)間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。

CPU密集型時(shí),任務(wù)可以少配置線程數(shù),大概和機(jī)器的cpu核數(shù)相當(dāng),這樣可以使得每個(gè)線程都在執(zhí)行任務(wù) IO密集型時(shí),大部分線程都阻塞,故需要多配置線程數(shù),2*cpu核數(shù) 操作系統(tǒng)之名稱解釋: 某些進(jìn)程花費(fèi)了絕大多數(shù)時(shí)間在計(jì)算上,而其他則在等待I/O上花費(fèi)了大多是時(shí)間,前者稱為計(jì)算密集型(CPU密集型)computer-bound,后者稱為I/O密集型,I/O-bound。

寫在最后:歡迎留言討論,加關(guān)注,持續(xù)更新!??!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容