一、高性能生產(chǎn)者-消費者:無鎖實現(xiàn)
BlockigQueue使用鎖和阻塞等待來實現(xiàn)線程間的同步,而ConcureentLinkedQueue使用大量的無鎖CAS操作,相比BlockigQueue的性能要好的多。但是使用CAS進行編程是非常困難的,不過現(xiàn)成的Disruptor框架幫我們實現(xiàn)了這一功能。
1.1 無鎖的緩沖框架:Disruptor
Disruptor框架是由LMAX公司開發(fā)的一款高效的無鎖內(nèi)存隊列,它使用無鎖的方式實現(xiàn)了一個環(huán)形隊列,非常適合生產(chǎn)者-消費者模式。在Disruptor中,使用了環(huán)形隊列來代替普通的線性隊列,這個環(huán)形隊列內(nèi)部實現(xiàn)是一個普通的數(shù)組。對于一般的隊列,勢必要提供隊列頭部head和尾部tail兩個指針用于出隊和入隊,這樣無疑就增加了線程協(xié)作的復(fù)雜度。但如果隊列的環(huán)形的,則只需要提供一個當(dāng)前隊列的位置cursor,利用這個cursor既可以出隊也可以入隊。由于是環(huán)形隊列的緣故,隊列的總大小必須事先指定,不能動態(tài)擴展。為了能夠快速從一個序列sequence對應(yīng)數(shù)組的實際位置(每次有元素入隊,序列就加1),Disruptor要求我們必須將數(shù)組的大小設(shè)置為2的整數(shù)次方。這樣通過sequence&(queueSize-1)就能立即定位到實際的元素位置index。這個要比取余(%)操作快得多。
如圖所示,顯示了RingBuffer的結(jié)構(gòu),生產(chǎn)者向緩沖區(qū)中寫入數(shù)據(jù),而消費者從中讀取數(shù)據(jù),生產(chǎn)者寫入數(shù)據(jù)使用CAS操作,消費者讀取數(shù)據(jù)時,為了防止多個消費者處理同一個數(shù)據(jù),也使用CAS操作進行保護。這種固定大小的環(huán)形隊列的另一個好處就是可以做到完全內(nèi)存復(fù)用。在系統(tǒng)運行過程中,不會有新的空間需要分配或者老的空間需要回收。因此,可以大大減少系統(tǒng)分配空間以及回收空間的額外開銷。
1.2 生產(chǎn)者-消費者案例
這里使用的Disruptor版本是disruptor-3.3.2。這里生產(chǎn)者不斷產(chǎn)生證書,消費者讀取生產(chǎn)者的數(shù)據(jù),并計算其平方。
代表數(shù)據(jù)的PCData:
public class PCData {
private long value;
public void set(long value) {
this.value = value;
}
public long get() {
return value;
}
}
消費者實現(xiàn)為WorkHandler接口,它來自Disruptor框架:
public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData event) throws Exception {
System.out.println(Thread.currentThread().getId() + ":Event: --" +
event.get() * event.get() + "--");
}
}
消費者的作用是讀取數(shù)據(jù)進行處理。這里,數(shù)據(jù)的讀取已經(jīng)由Disruptor進行封裝,onEvent()方法為框架的回調(diào)方法。因此,這個只需要簡單地進行數(shù)據(jù)處理即可。
PCData的工廠類。它會在Disruptor系統(tǒng)初始化時,構(gòu)造所有的緩沖區(qū)中的對象實例:
public class PCDataFactory implements EventFactory<PCData>{
@Override
public PCData newInstance() {
return new PCData();
}
}
生產(chǎn)者:
public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer byteBuffer){
long sequence = ringBuffer.next();
try {
PCData event = ringBuffer.get(sequence);
event.set(byteBuffer.getLong(0));
} finally {
ringBuffer.publish(sequence);
}
}
}
生產(chǎn)者需要一個RingBuffer的引用,也就是環(huán)形緩沖區(qū)。它有一個重要的方法pushData()將產(chǎn)生的數(shù)據(jù)推入緩沖區(qū)。方法pushData()接收一個ByteBuffer對象。在ByteBuffer中可以用來包裝任何數(shù)據(jù)類型。pushData()的功能就是將傳入的ByteBuffer中的數(shù)據(jù)提取出來,并裝載到環(huán)形緩沖區(qū)中。
上述第12行代碼,通過next()方法得到下一個可用的序列號。通過序列號,取得下一個空閑可用的PCData,并且將PCData的數(shù)據(jù)設(shè)為期望值,這個值最終會傳遞給消費者。最后,在第21行,進行數(shù)據(jù)發(fā)布。只有發(fā)布后的數(shù)據(jù)才會真正被消費者看見。
至此,我們的生產(chǎn)者、消費者和數(shù)據(jù)都已經(jīng)準(zhǔn)備就緒。只差一個統(tǒng)籌規(guī)劃的主函數(shù)將所有內(nèi)容整合起來:
public static void main(String[] args) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
EventFactory<PCData> factory = new EventFactory<PCData>() {
@Override
public PCData newInstance() {
return new PCData();
}
};
//設(shè)置緩沖區(qū)大小,一定要是2的整數(shù)次冪
int bufferSize = 1024;
WaitStrategy startegy = new BlockingWaitStrategy();
//創(chuàng)建disruptor,它封裝了整個Disruptor的使用,提供了一些便捷的API.
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory, bufferSize, executor, ProducerType.MULTI, startegy);
//設(shè)置消費者,系統(tǒng)會將每一個消費者實例映射到一個系統(tǒng)中,也就是提供4個消費者線程.
disruptor.handleEventsWithWorkerPool(new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());
//啟動并初始化disruptor系統(tǒng).
disruptor.start();
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
//創(chuàng)建生產(chǎn)者
Producer productor = new Producer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
//生產(chǎn)者不斷向緩沖區(qū)中存入數(shù)據(jù).
for (long l=0;true;l++){
byteBuffer.putLong(0,l);
productor.pushData(byteBuffer);
Thread.sleep(new Random().nextInt(500));
System.out.println("add data "+l);
}
}
Disruptor的性能要比BlocakingQueue至少高一個數(shù)量級以上。
1.3 提高消費者的響應(yīng)時間:選擇合適的策略
Disruptor為我們提供了幾個策略,這些策略由WaitStrategy接口進行封裝。
- BlockingWaitStrategy:默認(rèn)策略。和BlockingQueue是非常類似的,他們都使用了Lock(鎖)和Condition(條件)進行數(shù)據(jù)監(jiān)控和線程喚醒。因為涉及到線程的切換,BlockingWaitStrategy策略是最省CPU的,但在高并發(fā)下性能表現(xiàn)是最差的一種等待策略。
- SleepingWaitStrategy:這個策略也是對CPU非常保守的。它會在循環(huán)中不斷等待數(shù)據(jù)。它會先進行自旋等待,如果不成功,則使用Thread.yield()讓出CPU,并最終使用LockSupport.parkNanos(1)進行線程休眠,以確保不占用太多的CPU數(shù)據(jù)。因此,這個策略對于數(shù)據(jù)處理可能產(chǎn)生比較高的平均延時。適用于對延時要求不是特別高的場合,好處是他對生產(chǎn)者線程的影響最小。典型的場景是異步日志。
- YieldWaitStrategy:用于低延時場合。消費者線程會不斷循環(huán)監(jiān)控緩沖區(qū)變化,在循環(huán)內(nèi)部,它會使用Thread.yield()讓出CPU給別的線程執(zhí)行時間。如果需要高性能系統(tǒng),并且對延遲有較高要求,則可以考慮這種策略。這種策略相當(dāng)于消費者線程變成了一個內(nèi)部執(zhí)行Thread.yield()的死循環(huán),因此最好有多于消費者線程的邏輯CPU(“雙核四線程”中的四線程),否則整個應(yīng)用會受到影響。
- BusySpinWaitStrategy:瘋狂等待策略。它就是一個死循環(huán),消費者線程會盡最大努力監(jiān)控緩沖區(qū)的變化。它會吃掉CPU所有資源。所以只在非??量痰膱龊鲜褂盟?。因為這個策略等同于開一個死循環(huán)監(jiān)控。因此,物理CPU數(shù)量必須大于消費者線程數(shù)。因為如果是邏輯核,那么另外一個邏輯核必然會受到這種超密集計算的影響而不能正常工作。
1.4 CPU Cache的優(yōu)化:解決偽共享問題
我們知道,為了提高CPU的速度,CPU有一個高速緩存Cache。在高速緩存中,讀寫數(shù)據(jù)的最小單位是緩存行(Cache Line),它是主內(nèi)存(memory)復(fù)制到 緩存(Cache)的最小單位,一般為32~128byte(字節(jié))。
假如兩個變量存放在同一個緩存行中,在多線程訪問中,可能互相影響彼此的性能。如圖,運行在CPU1上的線程更新了X,那么CPU2傷的緩存行就會失效,同一行的Y即使沒有修改也會變成無效,導(dǎo)致Cache無法命中。接著,如果在CPU2上的線程更新了Y,則導(dǎo)致CPU1上的緩存行又失效(此時,同一行的X)。這無疑是一個潛在的性能殺手,如果CPU經(jīng)常不能命中緩存,那么系統(tǒng)的吞吐量會急劇下降。
為了使這種情況不發(fā)生,一種可行的做法就是在X變量前后空間都占據(jù)一定的位置(暫叫padding,用來填充Cache Line)。這樣,當(dāng)內(nèi)存被讀入緩存中時,這個緩存行中,只有X一個變量實際是有效的,因此就不會發(fā)生多個線程同時修改緩存行中不同變量而導(dǎo)致變量全體失效的情況。
具體實現(xiàn)如下:
public class FalseSharing implements Runnable {
public final static int NUM_THREADS = 4;
public final static long ITERATIONS = 500L * 1000L * 1000L;
private final int arrayIndex;
private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
static {
for(int i=0; i<longs.length; i++) {
longs[i] = new VolatileLong();
}
}
public FalseSharing(final int arrayIndex) {
this.arrayIndex = arrayIndex;
}
public static void main(String[] args) throws Exception {
final long start = System.currentTimeMillis();
runTest();
System.out.println("duration = " + (System.currentTimeMillis() - start));
}
private static void runTest() throws InterruptedException {
Thread[] threads = new Thread[NUM_THREADS];
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(new FalseSharing(i));
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
t.join();
}
}
@Override
public void run() {
long i = ITERATIONS + 1;
while(0 != --i) {
longs[arrayIndex].value = i;
}
}
public final static class VolatileLong {
public volatile long value = 0L;
public long p1, p2, p3, p4, p5, p6, p7;
}
}
在VolatileLong中,準(zhǔn)備了7個long型變量用來填充緩存。實際上,只有VolatileLong.value是會被使用的。而那些p1、p2等僅僅用于將數(shù)組第一個VolatileLong.value和第二個VolatileLong.value分開,防止它們進入同一個緩存行。
Disruptor框架充分考慮了這個問題,它的核心組件Sequence會被非常頻繁的訪問(每次入隊,它都會被加1),其基本結(jié)構(gòu)如下:
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding {
//省略具體實現(xiàn)
}
雖然在Sequence中,主要使用的只有value。但是,通過LhsPadding和RhsPadding,在這個value的前后安置了一些占位空間,使得value可以無沖突的存在于緩存中。此外,對于Disruptor的環(huán)形緩沖區(qū)RingBuffer,它內(nèi)部的數(shù)組是通過以下語句構(gòu)造的:
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
實際產(chǎn)生的數(shù)組大小是緩沖區(qū)實際大小再加上兩倍的BUFFER_PAD。這就相當(dāng)于在這個數(shù)組的頭部和尾部兩段各增加了BUFFER_PAD個填充,使得整個數(shù)組被載入Cache時不會受到其他變量的影響而失效。
二、Furture模式
Furture模式的核心思想是異步調(diào)用。當(dāng)我們需要調(diào)用一個函數(shù)方法時,可能不急著要結(jié)果,讓它在后臺慢慢處理這個請求,此時調(diào)用者可以先處理其它任務(wù),在真正需要數(shù)據(jù)的時候再去嘗試獲取需要的數(shù)據(jù)。
2.1 Furture模式的主要角色
Furture模式的主要參與者如下:
| 參與者 | 作用 |
|---|---|
| Main | 系統(tǒng)啟動,調(diào)用Client發(fā)出請求 |
| Client | 返回Data對象,立即返回FurtureData,并開啟ClientThread線程裝配RealData |
| Data | 返回數(shù)據(jù)的接口 |
| FurtureData | Future數(shù)據(jù),構(gòu)造很快但是是一個虛擬的數(shù)據(jù),需要裝配RealData |
| RealData | 真實數(shù)據(jù),其構(gòu)造比較慢 |
它的核心結(jié)構(gòu)如圖所示:
2.2 Future模式的簡單實現(xiàn)
在這個實現(xiàn)中,有一個核心接口Data,這就是客戶端希望獲取的數(shù)據(jù)。在Future模式中,這個Data接口有兩個重要的實現(xiàn),分別是RealData,也就是真是數(shù)據(jù),這就是我們最終需要獲得的,有價值的信息。另外一個就是FutureData,它就是用來提取RealData的一個“訂單”。因此FutureData是可以立即返回得到。
下面是Data接口:
public interface Data {
public String getResult();
}
FutureData實現(xiàn)了一個快速返回的RealData包裝。它只是一個包裝,或者說是一個RealData的虛擬實現(xiàn)。因此,它可以很快被構(gòu)造并返回。當(dāng)使用FutrueData的getResult()方法的時候,程序阻塞,直到RealData準(zhǔn)備好并注入到FutureData中,才最終返回數(shù)據(jù)。FutureData是Future模式的關(guān)鍵,它實際上是真實數(shù)據(jù)RealData的代理,封裝了獲取RealData的等待過程。
public class FutureData implements Data {
RealData realData = null; //FutureData是RealData的封裝
boolean isReady = false; //是否已經(jīng)準(zhǔn)備好
public synchronized void setRealData(RealData realData) {
if(isReady)
return;
this.realData = realData;
isReady = true;
notifyAll(); //RealData已經(jīng)被注入到FutureData中了,通知getResult()方法
}
@Override
public synchronized String getResult() throws InterruptedException {
if(!isReady) {
wait(); //一直等到RealData注入到FutureData中
}
return realData.getResult();
}
}
RealData是最終需要使用的數(shù)據(jù)模型。它的構(gòu)造很慢。在這里,使用sleep()函數(shù)模擬這個過程,簡單地模擬一個字符串的構(gòu)造。
public class RealData implements Data {
protected String data;
public RealData(String data) {
//利用sleep方法來表示RealData構(gòu)造過程是非常緩慢的
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
}
@Override
public String getResult() {
return data;
}
}
Client主要實現(xiàn)了獲取FutureData,并開啟構(gòu)造RealData的線程。并在接受請求后,很快的返回FutureData。注意,它不會等待將數(shù)據(jù)真的構(gòu)造完畢再返回,而是立即返回FutureData,即使這個時候FutureData并沒有真實數(shù)據(jù)。
public class Client {
public Data request(final String string) {
final FutureData futureData = new FutureData();
new Thread(new Runnable() {
@Override
public void run() {
//RealData的構(gòu)建很慢,所以放在單獨的線程中運行
RealData realData = new RealData(string);
futureData.setRealData(realData);
}
}).start();
return futureData; //先直接返回FutureData
}
}
最后,就是主函數(shù)Main,它主要負責(zé)調(diào)用Client發(fā)起請求,并消費返回的數(shù)據(jù)。
public static void main(String[] args) {
Client client = new Client();
Data data = client.request("name");
System.out.println("請求完畢");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("數(shù)據(jù) = " + data.getResult());
}
2.3 JDK中的Future模式
下面是JDK內(nèi)置Future模式的基本結(jié)構(gòu)。
[圖片上傳失敗...(image-283e98-1536761892098)]
其中Future接口就類似于訂單或者說是契約。通過它,可以得到真實的數(shù)據(jù)。RunnableFuture繼承了Future和Runnable兩個接口,其中run()方法用于構(gòu)造真實的數(shù)據(jù)。它有一個具體的實現(xiàn)FutureTask類。FutureTask有一個內(nèi)部的Sync,一些實質(zhì)性工作,會委托Sync類實現(xiàn)。而Sync類最終會調(diào)用Callable接口,完成實際數(shù)據(jù)的組裝工作。
Callable()接口只有一個方法call(),它會返回需要構(gòu)造的實際數(shù)據(jù)。這個Callable接口也是這個Future框架和應(yīng)用程序之間的重要接口。如果我們要實現(xiàn)自己的業(yè)務(wù)系統(tǒng),通常需要實現(xiàn)自己的Callable對象。此外,F(xiàn)utureTask類也與應(yīng)用程序密切相關(guān),通常,我們會使用Callable實例構(gòu)造一個FutureTask實例,并將它提交給線程池。下面將展示內(nèi)置的Future模式的使用:
public class RealData implements Callable<String> {
private String para;
public RealData(String para) {
this.para = para;
}
@Override
public String call() throws Exception {
StringBuffer sb = new StringBuffer();
for(int i=0; i<10; i++) {
sb.append(para);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
return sb.toString();
}
}
上述代碼實現(xiàn)了Callable接口,它的call()方法會構(gòu)造我們需要的真實數(shù)據(jù)并返回。當(dāng)然這個過程可以是緩慢的,這里使用Thread.sleep()模擬它:
public class FutureMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<String> future = new FutureTask<>(new RealData("a"));
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(future);
System.out.println("請求完畢");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("數(shù)據(jù) = " + future.get());
}
}
除了基本的功能外,JDK還為Future接口提供了一些簡單的控制功能:
boolean cancel(boolean mayInterruptIfRunning); //取消任務(wù)
boolean isCancelled(); //是否已經(jīng)取消
boolean isDone(); //是否已經(jīng)完成
V get() throws InterruptedException,ExecutionException //取得返回對象
V get(long timeout, TimeUnit unit); //取得返回對象,可以設(shè)置超時時間
三、并行流水線
雖然并發(fā)算法可以充分發(fā)揮多核CPU的性能,但并非所有的計算都可以改造成并發(fā)形式。執(zhí)行過程中有數(shù)據(jù)相關(guān)性的運算都是無法完美并行化的。假如現(xiàn)在有兩個數(shù),B和C。如果要計算(B+C)*B/2,那么這個運算過程就是無法并行的。原因是,如果B+C沒有執(zhí)行完成,則永遠算不出(B+C)*B,這就是數(shù)據(jù)相關(guān)性。
遇到這種情況,可以將日常生產(chǎn)中的流水線思想應(yīng)用到程序開發(fā)中。雖然(B+C)*B/2無法并行,但是如果需要計算一大堆B和C,可以將它流水化。首先將計算過程拆分為三個步驟:
P1:A=B+C
P2:D=AxB
P3:D=D/2
上述步驟中P1、P2和P3均在單獨的線程中計算,并且每個線程只負責(zé)自己的工作。此時,P3的計算結(jié)果就是最終需要的答案。P1接收B和C的值,并求和,將結(jié)果輸入P2。P2求乘積后輸入給P3。P3將D除以2得到最終值。一旦這條流水線建立,只需要一個計算步驟就可以得到(B+C)*B/2的結(jié)果。為了實現(xiàn)這個功能,需要定義一個在線程間攜帶結(jié)果進行信息交換的載體:
public class Msg {
public double i;
public double j;
public String orgStr = null;
}
P1計算的是加法:
public class Plus implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while(true) {
try {
Msg msg = bq.take();
msg.j = msg.i + msg.j;
Multiply.bq.add(msg);
} catch (InterruptedException e) {
}
}
}
}
上述代碼中,P1取得封裝了兩個操作數(shù)的Msg,并進行求和,將結(jié)果傳遞給乘法線程P2。當(dāng)沒有數(shù)據(jù)需要處理時,P1進行等待。
P2計算乘法:
public class Multiply implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while(true) {
try {
Msg msg = bq.take();
msg.i = msg.i * msg.j;
Div.bq.add(msg);
} catch (InterruptedException e) {
}
}
}
}
P2計算相乘結(jié)果后,將中間結(jié)果傳遞給除法線程P3。
P3計算除法:
public class Div implements Runnable {
public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>();
@Override
public void run() {
while(true) {
try {
Msg msg = bq.take();
msg.i = msg.i / 2;
System.out.println(msg.orgStr + "=" + msg.i);
} catch (InterruptedException e) {
}
}
}
}
最后是提交任務(wù)的主線程,這里,提交100萬個請求,讓線程組進行計算:
public class PStreamMain {
public static void main(String[] args) {
new Thread(new Plus()).start();
new Thread(new Multiply()).start();
new Thread(new Div()).start();
long s1 = System.currentTimeMillis();
for(int i=1; i<=1000; i++) {
for(int j=1; j<=1000; j++) {
Msg msg = new Msg();
msg.i = i;
msg.j = j;
msg.orgStr = "((" + i + "+" + j + ")*" + i + ")/2";
Plus.bq.add(msg);
}
}
}
}
上述代碼中,將數(shù)據(jù)提交給P1加法線程,開啟流水線的計算。在多核或者分布式場景中,這種設(shè)計思路可以有效地將有依賴關(guān)系的操作分配在不同的線程中進行計算,盡可能利用多核優(yōu)勢。
四、并行搜索
給定一個數(shù)組,我們要查找滿足條件的元素。對于串行程序來說,只要遍歷一下數(shù)組就可以得到結(jié)果。但如果要使用并行方式,則需要額外增加一些線程間的通信機制,使各個線程可以有效地運行。一種簡單的策略就是將原始數(shù)據(jù)集合按照期望的線程數(shù)進行分割。每個線程各自獨立搜索,當(dāng)其中一個線程找到數(shù)據(jù)后,立即返回結(jié)果即可。
現(xiàn)在假設(shè)有一個整數(shù)數(shù)組,我們需要查找數(shù)組內(nèi)的元素:
static int[] arr;
定義線程池、線程數(shù)量以及存放結(jié)果的變量result。在result中,我們會保存符合條件的元素在arr數(shù)組中的下標(biāo)。默認(rèn)為-1,表示沒有找到給定元素。
public static final int THREADNUM = 2;
static ExecutorService pool = Executors.newCachedThreadPool();
static AtomicInteger result = new AtomicInteger(-1);
并發(fā)搜索會要求每個線程查找arr中的一段,因此,搜索函數(shù)必須指定線程需要搜索的起始和結(jié)束位置
public static int search(int searchValue,int beginPos,int endPos) {
int i = 0;
for(i=beginPos; i<endPos; i++) {
if(result.get() >= 0) {
return result.get();
}
if(arr[i] == searchValue) {
if(!result.compareAndSet(-1, i)) {
return result.get();
}
return i;
}
}
return -1;
}
上述代碼中,首先通過result判斷是否已經(jīng)有其他線程找到了需要的結(jié)果。如果已經(jīng)找到,則立即返回不再進行查找。如果沒有找到,則進行下一步搜索。第7行代碼成立則表示當(dāng)前線程找到了需要的數(shù)據(jù),那么就會將結(jié)果保存到result變量中。這里使用CAS操作,如果設(shè)置失敗,則表示其他線程已經(jīng)先我一步找到了結(jié)果。因此,可以無視失敗的情況,找到結(jié)果后,進行返回。
定義一個線程進行查找,它會調(diào)用前面的search()方法:
public static class SearchTask implements Callable<Integer> {
int begin,end,searchValue;
public SearchTask(int searchValue,int begin,int end) {
this.begin = begin;
this.end = end;
this.searchValue = searchValue;
}
@Override
public Integer call() throws Exception {
int re = search(searchValue,begin,end);
return re;
}
}
最后是pSearch()并行查找函數(shù),它會根據(jù)線程數(shù)量對arr數(shù)組進行劃分,并建立對應(yīng)的任務(wù)提交給線程池處理:
public static int pSearch(int searchValue) throws InterruptedException,ExecutionException {
int subArrSize = arr.length/THREADNUM+1;
List<Future<Integer>> re = new ArrayList<Future<Integer>>();
for(int i=0; i<arr.length; i+=subArrSize) {
int end = i + subArrSize;
if(end>=arr.length) end = arr.length;
re.add(pool.submit(new SearchTask(searchValue, i, end)));
}
for(Future<Integer> fu : re) {
if(fu.get() >= 0) return fu.get();
}
return -1;
}
上述代碼中使用了JDK內(nèi)置的Future模式,其中4~8行將原始數(shù)組arr劃分為若干段,并根據(jù)劃分結(jié)果建立子任務(wù)。每一個子任務(wù)都會返回一個Future對象,通過Future對象可以獲得線程組得到的最終結(jié)果。在這里,由于線程之間通過result共享彼此的信息,因此只要當(dāng)一個線程成功返回后,其他線程都會立即返回。因此,不會出現(xiàn)由于排在前面的任務(wù)長時間無法結(jié)束而導(dǎo)致整個搜索結(jié)果無法立即獲取的情況。
五、并行排序
對于大部分排序操作都是串行執(zhí)行的,但是當(dāng)數(shù)據(jù)量很大時,就需要使用并行排序,但是并行排序的難度很大。下面介紹幾種相對簡單的并行排序算法。
5.1 分離數(shù)據(jù)相關(guān)性:奇偶交換排序
對于奇偶交換排序來說,它將排序過程分為兩個階段,奇交換和偶交換。對于奇交換來說,它總是比較奇數(shù)索引以及其相鄰的后續(xù)元素。而偶交換總是比較偶數(shù)索引和其相鄰的后續(xù)元素。并且,奇交換和偶交換會成對出現(xiàn),這樣才能保證比較和交換涉及到數(shù)組中的每一個元素。
下面是奇偶交換排序的串行實現(xiàn):
public static void oddEvenSort(int[] arr) {
int exchFlag = 1, start = 0;
while(exchFlag == 1 || start == 1) {
exchFlag = 0;
for(int i=start; i<arr.length-1; i+=2) {
if(arr[i] > arr[i+1]) {
int temp = arr[i];
arr[i] = arr[i+1];
arr[i+1] = temp;
exchFlag = 1;
}
}
if(start == 0)
start = 1;
else
start = 0;
}
}
其中,exchFlag用來記錄當(dāng)前迭代是否發(fā)生了數(shù)據(jù)交換,而start變量用來表示是奇交換還是偶交換。初始時,start為0,表示進行偶交換,每次迭代結(jié)束后,切換start的狀態(tài)。如果上一次比較交換發(fā)生了數(shù)據(jù)交換,或者當(dāng)前正在進行的是奇交換,循環(huán)就不會停止,直到程序不再發(fā)生交換,并且當(dāng)前進行的是偶交換為止(表示奇偶交換已經(jīng)成對出現(xiàn))。
并行模式代碼如下:
static int arr[];
static int exchFlag = 1;
static final int NUM_ARR = 10000;
static {
arr = new int[NUM_ARR];
for(int i=0; i<NUM_ARR; i++) {
arr[i] = new Random().nextInt(10000);
}
}
static synchronized void setExchFlag(int v) {
exchFlag = v;
}
static synchronized int getExchFlag() {
return exchFlag;
}
public static class OddEvenSortTask implements Runnable {
int i;
CountDownLatch latch;
public OddEvenSortTask(int i, CountDownLatch latch) {
this.i = i;
this.latch = latch;
}
@Override
public void run() {
if(arr[i] > arr[i+1]) {
int temp = arr[i];
arr[i] = arr[i+1];
arr[i+1] = temp;
setExchFlag(1);
}
latch.countDown();
}
}
public static void pOddEventSort() throws InterruptedException {
int start = 0;
ExecutorService pool = Executors.newCachedThreadPool();
while(getExchFlag() == 1 || start == 1) {
setExchFlag(0);
//偶數(shù)的數(shù)組長度,當(dāng)start為1時,只有l(wèi)en/2-1個線程
CountDownLatch latch = new CountDownLatch(arr.length/2 - (arr.length%2==0?start:0));
for(int i=start; i<arr.length-1; i+=2) {
pool.submit(new OddEvenSortTask(i, latch));
}
latch.await();
if(start == 0)
start = 1;
else
start = 0;
}
}
上述代碼定義了奇偶排序的任務(wù)類。該任務(wù)的主要工作是進行數(shù)據(jù)比較和必要交換。并行排序的 主體是pOddEventSort()方法,它使用CountDownLatch記錄線程數(shù)量,對于每一次迭代,使用單獨的線程對每一次元素比較和交換進行操作。在下一次迭代前,必須等待上一次迭代所有線程的完成。
5.2 改進的插入排序:希爾排序
插入排序的基本思想是:一個未排序的數(shù)組(或鏈表)可以分為兩個部分,前半部分是已經(jīng)排序的,后半部分是未排序的。在進行排序時,只需要在未排序的部分選擇一個元素,將其插入到前面有序的數(shù)組中即可。最終,未排序的部分會越來越少,直到為0,那么排序就完成了。
插入排序的實現(xiàn)如下:
public static void insertSort(int[] arr) {
int length = arr.length;
int j, i, key;
for(int i=1; i<length; i++) {
//key為要準(zhǔn)備插入的元素
key = arr[i];
j = i - 1;
while(j>=0 && arr[j]>key) {
arr[j+1] = arr[j];
j--;
}
//找到合適的位置插入key
arr[j+1] = key;
}
}
上述代碼第6行,提取要準(zhǔn)備插入的元素(也就是未排序序列中的第一個元素)。接著,在已排序隊列中找到這個元素的插入位置(第8~10行),并進行插入(第13行)即可。
簡單的插入排序是很難并行化的。因為這一次的 數(shù)據(jù)插入依賴于上一次得到的有序序列,因此多個步驟之間無法并行。為此,可以對插入排序進行擴展,這就是希爾排序。
希爾排序?qū)⒄麄€數(shù)組根據(jù)間隔h分隔為若干個子數(shù)組。子數(shù)組互相穿插在一起,每一次排序時,分別對每一個子數(shù)組進行排序。在每一次排序完成后,可以遞減h的值,進行下輪更加精細的排序。直到h為1,此時等價于一次插入排序。
希爾排序的一個主要優(yōu)點是,即使一個較小的元素在數(shù)組的末尾,由于每次元素移動都以h為間隔進行,因此數(shù)組末尾的小元素可以在很少的交換次數(shù)下,就被換到最接近元素最終位置的地方。
希爾排序的串行實現(xiàn):
public static void shellSort(int[] arr) {
//計算出最大的h
int h = 1;
while(h<=arr.length/3) {
h = h*3+1;
}
while(h>0) {
for(int i=h; i<arr.length; i++) {
if(arr[i]<arr[i-h]) {
int tmp = arr[i];
int j = i - h;
while(j>=0 && arr[j]>tmp) {
arr[j+h] = arr[j];
j-=h;
}
arr[j+h] = tmp;
}
}
h = (h-1)+3;
}
}
上述代碼4~6行,計算一個合適的h值,接著正式進行希爾排序。第8行的for循環(huán)進行間隔為h的插入排序,每次排序結(jié)束后,遞減h的值。直到h為1,退化為插入排序。
希爾排序每次都針對不同的子數(shù)組進行排序,各個子數(shù)組之間是完全獨立的。因此,改寫成并行程序:
public class ParallelShellSort {
static int arr[];
static final int ARRNUM = 1000;
static {
arr = new int[ARRNUM];
for (int i = 0; i < ARRNUM; i++) {
arr[i] = new Random().nextInt(1000);
}
}
public static class ShellSortTask implements Runnable {
int i = 0;
int h = 0;
CountDownLatch l;
public ShellSortTask(int i,int h,CountDownLatch latch) {
this.i = i;
this.h = h;
this.l = latch;
}
@Override
public void run() {
if(arr[i] < arr[i-h]) {
int tmp = arr[i];
int j = i - h;
while(j>=0 && arr[j] > tmp) {
arr[j+h] = arr[j];
j -= h;
}
arr[j+h] = tmp;
}
l.countDown();
}
}
public static void pShellSort() throws InterruptedException {
int h = 1;
CountDownLatch latch = null;
ExecutorService pool = Executors.newCachedThreadPool();
while(h<=arr.length/3) {
h = h*3 + 1;
}
while(h>0) {
System.out.println("h=" + h);
if(h>=4)
latch = new CountDownLatch(arr.length-h);
for(int i=h; i<arr.length; i++) {
if(h>=4) {
pool.execute(new ShellSortTask(i, h, latch));
} else {
if(arr[i] < arr[i-h]) {
int tmp = arr[i];
int j = i -h;
while(j>=0 && arr[j]>tmp) {
arr[j+h] = arr[j];
j -= h;
}
arr[j+h] = tmp;
}
}
}
latch.await();
h = (h-1)/3;
}
}
public static void main(String[] args) throws InterruptedException {
pShellSort();
for(int i=0; i<ARRNUM; i++) {
System.out.println(arr[i]);
}
}
}
上述代碼中定義ShellSortTask作為并行任務(wù)。一個ShellSortTask的作用是根據(jù)給定的起始位置和h,對子數(shù)組進行排序,因此可以完全并行化。為控制線程數(shù)量,這里定義并行主函數(shù)pShellSort()在h大于或等于4時使用并行線程,否則則退化為傳統(tǒng)的插入排序。每次計算后,遞減h的值。