如何封裝同步隊(duì)列的線程池

同步隊(duì)列

看到同步隊(duì)列,第一想到的是AQS。
隊(duì)列同步器(AQS)是用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,使用一個(gè)int型變量代表同步狀態(tài),通過(guò)內(nèi)置的隊(duì)列來(lái)完成線程的排隊(duì)工作。

AQS使用步驟:
①子類通過(guò)繼承AQS并實(shí)現(xiàn)其抽象方法來(lái)管理同步狀態(tài),對(duì)于同步狀態(tài)的更改通過(guò)提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來(lái)進(jìn)行操作,因?yàn)槭褂肅AS操作保證同步狀態(tài)的改變是原子的。
②子類被推薦定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器本身并沒有實(shí)現(xiàn)任何的同步接口,僅僅是定義了若干狀態(tài)獲取和釋放的方法來(lái)提供自定義同步組件的使用。
③同步器既可以支持獨(dú)占式的獲取同步狀態(tài),也可以支持共享式的獲取同步狀態(tài)(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類型的同步組件)
  
ASQ定義了兩種資源共享的方式:
(1)獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock;
(2)共享,多個(gè)線程可以同時(shí)執(zhí)行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
關(guān)于同步器的幾個(gè)重要方法 :
(1)sHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
(2)tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
(3)tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
(4)tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
(5)tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。

AQS的本質(zhì),其實(shí)是CLH鎖隊(duì)列。

簡(jiǎn)單的介紹可以看看這個(gè):AQS與CLH的簡(jiǎn)單介紹

線程池

簡(jiǎn)單的介紹可以看這個(gè):線程池介紹

那怎么把這2個(gè)東西組合起來(lái)?

1、既然AQS幫我們完成了那么多事情,那我們直接用它來(lái)確保隊(duì)列同步就行。實(shí)現(xiàn)一個(gè)同步鎖。

public class CusLock {
    public CusLock() {
        sync = new Sync();
    }
    public void lock(){
        sync.lock();
    }
    public void unlock(){
        sync.unlock();
    }
    private final Sync sync;
    static class Sync extends AbstractQueuedSynchronizer{
        void lock(){
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
            }else {
                acquire(1);
            }
        }
        void unlock(){
            release(1);
        }
        @Override
        protected boolean tryAcquire(int arg) {
            assert arg == 1;
            boolean success = false;
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                success = true;
            }
            return success;
        }

        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;
            if(arg == 0){
               throw new IllegalMonitorStateException();
            }
            setState(0);
            setExclusiveOwnerThread(null);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return Thread.currentThread() == getExclusiveOwnerThread();
        }
    }
}

2、實(shí)現(xiàn)個(gè)訂單任務(wù),利用上面的同步鎖來(lái)控制任務(wù)是否完成。

public class SynOrder {
    private CusLock lock = new CusLock();

    public void getOrderNo() {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "--" + SimpleDateFormat.getTimeInstance(SimpleDateFormat.FULL).format(new Date()));
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } finally {
            lock.unlock();
        }
    }
}

3、測(cè)試一下

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SynOrder synOrder = new SynOrder();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synOrder.getOrderNo();
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synOrder.getOrderNo();
            }
        });
        System.out.println("t1:"+t1.getName());
        System.out.println("t2:"+t2.getName());
        t1.start();
        t2.start();
        Thread.currentThread().join();
    }
}

4、建立線程池

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));

由于線程池是執(zhí)行線程的,所以我們的order需要改一改:


public class SynOrder implements Runnable {

    private CusLock lock = new CusLock();

    public void getOrderNo() {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "--" + SimpleDateFormat.getTimeInstance(SimpleDateFormat.FULL).format(new Date()));
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void run() {
        getOrderNo();
    }
}
for(int i=0;i<15;i++){
            SynOrder synOrder = new SynOrder();
             executor.execute(myTask);
             System.out.println("線程池中線程數(shù)目:"+executor.getPoolSize()+",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:"+
             executor.getQueue().size()+",已執(zhí)行玩別的任務(wù)數(shù)目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容