多線程編程之保護(hù)性暫掛模式

???????保護(hù)性暫掛模式,也稱為Guarded Suspension模式,指的是當(dāng)前線程在執(zhí)行某個(gè)任務(wù)之前,需要檢查某一條件,只有在該條件成立的情況下,當(dāng)前線程才可以繼續(xù)往下執(zhí)行當(dāng)前任務(wù)。顧名思義,保護(hù)性暫掛模式是一種廣義的概念,其主要載體有兩個(gè):預(yù)備條件和任務(wù),在任何需要使用預(yù)先檢查的情況中都可以使用保護(hù)性暫掛模式。

1. 角色描述

保護(hù)性暫掛類圖

???????如下是類圖中各個(gè)角色定位的描述:

  • GuardedObject:受保護(hù)的對(duì)象,供給客戶端調(diào)用,用于生成并執(zhí)行受保護(hù)的行為的和檢查并控制狀態(tài)改變的,如下是其各個(gè)方法的作用:
    • guardedMethod():生成并且執(zhí)行受保護(hù)的行為;
    • stateOperation():用于檢查當(dāng)前狀態(tài)是否滿足特定的狀態(tài),從而控制受保護(hù)行為的狀態(tài);
  • Blocker:該類主要提供了一些模板方法,主要是用于調(diào)用受保護(hù)行為,或者喚醒當(dāng)前由于先驗(yàn)條件不通過而導(dǎo)致等待的線程的,如下是其各個(gè)方法的作用:
    • callWithGuard(GuardedAction):該方法首先會(huì)檢查GuardedAction提供的先驗(yàn)條件是否滿足,如果不滿足,則會(huì)阻塞當(dāng)前線程,否則會(huì)執(zhí)行GuardedAction中的任務(wù);
    • signalAfter(Callable):該方法會(huì)先檢查先驗(yàn)條件是否滿足,如果滿足先驗(yàn)條件則會(huì)喚醒一個(gè)正在等待的線程;
    • broadcastAfter(Callable):該方法會(huì)先檢查先驗(yàn)條件是否滿足,如果先驗(yàn)條件滿足則會(huì)喚醒所有正在等待的線程;
    • signal:直接喚醒一個(gè)正在等待先驗(yàn)條件滿足的線程;
    • broadcast():直接喚醒所有正在等待先驗(yàn)條件滿足的線程;
  • GuardedAction:受保護(hù)方法的載體,并且提供了進(jìn)行先驗(yàn)檢查的條件,其各方法和屬性作用如下:
    • predicate:提供了進(jìn)行先驗(yàn)檢查的條件;
    • call():提供了需要執(zhí)行的任務(wù);
  • Predicate:承載了進(jìn)行先驗(yàn)條件檢查的條件。

2. 實(shí)例演示

???????比如我們會(huì)遇到這種場(chǎng)景,在進(jìn)行某些操作時(shí),比如通過elasticsearch服務(wù)器進(jìn)行查詢或更新操作,我們需要連接es服務(wù)器,而在es服務(wù)器連接上之前,所有的查詢和更新操作都是需要被阻塞的。即使在服務(wù)器連接上之后,我們也需要經(jīng)常對(duì)服務(wù)器進(jìn)行心跳測(cè)試,以檢查與服務(wù)器的連接是否還存活在,如果不存活,則還是需要繼續(xù)阻塞其余的操作,并且嘗試重新連接es服務(wù)器,這種情況我們就可以使用到保護(hù)性暫掛模式。保護(hù)性條件即是與es服務(wù)器的連接還存活在,如果不存活則需要掛起所有嘗試連接服務(wù)器執(zhí)行任務(wù)的線程,并且當(dāng)前線程會(huì)嘗試連接服務(wù)器。如下是示例代碼:

public class ElasticSearchAgent {
  private volatile boolean connectedToServer = false;

  private final Predicate agentConnected = () -> connectedToServer;

  private final Blocker blocker = new ConditionVarBlocker();

  private final Timer heartbeatTimer = new Timer(true);

  public void update(final UpdateCondition condition) throws Exception {
    GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) {
      @Override
      public Void call() {
        doUpdate(condition);
        return null;
      }
    };

