Activiti架構(gòu)分析及源碼詳解

Activiti架構(gòu)分析及源碼詳解

引言

工作流引擎,應(yīng)用于解決流程審批和流程編排方面等問(wèn)題,有效的提供了擴(kuò)展性的支撐。而目前來(lái)說(shuō),工作流領(lǐng)域也有了相對(duì)通行化的標(biāo)準(zhǔn)規(guī)范,也就是BPMN2.0。支持這個(gè)規(guī)范的開(kāi)源引擎主要有:Activiti,flowable,Jbpm4等。本文著重對(duì)Activiti的架構(gòu)設(shè)計(jì)進(jìn)行分析和梳理,同時(shí)對(duì)流程啟動(dòng)和原子操作的相關(guān)代碼進(jìn)行完整走讀。

本文的閱讀對(duì)象需要對(duì)Activiti有一定的理解并且已經(jīng)能夠初步的使用Activiti進(jìn)行流程流轉(zhuǎn)方面開(kāi)發(fā)。

如果對(duì)文章內(nèi)容有疑惑,歡迎加入技術(shù)交流群186233599,作者會(huì)不定時(shí)解答相關(guān)問(wèn)題。

一、Activiti設(shè)計(jì)解析-架構(gòu)&領(lǐng)域模型

1.1 架構(gòu)

Activiti采用了一個(gè)分層架構(gòu)完成自底向上的包裝。架構(gòu)圖如下

image

大致包括:

  • 核心接口層,被PVM接口定義。PVM會(huì)在后面的章節(jié)中詳細(xì)講述。
  • 核心實(shí)現(xiàn)層,基于PVM的思想和接口,定義了一些關(guān)鍵實(shí)體包含ActivityImpl(該類抽象了節(jié)點(diǎn)實(shí)現(xiàn)),F(xiàn)lowElementBehavior實(shí)現(xiàn)(該類抽象了節(jié)點(diǎn)指令動(dòng)作),ExecutionImpl(流程執(zhí)行實(shí)體類)
  • 命令層,Activiti在編碼模式上直接限定整體風(fēng)格為命令模式。也就是將業(yè)務(wù)邏輯封裝為一個(gè)個(gè)的Command接口實(shí)現(xiàn)類。這樣新增一個(gè)業(yè)務(wù)功能時(shí)只需要新增一個(gè)Command實(shí)現(xiàn)即可。這里需要特別提到的是,命令本身需要運(yùn)行在命令上下文中,也就是CommandContext類對(duì)象。
  • 命令攔截層,采用責(zé)任鏈模式,通過(guò)責(zé)任鏈模式的攔截器層,為命令的執(zhí)行創(chuàng)造條件。諸如開(kāi)啟事務(wù),創(chuàng)建CommandContext上下文,記錄日志等
  • 業(yè)務(wù)接口層,面向業(yè)務(wù),提供了各種接口。這部分的接口就不再面向框架開(kāi)發(fā)者了,而是面向框架的使用者。
  • 部署層,嚴(yán)格來(lái)說(shuō),這個(gè)與上面說(shuō)到的并不是一個(gè)完整的分層體系。但是為了突出重要性,單獨(dú)拿出來(lái)說(shuō)。流程運(yùn)轉(zhuǎn)的前提是流程定義。而流程定義解析就是一切的開(kāi)始。從領(lǐng)域語(yǔ)言解析為Java的POJO對(duì)象依靠的就是部署層。后文還會(huì)細(xì)說(shuō)這個(gè)環(huán)節(jié)。
  • 流程引擎,所有接口的總?cè)肟凇I厦嫣岬降臉I(yè)務(wù)接口層,部署層都可以從流程引擎類中得到。因此這里的流程引擎接口其實(shí)類似門面模式,只作為提供入口。

1.1.1 命令模式

Activit整體上采用命令模式進(jìn)行代碼功能解耦。將流程引擎的大部分涉及到客戶端的需求讓外部以具體命令實(shí)現(xiàn)類的方式實(shí)現(xiàn)。

完成這個(gè)編碼模式,有幾個(gè)重點(diǎn)類需要關(guān)注

  • Command命令接口,所有的具體命令都需要實(shí)現(xiàn)該類,最終業(yè)務(wù)就是執(zhí)行該類的execute方法。
  • CommandContext命令上下文,為具體命令的執(zhí)行提供上下文支撐。該上下文的生成是依靠命令攔截器中的上下文攔截器org.activiti.engine.impl.interceptor.CommandContextInterceptor來(lái)生成的。該攔截器會(huì)判斷是復(fù)用當(dāng)前的上下文還是生成新的上下文。

引擎內(nèi)的大部分功能都是通過(guò)單獨(dú)的命令完成。

1.1.2 責(zé)任鏈模式

Activiti的命令模式還需要搭配其對(duì)應(yīng)的責(zé)任鏈來(lái)完成。具體來(lái)說(shuō),Activiti中存在一個(gè)命令攔截器鏈條,該命令攔截器鏈條由幾大塊的攔截器實(shí)現(xiàn)組成,如下

image

其中重要的默認(rèn)攔截器有2個(gè):

  • 事務(wù)攔截器,主要職責(zé)是使得后續(xù)命令運(yùn)行在事務(wù)環(huán)境下。
  • CommandContext攔截器,主要職責(zé)是在有必要的時(shí)候創(chuàng)建CommandContext對(duì)象,并在使用完成后關(guān)閉該上下文。
1.1.2.1 事務(wù)攔截器

事務(wù)攔截器是否提供取決于org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl的子類對(duì)方法createTransactionInterceptor的實(shí)現(xiàn)。獨(dú)立使用時(shí)的org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration該方法返回為空。也就是不提供事務(wù)攔截器。此時(shí),命令的運(yùn)行就無(wú)法通過(guò)事務(wù)攔截器來(lái)提供事務(wù)環(huán)境了。

1.1.2.2 命令上下文攔截器

實(shí)現(xiàn)類:org.activiti.engine.impl.interceptor.CommandContextInterceptor。

該攔截器的功能非常重要,可以說(shuō)是Activiti操作的核心之一。其作用是在后續(xù)攔截器執(zhí)行前檢查當(dāng)前上下文環(huán)境,如果不存在CommandContext對(duì)象,則創(chuàng)建一個(gè);在后續(xù)攔截器執(zhí)行后,將CommandContext對(duì)象close。CommandContext包含了本次操作中涉及到所有的數(shù)據(jù)對(duì)象。

1.1.3 流程定義解析

Activiti遵循BPMN2.0規(guī)范,因此框架中少不了對(duì)BPMN2.0規(guī)范的定義文件(XML形式)的解析類。Activiti采用的STAX的拉模型進(jìn)行XML解析。這里先不分析其具體的解析類的內(nèi)在聯(lián)系,而是概念性的闡述下Activiti對(duì)解析的概念分層。

首先通過(guò)類org.activiti.bpmn.converter.BpmnXMLConverter進(jìn)行XML解析,解析為org.activiti.bpmn.model

