多線程學習筆記

  • 多線程出現(xiàn)目的
  • 如何使用多線程
  • 線程狀態(tài)(6種)
  • 線程開啟/停止
  • 線程安全
    • Volilate
    • Sychronized
      • 機制
        • 如何實現(xiàn)鎖
        • 為什么任何一個對象都可以成為鎖
        • 鎖的優(yōu)化
    • Lock與Synchronized區(qū)別
    • CAS
    • AQS(AbstractQueuedSychronizer)
      • ReentrantLock
        • Lock()加鎖分析
        • unlock()釋放鎖分析
      • CountDownLatch
        • 是什么
        • 如何使用
        • 分析

多線程出現(xiàn)目的

場景:

  1. 當一個進程處理過程中,遇到網(wǎng)絡(luò)與IO操作都會進入阻塞狀態(tài),不再處理任何東西,浪費系統(tǒng)資源。
  2. 一個函數(shù)的處理非常耗時,其實其中多個邏輯可以并行處理。

多線程的面世就是要解決以上問題。

如何使用多線程

  1. extends Thread
public class ThreadDemo extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ":" + "ThreadDemo Running");
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new ThreadDemo().start();
        }
    }
}
  1. implements Runnable
public class RunnableDemo implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ":" + "RunnableDemo Running");
    }

    public static void main(String[] args) {
        RunnableDemo runnableDemo = new RunnableDemo();
        for (int i = 0; i < 10; i++) {
            new Thread(runnableDemo).start();
        }
    }
}
  1. ExecutorService
    • Executors.newFixedThreadPool
    • Executors.newCachedThreadPool
    • Executors.newSingleThreadPool
    • Executors.newScheduledThreadPool
public class ExecutorServiceDemo {

    private static ThreadPoolExecutor threadPool;

    private static ThreadFactory factory = new ThreadFactory() {
        private final AtomicInteger integer = new AtomicInteger();

        @Override
        public Thread newThread(Runnable r) {
            int threadName = integer.getAndIncrement();
            System.out.println("Created Thread:" + threadName);
            return new Thread(r, "ThreadPool Thread:" + threadName);
        }
    };

    private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        threadPool = new ThreadPoolExecutor(10, 15, 1000L,
                TimeUnit.SECONDS,
                workQueue,
                factory);

        //execute()與submit()的區(qū)別在于submit有一個Future類型的返回,
        // 實際submit是把Callable入?yún)b成RunnableFuture類型后再調(diào)用execute();
        for (int i = 0; i < 15; i++) {
            System.out.println("threadPool.execute");
            threadPool.execute(new RunnableDemo());
        }

        for (int i = 0; i < 15; i++) {
            System.out.println("threadPool.submit");
            Future<?> future = threadPool.submit(new CallableDemo());
            System.out.println(future.get());
        }

    }
}
  1. implements Callable<>
public class CallableDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println(Thread.currentThread().getName() + ":" +"CallableDemo Running");
        return "Callable Result";
    }

    public static void main(String[] args) throws Exception {
        CallableDemo callableDemo = new CallableDemo();
        String callableReturn = callableDemo.call();
        System.out.println("callableReturn :" + callableReturn);
    }
}

Callable與Runable區(qū)別:
Re:

  1. Callable任務(wù)線程能返回執(zhí)行結(jié)果,而Runnable任務(wù)線程不能返回結(jié)果
  2. Callable能向上拋出異常,而Runnable接口異常只能內(nèi)部消化

為什么提供extends Thread又提供implements Runnable
Re:因為JAVA不支持多繼承

線程狀態(tài)(6種)

image.png
  • NEW 調(diào)用Start方法前
  • RUNNABLE 運行
  • BLOCKED 阻塞
    • 等待阻塞 wait
    • 同步阻塞 synchronized
    • 其它阻塞 sleep/join
  • WAITING 等待
  • TIMED_WAITING 超時等待
  • TERMINATED 終止

狀態(tài)變更圖示:


image.png

線程開啟/停止

開始:start()
停止:interrupt()
通過設(shè)置標志位的方式終止線程,使其能有機會去清理資源,而非暴力的方式直接kill掉,這種方式更新安全。

public class demo4 {
    private static int num;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                num++;
                System.out.println("Num:" + num);
            }
        });
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread.interrupt();
    }
}

線程安全

  • 可見性
  • 原子性
  • 有序性

Volilate