    blocker.callWithGuard(guardedAction);
  }

  private void doUpdate(UpdateCondition condition) {
    try {
      TimeUnit.MICROSECONDS.sleep(20); // 模擬進(jìn)行更新
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void init() {
    Thread connectingThread = new Thread(new ConnectingTask());
    connectingThread.start();
    heartbeatTimer.schedule(new HeartBeatTask(), 60000, 2000);
  }

  public void disconnect() {
    connectedToServer = false;
  }

  protected void onConnected() {
    try {
      blocker.signalAfter(() -> {
        connectedToServer = true;
        return Boolean.TRUE;
      });
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  protected void onDisconnected() {
    connectedToServer = false;
  }

  private class ConnectingTask implements Runnable {
    @Override
    public void run() {
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {}

      onConnected();
    }
  }

  private class HeartBeatTask extends TimerTask {

    @Override
    public void run() {
      if (!testConnection()) {
        onDisconnected();
        reconnect();
      }
    }

    private boolean testConnection() {
      return true;
    }

    private void reconnect() {
      ConnectingTask connectingTask = new ConnectingTask();
      connectingTask.run();
    }
  }
}

???????可以看到,在進(jìn)行update()操作時(shí),首先會(huì)創(chuàng)建一個(gè)GuardedAction對(duì)象,真正的更新操作是在該對(duì)象中進(jìn)行的,這里的保護(hù)性條件是通過一個(gè)volatile類型的變量connectedToServer來控制的,如果當(dāng)前與es服務(wù)器的連接還存活在,則該變量置為true。HeartBeatTask是一個(gè)定時(shí)任務(wù),在60s延遲之后每隔2s會(huì)向服務(wù)器發(fā)送心跳測(cè)試,以檢查連接是否存活,如果不存活,則會(huì)將connectedToServer變量置為false,并且會(huì)嘗試連接服務(wù)器。在init()方法中首先會(huì)創(chuàng)建一個(gè)連接服務(wù)器的任務(wù),以保證服務(wù)器連接在初始時(shí)的可用狀態(tài),并且其還會(huì)啟動(dòng)心跳測(cè)試的定時(shí)任務(wù)。如下是Blocker和ConditionVarBlocker的實(shí)現(xiàn)代碼:

public interface Blocker {
  <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception;

  void signalAfter(Callable<Boolean> stateOperation) throws Exception;

  void signal() throws InterruptedException;

  void broadcastAfter(Callable<Boolean> stateOperation) throws Exception;
}
public class ConditionVarBlocker implements Blocker {
  private final Lock lock;
  private final Condition condition;

  public ConditionVarBlocker(Lock lock) {
    this.lock = lock;
    this.condition = lock.newCondition();
  }

  public ConditionVarBlocker() {
    this.lock = new ReentrantLock();
    this.condition = lock.newCondition();
  }

  @Override
  public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception {
    lock.lockInterruptibly();
    try {
      final Predicate guard = guardedAction.guard;
      while (!guard.evaluate()) {
        condition.await();
      }

      return guardedAction.call();
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void signalAfter(Callable<Boolean> stateOperation) throws Exception {
    lock.lockInterruptibly();
    try {
      if (stateOperation.call()) {
        condition.signal();
      }
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void signal() throws InterruptedException {
    lock.lockInterruptibly();
    try {
      condition.signal();
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void broadcastAfter(Callable<Boolean> stateOperation) throws Exception {
    lock.lockInterruptibly();
    try {
      if (stateOperation.call()) {
        condition.signalAll();
      }
    } finally {
      lock.unlock();
    }
  }
}

????????可以看到,ConditionVarBlocker中基本上都是模板代碼,其聲明了一個(gè)Lock對(duì)象和一個(gè)Condition對(duì)象,Lock對(duì)象用于對(duì)當(dāng)前的先驗(yàn)條件檢查過程進(jìn)行同步處理,Condition對(duì)象則用于在先驗(yàn)條件不滿足的情況下阻塞當(dāng)前線程的。

???????在callWithGuard()方法中,首先會(huì)在一個(gè)循環(huán)中檢查當(dāng)前的先驗(yàn)條件是否滿足,如果不滿足,則使當(dāng)前線程進(jìn)入等待狀態(tài),如果滿足,則當(dāng)前線程繼續(xù)執(zhí)行其任務(wù)。這里需要注意的是,我們使用了while()循環(huán)用于判斷先驗(yàn)條件是否滿足,因?yàn)橛锌赡墚?dāng)前線程被意外的喚醒,或者說被喚醒之后先驗(yàn)條件還是不滿足,因而這里使用循環(huán)判斷,以使當(dāng)前線程在先驗(yàn)條件不滿足的情況下繼續(xù)等待。

???????在signalAfter()方法中,其首先調(diào)用stateOperation.call()方法,判斷當(dāng)前的先驗(yàn)條件是否滿足,只有在先驗(yàn)條件滿足的情況下才會(huì)喚醒一個(gè)等待的線程。這里stateOperation是ElasticSearchAgent傳入的用于判斷當(dāng)前是否處于連接狀態(tài)的一個(gè)條件載體。

???????如下是GuardedAction的實(shí)現(xiàn)代碼:

public abstract class GuardedAction<V> implements Callable<V> {
  protected final Predicate guard;

  public GuardedAction(Predicate guard) {
    this.guard = guard;
  }
}

???????這里GuardedAction是一個(gè)抽象類,其主要封裝了一個(gè)Predicate屬性。GuardedAction的主要實(shí)現(xiàn)在ElasticSearchAgent.guardedMethod()方法中生成的,因?yàn)榫唧w需要執(zhí)行的任務(wù)需要調(diào)用方生成,這里只是提供了一個(gè)模板方法。如下是Predicate的代碼:

@FunctionalInterface
public interface Predicate {
  boolean evaluate();
}

???????這里Predicate也只是一個(gè)聲明而已,其具體的實(shí)現(xiàn)也是在ElasticSearchAgent中,本例中主要是判斷connectedToServer是否為true,即處于連接服務(wù)器的狀態(tài)。

3. Guarded Suspension實(shí)現(xiàn)考量

  • 可以看到Guarded Suspension的實(shí)現(xiàn)中,Blocker、GuardedAction和Predicate只是提供的一個(gè)模板,其內(nèi)主要是一些通用的代碼,而和具體業(yè)務(wù)相關(guān)的代碼主要在ElasticSearchAgent中,其會(huì)創(chuàng)建所需執(zhí)行的GuardedAction對(duì)象,并且控制其所需的先驗(yàn)條件。Guarded Suspension將關(guān)注點(diǎn)進(jìn)行了分離,我們?cè)谑褂迷撃J降臅r(shí)候主要需要實(shí)現(xiàn)的也即是類似于ElasticSearchAgent的一個(gè)客戶端類;
  • 在執(zhí)行使用Guarded Suspension模式的時(shí)候,需要注意的是,每次執(zhí)行GuardedObject.guardedMethod()方法時(shí)都會(huì)創(chuàng)建一個(gè)GuardedAction對(duì)象,這可能會(huì)對(duì)JVM垃圾回收造成一定的負(fù)擔(dān),因而在使用該模式時(shí)如果內(nèi)存較小需要特別注意該問題;
  • 在Guarded Suspension模式中,ConditionVarBlocker的callWithGuard()和signal*()方法的執(zhí)行都進(jìn)行了加鎖處理,這是因?yàn)镃onditionVarBlocker是所有線程所共有的一個(gè)對(duì)象,其lock和condition變量是需要所有線程都一致可見的,因而這里需要對(duì)其進(jìn)行加鎖處理;
  • 在ConditionVarBlocker.callWithGuard()方法中,對(duì)先驗(yàn)條件的檢查是使用一個(gè)while循環(huán)進(jìn)行的,這是為了防止等待的線程被意外的喚醒,而先驗(yàn)條件此時(shí)還不滿足,使用while循環(huán)就可以保證當(dāng)前線程再次進(jìn)入到等待狀態(tài);
  • 上述ConditionVarBlocker還提供了一個(gè)如下的構(gòu)造方法:
public ConditionVarBlocker(Lock lock) {
    this.lock = lock;
    this.condition = lock.newCondition();
}

該方法用于防止ElasticSearchAgent由于某種原因而需要加鎖時(shí)可能會(huì)造成嵌套監(jiān)視器鎖死的問題的。所謂的嵌套監(jiān)視器鎖死的問題指的是,如果某個(gè)線程執(zhí)行依次獲取了兩個(gè)鎖,而由于先驗(yàn)條件不滿足,從而導(dǎo)致當(dāng)前線程釋放了內(nèi)層鎖從而進(jìn)入等待狀態(tài),而另外的線程為了檢查當(dāng)前的先驗(yàn)條件需要獲取到外層鎖,這就導(dǎo)致了鎖循環(huán)等待的問題,在等待先驗(yàn)條件滿足的線程持有外層鎖,其無法釋放,而嘗試改變先驗(yàn)條件的線程正在嘗試獲取外層鎖,但其一直無法獲取到,從而造成了死鎖。這種情況下就提供了該構(gòu)造方法,如果ElasticSearchAgent需要對(duì)其方法進(jìn)行加鎖,那么其需要通過該構(gòu)造方法將鎖傳遞給ConditionVarBlocker,這樣當(dāng)前線程在釋放鎖的時(shí)候就會(huì)將外層鎖和內(nèi)層鎖同時(shí)釋放了(因?yàn)槎际峭粋€(gè)鎖)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容