Actor的創(chuàng)建&引用&聲明周期
1.創(chuàng)建actor
- 定義一個Actor類
要定義自己的Actor類,需要繼承Actor并實現(xiàn)receive方法。receive方法需要定義一系列case語句(類型為PartialFunction[Any, Unit])來描述你的Actor能夠處理哪些消息(使用標準的Scala模式匹配),以及消息如何被處理。
如下例:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}
- Props
Props是一個用來在創(chuàng)建actor時指定選項的配置類,可以把它看作是不可變的,因此在創(chuàng)建包含相關(guān)部署信息的actor時(例如使用哪一個調(diào)度器(dispatcher),詳見下文),是可以自由共享的。以下是如何創(chuàng)建Props實例的示例.
import akka.actor.Props
val props1 = Props[MyActor]
val props2 = Props(new ActorWithArgs("arg")) // careful, see below
val props3 = Props(classOf[ActorWithArgs], "arg")
警告
在另一個actor中聲明一個actor是非常危險的,會打破actor的封裝。永遠不要將一個actor的this引用傳進Props!
推薦做法
在每一個Actor的伴生對象中提供工廠方法是一個好主意,這有助于保持創(chuàng)建合適的Props,盡可能接近actor的定義。這也避免了使用Props.apply(...)方法將采用一個“按名”(by-name)參數(shù)的缺陷,因為伴生對象的給定代碼塊中將不會保留包含作用域的引用:
object DemoActor {
/**
* Create Props for an actor of this type.
* @param magciNumber The magic number to be passed to this actor’s constructor.
* @return a Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
def props(magicNumber: Int): Props = Props(new DemoActor(magicNumber))
}
class DemoActor(magicNumber: Int) extends Actor {
def receive = {
case x: Int => sender() ! (x + magicNumber)
}
}
class SomeOtherActor extends Actor {
// Props(new DemoActor(42)) would not be safe
context.actorOf(DemoActor.props(42), "demo")
// ...
}
- 使用Props創(chuàng)建Actor
Actor可以通過將Props實例傳入actorOf工廠方法來創(chuàng)建,ActorSystem和ActorContext中都有該方法。
import akka.actor.ActorSystem
// ActorSystem is a heavy object: create only one per application
val system = ActorSystem("mySystem")
val myActor = system.actorOf(Props[MyActor], "myactor2")
使用ActorSystem將創(chuàng)建頂級actor,由actor系統(tǒng)提供的守護actor監(jiān)管;如果使用的是actor的上下文,則創(chuàng)建一個該actor的子actor。
class FirstActor extends Actor {
val child = context.actorOf(Props[MyActor], name = "myChild")
// plus some behavior ...
}
推薦創(chuàng)建一個樹形結(jié)構(gòu),包含子actor、孫子等等,使之符合應(yīng)用的邏輯錯誤處理結(jié)構(gòu)
- 依賴注入
如果你的actor有帶參數(shù)的構(gòu)造函數(shù),則這些參數(shù)也需要成為Props的一部分,如上文所述。但有些情況下必須使用工廠方法,例如,當實際構(gòu)造函數(shù)的參數(shù)由依賴注入框架決定。
import akka.actor.IndirectActorProducer
class DependencyInjector(applicationContext: AnyRef, beanName: String)
extends IndirectActorProducer {
override def actorClass = classOf[Actor]
override def produce =
// obtain fresh Actor instance from DI framework ...
}
val actorRef = system.actorOf(
Props(classOf[DependencyInjector], applicationContext, "hello"),
"helloBean")
2.Actor API
Actor trait只定義了一個抽象方法,就是上面提到的receive,用來實現(xiàn)actor的行為。
如果當前actor的行為與收到的消息不匹配,則會調(diào)用 unhandled,其缺省實現(xiàn)是向actor系統(tǒng)的事件流中發(fā)布一條akka.actor.UnhandledMessage(message, sender, recipient)(將配置項akka.actor.debug.unhandled設(shè)置為on來將它們轉(zhuǎn)換為實際的調(diào)試消息)。
另外,它還包括:
- self引用代表本actor的ActorRef
- sender引用代表最近收到消息的發(fā)送actor,通常用于下面將講到的消息回應(yīng)中
- supervisorStrategy 用戶可重寫它來定義對子actor的監(jiān)管策略
該策略通常在actor內(nèi)聲明,這樣決定函數(shù)就可以訪問actor的內(nèi)部狀態(tài):因為失敗通知作為消息發(fā)送給監(jiān)管者,并像普通消息一樣被處理(盡管不是正常行為),所有的值和actor變量都是可用的,以及sender引用 (報告失敗的將是直接子actor;如果原始失敗發(fā)生在遙遠的后裔,它仍然是一次向上報告一層)。
- context暴露actor和當前消息的上下文信息,如:
- 用于創(chuàng)建子actor的工廠方法(actorOf)
- actor所屬的系統(tǒng)
- 父監(jiān)管者
- 所監(jiān)管的子actor
- 生命周期監(jiān)控
- hotswap行為棧,見Become/Unbecome
3.Actor生命周期

actor系統(tǒng)中的路徑代表一個"地方",這里可能會被活著的actor占據(jù)。最初(除了系統(tǒng)初始化actor)路徑都是空的。在調(diào)用actorOf()時它將為指定路徑分配根據(jù)傳入Props創(chuàng)建的一個actor化身。actor化身是由路徑和一個UID標識的。重新啟動只會替換有Props定義的Actor實例,但不會替換化身,因此UID保持不變。
當actor停止時,其化身的生命周期結(jié)束。在這一時間點上相關(guān)的生命周期事件被調(diào)用,監(jiān)視該actor的actor都會獲得終止通知。當化身停止后,路徑可以重復(fù)使用,通過actorOf()創(chuàng)建一個actor。在這種情況下,除了UID不同外,新化身與老化身是相同的。
ActorRef始終表示化身(路徑和UID)而不只是一個給定的路徑。因此如果actor停止,并且創(chuàng)建一個新的具有相同名稱的actor,則指向老化身的ActorRef將不會指向新的化身。
相對地,ActorSelection指向路徑(或多個路徑,如果使用了通配符),且完全不關(guān)注有沒有化身占據(jù)它。因此ActorSelection 不能被監(jiān)視。獲取某路徑下的當前化身ActorRef是可能的,只要向該ActorSelection發(fā)送Identify,如果收到ActorIdentity回應(yīng),則正確的引用就包含其中(詳見通過Actor Selection確定Actor)。也可以使用ActorSelection的resolveOne方法,它會返回一個包含匹配ActorRef的Future。
- 使用DeathWatch進行生命周期監(jiān)控
為了在其它actor終止時 (即永久停止,而不是臨時的失敗和重啟)收到通知,actor可以將自己注冊為其它actor在終止時所發(fā)布的Terminated消息的接收者(見停止 Actor)。這個服務(wù)是由actor系統(tǒng)的DeathWatch組件提供的。
注冊一個監(jiān)視器很簡單:
import akka.actor.{ Actor, Props, Terminated }
class WatchActor extends Actor {
val child = context.actorOf(Props.empty, "child")
context.watch(child) // <-- this is the only call needed for registration
var lastSender = system.deadLetters
def receive = {
case "kill" =>
context.stop(child); lastSender = sender()
case Terminated(`child`) => lastSender ! "finished"
}
}
要注意Terminated消息的產(chǎn)生與注冊和終止行為所發(fā)生的順序無關(guān)。特別地,即使在注冊時,被觀察的actor已經(jīng)終止了,監(jiān)視actor仍然會受到一個Terminated消息。
多次注冊并不表示會有多個消息產(chǎn)生,也不保證有且只有一個這樣的消息被接收到:如果被監(jiān)控的actor已經(jīng)生成了消息并且已經(jīng)進入了隊列,在這個消息被處理之前又發(fā)生了另一次注冊,則會有第二個消息進入隊列,因為對一個已經(jīng)終止的actor的監(jiān)控注冊操作會立刻導(dǎo)致Terminated消息的產(chǎn)生。
可以使用context.unwatch(target)來停止對另一個actor生存狀態(tài)的監(jiān)控。即使Terminated已經(jīng)加入郵箱,該操作仍有效;一旦調(diào)用unwatch,則被觀察的actor的Terminated消息就都不會再被處理。
- 啟動Hook
actor啟動后,它的preStart方法會被立即執(zhí)行。
override def preStart() {
// registering with other actors
someService ! Register(self)
}
在actor第一次創(chuàng)建時,將調(diào)用此方法。在重新啟動期間,它被postRestart的默認實現(xiàn)調(diào)用,這意味著通過重寫該方法,你可以選擇是僅僅在初始化該actor時調(diào)用一次,還是為每次重新啟動都調(diào)用。actor構(gòu)造函數(shù)中的初始化代碼將在每個actor實例創(chuàng)建的時候被調(diào)用,這也發(fā)生在每次重啟時。
- 重啟Hook
所有的actor都是被監(jiān)管的,即與另一個使用某種失敗處理策略的actor綁定在一起。如果在處理一個消息的時候拋出了異常,Actor將被重啟(詳見監(jiān)管與監(jiān)控)。這個重啟過程包括上面提到的Hook:
要被重啟的actor被通知是通過調(diào)用preRestart,包含著導(dǎo)致重啟的異常以及觸發(fā)異常的消息;如果重啟并不是因為消息處理而發(fā)生的,則所攜帶的消息為None,例如,當一個監(jiān)管者沒有處理某個異常繼而被其監(jiān)管者重啟時,或者因其兄弟節(jié)點的失敗導(dǎo)致的重啟。如果消息可用,則消息的發(fā)送者通常也可用(即通過調(diào)用sender)。
這個方法是用來完成清理、準備移交給新actor實例等操作的最佳位置。其缺省實現(xiàn)是終止所有子actor并調(diào)用postStop。
最初調(diào)用actorOf的工廠將被用來創(chuàng)建新的實例。
新的actor的postRestart方法被調(diào)用時,將攜帶著導(dǎo)致重啟的異常信息。默認實現(xiàn)中,preStart被調(diào)用時,就像一個正常的啟動一樣。
actor的重啟只會替換掉原來的actor對象;重啟不影響郵箱的內(nèi)容,所以對消息的處理將在postRestart hook返回后繼續(xù)。觸發(fā)異常的消息不會被重新接收。在actor重啟過程中,所有發(fā)送到該actor的消息將象平常一樣被放進郵箱隊列中。
警告
要知道失敗通知與用戶消息的相關(guān)順序不是決定性的。尤其是,在失敗以前收到的最后一條消息被處理之前,父節(jié)點可能已經(jīng)重啟其子節(jié)點了。詳細信息請參見“討論:消息順序”。
- 終止 Hook
一個Actor終止后,其postStop hook將被調(diào)用,它可以用來,例如取消該actor在其它服務(wù)中的注冊。這個hook保證在該actor的消息隊列被禁止后才運行,即之后發(fā)給該actor的消息將被重定向到ActorSystem的deadLetters中。
3. 通過Actor Selection定位Actor
如Actor引用, 路徑與地址中所述,每個actor都擁有一個唯一的邏輯路徑,此路徑是由從actor系統(tǒng)的根開始的父子鏈構(gòu)成;它還擁有一個物理路徑,如果監(jiān)管鏈包含有遠程監(jiān)管者,此路徑可能會與邏輯路徑不同。這些路徑用來在系統(tǒng)中查找actor,例如,當收到一個遠程消息時查找收件者,但是它們更直接的用處在于:actor可以通過指定絕對或相對路徑(邏輯的或物理的)來查找其它的actor,并隨結(jié)果獲取一個ActorSelection:
// will look up this absolute path
context.actorSelection("/user/serviceA/aggregator")
// will look up sibling beneath same supervisor
context.actorSelection("../joe")
其中指定的路徑被解析為一個java.net.URI,它以/分隔成路徑段。如果路徑以/開始,表示一個絕對路徑,且從根監(jiān)管者("/user"的父親)開始查找;否則是從當前actor開始。如果某一個路徑段為..,會找到當前所遍歷到的actor的上一級,否則則會向下一級尋找具有該名字的子actor。 必須注意的是actor路徑中的..總是表示邏輯結(jié)構(gòu),即其監(jiān)管者。
一個actor selection的路徑元素中可能包含通配符,從而允許向匹配模式的集合廣播該條消息:
// will look all children to serviceB with names starting with worker
context.actorSelection("/user/serviceB/worker*")
// will look up all siblings beneath same supervisor
context.actorSelection("../*")
消息可以通過ActorSelection發(fā)送,并且在投遞每條消息時 ActorSelection的路徑都會被查找。如果selection不匹配任何actor,則消息將被丟棄。
要獲得ActorSelection的ActorRef,你需要發(fā)送一條消息到selection,然后使用答復(fù)消息的sender()引用即可。有一個內(nèi)置的Identify消息,所有actor會理解它并自動返回一個包含ActorRef的ActorIdentity消息。此消息被遍歷到的actor特殊處理為,如果一個具體的名稱查找失?。匆粋€不含通配符的路徑?jīng)]有對應(yīng)的活動actor),則會生成一個否定結(jié)果。請注意這并不意味著應(yīng)答消息有到達保證,它仍然是一個普通的消息。
import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated }
class Follower extends Actor {
val identifyId = 1
context.actorSelection("/user/another") ! Identify(identifyId)
def receive = {
case ActorIdentity(`identifyId`, Some(ref)) =>
context.watch(ref)
context.become(active(ref))
case ActorIdentity(`identifyId`, None) => context.stop(self)
}
def active(another: ActorRef): Actor.Receive = {
case Terminated(`another`) => context.stop(self)
}
}
你也可以通過ActorSelection的resolveOne方法獲取ActorSelection的一個ActorRef。如果存在這樣的actor,它將返回一個包含匹配的ActorRef的Future。如果沒有這樣的actor 存在或識別沒有在指定的時間內(nèi)完成,它將以失敗告終——akka.actor.ActorNotFound。
如果開啟了遠程調(diào)用,則遠程actor地址也可以被查找:
context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB")
4.發(fā)送消息
向actor發(fā)送消息需使用下列方法之一。
- !意思是“fire-and-forget”,即異步發(fā)送一個消息并立即返回。也稱為tell。
- ?異步發(fā)送一條消息并返回一個Future代表一個可能的回應(yīng)。也稱為ask。
對每一個消息發(fā)送者,分別有消息順序保證。
注意
使用ask有一些性能內(nèi)涵,因為需要跟蹤超時,需要有橋梁將Promise轉(zhuǎn)為ActorRef,并且需要在遠程情況下可訪問。所以為了性能應(yīng)該總選擇tell,除非只能選擇ask。
Tell: Fire-forget
這是發(fā)送消息的推薦方式。 不會阻塞地等待消息。它擁有最好的并發(fā)性和可擴展性。
actorRef ! message
如果是在一個Actor中調(diào)用 ,那么發(fā)送方的actor引用會被隱式地作為消息的sender(): ActorRef成員一起發(fā)送。目的actor可以使用它來向源actor發(fā)送回應(yīng), 使用sender() ! replyMsg。
如果不是從Actor實例發(fā)送的,sender成員缺省為 deadLetters actor引用。
Ask: Send-And-Receive-Future
ask模式既包含actor也包含future,所以它是一種使用模式,而不是ActorRef的方法:
import akka.pattern.{ ask, pipe }
import system.dispatcher // The ExecutionContext that will be used
case class Result(x: Int, s: String, d: Double)
case object Request
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val f: Future[Result] =
for {
x <- ask(actorA, Request).mapTo[Int] // call pattern directly
s <- (actorB ask Request).mapTo[String] // call by implicit conversion
d <- (actorC ? Request).mapTo[Double] // call by symbolic name
} yield Result(x, s, d)
f pipeTo actorD // .. or ..
pipe(f) to actorD
5.接收消息
Actor必須實現(xiàn)receive方法來接收消息:
protected def receive: PartialFunction[Any, Unit]
這個方法應(yīng)返回一個PartialFunction,例如一個“match/case”子句,消息可以與其中的不同分支進行scala模式匹配。如下例:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}
6.終止Actor
通過調(diào)用ActorRefFactory(即ActorContext或ActorSystem)的stop方法來終止一個actor。通常context用來終止子actor,而 system用來終止頂級actor。實際的終止操作是異步執(zhí)行的,即stop可能在actor被終止之前返回。
actor的終止分兩步: 第一步actor將掛起對郵箱的處理,并向所有子actor發(fā)送終止命令,然后處理來自子actor的終止消息直到所有的子actor都完成終止,最后終止自己(調(diào)用postStop,清空郵箱,向DeathWatch發(fā)布Terminated,通知其監(jiān)管者)。這個過程保證actor系統(tǒng)中的子樹以一種有序的方式終止,將終止命令傳播到葉子結(jié)點并收集它們回送的確認消息給被終止的監(jiān)管者。如果其中某個actor沒有響應(yīng)(即由于處理消息用了太長時間以至于沒有收到終止命令),整個過程將會被阻塞。
在ActorSystem.shutdown()被調(diào)用時,系統(tǒng)根監(jiān)管actor會被終止,以上的過程將保證整個系統(tǒng)的正確終止。
postStop() hook 是在actor被完全終止以后調(diào)用的。這是為了清理資源:
override def postStop() {
// clean up some resources ...
}
注意
由于actor的終止是異步的,你不能馬上使用你剛剛終止的子actor的名字;這會導(dǎo)致InvalidActorNameException。你應(yīng)該 監(jiān)視watch()正在終止的actor,并在Terminated最終到達后作為回應(yīng)創(chuàng)建它的替代者。
優(yōu)雅地終止
如果你需要等待終止過程的結(jié)束,或者組合若干actor的終止次序,可以使用gracefulStop:
import akka.pattern.gracefulStop
import scala.concurrent.Await
try {
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown)
Await.result(stopped, 6 seconds)
// the actor has been stopped
} catch {
// the actor wasn't stopped within 5 seconds
case e: akka.pattern.AskTimeoutException =>
}
object Manager {
case object Shutdown
}
class Manager extends Actor {
import Manager._
val worker = context.watch(context.actorOf(Props[Cruncher], "worker"))
def receive = {
case "job" => worker ! "crunch"
case Shutdown =>
worker ! PoisonPill
context become shuttingDown
}
def shuttingDown: Receive = {
case "job" => sender() ! "service unavailable, shutting down"
case Terminated(`worker`) =>
context stop self
}
}
當gracefulStop()成功返回時,actor的postStop() hook將會被執(zhí)行:在postStop()結(jié)束和gracefulStop()返回之間存在happens-before邊界。
在上面的示例中自定義的Manager.Shutdown消息是發(fā)送到目標actor來啟動actor的終止過程。你可以使用PoisonPill,但之后在停止目標actor之前,你與其他actor的互動的機會有限。在postStop中,可以處理簡單的清理任務(wù)。
警告
請記住,actor停止和其名稱被注銷是彼此異步發(fā)生的獨立事件。因此,在gracefulStop()返回后。你會發(fā)現(xiàn)其名稱仍可能在使用中。為了保證正確注銷,只在你控制的監(jiān)管者內(nèi),并且只在響應(yīng)Terminated消息時重用名稱,即不是用于頂級actor。
7.Become/Unbecome
升級
Akka支持在運行時對Actor消息循環(huán)(即其實現(xiàn))進行實時替換:在actor中調(diào)用context.become方法。become要求一個PartialFunction[Any, Unit]參數(shù)作為新的消息處理實現(xiàn)。 被替換的代碼被保存在一個棧中,可以被push和pop。
警告
請注意actor被其監(jiān)管者重啟后將恢復(fù)其最初的行為。
8.使用PartialFunction鏈來擴展actor
有時在一些actor中分享共同的行為,或通過若干小的函數(shù)構(gòu)成一個actor的行為是很有用的。這由于actor的receive方法返回一個Actor.Receive(PartialFunction[Any,Unit]的類型別名)而使之成為可能,多個偏函數(shù)可以使用PartialFunction#orElse鏈接在一起。你可以根據(jù)需要鏈接盡可能多的功能,但是你要牢記"第一個匹配"獲勝——這在組合可以處理同一類型的消息的功能時會很重要。
例如,假設(shè)你有一組actor是生產(chǎn)者Producers或消費者Consumers,然而有時候需要actor分享這兩種行為。這可以很容易實現(xiàn)而無需重復(fù)代碼,通過提取行為的特質(zhì)和并將actor的receive實現(xiàn)為這些偏函數(shù)的組合。
trait ProducerBehavior {
this: Actor =>
val producerBehavior: Receive = {
case GiveMeThings =>
sender() ! Give("thing")
}
}
trait ConsumerBehavior {
this: Actor with ActorLogging =>
val consumerBehavior: Receive = {
case ref: ActorRef =>
ref ! GiveMeThings
case Give(thing) =>
log.info("Got a thing! It's {}", thing)
}
}
class Producer extends Actor with ProducerBehavior {
def receive = producerBehavior
}
class Consumer extends Actor with ActorLogging with ConsumerBehavior {
def receive = consumerBehavior
}
class ProducerConsumer extends Actor with ActorLogging
with ProducerBehavior with ConsumerBehavior {
def receive = producerBehavior orElse consumerBehavior
}
// protocol
case object GiveMeThings
case class Give(thing: Any)
不同于繼承,相同的模式可以通過組合實現(xiàn)——可以簡單地通過委托的偏函數(shù)組合成receive方法。