java 多線程、線程池

概念:

  • 原子性
  • 原子是世界上的最小單位,具有不可分割性。比如 a=0;(a非long和double類(lèi)型) 這個(gè)操作是不可分割的,那么我們說(shuō)這個(gè)操作時(shí)原子操作。再比如:a++; 這個(gè)操作實(shí)際是a = a + 1;是可分割的,所以他不是一個(gè)原子操作。非原子操作都會(huì)存在線程安全問(wèn)題,需要我們使用同步技術(shù)(sychronized)來(lái)讓它變成一個(gè)原子操作。一個(gè)操作是原子操作,那么我們稱(chēng)它具有原子性。Java的concurrent包下提供了一些原子類(lèi),我們可以通過(guò)閱讀API來(lái)了解這些原子類(lèi)的用法。比如:AtomicInteger、AtomicLong、AtomicReference等。
  • 可見(jiàn)性(對(duì)與可見(jiàn)性的詳細(xì)解釋?zhuān)?a target="_blank" rel="nofollow">http://www.cnblogs.com/aigongsi/archive/2012/04/01/2429166.html)
  • 可見(jiàn)性,是指線程之間的可見(jiàn)性,一個(gè)線程修改的狀態(tài)對(duì)另一個(gè)線程是可見(jiàn)的。也就是一個(gè)線程修改的結(jié)果。另一個(gè)線程馬上就能看到。比如:用volatile修飾的變量,就會(huì)具有可見(jiàn)性。volatile修飾的變量不允許線程內(nèi)部緩存和重排序,即直接修改內(nèi)存。所以對(duì)其他線程是可見(jiàn)的。但是這里需要注意一個(gè)問(wèn)題,volatile只能讓被他修飾內(nèi)容具有可見(jiàn)性,但不能保證它具有原子性。比如 volatile int a = 0;之后有一個(gè)操作 a++;這個(gè)變量a具有可見(jiàn)性,但是a++ 依然是一個(gè)非原子操作,也就這這個(gè)操作同樣存在線程安全問(wèn)題。
  • 什么是線程安全:
    • 線程安全是指,同一段代碼多條線程訪問(wèn),不會(huì)產(chǎn)生不同的結(jié)果
  • 并行和并發(fā)區(qū)別
    • 并行是多條線程同時(shí)再賽跑。
    • 并發(fā)是指同個(gè)資源,兩者交替輪流使用資源。
  • Fifo(First In first Out):
    • Queue隊(duì)列的特性就是先入先出
  • Filo(First ln Last Out):
    • stact棧的特性就是先入后出
  • CAS(Compare And Swap):
    • CAS 指的是現(xiàn)代 CPU 廣泛支持的一種對(duì)內(nèi)存中的共享數(shù)據(jù)進(jìn)行操作的一種特殊指令。這個(gè)指令會(huì)對(duì)內(nèi)存中的共享數(shù)據(jù)做原子的讀寫(xiě)操作。簡(jiǎn)單介紹一下這個(gè)指令的操作過(guò)程:首先,CPU 會(huì)將內(nèi)存中將要被更改的數(shù)據(jù)與期望的值做比較。然后,當(dāng)這兩個(gè)值相等時(shí),CPU 才會(huì)將內(nèi)存中的數(shù)值替換為新的值。否則便不做操作。最后,CPU 會(huì)將舊的數(shù)值返回。這一系列的操作是原子的。它們雖然看似復(fù)雜,但卻是 Java 5 并發(fā)機(jī)制優(yōu)于原有鎖機(jī)制的根本。簡(jiǎn)單來(lái)說(shuō),CAS 的含義是“我認(rèn)為原有的值應(yīng)該是什么,如果是,則將原有的值更新為新值,否則不做修改,并告訴我原來(lái)的值是多少”。(這段描述引自《Java并發(fā)編程實(shí)踐》)
      簡(jiǎn)單的來(lái)說(shuō),CAS有3個(gè)操作數(shù),內(nèi)存值V,舊的預(yù)期值A(chǔ),要修改的新值B。當(dāng)且僅當(dāng)預(yù)期值A(chǔ)和內(nèi)存值V相同時(shí),將內(nèi)存值V修改為B,否則返回V。這是一種樂(lè)觀鎖的思路,它相信在它修改之前,沒(méi)有其它線程去修改它;而Synchronized是一種悲觀鎖,它認(rèn)為在它修改之前,一定會(huì)有其它線程去修改它,悲觀鎖效率很低