包下面的與各個(gè)XML元素定義對(duì)應(yīng)的POJO類。此時(shí)這些POJO類僅僅只是XML文件的一個(gè)Java表達(dá)。

在通過(guò)類org.activiti.engine.impl.bpmn.parser.BpmnParser聚合不同的解析類,將上面步驟解析出來(lái)的POJO類進(jìn)一步解析為可以在框架中利用的org.activiti.engine.impl.pvm.process包下面的類。典型的代表就是ActivityImpl類。

三者之間的關(guān)系簡(jiǎn)單用圖表達(dá)就是

image

1.2 領(lǐng)域模型

Activiti采取了領(lǐng)域中的充血模型作為自己的實(shí)現(xiàn)方式。大部分業(yè)務(wù)邏輯都直接關(guān)聯(lián)在了org.activiti.engine.impl.persistence.entity.ExecutionEntity中。由于Activiti采用了MyBatis而非Hibernate這樣的O/R Mapping產(chǎn)品作為持久層,因此Activiti在具體的持久化操作上也有自己的獨(dú)特方式。

1.2.1 數(shù)據(jù)集中提交

Activiti的持久化機(jī)制簡(jiǎn)單說(shuō)來(lái)就是數(shù)據(jù)集中提交。集中提交還產(chǎn)生了一個(gè)額外的作用:自動(dòng)提交。換句話說(shuō),在內(nèi)存中的實(shí)體,如果更新了屬性但是沒(méi)有顯示的執(zhí)行刷新動(dòng)作,在一個(gè)調(diào)用的生命周期結(jié)束后也會(huì)被持久化其最新的狀態(tài)到數(shù)據(jù)庫(kù)。下面來(lái)看下詳細(xì)解釋下這個(gè)集中提交機(jī)制。

在Activiti所有運(yùn)行期生成的對(duì)象都需要實(shí)現(xiàn)一個(gè)接口org.activiti.engine.impl.interceptor.Session,其定義如下

public interface Session
{
  void flush();
  void close();
}

而Session對(duì)象則是由接口org.activiti.engine.impl.interceptor.SessionFactory的方法進(jìn)行生成,其定義如下

public interface SessionFactory {
  Class<?> getSessionType();
  Session openSession();
}

流程引擎內(nèi)部持有各種SessionFactory實(shí)現(xiàn),用戶也可以自定義注冊(cè)自己的SessionFactory實(shí)現(xiàn),如果用戶希望自定義的對(duì)象也可以被集中提交機(jī)制處理的話。

在CommandContext中存在一個(gè)Map<Class,Session>存儲(chǔ),存儲(chǔ)著該CommandContext生命周期內(nèi)新建的所有Session對(duì)象。當(dāng)一個(gè)命令執(zhí)行完畢后,最終命令上下文CommandContext的close方法會(huì)被調(diào)用。當(dāng)執(zhí)行CommandContext.close()方法時(shí),其內(nèi)部會(huì)按照順序執(zhí)行flushSessions,closeSessions方法。從名字可以看到,第一個(gè)方法內(nèi)部就是執(zhí)行所有Session對(duì)象的flush方法,第二個(gè)方法內(nèi)部就是執(zhí)行所有的Session對(duì)象的close方法。

流程引擎內(nèi)部有一個(gè)Session實(shí)現(xiàn)是比較特別的。也就是org.activiti.engine.impl.db.DbSqlSession實(shí)現(xiàn)。如果有需要更新,刪除,插入等操作,該操作是需要通過(guò)DbSqlSession來(lái)實(shí)現(xiàn)的,而實(shí)際上該實(shí)現(xiàn)會(huì)將這些操作緩存在內(nèi)部。只有在執(zhí)行flush方法時(shí)才會(huì)真正的提交到數(shù)據(jù)庫(kù)去執(zhí)行。正是因?yàn)槿绱耍械臄?shù)據(jù)操作,實(shí)際上最終都是要等到CommandContext執(zhí)行close方法時(shí),才會(huì)真正提到到數(shù)據(jù)庫(kù)。

理論上,充血模型是在聚合根這個(gè)層面上完成的持久化。但是由于Activiti沒(méi)有采用O/R Mapping框架,于是自己完成了一個(gè)類似功能的模塊。

1.2.2 PersistentObject

要明白工作流中的數(shù)據(jù)插入機(jī)制,首先要看下實(shí)體類的接口org.activiti.engine.impl.db.PersistentObject,如下

public interface PersistentObject {
  String getId();
  void setId(String id);
  Object getPersistentState();
}

Activiti的數(shù)據(jù)庫(kù)表都是單字符串主鍵,每一個(gè)實(shí)體類都需要實(shí)現(xiàn)該接口。在對(duì)實(shí)體類進(jìn)行保存的時(shí)候,DbSqlSession會(huì)調(diào)用getId方法判斷是否存在ID,如果不存在,則使用ID生成器(該生成器根據(jù)策略有幾種不同實(shí)現(xiàn),這里不表)生成一個(gè)ID并且設(shè)置。

而方法getPersistentState是用來(lái)返回一個(gè)持久化狀態(tài)對(duì)象。則方法的使用場(chǎng)合在下一個(gè)章節(jié)說(shuō)明

1.2.3 DbSqlSession

該Session內(nèi)部存在三個(gè)重要屬性,如下

//該屬性存儲(chǔ)著所有使用insert方法放入的對(duì)象
protected Map<Class<? extends PersistentObject>, List<PersistentObject>> insertedObjects = new HashMap<Class<? extends PersistentObject>, List<PersistentObject>>();
//該Map結(jié)構(gòu)內(nèi)存儲(chǔ)所有通過(guò)該DbSqlSession查詢出來(lái)的結(jié)果,以及update方法放入的對(duì)象
protected Map<Class<?>, Map<String, CachedObject>> cachedObjects = new HashMap<Class<?>, Map<String,CachedObject>>();
//該屬性內(nèi)存儲(chǔ)著所有將要執(zhí)行的刪除操作
protected List<DeleteOperation> deleteOperations = new ArrayList<DeleteOperation>();

刪除和新增都比較容易理解,就是要此類操作緩存起來(lái),一次性提交到數(shù)據(jù)庫(kù),上文曾提到的數(shù)據(jù)集中提交就體現(xiàn)在這個(gè)地方。而cachedObjects就有些不同了。要解析這個(gè)Map結(jié)構(gòu),首先來(lái)看下類org.activiti.engine.impl.db.DbSqlSession.CachedObject的結(jié)構(gòu)屬性,如下

public static class CachedObject {
    protected PersistentObject persistentObject;
    protected Object persistentObjectState;
}
public CachedObject(PersistentObject persistentObject, boolean storeState) {
      this.persistentObject = persistentObject;
      if (storeState) {
        this.persistentObjectState = persistentObject.getPersistentState();
      }
    }

