并發(fā)編程學(xué)習(xí)

JAVA同步類容器線程是否安全以及介紹:

1,ArrayList,LinkedList線程不安全
Vector 對應(yīng) CopyOnWriteArrayList 線程安全
2,hashmap線程不安全
hashtable 對應(yīng) ConcurrentHashMap 線程安全
3,Queue下的阻塞隊列和非阻塞隊列:

ConcurrentLinkedQueue高性能,高并發(fā),線程安全的非阻塞隊列。

下面是阻塞隊列:


多線程設(shè)計模式

1,future模式
代碼demo
package com.wjb.demo.futuredemo;

/**
 * Created by wjb on 2018/1/10.
 */
public interface Data {
    String getRequest();
}
=================================================
package com.wjb.demo.futuredemo;

/**
 * Created by wjb on 2018/1/10.
 */
public class FutureClient {
    public Data request(String queryString) {
        //返回futureData包裝類,此時暫無數(shù)據(jù)
        final FutureData futureData = new FutureData();
        new Thread(new Runnable() {
            @Override
            public void run() {
                RealData realData = new RealData(queryString);
                futureData.setRealData(realData);
            }
        }).start();
        return futureData;
    }
}
===========================================
package com.wjb.demo.futuredemo;

/**
 * Created by wjb on 2018/1/10.
 */
public class FutureData implements Data {
    private RealData realData;
    private boolean isReady = false;

    public synchronized void setRealData(RealData realData) {
        //如果已經(jīng)裝載完畢直接返回
        if (isReady) {
            return;
        }
        this.realData=realData;
        isReady=true;
        notify();
    }


    @Override
    public synchronized String getRequest() {
        //如果沒有裝載好,一直阻塞
        while (!isReady){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //如果裝載好了返回真實數(shù)據(jù),這里調(diào)用的是真實數(shù)據(jù)realData的getRequest()方法。
        return this.realData.getRequest();
    }
}
=================================================
package com.wjb.demo.futuredemo;

/**
 * Created by wjb on 2018/1/10.
 * 真實數(shù)據(jù)類
 */
public class RealData implements Data {
    private String result;
    public RealData(String queryString){
        System.out.println("根據(jù)"+queryString+"查詢操作,一系列的操作省略。。。" );
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("操作完成");
        result="查詢結(jié)果";
    }

    @Override
    public String getRequest() {
        return result;
    }
}
================================================
package com.wjb.demo.futuredemo;

/**
 * Created by wjb on 2018/1/10.
 * 多線程Future模式 示例
 */
public class Main {
    public static void main(String[] args) {
        FutureClient client = new FutureClient();
        //這里返回的暫時是空數(shù)據(jù)
        Data data = client.request("查詢請求");
        System.out.println("請求發(fā)送成功");
        System.out.println("數(shù)據(jù)正在處理中,此時可以做其它操作");
        String result = data.getRequest();
        System.out.println(result);
    }
}

2,Master-Worker模式:
示例demo:
package com.wjb.demo.master_worker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Created by wjb on 2018/1/10.
 */
public class Master {
    //需要一個裝任務(wù)的容器,這里使用非阻塞隊列
    private ConcurrentLinkedQueue<Task> queue = new ConcurrentLinkedQueue();
    //需要一個裝worker的容器
    private HashMap<String,Thread> map = new HashMap<String,Thread>();
    //需要一個容器放所有worker處理完的數(shù)據(jù)
    private ConcurrentHashMap<String,Task> resultMap = new ConcurrentHashMap<String,Task>();

    public Master(Worker worker,int count){
        worker.setMap(resultMap);
        worker.setQueue(queue);
        for(int i =0;i<count;i++){
            this.map.put(Integer.toString(i),new Thread(worker));
        }
    }
    public void submit(Task task){
        this.queue.add(task);
    }

    public void execute(){
        for (Map.Entry<String,Thread> m:map.entrySet()){
            m.getValue().start();
        }
    }