public class VolatileDemo {
    private volatile static boolean stop = false;

    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            int i = 0;
            while (!stop) {
                i++;
                System.out.println("i: " + i);
            }
        });

        try {
            long startTime = System.currentTimeMillis();
            thread.start();
            System.out.println("Thread Start");
            TimeUnit.SECONDS.sleep(1);
            stop = true;
            long endTime = System.currentTimeMillis();
            System.out.println("Runtime: " + (endTime - startTime) / 1000 + " Second");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

因為每個線程有自己私有的內(nèi)存空間,修改變量需要同步回主存才能對其它線程可見,而volatile就會有哪下作用

  1. 修改volatile修飾的變量時,會強制將修改的值回寫到主存。
  2. 讀取volatile修飾的變量時,會強制到主存獲取數(shù)據(jù),不再到緩存讀取
  3. volatile會使被volatile修飾的語句禁止指令重排序

指令重排序?qū)嵗?/p>

int a = 1;
int b = 2;
int c = 3;
//以上例子,可能是int c = 3優(yōu)先于int a = 1和int b = 2執(zhí)行
int a = 1;
volatile int b = 2;
int c = 3;
//以上例子則volatile int b = 2一定是在int a = 1和int c = 3之間執(zhí)行。

問題:為什么需要編譯器指令重排?
Re:優(yōu)化執(zhí)行效率。

問題:什么是CPU亂序執(zhí)行?


image.png

問題:為什么要禁止編譯器指令重排呢?
Re:因為多線程下指令重排可能會導致處理出錯,例如:

Thread-1:
int b = 10;
int c = b;
boolean flag = true;

Thread-2:
while(flag){
    System.out.println(b);
}

如果編譯器把Thread-1的第3條指令重排到第一行,那Thread-2就有可能出錯,因為B還沒有初始化。

問題:volilate為什么不能保證原子性
Re:因為volilate對變量的操作在字節(jié)碼層面是由多條指令組成,非原子性操作,所以它只保證了可見性,不保證原子性。


Volilate因為只保證了Read and Load即從主存加載到工作內(nèi)存時加載的值是最新的,例如:
線程1和線程2在執(zhí)行Read and Load的時候,發(fā)現(xiàn)主存里的值都是5,雙方都加載了這個最新值,然后雙方都對該值加1,再把值放回主存,事實主存值結(jié)果為6,此操作有線程安全問題。

小結(jié)
聲明了volatile的變量進行寫操作,JVM就會向處理器發(fā)送一條Lock前綴的指令,把這個變量所在的緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存,再根據(jù)我們前面提到過的MESI的緩存一致性協(xié)議,來保證多CPU下的各個高速緩存中的數(shù)據(jù)的一致性。

Sychronized

機制

  1. 使用方法
  • 修飾實例方法
  • 修飾靜態(tài)方法
  • 修飾代碼塊
  1. 使用Sychronized后,會通過字節(jié)碼生成以下指令:
  • 修飾方法時:ACC_SYNCHRONIZED


    image.png
  • 修飾代碼塊:monitorenter monitorexit


  1. 獲取鎖情況
  • 修飾實例方法
    進行同步代碼前,需要獲取當前實例的鎖
  • 修飾靜態(tài)方法
    進行同步代碼前,需要獲取當前類對象的鎖
  • 修飾代碼塊
    進行同步代碼前,需要獲取給定對象的鎖

如何實現(xiàn)鎖

本質(zhì):對象監(jiān)視器的獲?。í氄兼i)

為什么任何一個對象都可以成為鎖

因為對象在內(nèi)存中分為三塊區(qū)域:對象頭、實例數(shù)據(jù)、對齊填充


image.png

對象頭:


image.png

而Synchroned使用的鎖存在每一個對象的對象頭里,其中鎖標志位指向的是monitor對象(也稱為管程或監(jiān)視器鎖)的起始地址。

鎖的優(yōu)化

鎖的狀態(tài):

  • 無向鎖
  • 偏向鎖
    大多數(shù)情況下,鎖不僅不存在多線程競爭,而且總是由同一線程多次獲得。使用傳統(tǒng)的重量級鎖會有頻繁鎖操作,為了讓線程獲得鎖的代價更低,引入了偏向鎖,
  • 輕量鎖
    當多線程竟然偏向鎖時會升級為輕量級鎖
  • 重量鎖
    基于monitor的鎖實現(xiàn)。

注意:鎖只能從輕到重的方向發(fā)展,不可逆。

Lock與Synchronized區(qū)別

  1. Lock是一個接口
  2. synchronized是JVM層的一個實現(xiàn)
  3. synchronized是被動的觸發(fā)鎖機制,而Lock是可以靈活的控制,鎖的創(chuàng)建和釋放都需要人為控制,特別是異常發(fā)生的時候要注意釋放鎖。
  4. Lock相對來講控制粒度更小,例如還可以分別控制讀寫鎖
  5. Lock支持公平、非公平鎖,而synchronized只支持非公平鎖

CAS

CAS是JDK提供的Unsafe類里的一系列操作,這一系列操作由JDK來保證原子性。

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
image.png