通過(guò)構(gòu)造方法可以明白,在新建該對(duì)象的時(shí)候,通過(guò)storeState參數(shù)決定是否保存當(dāng)時(shí)的持久化狀態(tài)。

該Map的數(shù)據(jù)來(lái)源有2處

  • 所有通過(guò)該DbSqlSession對(duì)象執(zhí)行查詢的結(jié)果對(duì)象都會(huì)生成一個(gè)對(duì)應(yīng)的CachedObject對(duì)象,并且storeState參數(shù)為true
  • 執(zhí)行該DbSqlSession對(duì)象的update類方法,會(huì)將參數(shù)用CachedObject對(duì)象包裝起來(lái),storeState參數(shù)為false。

當(dāng)DbSqlSession執(zhí)行flush方法時(shí),主要來(lái)說(shuō)是做了數(shù)據(jù)提交動(dòng)作

  1. 將insertObjects列表中的元素插入到數(shù)據(jù)庫(kù)
  2. 將deleteOperations列表中的元素遍歷執(zhí)行
  3. 執(zhí)行方法getUpdatedObjects獲得要更新的實(shí)體對(duì)象

方法getUpdatedObjects的邏輯就是遍歷所有的CachedObject,同時(shí)滿足以下條件者則放入要更新的實(shí)體集合中

  1. 實(shí)體的getPersistentState方法不為空
  2. 實(shí)體的getPersistentState方法返回對(duì)象與CachedObject存儲(chǔ)的persistentObjectState執(zhí)行equal判斷,結(jié)果為false

通過(guò)上面可以得知,如果一個(gè)實(shí)體類在DbSqlSession的生命周期被查詢出來(lái),并且其數(shù)據(jù)內(nèi)容有了修改,則DbSqlSession刷新時(shí)會(huì)自動(dòng)刷新到數(shù)據(jù)庫(kù)

二、Activiti設(shè)計(jì)解析-PVM執(zhí)行樹(shù)

2.1 核心理念

任何框架都是核心理念上發(fā)展細(xì)化而來(lái)。Activiti的核心理念就是流程虛擬機(jī)(Process Virtual Machine,以下簡(jiǎn)稱PVM)。PVM試圖提供一組API,通過(guò)API本身來(lái)描述工作流方面的各種可能性。沒(méi)有了具體實(shí)現(xiàn),也使得PVM本身可以較好的適應(yīng)各種不同的工作流領(lǐng)域語(yǔ)言,而Activiti本身也是在PVM上的一種實(shí)現(xiàn)。

2.1.1 PVM對(duì)流程定義期的描述

首先來(lái)看下流程定義本身。在工作流中,流程定義可以圖形化的表達(dá)為一組節(jié)點(diǎn)和連接構(gòu)成的集合。比如下圖

image

即使沒(méi)有任何知識(shí)也能大概明白這張圖表達(dá)的是一個(gè)流程以及執(zhí)行順序的意圖。流程定義的表達(dá)方式不限,可以使用圖形的方式表達(dá),可以使用領(lǐng)域語(yǔ)言,也可以傳統(tǒng)的XML(比如Activiti用的就是BPMN2.0 Schema下的XML)。特別的,當(dāng)前已經(jīng)有了標(biāo)準(zhǔn)化的BPMN2.0規(guī)范。

PVM將流程定義描述為流程元素的集合。再將流程元素細(xì)分為2個(gè)子類:流程節(jié)點(diǎn)和連線。

  • 流程節(jié)點(diǎn)是某一種動(dòng)作表達(dá)的抽象描述。節(jié)點(diǎn)本身是可以嵌套的,也就是節(jié)點(diǎn)可以擁有子節(jié)點(diǎn)。
  • 連線表達(dá)是不同節(jié)點(diǎn)之間的轉(zhuǎn)移關(guān)系。一個(gè)連線只能有一個(gè)源頭節(jié)點(diǎn)和一個(gè)目標(biāo)節(jié)點(diǎn)。而節(jié)點(diǎn)本身可以有任意多的進(jìn)入連線和外出連線。

從類圖的角度也能很好的看出這種關(guān)系,流程節(jié)點(diǎn)PvmActivity和連線PvmTransition都是流程元素PvmProcessElement。

image

從類圖可以看到PvmActivity繼承于PvmScope。這種繼承關(guān)系表明流程節(jié)點(diǎn)本身有其歸于的作用域(PvmScope),節(jié)點(diǎn)本身也可能是另外一些節(jié)點(diǎn)的作用域,這也符合節(jié)點(diǎn)可能擁有子節(jié)點(diǎn)的原則。關(guān)于作用域本身,后文還會(huì)再次詳細(xì)講解,這里先按下不表。

2.1.2 PVM對(duì)流程運(yùn)行期的描述

通過(guò)流程節(jié)點(diǎn)和連線,PVM完成了對(duì)流程定義的表達(dá)。流程定義是一個(gè)流程的靜態(tài)表達(dá),流程執(zhí)行則是依照流程定義啟動(dòng)的一個(gè)運(yùn)行期表達(dá),每一個(gè)流程執(zhí)行都具備自己唯一的生命周期。流程執(zhí)行需要具備以下要素:

  1. 流程節(jié)點(diǎn)的具體執(zhí)行動(dòng)作。
  2. 流程執(zhí)行當(dāng)前處于哪一個(gè)流程節(jié)點(diǎn)。
  3. 流程執(zhí)行是如何從一個(gè)節(jié)點(diǎn)運(yùn)行至下一個(gè)節(jié)點(diǎn)。
  4. 流程執(zhí)行如何執(zhí)行流程節(jié)點(diǎn)定義的執(zhí)行動(dòng)作

針對(duì)要素1,Activiti提供了接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior。該接口內(nèi)部?jī)H有一個(gè)execute方法。該接口的實(shí)現(xiàn)即為不同PvmActivity節(jié)點(diǎn)提供了具體動(dòng)作。ActivityBehavior有豐富的不同實(shí)現(xiàn),對(duì)應(yīng)了流程中豐富的不同功能的節(jié)點(diǎn)。每一個(gè)PvmActivity對(duì)象都會(huì)持有一個(gè)ActivityBehavior對(duì)象。

針對(duì)要素2,Activiti提供了接口org.activiti.engine.impl.pvm.PvmExecution。該接口有一個(gè)方法PvmActivity getActivity()。用以返回當(dāng)前流程執(zhí)行所處的流程節(jié)點(diǎn)。

針對(duì)要素3,Activiti提供了接口org.activiti.engine.impl.pvm.runtime.InterpretableExecution。接口方法很多,這里取和流程執(zhí)行運(yùn)轉(zhuǎn)最重要的2個(gè)方法展開(kāi),如下