    public boolean isComplete(){
        for (Map.Entry<String,Thread> m:map.entrySet()){
            if (m.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

}
=================================================
package com.wjb.demo.master_worker;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Created by wjb on 2018/1/10.
 */
public class Worker implements Runnable{
    private ConcurrentLinkedQueue<Task> queue;
    private ConcurrentHashMap<String,Task> map;

    public void setQueue(ConcurrentLinkedQueue queue){
        this.queue=queue;
    }
    public void setMap(ConcurrentHashMap<String,Task> map){
        this.map=map;
    }

    @Override
    public void run() {
        boolean flag = true;
        while (flag){
            Task task = this.queue.poll();
            if(task == null){
                break;
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.map.put(Integer.toString(task.getId()),task);
        }
    }
}
================================================
package com.wjb.demo.master_worker;

/**
 * Created by wjb on 2018/1/10.
 */
public class Task {
    private int id;
    private int price;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }
}
=================================================
package com.wjb.demo.master_worker;

import java.util.Random;

/**
 * Created by wjb on 2018/1/10.
 * 多線程master-workersa模式
 */
public class Main {
    public static void main(String[] args) {
        Master master = new Master(new Worker(), 10);

        Random random = new Random();
        for(int i = 0;i<100;i++){
            Task task = new Task();
            task.setId(i);
            task.setPrice(random.nextInt());
            master.submit(task);
        }
        master.execute();
        long start = System.currentTimeMillis();
        while (true){
            if (master.isComplete()){
                long end = System.currentTimeMillis();
                System.out.println("耗時:"+(end-start));
                break;
            }
        }
    }
}

3,生產(chǎn)者消費者模式:

示例demo

package com.wjb.demo.provider_consumer;

import java.util.concurrent.*;

/**
 * Created by wjb on 2018/1/11.
 */
public class Main {
    public static void main(String[] args) {
        //無界的阻塞隊列,個數(shù)無限制
        BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>();

        //緩存線程池,可以創(chuàng)建無限大數(shù)量的線程,沒有任務(wù)時不創(chuàng)建線程,空閑線程存活時間默認60S
        ExecutorService threadPool = Executors.newCachedThreadPool();

        Provider p1 = new Provider(queue);
        Provider p2 = new Provider(queue);
        Provider p3 = new Provider(queue);

        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);

        threadPool.execute(p1);
        threadPool.execute(p2);
        threadPool.execute(p3);
        threadPool.execute(c1);
        threadPool.execute(c2);
        threadPool.execute(c3);
        try {
            Thread.sleep(3000);
            p1.stop();
            p2.stop();
            p3.stop();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadPool.shutdown();
    }
}
==============================================
package com.wjb.demo.provider_consumer;

/**
 * Created by wjb on 2018/1/11.
 */
public class Data {
    private int id;
    private String name;
    public Data(int id,String name){
        this.id=id;
        this.name=name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
===========================================
package com.wjb.demo.provider_consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by wjb on 2018/1/11.
 */
public class Provider implements Runnable{
    private BlockingQueue queue;
    private volatile boolean isRunning = true;
    private static AtomicInteger count =  new AtomicInteger();

    public Provider(BlockingQueue queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        while (isRunning){
            try {
                Thread.sleep(1000);
                int id = count.incrementAndGet();
                Data data = new Data(id, Integer.toString(id));
                System.out.println("當(dāng)前線程:"+Thread.currentThread().getName()+"生產(chǎn)數(shù)據(jù)ID是:"+id);
                if (!this.queue.offer(data,2, TimeUnit.SECONDS)){
                    System.out.println("數(shù)據(jù)放入隊列失敗");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop(){
        this.isRunning=false;
    }
}
===============================================
package com.wjb.demo.provider_consumer;

import java.util.concurrent.BlockingQueue;

/**
 * Created by wjb on 2018/1/11.
 */
public class Consumer implements Runnable{
    private BlockingQueue<Data> queue;
    private volatile boolean isRunning = true;
    public Consumer(BlockingQueue queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        while (isRunning){
            try {
                Data data = this.queue.take();
                Thread.sleep(1000);
                System.out.println("消費的數(shù)據(jù)ID是:"+data.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Executor框架


使用有界隊列示例demo

package com.wjb.demo.threadpoolexecutor;

/**
 * Created by wjb on 2018/1/11.
 */
public class Task implements Runnable {
    private int id;
    private String name;

    public Task(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName()+"執(zhí)行任務(wù)ID"+this.id);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
=============================================
package com.wjb.demo.threadpoolexecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by wjb on 2018/1/11.
 * 自定義線程池,有界隊列的使用策略
 */
public class ThreadPoolExecutorDemo {
    public static void main(String[] args) {

        /**
         * 首先任務(wù)1進來被一個線程執(zhí)行,任務(wù)2,3,4進來時會暫存到隊列里去,任務(wù)5進來時,
         * 隊列已無法暫存,如果當(dāng)前線程數(shù)小于最大線程數(shù),則創(chuàng)建新線程執(zhí)行此任務(wù)。
         * 所以任務(wù)5進來時會新建一個線程執(zhí)行。
如果還有任務(wù)6進來的話,此時會執(zhí)行拒絕策略,JDK默認是AbortPolicy
         */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3));

        Task task1 = new Task(1, "任務(wù)1");
        Task task2 = new Task(2, "任務(wù)2");
        Task task3 = new Task(3, "任務(wù)3");
        Task task4 = new Task(4, "任務(wù)4");
        Task task5 = new Task(5, "任務(wù)5");

        pool.execute(task1);
        pool.execute(task2);
        pool.execute(task3);
        pool.execute(task4);
        pool.execute(task5);

        pool.shutdown();
    }
}
============================================
package com.wjb.demo.threadpoolexecutor;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Created by wjb on 2018/1/11.
 * 自定義拒絕策略,這里可以自行處理,真實場景中,數(shù)據(jù)是不能丟失的,所以放在緩存中或是其它方式。
 */
public class MyRejected implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString());
    }
}
=============================================
執(zhí)行結(jié)果:
Task{id=6, name='任務(wù)6'}
pool-1-thread-1執(zhí)行任務(wù)ID1
pool-1-thread-2執(zhí)行任務(wù)ID5
pool-1-thread-1執(zhí)行任務(wù)ID3
pool-1-thread-2執(zhí)行任務(wù)ID2
pool-1-thread-1執(zhí)行任務(wù)ID4

使用無界隊列示例DEMO

package com.wjb.demo.threadpoolexecutor;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by wjb on 2018/1/11.
 * 線程池 使用無界隊列的策略
 */
public class ThreadPoolExecutor2 implements Runnable{
    private int id;
    private String name;
    private static AtomicInteger count = new AtomicInteger();
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        try {
            int id = count.incrementAndGet();
            System.out.println(Thread.currentThread().getName()+"執(zhí)行的任務(wù)ID:"+id);
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

        /**
         * 來一個任務(wù)新建一個線程,當(dāng)線程到corePoolSize時不再創(chuàng)建線程,所以這里的最大線程數(shù)一般和核心線程一樣。所有新來的任務(wù)都暫存到隊列中
         * 無界隊列不會拒絕新來的任務(wù)直到內(nèi)存耗凈。
         */
        ExecutorService executor = new ThreadPoolExecutor(5,5,120, TimeUnit.SECONDS,queue);

        for (int i = 1;i<= 20;i++){
            executor.execute(new ThreadPoolExecutor2());
        }
        try {
            Thread.sleep(1000);
            System.out.println("隊列大小"+queue.size());
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容