同步隊(duì)列
看到同步隊(duì)列,第一想到的是AQS。
隊(duì)列同步器(AQS)是用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,使用一個(gè)int型變量代表同步狀態(tài),通過(guò)內(nèi)置的隊(duì)列來(lái)完成線程的排隊(duì)工作。
AQS使用步驟:
①子類通過(guò)繼承AQS并實(shí)現(xiàn)其抽象方法來(lái)管理同步狀態(tài),對(duì)于同步狀態(tài)的更改通過(guò)提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來(lái)進(jìn)行操作,因?yàn)槭褂肅AS操作保證同步狀態(tài)的改變是原子的。
②子類被推薦定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器本身并沒有實(shí)現(xiàn)任何的同步接口,僅僅是定義了若干狀態(tài)獲取和釋放的方法來(lái)提供自定義同步組件的使用。
③同步器既可以支持獨(dú)占式的獲取同步狀態(tài),也可以支持共享式的獲取同步狀態(tài)(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類型的同步組件)
ASQ定義了兩種資源共享的方式:
(1)獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock;
(2)共享,多個(gè)線程可以同時(shí)執(zhí)行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
關(guān)于同步器的幾個(gè)重要方法 :
(1)sHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
(2)tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
(3)tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
(4)tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
(5)tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。
AQS的本質(zhì),其實(shí)是CLH鎖隊(duì)列。
簡(jiǎn)單的介紹可以看看這個(gè):AQS與CLH的簡(jiǎn)單介紹
線程池
簡(jiǎn)單的介紹可以看這個(gè):線程池介紹
那怎么把這2個(gè)東西組合起來(lái)?
1、既然AQS幫我們完成了那么多事情,那我們直接用它來(lái)確保隊(duì)列同步就行。實(shí)現(xiàn)一個(gè)同步鎖。
public class CusLock {
public CusLock() {
sync = new Sync();
}
public void lock(){
sync.lock();
}
public void unlock(){
sync.unlock();
}
private final Sync sync;
static class Sync extends AbstractQueuedSynchronizer{
void lock(){
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
}else {
acquire(1);
}
}
void unlock(){
release(1);
}
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1;
boolean success = false;
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
success = true;
}
return success;
}
@Override
protected boolean tryRelease(int arg) {
assert arg == 1;
if(arg == 0){
throw new IllegalMonitorStateException();
}
setState(0);
setExclusiveOwnerThread(null);
return true;
}
@Override
protected boolean isHeldExclusively() {
return Thread.currentThread() == getExclusiveOwnerThread();
}
}
}
2、實(shí)現(xiàn)個(gè)訂單任務(wù),利用上面的同步鎖來(lái)控制任務(wù)是否完成。
public class SynOrder {
private CusLock lock = new CusLock();
public void getOrderNo() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "--" + SimpleDateFormat.getTimeInstance(SimpleDateFormat.FULL).format(new Date()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
3、測(cè)試一下
public class Main {
public static void main(String[] args) throws InterruptedException {
SynOrder synOrder = new SynOrder();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synOrder.getOrderNo();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synOrder.getOrderNo();
}
});
System.out.println("t1:"+t1.getName());
System.out.println("t2:"+t2.getName());
t1.start();
t2.start();
Thread.currentThread().join();
}
}
4、建立線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
由于線程池是執(zhí)行線程的,所以我們的order需要改一改:
public class SynOrder implements Runnable {
private CusLock lock = new CusLock();
public void getOrderNo() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "--" + SimpleDateFormat.getTimeInstance(SimpleDateFormat.FULL).format(new Date()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
@Override
public void run() {
getOrderNo();
}
}
for(int i=0;i<15;i++){
SynOrder synOrder = new SynOrder();
executor.execute(myTask);
System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:"+
executor.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount());
}
executor.shutdown();