Atomic一系列的對象是根據(jù)CAS的封裝來實例原子性。

AQS(AbstractQueuedSychronizer)

AQS的關(guān)鍵數(shù)據(jù)結(jié)構(gòu):


image.png

鏈表的操作通過CAS原子操作來保證多線程下的原子性:

  • compareAndSetTail
    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
  • compareAndSetHead
     /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

通過compareAndSwapObject這個native方法來保證鏈表操作的線程安全性

ReentrantLock

Lock()加鎖分析

image.png

非公平鎖邏輯流程圖

image.png

公平鎖與非公平鎖的差異

  • 公平鎖獲取鎖的過程
    final void lock() {
        acquire(1);
    }
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  • 非公平鎖獲取鎖的過程
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
 
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    } 
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

以上可以看出,非公平鎖在Lock()方法被調(diào)用時是首先嘗試當前線程是否能直接獲得鎖,然后tryAcquire()方法的時候公平鎖是需要檢查AQS隊列里是否有等待的節(jié)點,有的話是當前線程獲取鎖不成功,而非公平鎖是直接CAS當前鎖的狀態(tài),若通過就把鎖給當前線程了。同時也可以看出雙方在獲取不到鎖的時候,進行AQS隊列方式是一樣的,都是加在隊尾。在加入隊列后,還需要根據(jù)當前節(jié)點的前驅(qū)節(jié)點的waitStatus若是Node.SIGNAL狀態(tài)判斷是否需要把當前線程掛起,以省系統(tǒng)資源,

unlock()釋放鎖分析

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

可以看出,每個unlock()操作都是一個State - 1操作,直到State == 0的時候,把ExclusiveOwnerThread即當前獲得鎖的線程設(shè)置為null來釋放鎖。

小結(jié)
在獲取鎖的時候,會維護一個雙向鏈表,用于存放獲取鎖失敗的的線程到隊列中進行自旋來獲取鎖,

CountDownLatch

是什么

CountDownLatch是JUC中提供的一個同步工具,使用調(diào)用await()它可以使一個或者多個線程進行等待,直到其它線程執(zhí)行CountDown()方法把倒數(shù)器減至0后,等待的線程才會啟動。

如何使用

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);

        new Thread(() -> {
            System.out.println(Thread.currentThread() + "執(zhí)行完畢");
            countDownLatch.countDown();
        }, "Thread-1").start();
        new Thread(() -> {
            System.out.println(Thread.currentThread() + "執(zhí)行完畢");
            countDownLatch.countDown();
        }, "Thread-2").start();
        new Thread(() -> {
            System.out.println(Thread.currentThread() + "執(zhí)行完畢");
            countDownLatch.countDown();
        }, "Thread-3").start();

        countDownLatch.await();
        System.out.println("全部線程執(zhí)行完畢");
    }
}

分析

  • await()
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
    

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) // 若線程中端,直接拋異常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


// 計數(shù)為0時,表示獲取鎖成功
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// 阻塞,并入隊
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); // 入隊
    boolean failed = true;
    try {
        for (;;) {
            // 獲取前驅(qū)節(jié)點
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 獲取鎖成功,設(shè)置隊列頭為node節(jié)點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) // 線程掛起
              && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 檢查計數(shù)器是否為0,為0直接返回
  2. 計數(shù)器大于0,即當前線程需要阻塞并等待計數(shù)器變?yōu)?
  3. 當前線程需要被封裝成Node對象并添加到AQS雙向鏈表里去
  4. 最后自旋嘗試獲取鎖,即檢查計數(shù)器是否為0,獲取成功即出隊,然后放行當前線程
  • countDonw()
// 計數(shù)-1
public void countDown() {
    sync.releaseShared(1);
}


public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 首先嘗試釋放鎖
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0) //如果計數(shù)已經(jīng)為0,則返回失敗
            return false;
        int nextc = c-1;
        // 原子操作實現(xiàn)計數(shù)-1
        if (compareAndSetState(c, nextc)) 
            return nextc == 0;
    }
}

// 喚醒被阻塞的線程
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 隊列非空,表示有線程被阻塞
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { 
            // 頭結(jié)點如果為SIGNAL,則喚醒頭結(jié)點下個節(jié)點上關(guān)聯(lián)的線程,并出隊
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head) // 沒有線程被阻塞,直接跳出
            break;
    }
}
  1. 嘗試釋放鎖,即將計數(shù)器-1,并判斷state是否為0,若為0即表示當前沒有鎖,可以開始喚醒鏈表中阻塞中的線程
  2. 如果鏈表里為空,即沒有阻塞的線程,直接退出
  3. 如果頭節(jié)點waitStatus為SIGNAL,就依次喚醒下個節(jié)點的線程,并出隊
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容