CAS的ABA問(wèn)題

所謂 ,問(wèn)題基本是這個(gè)樣子:

進(jìn)程P1在共享變量中讀到值為A
P1被搶占了,進(jìn)程P2執(zhí)行
P2把共享變量里的值從A改成了B,再改回到A,此時(shí)被P1搶占。
P1回來(lái)看到共享變量里的值沒(méi)有被改變,于是繼續(xù)執(zhí)行。
雖然P1以為變量值沒(méi)有改變,繼續(xù)執(zhí)行了,但是這個(gè)會(huì)引發(fā)一些潛在的問(wèn)題。ABA問(wèn)題最容易發(fā)生在lock free 的算法中的,CAS首當(dāng)其沖,因?yàn)镃AS判斷的是指針的地址。如果這個(gè)地址被重用了呢,問(wèn)題就很大了。(地址被重用是很經(jīng)常發(fā)生的,一個(gè)內(nèi)存分配后釋放了,再分配,很有可能還是原來(lái)的地址)

比如上述的DeQueue()函數(shù),因?yàn)槲覀円宧ead和tail分開(kāi),所以我們引入了一個(gè)dummy指針給head,當(dāng)我們做CAS的之前,如果head的那塊內(nèi)存被回收并被重用了,而重用的內(nèi)存又被EnQueue()進(jìn)來(lái)了,這會(huì)有很大的問(wèn)題。(內(nèi)存管理中重用內(nèi)存基本上是一種很常見(jiàn)的行為)

這個(gè)例子你可能沒(méi)有看懂,維基百科上給了一個(gè)活生生的例子——

你拿著一個(gè)裝滿錢(qián)的手提箱在飛機(jī)場(chǎng),此時(shí)過(guò)來(lái)了一個(gè)火辣性感的美女,然后她很暖昧地挑逗著你,并趁你不注意的時(shí)候,把用一個(gè)一模一樣的手提箱和你那裝滿錢(qián)的箱子調(diào)了個(gè)包,然后就離開(kāi)了,你看到你的手提箱還在那,于是就提著手提箱去趕飛機(jī)去了。

這就是ABA的問(wèn)題。

一、Java線程池ThreadPoolExecutor -- 參數(shù)

  • corePoolSize: 核心線程數(shù)
    • 核心線程會(huì)一直存活,及時(shí)沒(méi)有任務(wù)需要執(zhí)行
    • 當(dāng)線程數(shù)小于核心線程數(shù)時(shí),即使有線程空閑,線程池也會(huì)優(yōu)先創(chuàng)建新線程處理
    • 設(shè)置allowCoreThreadTimeout=true(默認(rèn)false)時(shí),核心線程會(huì)超時(shí)關(guān)閉
    • 在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中
  • maxmumPoolSize : 最大線程數(shù)
    • 當(dāng)線程數(shù)>=corePoolSize,且任務(wù)隊(duì)列已滿時(shí)。線程池會(huì)創(chuàng)建新線程來(lái)處理任務(wù)
    • 當(dāng)線程數(shù)=maxPoolSize,且任務(wù)隊(duì)列已滿時(shí),線程池會(huì)拒絕處理任務(wù)而拋出異常
  • keepAliveTime : 線程空閑時(shí)間
    • 當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí),線程會(huì)退出,直到線程數(shù)量=corePoolSize
    • 如果allowCoreThreadTimeout=true,則會(huì)直到線程數(shù)量=0
    • 默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0
  • unit : keepAliveTime 時(shí)間單位
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時(shí)
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
  • threadFactory

    • 線程工廠,主要用來(lái)創(chuàng)建線程
  • handler

    • 表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:

    ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
    ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
    ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
    ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)

  • workQueue : 任務(wù)隊(duì)列

    • Queue:Queue中元素按Fifo原則進(jìn)行排序
      • ConcurrentLinkedQueue : 非阻塞隊(duì)列
        • Queue是一個(gè)安全實(shí)現(xiàn)
        • 使用CAS無(wú)鎖編程,來(lái)保證數(shù)據(jù)的一致性
public class ConcurrentLinkedQueueTest {
    private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
    private static int count = 2; // 線程個(gè)數(shù)
    //CountDownLatch,一個(gè)同步輔助類(lèi),在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
    private static CountDownLatch latch = new CountDownLatch(count);

