Java 多線程
基礎(chǔ)
進(jìn)程和線程
- 進(jìn)程和線程的區(qū)別于聯(lián)系
- 進(jìn)程是操作系統(tǒng)分配資源的基本單位,進(jìn)程擁有獨(dú)立的內(nèi)存等資源,一個(gè)進(jìn)程至少有一個(gè)線程。
- 線程則是具體的執(zhí)行單位,CPU(或是CPU的一個(gè)核心)同時(shí)只能執(zhí)行一個(gè)線程
- 一個(gè)進(jìn)程的所有線程共享該進(jìn)程的系統(tǒng)資源。
- 進(jìn)程是程序的一次執(zhí)行,而使用線程可以使程序的這次執(zhí)行變?yōu)椴l(fā)
線程的狀態(tài)
Java中線程工分為以下幾個(gè)狀態(tài):
- new 表示線程創(chuàng)建,但是還未執(zhí)行
- runnable 表示線程可以被cpu調(diào)度到
- blocked 表示線程進(jìn)入同步代碼塊或IO操作
- running 表示線程正在執(zhí)行
- waiting 表示線程正在等待一個(gè)monitor
- timed_waiting表示線程有顯時(shí)間內(nèi)等待
各個(gè)狀態(tài)之間切換關(guān)系如下:

Java內(nèi)存模型
Java內(nèi)存模型是Java多線程的基礎(chǔ),可以參考文章:全面理解Java內(nèi)存模型
Therad類介紹
Therad類的使用
繼承Thread類并重寫Run方法
public static void main(String[] args) {
UselessThread thread = new UselessThread();
thread.start();
}
static class UselessThread extends Thread {
@Override
public void run() {
System.out.print("Useless");
}
}
輸出結(jié)果:
Useless
使用Runable接口
Thread thread = new Thread(new Runnable() {
public void run() {
System.out.print("Useless");
}
});
thread.start();
輸出結(jié)果:
Useless
幾個(gè)常用接口
- sleep
sleep方法是Thread中的一個(gè)靜態(tài)方法,用于用于強(qiáng)制停止當(dāng)前的線程指向若干毫秒。
- yield
yield 方法調(diào)用后表示線程可以讓出CPU,系統(tǒng)可以將CPU調(diào)度其他線程執(zhí)行,也可以繼續(xù)執(zhí)行當(dāng)前線程。
- join
join方法表示阻塞調(diào)用該方法的線程,指導(dǎo)被引用的線程結(jié)束為止,如下面的例子:
Thread aThread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("aThread finished \r\n");
}
});
aThread.start();
System.out.print("main Threand is abount to join \r\n");
try {
aThread.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.print("main Threand finish join \r\n");
輸出結(jié)果:
main Threand is abount to join
aThread finished
main Threand finish join
- interupt
interupt方法使被引用的線程收到一個(gè)InteruptException,以防止被引用的線程無(wú)限等待么,如下面的例子:
Thread aThread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(5000000);
} catch (InterruptedException e) {
System.out.print("aThread is interrupt \r\n");
}
System.out.print("aThread finished \r\n");
}
});
aThread.start();
try {
Thread.sleep(2000);
System.out.print("main thread sleep ended \r\n");
} catch (InterruptedException e) {
e.printStackTrace();
}
aThread.interrupt();
輸出結(jié)果:
main thread sleep ended
aThread is interrupt
aThread finished
- setPriority
setPriority方法用于設(shè)置線程的優(yōu)先級(jí),Thread類中優(yōu)先級(jí)是一個(gè)從1到10的整形,數(shù)字越大,優(yōu)先級(jí)越高。
基礎(chǔ)同步工具
synchronized
synchronized關(guān)鍵字可以用在方法代碼塊中。在Java中每個(gè)對(duì)象都有一個(gè)唯一的鎖,要進(jìn)synchronized代碼塊&方法中后必須獲得相關(guān)的鎖,如果無(wú)法獲得則線程阻塞直到持有鎖代碼塊&方法退出為止。如下面的例子:
final Object lock = new Object();
Thread aThread = new Thread(new Runnable() {
public void run() {
System.out.print("aThread is runing \r\n");
synchronized (lock) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("aThread finished \r\n");
}
}
});
aThread.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
System.out.print("main Thread finished \r\n");
}
輸出結(jié)果如下:
aThread is runing
aThread finished
main Thread finished
從輸出結(jié)果可以看出主線程被阻塞,直到子線程帶鎖的代碼塊返回。
wait & notify & notifyAll
wait接口調(diào)用后,調(diào)用后會(huì)放棄對(duì)象的鎖,并且當(dāng)前線程進(jìn)入了阻塞狀態(tài),并進(jìn)入到一個(gè)和該對(duì)象相關(guān)的等待池中。notfyfy & notify_all接口用于喚醒等待池等待池中第一個(gè)/所有的線程。wait方法和notify/notifyAll方法都必須包含在synchronized代碼塊中。下面是一個(gè)例子:
public class TestJava {
public static Object sObject = new Object();
/**
* @param args
*/
public static void main(String[] args) {
WaitThread threadA = new WaitThread("ThreadA");
WaitThread threadB = new WaitThread("ThreadB");
WaitThread threadC = new WaitThread("ThreadC");
threadA.start();
threadB.start();
threadC.start();
synchronized (sObject) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
sObject.notify();
}
}
static class WaitThread extends Thread {
WaitThread(String name) {
super(name);
}
@Override
public void run() {
synchronized (sObject) {
System.out.print(getName() + " is abount to wait \r\n");
try {
sObject.wait();
} catch (InterruptedException e) {
// TODO: handle exception
}
System.out.print("Thread" + getName() + " end wait \r\n");
}
}
}
}
輸出結(jié)果如下:
ThreadA is abount to wait
ThreadC is abount to wait
ThreadB is abount to wait
ThreadA end wait
如果將sObject.notify()換成sObject.notifyAll() 輸出結(jié)果如下:
ThreadA is abount to wait
ThreadB is abount to wait
ThreadC is abount to wait
main thread is about to notify
ThreadC end wait
ThreadB end wait
ThreadA end wait
volatile
volatile作用如下
- Java提供了volatile關(guān)鍵字來(lái)保證可見(jiàn)性。當(dāng)一個(gè)共享變量被volatile修飾時(shí),它會(huì)保證修改的值會(huì)立即被更新到主存,當(dāng)有其他線程需要讀取時(shí),它會(huì)去內(nèi)存中讀取新值。
- 在Java內(nèi)存模型中,允許編譯器和處理器對(duì)指令進(jìn)行重排序,但是重排序過(guò)程不會(huì)影響到單線程程序的執(zhí)行,卻會(huì)影響到多線程并發(fā)執(zhí)行的正確性。
- 保證了不同線程對(duì)這個(gè)變量進(jìn)行操作時(shí)的可見(jiàn)性,即一個(gè)線程修改了某個(gè)變量的值,這新值對(duì)其他線程來(lái)說(shuō)是立即可見(jiàn)的。
- 禁止進(jìn)行指令重排序
- volatile不可保證原子性
使用volatile的場(chǎng)景如下
- 多線程下,保證對(duì)當(dāng)前變量的寫操作保證可見(jiàn)性
voaltile boolean isStop = true;
//Thread1
while (isStop){
dosometing();
}
//Thread2
isStop = true;
終止一個(gè)無(wú)限循環(huán)的線程可以通過(guò)標(biāo)記變量的形式來(lái)做,如果不使用volatile關(guān)鍵字線程2的修改可能修改對(duì)線程1不可見(jiàn),會(huì)導(dǎo)致線程1繼續(xù)無(wú)限循環(huán)。
- 多線程下,禁止指令重排序
Context context;
volatile boolean isInitialized;
//Thread1
context = loadContext();
isInitialized = true;
//Thread2
if (isInitialized) {
context.getRessources();
}
如果不適用volatile 線程1中的兩行代碼可能被重排序,從而導(dǎo)致線程2概率性出現(xiàn)空指針問(wèn)題。
高級(jí)多線程工具類
ThreadLocal
ThreadLocal的使用方法
ThreadLocal類為對(duì)于為同一對(duì)象在每個(gè)線程產(chǎn)生一個(gè)副本,從而避免資源的競(jìng)爭(zhēng),簡(jiǎn)化同步代碼。ThreadLocal 共有三個(gè)共有方法。
public static void main(String[] args) {
final ThreadLocal<String> description = new ThreadLocal<String>() {
@Override
protected String initialValue() {
return "Not initialized";
}
};
Runnable r = new Runnable() {
public void run() {
String threadName = Thread.currentThread().getName();
System.out.print("Thread:" + threadName + " before set description:" + description.get() + "\r\n");
description.set(threadName);
System.out.print("Thread:" + threadName + " after set description:" + description.get() + "\r\n");
}
};
Thread t1 = new Thread(r, "A");
Thread t2 = new Thread(r, "B");
Thread t3 = new Thread(r, "C");
t1.start();
t2.start();
t3.start();
}
輸出結(jié)果如下:
Thread:A before set description:Not initialized
Thread:C before set description:Not initialized
Thread:A after set description:A
Thread:B before set description:Not initialized
Thread:C after set description:C
Thread:B after set description:B
從輸出結(jié)果看三個(gè)線程,交替執(zhí)行 description.get()的值都不受其他線程修改的影響。
ThreadLocal實(shí)現(xiàn)分析
- Thread類中有一個(gè)ThreadLocalMap成員,ThreadLocalMap是一個(gè)ThreadLocal和Object的Map
//Thread.java
ThreadLocal.ThreadLocalMap threadLocals = null;
//ThreadLocal.java
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}
- ThreadLocal 的Set 和Get放到都會(huì)從當(dāng)前Thread的threadLocals中查找。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
原子類
原子類AtomicInteger 中提供了一系列的方法,可以保證Integer的相關(guān)操作時(shí)原子性的,下面觀察兩段代碼:
- volatile int變量自增
public volatile int inc = 0;
public void increase() {
inc++;
}
public static void main(String[] args) {
final TestJava test = new TestJava();
for (int i = 0; i < 1000; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increase();
};
}.start();
}
while (Thread.activeCount() > 1)
Thread.yield();
System.out.println(test.inc);
}
1000 個(gè)線程中,每個(gè)線程對(duì)int變量做1000次自增,輸出結(jié)果如下:
995441
如前面所述,volatile變量只能保證修改可見(jiàn),自增操作可以分解為 讀取,增加,寫入三個(gè)操作,當(dāng)線程A和B的執(zhí)行順序?yàn)?,A讀取,B讀取,A增加,B增加,B寫入,A寫入時(shí),就會(huì)出現(xiàn)結(jié)果不一致的問(wèn)題。
如果修改為AtomicInteger,測(cè)試結(jié)果如下:
public AtomicInteger atomInt = new AtomicInteger(0);
public void increaseAtom() {
atomInt.getAndIncrement();
}
public static void main(String[] args) {
final TestJava test = new TestJava();
for (int i = 0; i < 1000; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increaseAtom();
};
}.start();
}
while (Thread.activeCount() > 1)
Thread.yield();
System.out.println(test.atomInt.get());
}
1000000
從測(cè)試結(jié)果看,AtomicInteger的getAndIncrement方法保證了自增操作的原子性。
AtomicInteger的實(shí)現(xiàn)
AtomicInteger中getAndIncrement方法的實(shí)現(xiàn)如下:
//AtomicInteger.java
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
//Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
會(huì)循環(huán)讀取 當(dāng)前值,然后再調(diào)用compareAndSwapInt方法設(shè)置新的值。這里要提到的一個(gè)概念是CAS, CAS是compare-and-swap的縮寫,當(dāng)要寫入一個(gè)值時(shí),首先從主存中讀取當(dāng)前值,然后和其他的當(dāng)前值做比較,如果發(fā)現(xiàn)不同則認(rèn)為其他線程修改了這個(gè)值,則寫入失敗。目前很多CPU都支持CAS指令因此為了效率 compareAndSwapInt 和 getIntVolatile都使用了natvie實(shí)現(xiàn)。
Lock類
JDK1.5 之后JAVA提供了Lock接口來(lái)實(shí)現(xiàn)和synchronized一樣的功能,使用方法如下.
public static void main(String[] args) {
final Lock lock = new ReentrantLock();
Runnable r = new Runnable() {
public void run() {
try {
System.out.print("Thread:" + Thread.currentThread().getName()
+ " requect lock \r\n");
lock.lock();
long before = System.currentTimeMillis();
long sleepTime = Math.abs(new Random().nextLong() & 2000);
System.out.print("Thread:" + Thread.currentThread().getName()
+ " getLock sleep:" + sleepTime + " \r\n");
Thread.sleep(sleepTime);
long after = System.currentTimeMillis();
System.out.print("Thread:" + Thread.currentThread().getName()
+ " End Time consumed:" + (after - before) + "\r\n");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
lock.unlock();
}
}
};
Thread threadA = new Thread(r, "A");
Thread threadB = new Thread(r, "B");
Thread threadC = new Thread(r, "C");
threadA.start();
threadB.start();
threadC.start();
}
Thread:C request lock
Thread:B request lock
Thread:A request lock
Thread:C getLock sleep:1680
Thread:C End Time consumed:1681
Thread:A getLock sleep:896
Thread:A End Time consumed:896
Thread:B getLock sleep:1232
Thread:B End Time consumed:1233
從Log可以看出,三個(gè)線程在鎖之間的代碼必須等待其他線程釋放鎖之后才能執(zhí)行。
如果將鎖的聲明修改為:
final Lock lock = new ReentrantLock(true);
可以看到如下log:
Thread:A request lock
Thread:C request lock
Thread:B request lock
Thread:A getLock sleep:336
Thread:A End Time consumed:337
Thread:C getLock sleep:1808
Thread:C End Time consumed:1808
Thread:B getLock sleep:320
Thread:B End Time consumed:321
從輸出可以看出,獲得鎖的順序和申請(qǐng)鎖的一致,這種鎖稱為公平鎖。ReentrantLock類可以構(gòu)造函數(shù)可以接受一個(gè)Boolean類型的標(biāo)量用于定義鎖是否是公平鎖。默認(rèn)工構(gòu)造函數(shù)為非公平鎖。
Condition類
Condition類實(shí)現(xiàn)了和Object類wait以及notify類似的功能,參考代碼如下"
public static void main(String[] args) {
final Lock lock = new ReentrantLock();
ConditionRunable ra = new ConditionRunable(lock);
ConditionRunable rb = new ConditionRunable(lock);
ConditionRunable rc = new ConditionRunable(lock);
Thread threadA = new Thread(ra, "A");
Thread threadB = new Thread(rb, "B");
Thread threadC = new Thread(rc, "C");
threadA.start();
threadB.start();
threadC.start();
try {
Thread.sleep(1000);
lock.lock();
rb.mCondition.signal();
rc.mCondition.signal();
ra.mCondition.signal();
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
static class ConditionRunable implements Runnable {
public Condition mCondition;
private Lock mLock;
public ConditionRunable(Lock lock) {
mLock = lock;
mCondition = lock.newCondition();
}
public void run() {
try {
mLock.lock();
System.out.print("Thread:" + Thread.currentThread().getName()
+ " reeady to wait \r\n");
mCondition.await();
System.out.print("Thread:" + Thread.currentThread().getName() + " wait end \r\n");
} catch (InterruptedException e) {
} finally {
mLock.unlock();
}
}
}
輸出如下:
Thread:A reeady to wait
Thread:B reeady to wait
Thread:C reeady to wait
Thread:B wait end
Thread:C wait end
Thread:A wait end
Condition的await和signal 和 Object的wait和notify類似,必須包含在鎖的lock和unlock方法之間。但是一個(gè)lock可以創(chuàng)建多個(gè)condition,從而在實(shí)際用用中,可以根據(jù)需求喚醒不同的線程。
ThreadPoolExecutor
構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
構(gòu)造函數(shù)比較復(fù)雜,說(shuō)明如下:
- 當(dāng)線程池中的線程小于corePoolSize時(shí),當(dāng)提交新的任務(wù)新建一個(gè)線程執(zhí)行
- 當(dāng)線程池中的線程達(dá)到corePoolSize,新提交的任務(wù)被放入workQueue等待執(zhí)行
- 當(dāng)workQueue已滿,且maximumPoolSize>corePoolSize時(shí),新提交的任務(wù)會(huì)創(chuàng)建新線程執(zhí)行
- 當(dāng)提交的任務(wù)數(shù)超過(guò)maximumPoolSize時(shí),執(zhí)行RejectedExecutionHandler
- 當(dāng)線程數(shù)量超過(guò)corePoolSize,且空閑時(shí)間達(dá)到keepAliveTime,則終止超出corePoolSize的空閑線程
- 當(dāng)設(shè)置allowCoreThreadTimeOut(true)時(shí),線程池中corePoolSize線程空閑時(shí)間達(dá)到keepAliveTime也將關(guān)閉
Android AsynTask中也用到了ThreadPoolExecutor,代碼如下:
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
這里corePoolSize的值,如果CPU核心數(shù)大于4,則為4,如果CPU核心數(shù)小于等于2則為2。maximumPoolSize的大小為CPU核心數(shù)乘以2加1。wockQueue的大小為128。
使用方法
可以參考下面的例子:
static BlockingQueue<Runnable> sBlockingQueue = new LinkedBlockingDeque<Runnable>(4);
static ThreadPoolExecutor sExcutor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.SECONDS,
sBlockingQueue, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
System.out.print("Task was rejected +\r\n");
}
});
/**
* @param args
*/
public static void main(String[] args) {
Runnable r = new Runnable() {
public void run() {
System.out.print("A Task is runing \r\n");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
}
System.out.print("A Task is end \r\n");
}
};
for (int i = 0; i < 2; i++) {
sExcutor.execute(r);
}
}
有興趣的同學(xué)可以將循環(huán)的次數(shù)分別修改為2,6,9,10看下執(zhí)行的結(jié)果。