public interface InterpretableExecution extends ActivityExecution, ExecutionListenerExecution, PvmProcessInstance {
  void take(PvmTransition transition);
  void take(PvmTransition transition, boolean fireActivityCompletedEvent);
 

執(zhí)行方法take,以連線對(duì)象作為入?yún)?,這會(huì)使得流程執(zhí)行該連線定義的路線。其實(shí)現(xiàn)邏輯應(yīng)該為讓流程執(zhí)行定位于連線源頭的活動(dòng)節(jié)點(diǎn),經(jīng)由連線對(duì)象,到達(dá)連線目的地的活動(dòng)節(jié)點(diǎn)。

針對(duì)要素4,實(shí)際上也是由接口org.activiti.engine.impl.pvm.runtime.AtomicOperation來(lái)完成的。通過(guò)該接口的調(diào)用類,此種情況的實(shí)現(xiàn)者需要獲取當(dāng)前流程執(zhí)行所處的活動(dòng)節(jié)點(diǎn)的ActivityBehavior對(duì)象,執(zhí)行其execute方法來(lái)執(zhí)行節(jié)點(diǎn)動(dòng)作。結(jié)合要素3和4,可以看出AtomicOperation接口用于執(zhí)行流程運(yùn)轉(zhuǎn)中的單一指令,例如根據(jù)連線移動(dòng),執(zhí)行節(jié)點(diǎn)指令等。分解成單一指令的好處是易于編碼和理解。這也契合接口命名中的原子一意。

2.1.3PVM綜述

從上面對(duì)PVM定義期和運(yùn)行期的解釋可以看出,整個(gè)概念體系并不復(fù)雜。涉及到的類也不多。正是因?yàn)镻VM只對(duì)工作流中最基礎(chǔ)的部分做了抽象和接口定義,使得PVM的實(shí)現(xiàn)上有了很多的可能性。

然而也正是由于定義的簡(jiǎn)單性,實(shí)際上這套PVM在轉(zhuǎn)化為實(shí)際實(shí)現(xiàn)的時(shí)候需要額外附加很多的特性才能真正完成框架需求。

2.2 ActivitiImpl與作用域

在解析完成后,一個(gè)流程定義中的所有節(jié)點(diǎn)都會(huì)被解析為ActivityImpl對(duì)象。ActivityImpl對(duì)象本身可以持有事件訂閱(根據(jù)BPMN2.0規(guī)范,目前有定時(shí),消息,信號(hào)三種事件訂閱類型)。因?yàn)锳ctivityImpl本身可以嵌套并且可以持有訂閱,因此引入作用域概念(Scope)。

一個(gè)ActivityImpl在以下兩種情況下會(huì)被定義為作用域ActivityImpl。

  1. 該ActivityImpl是可變范圍,則它是一個(gè)作用域??勺兎秶梢岳斫鉃樵摴?jié)點(diǎn)的內(nèi)容定義是可變的。比如流程定義、子流程,其內(nèi)部?jī)?nèi)容是可變的。根據(jù)BPMN定義,可變范圍有:流程定義,子流程,多實(shí)例,調(diào)用活動(dòng)。
  2. 該ActivityImpl定義了一個(gè)上下文用于接收事件。比如:具備邊界事件的ActivityImpl,具備事件子流程的ActivityImpl,事件驅(qū)動(dòng)網(wǎng)關(guān),中間事件捕獲ActivityImpl。

作用域是一個(gè)很重要的概念,情況1中作用域定義的是復(fù)雜節(jié)點(diǎn)的生命周期,情況2中作用域定義的是事件的捕獲范圍。

2.3 ExecutionEntity

ExecutionEntity的含義是一個(gè)流程定義被啟動(dòng)后的執(zhí)行實(shí)例,代表著流程的運(yùn)行期狀態(tài)。在Activiti的設(shè)計(jì)中,事件訂閱,流程變量等都是與一個(gè)具體的ExecutionEntity相關(guān)的。其本身有幾個(gè)重要的屬性:

  • isScope:該屬性為真時(shí),意味該執(zhí)行實(shí)例在執(zhí)行一個(gè)具備作用域的ActivityImpl節(jié)點(diǎn)或者執(zhí)行一個(gè)流程定義。更簡(jiǎn)單一些,意味著該實(shí)例正在執(zhí)行一個(gè)作用域活動(dòng)。
  • isConcurrent:該屬性為真時(shí),意味與該執(zhí)行實(shí)例正在執(zhí)行的活動(dòng)節(jié)點(diǎn)同屬相同作用域的節(jié)點(diǎn)可能正并發(fā)被其他執(zhí)行實(shí)例執(zhí)行(比如并行網(wǎng)關(guān)后面的2個(gè)并行任務(wù))。
  • isActive:該屬性為真時(shí),意味該執(zhí)行實(shí)例正在執(zhí)行一個(gè)簡(jiǎn)單ActivityImpl(不包含其他ActivityImpl的ActivityImpl)
  • isEventScope:該屬性為真時(shí),意味該執(zhí)行實(shí)例是為了后期補(bǔ)償而進(jìn)行的變量保存所創(chuàng)建的執(zhí)行實(shí)例。由于流程執(zhí)行中的變量都需要與ExecutionEntity掛鉤,而補(bǔ)償是需要原始變量的快照。為了滿足這個(gè)需求,創(chuàng)建出一個(gè)專用于此的ExecutionEntity。
  • activityId:該ExecutionEntity正在執(zhí)行的ActivityImpl的id。正在執(zhí)行意味著幾種情況:進(jìn)入該節(jié)點(diǎn),執(zhí)行該節(jié)點(diǎn)動(dòng)作,離開(kāi)該節(jié)點(diǎn)。如果是等待子流程的完成,則該屬性為null。

上面對(duì)ExecutionEntity的解釋仍然抽象。如果直觀的看,可以認(rèn)為ExecutionEntity是某一種生命周期的體現(xiàn),其內(nèi)部屬性隨著不同的情況而變化。如下圖所示:

image

隨著流程的啟動(dòng),會(huì)創(chuàng)建一個(gè)ExecutionEntity對(duì)象。該ExecutionEntity生命周期與整個(gè)流程相同,而其中的isScopeisConcurrent在創(chuàng)建之初固定,并且不會(huì)改變。而isActiveactivityId隨著流程的推進(jìn)則會(huì)不斷變化。

ExecutionEntity是用來(lái)反映流程的推進(jìn)情況的,實(shí)際上,往往一個(gè)ExecutionEntity不足以支撐全部的BPMN功能。因此實(shí)現(xiàn)上,Activiti是通過(guò)一個(gè)樹(shù)狀結(jié)構(gòu)的ExecutionEntity結(jié)構(gòu)來(lái)反映流程推進(jìn)情況。創(chuàng)建之初的ExecutionEntity對(duì)象隨著流程的推進(jìn)會(huì)不斷的分裂和合并,ExecutionEntity樹(shù)也會(huì)不斷的生長(zhǎng)和修剪。在流程的推進(jìn)過(guò)程中會(huì)遇到4種基本情況

  1. 單獨(dú)的非作用域節(jié)點(diǎn)
  2. 單獨(dú)的作用域節(jié)點(diǎn)
  3. 并發(fā)的非作用域節(jié)點(diǎn)
  4. 并發(fā)的作用域節(jié)點(diǎn)