    public static void main(String[] args) throws InterruptedException {
        long timeStart = System.currentTimeMillis();
        ExecutorService es = Executors.newFixedThreadPool(4);
        ConcurrentLinkedQueueTest.offer();
        for (int i = 0; i < count; i++) {
            es.submit(new Poll());
        }
        latch.await(); //使得主線程(main)阻塞直到latch.countDown()為零才繼續(xù)執(zhí)行
        System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
        es.shutdown();
    }

    /**
     * 生產(chǎn)
     */
    public static void offer() {
        for (int i = 0; i < 100000; i++) {
            queue.offer(i);
        }
    }


    /**
     * 消費(fèi)
     *
     * @author 林計(jì)欽
     * @version 1.0 2013-7-25 下午05:32:56
     */
    static class Poll implements Runnable {
        public void run() {
            // while (queue.size()>0) {
            while (!queue.isEmpty()) {
                System.out.println(queue.poll());
            }
            latch.countDown();
        }
    }
}

+ BlockingQueue:阻塞隊(duì)列
  + LinkedBlockingQueue:實(shí)現(xiàn)是線程安全的,實(shí)現(xiàn)了先進(jìn)先出的特性,是作為生產(chǎn)者消費(fèi)者的首選
    + put:添加方法,在隊(duì)列滿的時(shí)候會(huì)阻塞
    + take : 讀取方法,在隊(duì)列為空的時(shí)候會(huì)阻塞,直到有成員被加
package com.example;  

import java.util.concurrent.*;
// LinkedBlockingQueue Demo
public class BlockingQueueTest {
    /**
     * 定義裝蘋(píng)果的籃子
     */
    public class Basket {
        // 籃子,能夠容納3個(gè)蘋(píng)果
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);

        // 生產(chǎn)蘋(píng)果,放入籃子
        public void produce() throws InterruptedException {
            // put方法放入一個(gè)蘋(píng)果,若basket滿了,等到basket有位置
            basket.put("An apple");
        }

        // 消費(fèi)蘋(píng)果,從籃子中取走
        public String consume() throws InterruptedException {
            // take方法取出一個(gè)蘋(píng)果,若basket為空,等到basket有蘋(píng)果為止(獲取并移除此隊(duì)列的頭部)
            return basket.take();
        }
    }

    // 定義蘋(píng)果生產(chǎn)者
    class Producer implements Runnable {
        private String instance;
        private Basket basket;

        public Producer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 生產(chǎn)蘋(píng)果
                    System.out.println("生產(chǎn)者準(zhǔn)備生產(chǎn)蘋(píng)果:" + instance);
                    basket.produce();
                    System.out.println("!生產(chǎn)者生產(chǎn)蘋(píng)果完畢:" + instance);
                    // 休眠300ms
                    Thread.sleep(300);
                }
            } catch (InterruptedException ex) {
                System.out.println("Producer Interrupted");
            }
        }
    }

    // 定義蘋(píng)果消費(fèi)者
    class Consumer implements Runnable {
        private String instance;
        private Basket basket;

        public Consumer(String instance, Basket basket) {
            this.instance = instance;
            this.basket = basket;
        }

        public void run() {
            try {
                while (true) {
                    // 消費(fèi)蘋(píng)果
                    System.out.println("消費(fèi)者準(zhǔn)備消費(fèi)蘋(píng)果:" + instance);
                    System.out.println(basket.consume());
                    System.out.println("!消費(fèi)者消費(fèi)蘋(píng)果完畢:" + instance);
                    // 休眠1000ms
                    Thread.sleep(1000);
                }
            } catch (InterruptedException ex) {
                System.out.println("Consumer Interrupted");
            }
        }
    }

    public static void main(String[] args) {
        BlockingQueueTest test = new BlockingQueueTest();

        // 建立一個(gè)裝蘋(píng)果的籃子
        Basket basket = test.new Basket();

        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = test.new Producer("生產(chǎn)者001", basket);
        Producer producer2 = test.new Producer("生產(chǎn)者002", basket);
        Consumer consumer = test.new Consumer("消費(fèi)者001", basket);
        service.submit(producer);
        service.submit(producer2);
        service.submit(consumer);
        // 程序運(yùn)行5s后,所有任務(wù)停止
//        try {
//            Thread.sleep(1000 * 5);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        service.shutdownNow();
    }
}

二、Runnable And Callable
1、Runnable實(shí)現(xiàn)是Run方法,Callable實(shí)現(xiàn)是call方法
2、Callable的call方法可以有返回值使用Future接受返回值、Runnable的run方法沒(méi)有返回值
3、Callable的call方法可以拋出異常、run方法無(wú)法捕獲線程異常

