概念:
- 原子性
- 原子是世界上的最小單位,具有不可分割性。比如 a=0;(a非long和double類(lèi)型) 這個(gè)操作是不可分割的,那么我們說(shuō)這個(gè)操作時(shí)原子操作。再比如:a++; 這個(gè)操作實(shí)際是a = a + 1;是可分割的,所以他不是一個(gè)原子操作。非原子操作都會(huì)存在線程安全問(wèn)題,需要我們使用同步技術(shù)(sychronized)來(lái)讓它變成一個(gè)原子操作。一個(gè)操作是原子操作,那么我們稱(chēng)它具有原子性。Java的concurrent包下提供了一些原子類(lèi),我們可以通過(guò)閱讀API來(lái)了解這些原子類(lèi)的用法。比如:AtomicInteger、AtomicLong、AtomicReference等。
- 可見(jiàn)性(對(duì)與可見(jiàn)性的詳細(xì)解釋?zhuān)?a target="_blank" rel="nofollow">http://www.cnblogs.com/aigongsi/archive/2012/04/01/2429166.html)
- 可見(jiàn)性,是指線程之間的可見(jiàn)性,一個(gè)線程修改的狀態(tài)對(duì)另一個(gè)線程是可見(jiàn)的。也就是一個(gè)線程修改的結(jié)果。另一個(gè)線程馬上就能看到。比如:用volatile修飾的變量,就會(huì)具有可見(jiàn)性。volatile修飾的變量不允許線程內(nèi)部緩存和重排序,即直接修改內(nèi)存。所以對(duì)其他線程是可見(jiàn)的。但是這里需要注意一個(gè)問(wèn)題,volatile只能讓被他修飾內(nèi)容具有可見(jiàn)性,但不能保證它具有原子性。比如 volatile int a = 0;之后有一個(gè)操作 a++;這個(gè)變量a具有可見(jiàn)性,但是a++ 依然是一個(gè)非原子操作,也就這這個(gè)操作同樣存在線程安全問(wèn)題。
- 什么是線程安全:
- 線程安全是指,同一段代碼多條線程訪問(wèn),不會(huì)產(chǎn)生不同的結(jié)果
- 并行和并發(fā)區(qū)別
- 并行是多條線程同時(shí)再賽跑。
- 并發(fā)是指同個(gè)資源,兩者交替輪流使用資源。
- Fifo(First In first Out):
- Queue隊(duì)列的特性就是先入先出
- Filo(First ln Last Out):
- stact棧的特性就是先入后出
- CAS(Compare And Swap):
- CAS 指的是現(xiàn)代 CPU 廣泛支持的一種對(duì)內(nèi)存中的共享數(shù)據(jù)進(jìn)行操作的一種特殊指令。這個(gè)指令會(huì)對(duì)內(nèi)存中的共享數(shù)據(jù)做原子的讀寫(xiě)操作。簡(jiǎn)單介紹一下這個(gè)指令的操作過(guò)程:首先,CPU 會(huì)將內(nèi)存中將要被更改的數(shù)據(jù)與期望的值做比較。然后,當(dāng)這兩個(gè)值相等時(shí),CPU 才會(huì)將內(nèi)存中的數(shù)值替換為新的值。否則便不做操作。最后,CPU 會(huì)將舊的數(shù)值返回。這一系列的操作是原子的。它們雖然看似復(fù)雜,但卻是 Java 5 并發(fā)機(jī)制優(yōu)于原有鎖機(jī)制的根本。簡(jiǎn)單來(lái)說(shuō),CAS 的含義是“我認(rèn)為原有的值應(yīng)該是什么,如果是,則將原有的值更新為新值,否則不做修改,并告訴我原來(lái)的值是多少”。(這段描述引自《Java并發(fā)編程實(shí)踐》)
簡(jiǎn)單的來(lái)說(shuō),CAS有3個(gè)操作數(shù),內(nèi)存值V,舊的預(yù)期值A(chǔ),要修改的新值B。當(dāng)且僅當(dāng)預(yù)期值A(chǔ)和內(nèi)存值V相同時(shí),將內(nèi)存值V修改為B,否則返回V。這是一種樂(lè)觀鎖的思路,它相信在它修改之前,沒(méi)有其它線程去修改它;而Synchronized是一種悲觀鎖,它認(rèn)為在它修改之前,一定會(huì)有其它線程去修改它,悲觀鎖效率很低
- CAS 指的是現(xiàn)代 CPU 廣泛支持的一種對(duì)內(nèi)存中的共享數(shù)據(jù)進(jìn)行操作的一種特殊指令。這個(gè)指令會(huì)對(duì)內(nèi)存中的共享數(shù)據(jù)做原子的讀寫(xiě)操作。簡(jiǎn)單介紹一下這個(gè)指令的操作過(guò)程:首先,CPU 會(huì)將內(nèi)存中將要被更改的數(shù)據(jù)與期望的值做比較。然后,當(dāng)這兩個(gè)值相等時(shí),CPU 才會(huì)將內(nèi)存中的數(shù)值替換為新的值。否則便不做操作。最后,CPU 會(huì)將舊的數(shù)值返回。這一系列的操作是原子的。它們雖然看似復(fù)雜,但卻是 Java 5 并發(fā)機(jī)制優(yōu)于原有鎖機(jī)制的根本。簡(jiǎn)單來(lái)說(shuō),CAS 的含義是“我認(rèn)為原有的值應(yīng)該是什么,如果是,則將原有的值更新為新值,否則不做修改,并告訴我原來(lái)的值是多少”。(這段描述引自《Java并發(fā)編程實(shí)踐》)
CAS的ABA問(wèn)題
所謂 ,問(wèn)題基本是這個(gè)樣子:
進(jìn)程P1在共享變量中讀到值為A
P1被搶占了,進(jìn)程P2執(zhí)行
P2把共享變量里的值從A改成了B,再改回到A,此時(shí)被P1搶占。
P1回來(lái)看到共享變量里的值沒(méi)有被改變,于是繼續(xù)執(zhí)行。
雖然P1以為變量值沒(méi)有改變,繼續(xù)執(zhí)行了,但是這個(gè)會(huì)引發(fā)一些潛在的問(wèn)題。ABA問(wèn)題最容易發(fā)生在lock free 的算法中的,CAS首當(dāng)其沖,因?yàn)镃AS判斷的是指針的地址。如果這個(gè)地址被重用了呢,問(wèn)題就很大了。(地址被重用是很經(jīng)常發(fā)生的,一個(gè)內(nèi)存分配后釋放了,再分配,很有可能還是原來(lái)的地址)
比如上述的DeQueue()函數(shù),因?yàn)槲覀円宧ead和tail分開(kāi),所以我們引入了一個(gè)dummy指針給head,當(dāng)我們做CAS的之前,如果head的那塊內(nèi)存被回收并被重用了,而重用的內(nèi)存又被EnQueue()進(jìn)來(lái)了,這會(huì)有很大的問(wèn)題。(內(nèi)存管理中重用內(nèi)存基本上是一種很常見(jiàn)的行為)
這個(gè)例子你可能沒(méi)有看懂,維基百科上給了一個(gè)活生生的例子——
你拿著一個(gè)裝滿錢(qián)的手提箱在飛機(jī)場(chǎng),此時(shí)過(guò)來(lái)了一個(gè)火辣性感的美女,然后她很暖昧地挑逗著你,并趁你不注意的時(shí)候,把用一個(gè)一模一樣的手提箱和你那裝滿錢(qián)的箱子調(diào)了個(gè)包,然后就離開(kāi)了,你看到你的手提箱還在那,于是就提著手提箱去趕飛機(jī)去了。
這就是ABA的問(wèn)題。
一、Java線程池ThreadPoolExecutor -- 參數(shù)
- corePoolSize: 核心線程數(shù)
- 核心線程會(huì)一直存活,及時(shí)沒(méi)有任務(wù)需要執(zhí)行
- 當(dāng)線程數(shù)小于核心線程數(shù)時(shí),即使有線程空閑,線程池也會(huì)優(yōu)先創(chuàng)建新線程處理
- 設(shè)置allowCoreThreadTimeout=true(默認(rèn)false)時(shí),核心線程會(huì)超時(shí)關(guān)閉
- 在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中
- maxmumPoolSize : 最大線程數(shù)
- 當(dāng)線程數(shù)>=corePoolSize,且任務(wù)隊(duì)列已滿時(shí)。線程池會(huì)創(chuàng)建新線程來(lái)處理任務(wù)
- 當(dāng)線程數(shù)=maxPoolSize,且任務(wù)隊(duì)列已滿時(shí),線程池會(huì)拒絕處理任務(wù)而拋出異常
- keepAliveTime : 線程空閑時(shí)間
- 當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí),線程會(huì)退出,直到線程數(shù)量=corePoolSize
- 如果allowCoreThreadTimeout=true,則會(huì)直到線程數(shù)量=0
- 默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0
- unit : keepAliveTime 時(shí)間單位
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
-
threadFactory
- 線程工廠,主要用來(lái)創(chuàng)建線程
-
handler
- 表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù) -
workQueue : 任務(wù)隊(duì)列
- Queue:Queue中元素按Fifo原則進(jìn)行排序
- ConcurrentLinkedQueue : 非阻塞隊(duì)列
- Queue是一個(gè)安全實(shí)現(xiàn)
- 使用CAS無(wú)鎖編程,來(lái)保證數(shù)據(jù)的一致性
- ConcurrentLinkedQueue : 非阻塞隊(duì)列
- Queue:Queue中元素按Fifo原則進(jìn)行排序
public class ConcurrentLinkedQueueTest {
private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
private static int count = 2; // 線程個(gè)數(shù)
//CountDownLatch,一個(gè)同步輔助類(lèi),在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
private static CountDownLatch latch = new CountDownLatch(count);
public static void main(String[] args) throws InterruptedException {
long timeStart = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(4);
ConcurrentLinkedQueueTest.offer();
for (int i = 0; i < count; i++) {
es.submit(new Poll());
}
latch.await(); //使得主線程(main)阻塞直到latch.countDown()為零才繼續(xù)執(zhí)行
System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
es.shutdown();
}
/**
* 生產(chǎn)
*/
public static void offer() {
for (int i = 0; i < 100000; i++) {
queue.offer(i);
}
}
/**
* 消費(fèi)
*
* @author 林計(jì)欽
* @version 1.0 2013-7-25 下午05:32:56
*/
static class Poll implements Runnable {
public void run() {
// while (queue.size()>0) {
while (!queue.isEmpty()) {
System.out.println(queue.poll());
}
latch.countDown();
}
}
}
+ BlockingQueue:阻塞隊(duì)列
+ LinkedBlockingQueue:實(shí)現(xiàn)是線程安全的,實(shí)現(xiàn)了先進(jìn)先出的特性,是作為生產(chǎn)者消費(fèi)者的首選
+ put:添加方法,在隊(duì)列滿的時(shí)候會(huì)阻塞
+ take : 讀取方法,在隊(duì)列為空的時(shí)候會(huì)阻塞,直到有成員被加
package com.example;
import java.util.concurrent.*;
// LinkedBlockingQueue Demo
public class BlockingQueueTest {
/**
* 定義裝蘋(píng)果的籃子
*/
public class Basket {
// 籃子,能夠容納3個(gè)蘋(píng)果
BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
// 生產(chǎn)蘋(píng)果,放入籃子
public void produce() throws InterruptedException {
// put方法放入一個(gè)蘋(píng)果,若basket滿了,等到basket有位置
basket.put("An apple");
}
// 消費(fèi)蘋(píng)果,從籃子中取走
public String consume() throws InterruptedException {
// take方法取出一個(gè)蘋(píng)果,若basket為空,等到basket有蘋(píng)果為止(獲取并移除此隊(duì)列的頭部)
return basket.take();
}
}
// 定義蘋(píng)果生產(chǎn)者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 生產(chǎn)蘋(píng)果
System.out.println("生產(chǎn)者準(zhǔn)備生產(chǎn)蘋(píng)果:" + instance);
basket.produce();
System.out.println("!生產(chǎn)者生產(chǎn)蘋(píng)果完畢:" + instance);
// 休眠300ms
Thread.sleep(300);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定義蘋(píng)果消費(fèi)者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消費(fèi)蘋(píng)果
System.out.println("消費(fèi)者準(zhǔn)備消費(fèi)蘋(píng)果:" + instance);
System.out.println(basket.consume());
System.out.println("!消費(fèi)者消費(fèi)蘋(píng)果完畢:" + instance);
// 休眠1000ms
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args) {
BlockingQueueTest test = new BlockingQueueTest();
// 建立一個(gè)裝蘋(píng)果的籃子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = test.new Producer("生產(chǎn)者001", basket);
Producer producer2 = test.new Producer("生產(chǎn)者002", basket);
Consumer consumer = test.new Consumer("消費(fèi)者001", basket);
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序運(yùn)行5s后,所有任務(wù)停止
// try {
// Thread.sleep(1000 * 5);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// service.shutdownNow();
}
}
二、Runnable And Callable
1、Runnable實(shí)現(xiàn)是Run方法,Callable實(shí)現(xiàn)是call方法
2、Callable的call方法可以有返回值使用Future接受返回值、Runnable的run方法沒(méi)有返回值
3、Callable的call方法可以拋出異常、run方法無(wú)法捕獲線程異常
三、Future AND CompletionService
1、用于接受Callable線程執(zhí)行完成返回的結(jié)果
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 多線程執(zhí)行,異步獲取結(jié)果
*
* @author i-clarechen
*
*/
public class AsyncThread {
public static void main(String[] args) {
AsyncThread t = new AsyncThread();
List<Future<String>> futureList = new ArrayList<Future<String>>();
t.generate(3, futureList);
t.doOtherThings();
t.getResult(futureList);
}
/**
* 生成指定數(shù)量的線程,都放入future數(shù)組
*
* @param threadNum
* @param fList
*/
public void generate(int threadNum, List<Future<String>> fList) {
ExecutorService service = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i < threadNum; i++) {
Future<String> f = service.submit(getJob(i));
fList.add(f);
}
service.shutdown();
}
/**
* other things
*/
public void doOtherThings() {
try {
for (int i = 0; i < 3; i++) {
System.out.println("do thing no:" + i);
Thread.sleep(1000 * (new Random().nextInt(10)));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 從future中獲取線程結(jié)果,打印結(jié)果
*
* @param fList
*/
public void getResult(List<Future<String>> fList) {
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(getCollectJob(fList));
service.shutdown();
}
/**
* 生成指定序號(hào)的線程對(duì)象
*
* @param i
* @return
*/
public Callable<String> getJob(final int i) {
final int time = new Random().nextInt(10);
return new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000 * time);
return "thread-" + i;
}
};
}
/**
* 生成結(jié)果收集線程對(duì)象
*
* @param fList
* @return
*/
public Runnable getCollectJob(final List<Future<String>> fList) {
return new Runnable() {
public void run() {
for (Future<String> future : fList) {
try {
while (true) {
if (future.isDone() && !future.isCancelled()) {
System.out.println("Future:" + future
+ ",Result:" + future.get());
break;
} else {
Thread.sleep(1000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
}
}
結(jié)果
do thing no:0
do thing no:1
do thing no:2
Future:java.util.concurrent.FutureTask@68e1ca74,Result:thread-0
Future:java.util.concurrent.FutureTask@3fb2bb77,Result:thread-1
Future:java.util.concurrent.FutureTask@6f31a24c,Result:thread-2
2、使用誤CompletionService實(shí)現(xiàn)非阻塞式Future
- 當(dāng)向Executor提交批處理任務(wù)時(shí),并且希望在它們完成后獲得結(jié)果,如果用FutureTask,你可以循環(huán)獲取task,并用future.get()去獲取結(jié)果,但是如果這個(gè)task沒(méi)有完成,你就得阻塞在這里,這個(gè)實(shí)效性不高,其實(shí)在很多場(chǎng)合,其實(shí)你拿第一個(gè)任務(wù)結(jié)果時(shí),此時(shí)結(jié)果并沒(méi)有生成并阻塞,其實(shí)在阻塞在第一個(gè)任務(wù)時(shí),第二個(gè)task的任務(wù)已經(jīng)早就完成了,顯然這種情況用future task不合適的,效率也不高。
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
public class testCallable {
public static void main(String[] args) {
try {
completionServiceCount();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
/**
* 使用completionService收集callable結(jié)果
* @throws ExecutionException
* @throws InterruptedException
*/
public static void completionServiceCount() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
executorService);
int threadNum = 5;
for (int i = 0; i < threadNum; i++) {
completionService.submit(getTask(i));
}
int sum = 0;
int temp = 0;
for(int i=0;i<threadNum;i++){
temp = completionService.take().get();
sum += temp;
System.out.print(temp + "\t");
}
System.out.println("CompletionService all is : " + sum);
executorService.shutdown();
}
public static Callable<Integer> getTask(final int no) {
final Random rand = new Random();
Callable<Integer> task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int time = rand.nextInt(100)*100;
System.out.println("thead:"+no+" time is:"+time);
Thread.sleep(time);
return no;
}
};
return task;
}
}
結(jié)果:最先執(zhí)行完成的線程先輸出結(jié)果
thead:0 time is:4200
thead:1 time is:6900
thead:2 time is:2900
thead:3 time is:9000
thead:4 time is:7100
2 0 1 4 3 CompletionService all is : 10