創(chuàng)建線程的方法
創(chuàng)建線程的方式一共有四種,1.繼承Thread,2.實現(xiàn)Runnable接口,3.實現(xiàn)Callable接口,4.線程池
首先介紹前兩種方式:
public class TestThread {
public static void main(String[] args) {
ThreadDemo1 td1 = new ThreadDemo1();
ThreadDemo2 td2 = new ThreadDemo2();
td1.start();
new Thread(td2).start();
}
}
class ThreadDemo1 extends Thread {
@Override
public void run() {
System.out.println("thread");
}
}
class ThreadDemo2 implements Runnable {
@Override
public void run() {
System.out.println("runnable");
}
}
Thread本身就繼承了Runnable接口,所以自然也就重寫了run()方法,所以在使用繼承Thread和實現(xiàn)Runnable都需要重寫run方法。
我們平時不會去采用繼承Thread的方式(單繼承,多實現(xiàn))。
都會采用start方法來開啟線程,與run相比,start方法會創(chuàng)建一個線程,并把線程至于可運行狀態(tài)(CPU可以進(jìn)行調(diào)度)。然后主線程會跳過這段代碼繼續(xù)向下執(zhí)行。當(dāng)執(zhí)行到該線程的時候,會調(diào)用run()方法,執(zhí)行線程內(nèi)部的邏輯。如果直接使用run方法是不會開啟一條線程的,相當(dāng)于仍然是單線程在執(zhí)行。
線程安全問題
線程安全問題存在于多個線程對共享變量的訪問。線程安全主要有兩個問題,一個是原子性,一個是可見性。這兩個問題都可以加鎖的方式解決,但在某些場合,加鎖的代價很重,因此不太適合。下面會介紹不通過加鎖解決線程安全的方法。
- 可見性:使用volatile關(guān)鍵字。每一個線程都會有自己的一塊內(nèi)存,當(dāng)它們對共享數(shù)據(jù)進(jìn)行修改的時候,只會更改自己線程內(nèi)存中的數(shù)據(jù),刷新到主內(nèi)存中需要一定的時間,其它的線程是無法快速感知的,這樣的話就會造成數(shù)據(jù)錯誤。volatile關(guān)鍵字的作用是,使用該關(guān)鍵子修飾的共享變量在修改的時候,會將數(shù)據(jù)同步到主內(nèi)存,變量在讀取的時候不從自己線程的緩存中讀取,而是去主內(nèi)存中讀取,我們可以理解為使用了volatile后,修改和讀取都是在主內(nèi)存中進(jìn)行的,雖然會有一定的開銷,但對于加鎖操作來說,開銷小太多了。除此之外volatile還有禁止指令重排的功能(屏障)。
//一般用于開關(guān)使用
public class TestVolatile {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while (true) {
//可以加同步鎖保證可見性,但是太重了(通過Happen-before原則來實現(xiàn)可見性)
if(td.isFlag()) {
System.out.println("--------------------");
break;
}
}
}
}
class ThreadDemo implements Runnable {
//去掉volatile以后,主方法sout不會執(zhí)行
private volatile boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag = " + isFlag());
}
public boolean isFlag() {
return flag;
}
}
- 原子性:一個經(jīng)典的例子i++,它是需要有兩步操作的,所以在多線程的環(huán)境下會有安全問題,volatile只能保證可見性,但是并不能保證原子性。
public class TestAtomic {
public static void main(String[] args) {
ThreadAtomicDemo td = new ThreadAtomicDemo();
for(int i = 0; i < 10; i++) {
new Thread(td).start();
}
}
}
class ThreadAtomicDemo implements Runnable {
//無法保證原子性,有可能會導(dǎo)致有相同的值出現(xiàn)
//private int num = 0;
//使用juc中Atomic包下的類,自帶volatile,原子性通過CAS保證
AtomicInteger num = new AtomicInteger(0);
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getNum());
}
public int getNum() {
//return num++;
return num.getAndIncrement();
}
}
每一個數(shù)據(jù)類型都對應(yīng)著其線程安全的數(shù)據(jù)類型,例如Integer對應(yīng)著AtomicInteger,其可見性通過volatile關(guān)鍵字來保證,原子性則是通過CAS來保證的。
CAS:硬件對于原子性的支持,舉個例子,假設(shè)一個線程要對共享數(shù)據(jù)i進(jìn)行加1的操作,首先它要去主內(nèi)存中去取i的值,發(fā)現(xiàn)i為2,接下來它對i進(jìn)行加1操作,同時再去主內(nèi)存中取i的值,如果此時i的值仍然為2,那么就把3寫回主內(nèi)存,如果它發(fā)現(xiàn)此時i的值不為2,那么從頭開始,重新嘗試對i進(jìn)行加1操作。
線程安全的集合類
我們會經(jīng)常使用集合,在多線程的情況又如何來保證安全問題呢?最簡單粗暴的方式就是對修改集合中數(shù)據(jù)的方法加同步鎖。第二種方式是使用Collections.synchronizedList(new ArrayList<>()),同樣也有set和map的方法,缺點是,在對集合中的數(shù)據(jù)進(jìn)行修改的時候會將整張表鎖住,所以在集合遍歷的時候,進(jìn)行修改的話,會有并發(fā)修改異常。接下來介紹一些常用的線程安全的集合類。
- ConcurrentHashMap:與HashTable不同的是,HashTable鎖住了整張表,而ConcurrentHashMap則是使用分段鎖(Segment)來表示不同的部分,每個段其實就是一個小的hashTable,它們都有自己的鎖(Lock),所以當(dāng)多個修改發(fā)生在不同段上的時候,就可以并發(fā)的進(jìn)行。當(dāng)然它也會有上述所說的異常。
- CopyOnWriteArrayList/CopyOnWriteArraySet:CopyOnWrite容器是寫的時候復(fù)制的容器,就是我們在往集合里面寫東西的時候,不是直接寫而是先copy這個容器,我們在copy的容器中添加元素,之后將指針指向新容器,因此可以并發(fā)的讀,不需要加鎖,十分適合讀多寫少的并發(fā)場景。當(dāng)然它的缺點一個是內(nèi)存占用的問題,一個是它只能保證最終一致性。
具體選擇用哪種線程安全的集合類看業(yè)務(wù)場景而定。
public class TestCopyOnWriteArrayList {
public static void main(String[] args) {
ThreadCopyOnWriteDemo td = new ThreadCopyOnWriteDemo();
for(int i = 0; i < 10; i++) {
new Thread(td).start();
}
}
}
class ThreadCopyOnWriteDemo implements Runnable {
/**
* 會有并發(fā)修改異常出現(xiàn)
*/
private static List<String> list = Collections.synchronizedList(new ArrayList<>());
/**
* 底層通過每次修改集合都會復(fù)制出一個新的集合來保證線程安全
* 適用于讀多寫少的多線程環(huán)境
*/
//private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
static {
list.add("AA");
list.add("BB");
list.add("CC");
}
@Override
public void run() {
Iterator<String> it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
list.add("AA");
}
}
}
線程之間的通訊
舉一個例子說,如果我希望一個線程要等其他的線程運行完之后再去運行。
- CountDownLatch:
允許一個或者多個線程等待其他線程完成之后再執(zhí)行,比如人齊了一起吃飯。
CountDownLatch(int count): 構(gòu)造方法,初始化計數(shù)器。
await(): 是當(dāng)前線程在計數(shù)器為0之前一直等待,除非線程被中斷。
countDown(): 計數(shù)器減1。
getCount(): 返回當(dāng)前計數(shù)。
CountDownLatch內(nèi)部有一個線程數(shù)量計數(shù)器,當(dāng)一個(或多個)線程執(zhí)行await方法后等待,其他的線程完成任務(wù)后,計數(shù)器減1。如果此時計數(shù)器仍然大于0,那么等待的線程繼續(xù)等待。如果為0,表示其他線程任務(wù)執(zhí)行完成之后,等待的線程會被喚醒。
public class TestCountDown {
public static void main(String[] args) throws InterruptedException {
//指定計數(shù)器5,每當(dāng)有線程執(zhí)行完后就減1,當(dāng)減到0的時候,主線程才會執(zhí)行
final CountDownLatch latch = new CountDownLatch(5);
TestCountDownDemo td = new TestCountDownDemo(latch);
long start = System.currentTimeMillis();
for(int i = 0; i < 5; i++) {
new Thread(td).start();
}
//主線程等待,只有l(wèi)atch的計數(shù)器到0的時候才會放行
latch.await();
/**
* 當(dāng)其它線程全部執(zhí)行完畢后才會執(zhí)行
* 用于總計算,那些需要其它線程執(zhí)行完結(jié)果再去執(zhí)行的場景
*/
long end = System.currentTimeMillis();
System.out.println("耗費時間為:" + (end - start));
}
}
class TestCountDownDemo implements Runnable {
private CountDownLatch latch;
public TestCountDownDemo(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
synchronized (this) {
try {
for(int i = 0; i < 10; i++) {
if(i % 2 ==0) {
System.out.println(i);
}
}
} finally {
//當(dāng)線程執(zhí)行完畢后減1
latch.countDown();
}
}
}
}
- 通過實現(xiàn)Callable接口來創(chuàng)建線程,實現(xiàn)Callable接口,可以將線程的結(jié)果進(jìn)行返回,需要FutureTask實現(xiàn)類的支持,用于接收結(jié)果。簡單來說就是線程能拋異常,能又返回值,返回值存在Future中。Callable實例當(dāng)作參數(shù),生成一個FutureTask的對象,然后把這個對象當(dāng)作一個Runnable,作為參數(shù)令起線程。
public class TestCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadCallableDemo td = new ThreadCallableDemo();
FutureTask<Integer> future = new FutureTask<>(td);
new Thread(future).start();
//接收線程運算后的結(jié)果
Integer result = future.get();
System.out.println(result);
System.out.println("-----------------------");
}
}
class ThreadCallableDemo implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
}
Future的核心思想:一個方法f,計算過程可能非常耗時,等待f返回,顯然不明智??梢栽谡{(diào)用f的時候,立刻返回一個Future,可以通過Future這個數(shù)據(jù)結(jié)構(gòu)去控制方法f的計算過程。
- get:獲取計算結(jié)果(如果還沒計算完,也是必須等待的)
- cancel:還沒計算完,可以取消計算過程
- isDone:判斷是否計算完
- isCancelled:判斷計算是否別取消
實際上,F(xiàn)utureTask也是一個Runnable,其中的run方法的邏輯就是運行Callable的call方法,然后保存結(jié)果或者異常。
讀寫鎖
讀寫鎖希望在共享數(shù)據(jù)的訪問上,讀讀不互斥,讀寫互斥,寫寫互斥,而且希望讀必須是最新的數(shù)據(jù),因此需要加讀寫鎖。
public class TestReadAndWrite {
public static void main(String[] args) {
ReadAndWrite rw = new ReadAndWrite();
for(int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
rw.get();
}
}).start();
}
new Thread(new Runnable() {
@Override
public void run() {
rw.save();
}
}, "write").start();
}
}
class ReadAndWrite {
private int number = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
//讀
public void get() {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " : " + number);
} finally {
lock.readLock().unlock();
}
}
//寫
public void save() {
lock.writeLock().lock();
try {
//Thread.sleep(20);
System.out.println(Thread.currentThread().getName());
this.number = new Random().nextInt(100);
} catch (Exception e) {
} finally {
lock.writeLock().unlock();
}
}
}
線程池
創(chuàng)建線程的第四種方式
為什么要使用線程池
- 減少過于頻繁的創(chuàng)建、銷毀線程,增加處理效率。
- 線程并發(fā)數(shù)量過多,搶占系統(tǒng)資源從而導(dǎo)致堵塞。
- 對線程進(jìn)行簡單的處理。
線程池中的參數(shù)
- corePoolSize:線程池中核心線程的最大值。(線程池里面分為核心線程和非核心線程) PS:核心線程默認(rèn)會一直存活,即使這個核心線程啥事都不干。
- maximumPoolSize:線程總數(shù)最大值。(線程總數(shù) = 核心線程 + 非核心線程)
- keepAliveTime:非核心線程,閑置最長時長。
- TimeUnit:keepAliveTime的單位。
- BlockingQueue:線程池中的任務(wù)隊列,核心線程沒滿,新添加的線程直接進(jìn)核心線程,核心線程滿了,加入任務(wù)隊列,若隊列滿了,新建非核心線程,若超出之前設(shè)置的線程總數(shù)最大值,就會報錯。
Executors提供的線程池創(chuàng)建配置
- newCachedThreadPool(): 用于處理大量短時間工作任務(wù)的線程池。
- 試圖緩存線程并重用,當(dāng)無緩存線程可用時,會創(chuàng)建新的線程。
- 如果線程閑置時間超過60s,則被終止并移出緩存。
- 內(nèi)部使用SynchronousQueue作為工作隊列。
(corePoolSize: 0, maximumPoolSize: Integer.MAX_VALUE, keepAliveTime: 60L, TimeUnit: SECONDS, BlockingQueue: SynchronousQueue)
- newFixThreadPool(int nThreads):
- 重用指定數(shù)目的線程。(nThreads)
- 使用無界的工作隊列。
- 任務(wù)數(shù)量超過活動隊列的數(shù)目,將在工作隊列中等待空閑線程出現(xiàn)。如果有工作線程退出,將會有新的工作線程被創(chuàng)建,以補足指定數(shù)目的nThreads。
(corePoolSize: nThreads, maximumPoolSize: nThreads, keepAliveTime: 0L, TimeUnit: SECONDS, BlockingQueue: LinkedBlockingQueue)
- newSingleThreadExecutor(): 創(chuàng)建的是ScheduledExecutorService,也就是可以進(jìn)行定時或周期性的工作調(diào)度。
- 工作線程數(shù)目限制為1,保證所有任務(wù)都是被順序執(zhí)行。
- 最多會有一個任務(wù)處于活動狀態(tài)。
- 不允許使用者更改線程池實例,可以避免其改變線程數(shù)目。
(corePoolSize: 1, maximumPoolSize: 1, keepAliveTime: 0L, TimeUnit: SECONDS, BlockingQueue: LinkedBlockingQueue)
- newScheduledThreadPool(int corePoolSize): 同樣是ScheduledExecutorService
- 會保持corePoolSize個工作線程。
- 可以設(shè)置延遲多少秒執(zhí)行。
(corePoolSize: corePoolSize, maximumPoolSize: Integer.MAX_VALUE, BlockingQueue: DelayedQueue)
public class TestThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
SumRunnable sr = new SumRunnable();
SumCallable sc = new SumCallable();
/*for(int i = 0; i < 100; i++) {
executorService.submit(sr);
}*/
List<Future<Integer>> list = new ArrayList<>();
//List<Integer> list1 = new ArrayList<>();
for(int i = 0; i < 100; i++) {
Future<Integer> future = executorService.submit(sc);
list.add(future);
//list1.add(future.get());
}
executorService.shutdown();
for(Future<Integer> future: list) {
System.out.println(future.get());
}
/*for(Integer i: list1) {
System.out.println(i);
}*/
}
}
class SumRunnable implements Runnable {
private int number = 0;
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
for(int i = 0; i <= 100; i++) {
number += i;
}
}
}
class SumCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int number = 0;
Thread.sleep(100);
for(int i = 0; i <= 100; i++) {
number += i;
}
return number;
}
}