三、Future AND CompletionService
1、用于接受Callable線程執(zhí)行完成返回的結(jié)果

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 多線程執(zhí)行,異步獲取結(jié)果
 * 
 * @author i-clarechen
 *
 */
public class AsyncThread {

    public static void main(String[] args) {
        AsyncThread t = new AsyncThread();
        List<Future<String>> futureList = new ArrayList<Future<String>>();
        t.generate(3, futureList);
        t.doOtherThings();
        t.getResult(futureList);
    }

    /**
     * 生成指定數(shù)量的線程,都放入future數(shù)組
     * 
     * @param threadNum
     * @param fList
     */
    public void generate(int threadNum, List<Future<String>> fList) {
        ExecutorService service = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {
            Future<String> f = service.submit(getJob(i));
            fList.add(f);
        }
        service.shutdown();
    }

    /**
     * other things
     */
    public void doOtherThings() {
        try {
            for (int i = 0; i < 3; i++) {
                System.out.println("do thing no:" + i);
                Thread.sleep(1000 * (new Random().nextInt(10)));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 從future中獲取線程結(jié)果,打印結(jié)果
     * 
     * @param fList
     */
    public void getResult(List<Future<String>> fList) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        service.execute(getCollectJob(fList));
        service.shutdown();
    }

    /**
     * 生成指定序號(hào)的線程對(duì)象
     * 
     * @param i
     * @return
     */
    public Callable<String> getJob(final int i) {
        final int time = new Random().nextInt(10);
        return new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000 * time);
                return "thread-" + i;
            }
        };
    }

    /**
     * 生成結(jié)果收集線程對(duì)象
     * 
     * @param fList
     * @return
     */
    public Runnable getCollectJob(final List<Future<String>> fList) {
        return new Runnable() {
            public void run() {
                for (Future<String> future : fList) {
                    try {
                        while (true) {
                            if (future.isDone() && !future.isCancelled()) {
                                System.out.println("Future:" + future
                                        + ",Result:" + future.get());
                                break;
                            } else {
                                Thread.sleep(1000);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        };
    }

}

結(jié)果

do thing no:0
do thing no:1
do thing no:2
Future:java.util.concurrent.FutureTask@68e1ca74,Result:thread-0
Future:java.util.concurrent.FutureTask@3fb2bb77,Result:thread-1
Future:java.util.concurrent.FutureTask@6f31a24c,Result:thread-2

2、使用誤CompletionService實(shí)現(xiàn)非阻塞式Future

  • 當(dāng)向Executor提交批處理任務(wù)時(shí),并且希望在它們完成后獲得結(jié)果,如果用FutureTask,你可以循環(huán)獲取task,并用future.get()去獲取結(jié)果,但是如果這個(gè)task沒(méi)有完成,你就得阻塞在這里,這個(gè)實(shí)效性不高,其實(shí)在很多場(chǎng)合,其實(shí)你拿第一個(gè)任務(wù)結(jié)果時(shí),此時(shí)結(jié)果并沒(méi)有生成并阻塞,其實(shí)在阻塞在第一個(gè)任務(wù)時(shí),第二個(gè)task的任務(wù)已經(jīng)早就完成了,顯然這種情況用future task不合適的,效率也不高。
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;


public class testCallable {
    public static void main(String[] args) {
        try {
            completionServiceCount();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

  
    /**
     * 使用completionService收集callable結(jié)果
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void completionServiceCount() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
                executorService);
        int threadNum = 5;
        for (int i = 0; i < threadNum; i++) {
            completionService.submit(getTask(i));
        }
        int sum = 0;
        int temp = 0;
        for(int i=0;i<threadNum;i++){
            temp = completionService.take().get();
            sum += temp;
            System.out.print(temp + "\t");
        }
        System.out.println("CompletionService all is : " + sum);
        executorService.shutdown();
    }

    public static Callable<Integer> getTask(final int no) {
        final Random rand = new Random();
        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int time = rand.nextInt(100)*100;
                System.out.println("thead:"+no+" time is:"+time);
                Thread.sleep(time);
                return no;
            }
        };
        return task;
    }
}

結(jié)果:最先執(zhí)行完成的線程先輸出結(jié)果

thead:0 time is:4200
thead:1 time is:6900
thead:2 time is:2900
thead:3 time is:9000
thead:4 time is:7100
2    0    1    4    3    CompletionService all is : 10
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容