2.3.1 單獨(dú)的非作用域節(jié)點(diǎn)

此種情況可以如下圖所示

image

在流程前進(jìn)的構(gòu)成中,遇到單獨(dú)的非作用域節(jié)點(diǎn),ExecutionEntity一直處于激活狀態(tài),只不過(guò)隨著流程前進(jìn),其activityId指向會(huì)不斷變化。如上圖所示,會(huì)經(jīng)歷:開(kāi)始節(jié)點(diǎn)、組員工作、領(lǐng)導(dǎo)審批、結(jié)束節(jié)點(diǎn)4個(gè)不同的值。

實(shí)際上,這些節(jié)點(diǎn)都是在作用域<流程定義>之下,而ExecutionEntity代表的正是該作用域,因此其isScope屬性為true。

2.3.2 單獨(dú)的作用域節(jié)點(diǎn)

如果流程推進(jìn)中遇到單獨(dú)的作用域節(jié)點(diǎn),則當(dāng)前執(zhí)行對(duì)象ExecutionEntity應(yīng)該創(chuàng)建一個(gè)作用域子執(zhí)行(isScope為true的ExecutionEntity)。整個(gè)變化過(guò)程可以如下圖所示

image

當(dāng)準(zhǔn)備進(jìn)入節(jié)點(diǎn)<組員工作>時(shí),ExecutionEntity1凍結(jié),并且創(chuàng)建出子執(zhí)行ExecutionEntity2。ExecutionEntity2的isScope屬性也為true。

ExecutionEntity1的isScope為true,是因?yàn)樵搱?zhí)行實(shí)例負(fù)責(zé)整個(gè)流程定義的事件訂閱,ExecutionEntity2的isScope為true,是因?yàn)樵搱?zhí)行實(shí)例負(fù)責(zé)節(jié)點(diǎn)<組員工作>的事件訂閱。

前面提到過(guò),事件訂閱與某一個(gè)具體的執(zhí)行實(shí)例相關(guān)。當(dāng)節(jié)點(diǎn)<組員工作>完成時(shí),也就是事件訂閱所在的作用域要被摧毀時(shí),對(duì)應(yīng)的事件訂閱也要被刪除。此時(shí)額外的ExecutionEntity就特別方便,只要?jiǎng)h除該ExecutionEntity,順便刪除相關(guān)的事件訂閱即可,在這里就是刪除ExecutionEntity2。

刪除ExecutionEntity2,并且激活父執(zhí)行ExecutionEntity1。隨著流程推進(jìn),ExecutionEntity1更換指向的activityId。

2.3.3 并發(fā)的非作用域節(jié)點(diǎn)

流程推進(jìn)中節(jié)點(diǎn)存在多個(gè)外出連線,則可以根據(jù)需要?jiǎng)?chuàng)建多個(gè)并發(fā)的子執(zhí)行,每一個(gè)子執(zhí)行對(duì)應(yīng)一個(gè)連線。如下圖所示

image

當(dāng)流程節(jié)點(diǎn)A、B被激活時(shí),ExecutionEntity1會(huì)有2個(gè)并發(fā)的子執(zhí)行ExecutionEntity2ExecutionEntity3。這兩個(gè)子執(zhí)行的isConcurrent屬性均為true,因?yàn)楣?jié)點(diǎn)A和B都是在相同的作用域(流程定義)下被并發(fā)的執(zhí)行。

當(dāng)節(jié)點(diǎn)A、B執(zhí)行完畢后,并發(fā)的子執(zhí)行被刪除,父執(zhí)行重新被激活,繼續(xù)后面的節(jié)點(diǎn)。

2.3.4 并發(fā)的作用域節(jié)點(diǎn)

流程推進(jìn)中遇到并發(fā)節(jié)點(diǎn),并且節(jié)點(diǎn)為作用域節(jié)點(diǎn),情況就會(huì)如下所示

image

當(dāng)流程運(yùn)行至P1節(jié)點(diǎn),P1節(jié)點(diǎn)有多個(gè)出線。根據(jù)出線數(shù)目創(chuàng)建2個(gè)子執(zhí)行,此時(shí)2個(gè)子執(zhí)行均為并發(fā)的,且從P1作為出線的源頭節(jié)點(diǎn),因此2個(gè)子執(zhí)行的activityId均為P1。

當(dāng)運(yùn)行到A、B節(jié)點(diǎn)時(shí),由于2個(gè)節(jié)點(diǎn)均為作用域節(jié)點(diǎn),因此還會(huì)再創(chuàng)建2個(gè)子執(zhí)行。此時(shí)ExecutionEntity3ExecutionEntity3凍結(jié)。ExecutionEntity4ExecutionEntity5所執(zhí)行的節(jié)點(diǎn)在各自的作用域下均無(wú)并發(fā)操作,因此其isScope屬性為true,isConcurrent屬性為false。這5個(gè)執(zhí)行實(shí)例構(gòu)成的執(zhí)行樹(shù)如下

image

當(dāng)A、B節(jié)點(diǎn)完成時(shí),首先是各自的作用域被刪除,因此ExecutionEntity4ExecutionEntity5首先被刪除,ExecutionEntity3ExecutionEntity4激活。而后匯聚于P2節(jié)點(diǎn),因此ExecutionEntity3ExecutionEntity4刪除,ExecutionEntity1被激活,繼續(xù)執(zhí)行剩下的節(jié)點(diǎn)。

三、代碼解析-流程啟動(dòng)

3.1 流程說(shuō)明

流程啟動(dòng)依靠的是命令類:org.activiti.engine.impl.cmd.StartProcessInstanceCmd。

該命令的整體流程如下

image

部署管理器的查詢和運(yùn)行期關(guān)系不大,先忽略。兩個(gè)流程著重展開(kāi):

  1. 流程定義實(shí)體創(chuàng)建流程實(shí)例
  2. 流程實(shí)例執(zhí)行啟動(dòng)

3.1.1 流程定義實(shí)體創(chuàng)建流程實(shí)例

流程如下

image

其中指定初始滑動(dòng)節(jié)點(diǎn)創(chuàng)建實(shí)例本身展開(kāi)后的流程如下

image

ExecutionEntity的初始化需要單獨(dú)說(shuō)下,流程如下

image

在創(chuàng)建流程的邏輯的尾部是一個(gè)循環(huán)流程。該循環(huán)的目的是為了創(chuàng)建正確的ExecutionImpl樹(shù)(以下簡(jiǎn)稱執(zhí)行樹(shù))。本質(zhì)上該方法是創(chuàng)建一個(gè)流程實(shí)例,并且將流程當(dāng)前運(yùn)行節(jié)點(diǎn)定位到指定的節(jié)點(diǎn)。而工作流的正確執(zhí)行依賴于執(zhí)行樹(shù)的正確分裂和整合。因此就需要為指定的節(jié)點(diǎn)創(chuàng)建其上游的執(zhí)行樹(shù)實(shí)例。使得在效果上看起來(lái)就和流程自動(dòng)執(zhí)行到當(dāng)前節(jié)點(diǎn)類似(執(zhí)行樹(shù)類似,節(jié)點(diǎn)運(yùn)行歷史則無(wú)相似,實(shí)際上也無(wú)歷史節(jié)點(diǎn))。

