JAVA同步類容器線程是否安全以及介紹:
1,ArrayList,LinkedList線程不安全
Vector 對應(yīng) CopyOnWriteArrayList 線程安全
2,hashmap線程不安全
hashtable 對應(yīng) ConcurrentHashMap 線程安全
3,Queue下的阻塞隊列和非阻塞隊列:
ConcurrentLinkedQueue高性能,高并發(fā),線程安全的非阻塞隊列。
下面是阻塞隊列:

多線程設(shè)計模式
1,future模式

代碼demo
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
*/
public interface Data {
String getRequest();
}
=================================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
*/
public class FutureClient {
public Data request(String queryString) {
//返回futureData包裝類,此時暫無數(shù)據(jù)
final FutureData futureData = new FutureData();
new Thread(new Runnable() {
@Override
public void run() {
RealData realData = new RealData(queryString);
futureData.setRealData(realData);
}
}).start();
return futureData;
}
}
===========================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
*/
public class FutureData implements Data {
private RealData realData;
private boolean isReady = false;
public synchronized void setRealData(RealData realData) {
//如果已經(jīng)裝載完畢直接返回
if (isReady) {
return;
}
this.realData=realData;
isReady=true;
notify();
}
@Override
public synchronized String getRequest() {
//如果沒有裝載好,一直阻塞
while (!isReady){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果裝載好了返回真實數(shù)據(jù),這里調(diào)用的是真實數(shù)據(jù)realData的getRequest()方法。
return this.realData.getRequest();
}
}
=================================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
* 真實數(shù)據(jù)類
*/
public class RealData implements Data {
private String result;
public RealData(String queryString){
System.out.println("根據(jù)"+queryString+"查詢操作,一系列的操作省略。。。" );
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("操作完成");
result="查詢結(jié)果";
}
@Override
public String getRequest() {
return result;
}
}
================================================
package com.wjb.demo.futuredemo;
/**
* Created by wjb on 2018/1/10.
* 多線程Future模式 示例
*/
public class Main {
public static void main(String[] args) {
FutureClient client = new FutureClient();
//這里返回的暫時是空數(shù)據(jù)
Data data = client.request("查詢請求");
System.out.println("請求發(fā)送成功");
System.out.println("數(shù)據(jù)正在處理中,此時可以做其它操作");
String result = data.getRequest();
System.out.println(result);
}
}
2,Master-Worker模式:

示例demo:
package com.wjb.demo.master_worker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by wjb on 2018/1/10.
*/
public class Master {
//需要一個裝任務(wù)的容器,這里使用非阻塞隊列
private ConcurrentLinkedQueue<Task> queue = new ConcurrentLinkedQueue();
//需要一個裝worker的容器
private HashMap<String,Thread> map = new HashMap<String,Thread>();
//需要一個容器放所有worker處理完的數(shù)據(jù)
private ConcurrentHashMap<String,Task> resultMap = new ConcurrentHashMap<String,Task>();
public Master(Worker worker,int count){
worker.setMap(resultMap);
worker.setQueue(queue);
for(int i =0;i<count;i++){
this.map.put(Integer.toString(i),new Thread(worker));
}
}
public void submit(Task task){
this.queue.add(task);
}
public void execute(){
for (Map.Entry<String,Thread> m:map.entrySet()){
m.getValue().start();
}
}
public boolean isComplete(){
for (Map.Entry<String,Thread> m:map.entrySet()){
if (m.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
}
=================================================
package com.wjb.demo.master_worker;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by wjb on 2018/1/10.
*/
public class Worker implements Runnable{
private ConcurrentLinkedQueue<Task> queue;
private ConcurrentHashMap<String,Task> map;
public void setQueue(ConcurrentLinkedQueue queue){
this.queue=queue;
}
public void setMap(ConcurrentHashMap<String,Task> map){
this.map=map;
}
@Override
public void run() {
boolean flag = true;
while (flag){
Task task = this.queue.poll();
if(task == null){
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.map.put(Integer.toString(task.getId()),task);
}
}
}
================================================
package com.wjb.demo.master_worker;
/**
* Created by wjb on 2018/1/10.
*/
public class Task {
private int id;
private int price;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
=================================================
package com.wjb.demo.master_worker;
import java.util.Random;
/**
* Created by wjb on 2018/1/10.
* 多線程master-workersa模式
*/
public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(), 10);
Random random = new Random();
for(int i = 0;i<100;i++){
Task task = new Task();
task.setId(i);
task.setPrice(random.nextInt());
master.submit(task);
}
master.execute();
long start = System.currentTimeMillis();
while (true){
if (master.isComplete()){
long end = System.currentTimeMillis();
System.out.println("耗時:"+(end-start));
break;
}
}
}
}
3,生產(chǎn)者消費者模式:
示例demo
package com.wjb.demo.provider_consumer;
import java.util.concurrent.*;
/**
* Created by wjb on 2018/1/11.
*/
public class Main {
public static void main(String[] args) {
//無界的阻塞隊列,個數(shù)無限制
BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>();
//緩存線程池,可以創(chuàng)建無限大數(shù)量的線程,沒有任務(wù)時不創(chuàng)建線程,空閑線程存活時間默認60S
ExecutorService threadPool = Executors.newCachedThreadPool();
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
threadPool.execute(p1);
threadPool.execute(p2);
threadPool.execute(p3);
threadPool.execute(c1);
threadPool.execute(c2);
threadPool.execute(c3);
try {
Thread.sleep(3000);
p1.stop();
p2.stop();
p3.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool.shutdown();
}
}
==============================================
package com.wjb.demo.provider_consumer;
/**
* Created by wjb on 2018/1/11.
*/
public class Data {
private int id;
private String name;
public Data(int id,String name){
this.id=id;
this.name=name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
===========================================
package com.wjb.demo.provider_consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by wjb on 2018/1/11.
*/
public class Provider implements Runnable{
private BlockingQueue queue;
private volatile boolean isRunning = true;
private static AtomicInteger count = new AtomicInteger();
public Provider(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
while (isRunning){
try {
Thread.sleep(1000);
int id = count.incrementAndGet();
Data data = new Data(id, Integer.toString(id));
System.out.println("當(dāng)前線程:"+Thread.currentThread().getName()+"生產(chǎn)數(shù)據(jù)ID是:"+id);
if (!this.queue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("數(shù)據(jù)放入隊列失敗");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning=false;
}
}
===============================================
package com.wjb.demo.provider_consumer;
import java.util.concurrent.BlockingQueue;
/**
* Created by wjb on 2018/1/11.
*/
public class Consumer implements Runnable{
private BlockingQueue<Data> queue;
private volatile boolean isRunning = true;
public Consumer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
while (isRunning){
try {
Data data = this.queue.take();
Thread.sleep(1000);
System.out.println("消費的數(shù)據(jù)ID是:"+data.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Executor框架



使用有界隊列示例demo
package com.wjb.demo.threadpoolexecutor;
/**
* Created by wjb on 2018/1/11.
*/
public class Task implements Runnable {
private int id;
private String name;
public Task(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+"執(zhí)行任務(wù)ID"+this.id);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
=============================================
package com.wjb.demo.threadpoolexecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by wjb on 2018/1/11.
* 自定義線程池,有界隊列的使用策略
*/
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
/**
* 首先任務(wù)1進來被一個線程執(zhí)行,任務(wù)2,3,4進來時會暫存到隊列里去,任務(wù)5進來時,
* 隊列已無法暫存,如果當(dāng)前線程數(shù)小于最大線程數(shù),則創(chuàng)建新線程執(zhí)行此任務(wù)。
* 所以任務(wù)5進來時會新建一個線程執(zhí)行。
如果還有任務(wù)6進來的話,此時會執(zhí)行拒絕策略,JDK默認是AbortPolicy
*/
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3));
Task task1 = new Task(1, "任務(wù)1");
Task task2 = new Task(2, "任務(wù)2");
Task task3 = new Task(3, "任務(wù)3");
Task task4 = new Task(4, "任務(wù)4");
Task task5 = new Task(5, "任務(wù)5");
pool.execute(task1);
pool.execute(task2);
pool.execute(task3);
pool.execute(task4);
pool.execute(task5);
pool.shutdown();
}
}
============================================
package com.wjb.demo.threadpoolexecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Created by wjb on 2018/1/11.
* 自定義拒絕策略,這里可以自行處理,真實場景中,數(shù)據(jù)是不能丟失的,所以放在緩存中或是其它方式。
*/
public class MyRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString());
}
}
=============================================
執(zhí)行結(jié)果:
Task{id=6, name='任務(wù)6'}
pool-1-thread-1執(zhí)行任務(wù)ID1
pool-1-thread-2執(zhí)行任務(wù)ID5
pool-1-thread-1執(zhí)行任務(wù)ID3
pool-1-thread-2執(zhí)行任務(wù)ID2
pool-1-thread-1執(zhí)行任務(wù)ID4
使用無界隊列示例DEMO
package com.wjb.demo.threadpoolexecutor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by wjb on 2018/1/11.
* 線程池 使用無界隊列的策略
*/
public class ThreadPoolExecutor2 implements Runnable{
private int id;
private String name;
private static AtomicInteger count = new AtomicInteger();
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void run() {
try {
int id = count.incrementAndGet();
System.out.println(Thread.currentThread().getName()+"執(zhí)行的任務(wù)ID:"+id);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
/**
* 來一個任務(wù)新建一個線程,當(dāng)線程到corePoolSize時不再創(chuàng)建線程,所以這里的最大線程數(shù)一般和核心線程一樣。所有新來的任務(wù)都暫存到隊列中
* 無界隊列不會拒絕新來的任務(wù)直到內(nèi)存耗凈。
*/
ExecutorService executor = new ThreadPoolExecutor(5,5,120, TimeUnit.SECONDS,queue);
for (int i = 1;i<= 20;i++){
executor.execute(new ThreadPoolExecutor2());
}
try {
Thread.sleep(1000);
System.out.println("隊列大小"+queue.size());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}