而如果指定的初始節(jié)點(diǎn)就是流程定義的初始節(jié)點(diǎn),則循環(huán)就不存在意義了。

3.1.2 流程實(shí)例啟動(dòng)

流程實(shí)例的啟動(dòng)的內(nèi)容,就是執(zhí)行原子操作:org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStart。關(guān)于原子操作單獨(dú)闡述。

3.2 額外補(bǔ)充

3.2.1 ActivityImpl的parent屬性

org.activiti.engine.impl.pvm.process.ActivityImpl類中有一個(gè)屬性parent,類型為org.activiti.engine.impl.pvm.process.ScopeImpl。在解析的時(shí)候,該屬性為當(dāng)前節(jié)點(diǎn)的作用域節(jié)點(diǎn)。根據(jù)作用域節(jié)點(diǎn)的定義,該屬性的取值有兩種可能的類型,一種是org.activiti.engine.impl.pvm.process.ActivityImpl,另外一種是org.activiti.engine.impl.pvm.process.ProcessDefinitionImpl。

第二種情況意味著該節(jié)點(diǎn)是直屬于流程定義的節(jié)點(diǎn)了。

四、代碼解析-原子操作

4.1 說(shuō)明

原子操作是一個(gè)接口org.activiti.engine.impl.pvm.runtime.AtomicOperation。從名字也可以看出,該接口的作用就是執(zhí)行流程實(shí)例中的一個(gè)單步操作。下面分階段說(shuō)明

4.2 AbstractEventAtomicOperation

該抽象類是眾多實(shí)現(xiàn)類的基類。其代碼如下

public abstract class AbstractEventAtomicOperation implements AtomicOperation {
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  public void execute(InterpretableExecution execution) {
      //獲取當(dāng)前執(zhí)行對(duì)象的作用域?qū)ο蟆>唧w由子類提供。
    ScopeImpl scope = getScope(execution);
      //從作用域?qū)ο笾蝎@取指定事件的監(jiān)聽(tīng)器。事件名稱由子類提供。
    List<ExecutionListener> exectionListeners = scope.getExecutionListeners(getEventName());
    int executionListenerIndex = execution.getExecutionListenerIndex();
 
    if (exectionListeners.size()>executionListenerIndex) {
      execution.setEventName(getEventName());
      execution.setEventSource(scope);
      ExecutionListener listener = exectionListeners.get(executionListenerIndex);
      try {
        listener.notify(execution);
      } catch (RuntimeException e) {
        throw e;
      } catch (Exception e) {
        throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
      }
      execution.setExecutionListenerIndex(executionListenerIndex+1);
      execution.performOperation(this);
 
    } else {
      execution.setExecutionListenerIndex(0);
      execution.setEventName(null);
      execution.setEventSource(null);
 
      eventNotificationsCompleted(execution);
    }
  }
 
  protected abstract ScopeImpl getScope(InterpretableExecution execution);
  protected abstract String getEventName();
  protected abstract void eventNotificationsCompleted(InterpretableExecution execution);
}

整個(gè)抽象類的邏輯概括而言,就是將獲取當(dāng)前執(zhí)行實(shí)例的作用域?qū)ο螅ň唧w由子類提供),執(zhí)行其中特定事件(事件名由子類提供)的監(jiān)聽(tīng)器。

在全部的監(jiān)聽(tīng)器執(zhí)行完畢后,執(zhí)行子類的特定邏輯。

4.3 AtomicOperationProcessStart

該操作用于流程啟動(dòng)。但是并不執(zhí)行真正的啟動(dòng)動(dòng)作。只是設(shè)置了當(dāng)前執(zhí)行實(shí)例的活動(dòng)節(jié)點(diǎn)為org.activiti.engine.impl.pvm.runtime.StartingExecution中存儲(chǔ)的活動(dòng)節(jié)點(diǎn)。然后執(zhí)行原子操作org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStartInitial。

本質(zhì)上來(lái)說(shuō),只是執(zhí)行了一個(gè)設(shè)置的動(dòng)作。

public class AtomicOperationProcessStart extends AbstractEventAtomicOperation {
 
  @Override
  protected ScopeImpl getScope(InterpretableExecution execution) {
    return execution.getProcessDefinition();
  }
 
  @Override
  protected String getEventName() {
    return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START;
  }
 
  @Override
  protected void eventNotificationsCompleted(InterpretableExecution execution) {
      if (Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
        Map<String, Object> variablesMap = null;
        try {
          variablesMap = execution.getVariables();
        } catch (Throwable t) {
          // In some rare cases getting the execution variables can fail (JPA entity load failure for example)
          // We ignore the exception here, because it's only meant to include variables in the initialized event.
        }
        Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
                ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED,
                    execution, variablesMap, false));
      Context.getProcessEngineConfiguration().getEventDispatcher()
              .dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variablesMap, false));
    }
 
    ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
    StartingExecution startingExecution = execution.getStartingExecution();
    List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());
    execution.setActivity(initialActivityStack.get(0));
    execution.performOperation(PROCESS_START_INITIAL);
  }
}

4.4 AtomicOperationProcessStartInitial

代碼如下

public class AtomicOperationProcessStartInitial extends AbstractEventAtomicOperation {
 
  @Override
  protected ScopeImpl getScope(InterpretableExecution execution) {
    return (ScopeImpl) execution.getActivity();
  }
 
  @Override
  protected String getEventName() {
    return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START;
  }
 
  @Override
  protected void eventNotificationsCompleted(InterpretableExecution execution) {
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
    StartingExecution startingExecution = execution.getStartingExecution();
     //從開(kāi)始節(jié)點(diǎn)開(kāi)始的,該判斷均為真。
    if (activity==startingExecution.getInitial()) {
      execution.disposeStartingExecution();
      execution.performOperation(ACTIVITY_EXECUTE);
    } else {
      List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());
      int index = initialActivityStack.indexOf(activity);
      activity = initialActivityStack.get(index+1);
 
      InterpretableExecution executionToUse = null;
      if (activity.isScope()) {
        executionToUse = (InterpretableExecution) execution.getExecutions().get(0);
      } else {
        executionToUse = execution;
      }
      executionToUse.setActivity(activity);
      executionToUse.performOperation(PROCESS_START_INITIAL);
    }
  }
}

4.5 AtomicOperationTransitionNotifyListenerEnd

該原子操作的目的僅是為了執(zhí)行節(jié)點(diǎn)上的end事件監(jiān)聽(tīng)器。監(jiān)聽(tīng)器執(zhí)行完畢后,就執(zhí)行下一個(gè)原子操作AtomicOperationTransitionDestroyScope

4.6 AtomicOperationTransitionNotifyListenerStart

該原子操作的目的是為了執(zhí)行節(jié)點(diǎn)上start事件監(jiān)聽(tīng)器。在執(zhí)行完畢后,會(huì)判斷執(zhí)行實(shí)例的當(dāng)前節(jié)點(diǎn)是否可以執(zhí)行。判斷的依據(jù)該節(jié)點(diǎn)和連接線節(jié)點(diǎn)

4.3 AtomicOperationActivityExecute

該原子操作的作用實(shí)際上就是取出該執(zhí)行實(shí)例當(dāng)前的活動(dòng)節(jié)點(diǎn),并且執(zhí)行該活動(dòng)節(jié)點(diǎn)的行為定義。行為定義通過(guò)接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior定義。不同的節(jié)點(diǎn)行為由不同的子類完成

public class AtomicOperationActivityExecute implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationActivityExecute.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  public void execute(InterpretableExecution execution) {
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
 
    ActivityBehavior activityBehavior = activity.getActivityBehavior();
    if (activityBehavior==null) {
      throw new PvmException("no behavior specified in "+activity);
    }
 
    log.debug("{} executes {}: {}", execution, activity, activityBehavior.getClass().getName());
 
    try {
        if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
          Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
                  ActivitiEventBuilder.createActivityEvent(ActivitiEventType.ACTIVITY_STARTED,
                          execution.getActivity().getId(),
                          (String) execution.getActivity().getProperty("name"),
                          execution.getId(),
                          execution.getProcessInstanceId(),
                          execution.getProcessDefinitionId(),
                          (String) activity.getProperties().get("type"),
                          activity.getActivityBehavior().getClass().getCanonicalName()));
      }
 
      activityBehavior.execute(execution);
    } catch (RuntimeException e) {
      throw e;
    } catch (Exception e) {
      LogMDC.putMDCExecution(execution);
      throw new PvmException("couldn't execute activity <"+activity.getProperty("type")+" id=\""+activity.getId()+"\" ...>: "+e.getMessage(), e);
    }
  }
}

4.4 AtomicOperationTransitionDestroyScope

public class AtomicOperationTransitionDestroyScope implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionDestroyScope.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  @SuppressWarnings("unchecked")
  public void execute(InterpretableExecution execution) {
    InterpretableExecution propagatingExecution = null;
 
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    /**
    * 如果當(dāng)前的活動(dòng)節(jié)點(diǎn)具備作用域。這就意味著最初的時(shí)候,有一個(gè)處于激活狀態(tài)的執(zhí)行實(shí)例在執(zhí)行該節(jié)點(diǎn),以下稱初始執(zhí)行實(shí)例。
    * 從這樣的節(jié)點(diǎn)退出要考慮幾種情況:
    * 一、單獨(dú)的作用域節(jié)點(diǎn)。此時(shí)的執(zhí)行樹(shù)情況是:非激活的非作用域執(zhí)行實(shí)例-激活的作用域執(zhí)行實(shí)例(初始執(zhí)行實(shí)例)。此時(shí)要離開(kāi)作用域節(jié)點(diǎn),首先是銷毀激活的作用域執(zhí)行實(shí)例(初始執(zhí)行實(shí)例),激活初始執(zhí)行實(shí)例的父實(shí)例,使用該父實(shí)例執(zhí)行后續(xù)的出線動(dòng)作。
    * 二、并行的作用域節(jié)點(diǎn)。此時(shí)的執(zhí)行樹(shù)情況是:非激活的非作用域并發(fā)執(zhí)行實(shí)例-非激活的
    */
    if (activity.isScope()) {
 
      InterpretableExecution parentScopeInstance = null;
      // if this is a concurrent execution crossing a scope boundary
      if (execution.isConcurrent() && !execution.isScope()) {
        // first remove the execution from the current root
        InterpretableExecution concurrentRoot = (InterpretableExecution) execution.getParent();
        parentScopeInstance = (InterpretableExecution) execution.getParent().getParent();
 
        log.debug("moving concurrent {} one scope up under {}", execution, parentScopeInstance);
        List<InterpretableExecution> parentScopeInstanceExecutions = (List<InterpretableExecution>) parentScopeInstance.getExecutions();
        List<InterpretableExecution> concurrentRootExecutions = (List<InterpretableExecution>) concurrentRoot.getExecutions();
        // if the parent scope had only one single scope child
        if (parentScopeInstanceExecutions.size()==1) {
          // it now becomes a concurrent execution
          parentScopeInstanceExecutions.get(0).setConcurrent(true);
        }
 
        concurrentRootExecutions.remove(execution);
        parentScopeInstanceExecutions.add(execution);
        execution.setParent(parentScopeInstance);
        execution.setActivity(activity);
        propagatingExecution = execution;
 
        // if there is only a single concurrent execution left
        // in the concurrent root, auto-prune it.  meaning, the
        // last concurrent child execution data should be cloned into
        // the concurrent root. 
        if (concurrentRootExecutions.size()==1) {
          InterpretableExecution lastConcurrent = concurrentRootExecutions.get(0);
          if (lastConcurrent.isScope()) {
            lastConcurrent.setConcurrent(false);
 
          } else {
            log.debug("merging last concurrent {} into concurrent root {}", lastConcurrent, concurrentRoot);
 
            // We can't just merge the data of the lastConcurrent into the concurrentRoot.
            // This is because the concurrent root might be in a takeAll-loop.  So the
            // concurrent execution is the one that will be receiving the take
            concurrentRoot.setActivity((ActivityImpl) lastConcurrent.getActivity());
            concurrentRoot.setActive(lastConcurrent.isActive());
            lastConcurrent.setReplacedBy(concurrentRoot);
            lastConcurrent.remove();
          }
        }
 
      } else if (execution.isConcurrent() && execution.isScope()) {
        /**
        * 根據(jù)算法,這種情況不會(huì)出現(xiàn)。源代碼中,這部分也屬于todo的內(nèi)容。
        */
      }
        else {
        /**
        * 這個(gè)條件是執(zhí)行實(shí)例的scope屬性為真。此時(shí)銷毀當(dāng)前的執(zhí)行實(shí)例,使用其父執(zhí)行實(shí)例繼續(xù)后面的流程
        */
        propagatingExecution = (InterpretableExecution) execution.getParent();
        propagatingExecution.setActivity((ActivityImpl) execution.getActivity());
        propagatingExecution.setTransition(execution.getTransition());
        propagatingExecution.setActive(true);
        log.debug("destroy scope: scoped {} continues as parent scope {}", execution, propagatingExecution);
        execution.destroy();
        //刪除與該執(zhí)行實(shí)例相關(guān)的一切,包括:定時(shí)工作,各種任務(wù),事件訂閱,用戶流程關(guān)系,最后刪除自身。
        execution.remove();
      }
    } else {
      //如果離開(kāi)的是一個(gè)非作用域節(jié)點(diǎn),則仍然使用當(dāng)前的執(zhí)行實(shí)例作為下一個(gè)節(jié)點(diǎn)的執(zhí)行實(shí)例
      propagatingExecution = execution;
    }
 
    // if there is another scope element that is ended
    ScopeImpl nextOuterScopeElement = activity.getParent();
    TransitionImpl transition = propagatingExecution.getTransition();
    ActivityImpl destination = transition.getDestination();
    /**
    * 考慮當(dāng)前的節(jié)點(diǎn)可能是子流程或者活動(dòng)調(diào)用中的節(jié)點(diǎn)。那么就需要離開(kāi)當(dāng)前的作用域范圍,回到更上層的作用域下。因此需要判斷目的地是否和源頭節(jié)點(diǎn)處于同一個(gè)作用域。如果不是同一個(gè)作用域,則不斷向上回溯
    */
    if (transitionLeavesNextOuterScope(nextOuterScopeElement, destination)) {
      propagatingExecution.setActivity((ActivityImpl) nextOuterScopeElement);
      propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_END);
    } else {
      propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_TAKE);
    }
  }
 
  public boolean transitionLeavesNextOuterScope(ScopeImpl nextScopeElement, ActivityImpl destination) {
    return !nextScopeElement.contains(destination);
  }
}

4.5 AtomicOperationTransitionNotifyListenerTake

該原子操作的目的就是為了執(zhí)行在連接線上的監(jiān)聽(tīng)器。在執(zhí)行完畢后,就準(zhǔn)備執(zhí)行目標(biāo)活動(dòng)節(jié)點(diǎn)。這里關(guān)于目標(biāo)節(jié)點(diǎn)還存在一個(gè)選擇的問(wèn)題。并不是直接執(zhí)行連接線上的目標(biāo)活動(dòng)節(jié)點(diǎn)。而是從目標(biāo)活動(dòng)節(jié)點(diǎn)出發(fā),選擇和執(zhí)行實(shí)例當(dāng)前活動(dòng)節(jié)點(diǎn)同屬同一個(gè)作用域的目標(biāo)活動(dòng)節(jié)點(diǎn)或其父節(jié)點(diǎn)。

public class AtomicOperationTransitionNotifyListenerTake implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionNotifyListenerTake.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    return false;
  }
 
  public void execute(InterpretableExecution execution) {
    TransitionImpl transition = execution.getTransition();
 
    List<ExecutionListener> executionListeners = transition.getExecutionListeners();
    int executionListenerIndex = execution.getExecutionListenerIndex();
    /**
    * 整個(gè)if的功能就是不斷判斷監(jiān)聽(tīng)器是否被執(zhí)行完畢。都執(zhí)行完畢后走入到else的部分。
    */
    if (executionListeners.size()>executionListenerIndex) {
      execution.setEventName(org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_TAKE);
      execution.setEventSource(transition);
      ExecutionListener listener = executionListeners.get(executionListenerIndex);
      try {
        listener.notify(execution);
      } catch (RuntimeException e) {
        throw e;
      } catch (Exception e) {
        throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
      }
      execution.setExecutionListenerIndex(executionListenerIndex+1);
      execution.performOperation(this);
 
    } else {
        if (log.isDebugEnabled()) {
            log.debug("{} takes transition {}", execution, transition);
        }
      execution.setExecutionListenerIndex(0);
      execution.setEventName(null);
      execution.setEventSource(null);
 
      ActivityImpl activity = (ActivityImpl) execution.getActivity();
      ActivityImpl nextScope = findNextScope(activity.getParent(), transition.getDestination());
      execution.setActivity(nextScope);
 
      // Firing event that transition is being taken       
      if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
          Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
                ActivitiEventBuilder.createSequenceFlowTakenEvent(ActivitiEventType.SEQUENCEFLOW_TAKEN, transition.getId(),
                        activity.getId(), (String) activity.getProperties().get("name") ,(String) activity.getProperties().get("type"), activity.getActivityBehavior().getClass().getCanonicalName(),
                        nextScope.getId(), (String) nextScope.getProperties().get("name"), (String) nextScope.getProperties().get("type"), nextScope.getActivityBehavior().getClass().getCanonicalName()));
      }
 
      execution.performOperation(TRANSITION_CREATE_SCOPE);
    }
  }
 
  /** finds the next scope to enter.  the most outer scope is found first */
  public static ActivityImpl findNextScope(ScopeImpl outerScopeElement, ActivityImpl destination) {
    ActivityImpl nextScope = destination;
    while( (nextScope.getParent() instanceof ActivityImpl)
           && (nextScope.getParent() != outerScopeElement)
         ) {
      nextScope = (ActivityImpl) nextScope.getParent();
    }
    return nextScope;
  }
}

4.6 AtomicOperationTransitionCreateScope

該原子操作是為了確認(rèn)進(jìn)入的節(jié)點(diǎn)是否具備作用域。如果具備作用域,則將目前的執(zhí)行實(shí)例凍結(jié)。并且創(chuàng)建出新的執(zhí)行實(shí)例,用于執(zhí)行作用域節(jié)點(diǎn);如果不具備作用域,則無(wú)效果。

在確認(rèn)完畢后,執(zhí)行原子操作AtomicOperationTransitionNotifyListenerStart

public class AtomicOperationTransitionCreateScope implements AtomicOperation {
 
  private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionCreateScope.class);
 
  public boolean isAsync(InterpretableExecution execution) {
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    return activity.isAsync();
  }
 
  public void execute(InterpretableExecution execution) {
    InterpretableExecution propagatingExecution = null;
    ActivityImpl activity = (ActivityImpl) execution.getActivity();
    if (activity.isScope()) {
      //為作用域活動(dòng)創(chuàng)建一個(gè)新的執(zhí)行實(shí)例,是該原子操作的主要目的
      propagatingExecution = (InterpretableExecution) execution.createExecution();
      propagatingExecution.setActivity(activity);
      propagatingExecution.setTransition(execution.getTransition());
      execution.setTransition(null);
      execution.setActivity(null);
      execution.setActive(false);
      log.debug("create scope: parent {} continues as execution {}", execution, propagatingExecution);
      //這里是另外一個(gè)重點(diǎn)。在一個(gè)流程實(shí)例初始化的時(shí)候,會(huì)對(duì)當(dāng)前流程所處的作用域?qū)ο螅赡苁橇鞒潭x或者是作用域活動(dòng)進(jìn)行處理,具體表現(xiàn)是為該作用域?qū)ο笊系亩〞r(shí)事件,消息事件,信號(hào)事件執(zhí)行注冊(cè)動(dòng)作。分別是放入定時(shí)調(diào)度器,在數(shù)據(jù)庫(kù)新增事件訂閱)
      propagatingExecution.initialize();
 
    } else {
      propagatingExecution = execution;
    }
 
    propagatingExecution.performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_START);
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容