【Akka】Akka入門編程實(shí)例

引言

這篇文章主要是第一次學(xué)習(xí)Akka編程,先試試水,探探坑,對Akka和SBT的使用有一個直觀的了解,以幾個簡單的akka編程實(shí)例來說明akka的使用。希望在日后的學(xué)習(xí)和編程中,能有更多自己的體會和經(jīng)驗(yàn)總結(jié)來分享。

Actor模型

Actor實(shí)例可以想象成是服務(wù)器上的Web服務(wù),你無法控制,只能通過發(fā)送消息去請求執(zhí)行任務(wù)或查詢信息,而不能直接在Web服務(wù)中修改狀態(tài)或者處理資源。通過發(fā)送不可改變的消息,雖然看上去有些限制,但是可以很簡單安全的編寫并發(fā)程序。

Actor系統(tǒng)的形象理解

一個actor是基于Actor系統(tǒng)的最小單元,就像面向?qū)ο笙到y(tǒng)中的對象實(shí)例一樣,它也封裝了狀態(tài)和行為。我們無法窺探actor內(nèi)部的信息,只能通過發(fā)送消息來請求狀態(tài)信息(就像是問一個人,他感覺如何)。actor中有一個存放不可變狀態(tài)信息的信箱。我們通過發(fā)送信息和actor進(jìn)行通信,當(dāng)actor收到信息之后,它會運(yùn)用相關(guān)算法來處理具體的信息。
在一個應(yīng)用程序中,多個actor構(gòu)成了一套層級系統(tǒng),像是一個家族或者一個商業(yè)組織。一個actor可以認(rèn)為是一個商業(yè)組織的個人。一個actor有一個父親,稱為監(jiān)督者(supervisor),還有好多孩子,可以認(rèn)為,在一個商業(yè)組織中,主席(actor)下面有多個副主席,副主席也有很多下屬隨從。
Actor系統(tǒng)的最佳實(shí)踐是“委派任務(wù)”,尤其是當(dāng)actor的行為被阻塞的時候。可以想象,在實(shí)際商業(yè)活動中,主席將要做的工作分配給下面的幾個副主席去分別執(zhí)行,而副主席也會將子任務(wù)分配給自己的隨從,直到該任務(wù)被下屬們執(zhí)行完畢。

處理故障

Actor模型的一個重要內(nèi)容是處理故障。在工作工程中,如果出現(xiàn)錯誤或者拋出異常,actor和其子actor都將暫停,然后發(fā)送一條信息給監(jiān)督者(supervisor)actor,報(bào)告出現(xiàn)故障的信號。
根據(jù)工作任務(wù)和故障的性質(zhì),監(jiān)督者actor將會作出幾種選擇:

  • 恢復(fù)下屬actor,保留內(nèi)部狀態(tài)
  • 重啟下屬actor,清空狀態(tài)
  • 終止下屬actor
  • 上報(bào)故障

Hello,Actor實(shí)例

現(xiàn)在我用一個最簡單的actor編程實(shí)例來介紹akka編程,先給出代碼:

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props

class HelloActor extends Actor{
  def receive = {
    case "hello"  => println("hello back to you.")
    case _        => println("huh?")
  }
}

object Test1_HelloActor extends App {
  // actor need an ActorSystem
  val system = ActorSystem("HelloSystem")
  // create and start the actor
  val helloActor = system.actorOf(Props[HelloActor], name="helloActor")
  // send two messages
  helloActor ! "hello"
  helloActor ! "what"
  // shutdown the actor system
  system.shutdown
}

代碼注解:

  • Actor由HelloActor定義
  • HelloActor的行為有receive方法定義實(shí)現(xiàn),其中使用了模式匹配表達(dá)式
  • HelloActor接收字符串hello作為消息,做出相應(yīng)打印動作
  • Test1_HelloActor的object用來測試actor
  • ActorSystem接收一個name參數(shù),并且通過system.actorOf創(chuàng)建actor實(shí)例
  • 創(chuàng)建Actor實(shí)例名為helloActor,其構(gòu)造函數(shù)沒有參數(shù)
  • Actor創(chuàng)建后自動運(yùn)行,不需調(diào)用start或者run方法
  • 通過!方法來發(fā)送消息

ActorSystem

一個actor system是actors的層級集團(tuán),分享公共配置信息(比如分發(fā)器dispatchers,部署deployments,遠(yuǎn)程功能remote capabilities,地址addresses)。它同時也是創(chuàng)建和查詢actors的入口。ActorSystem是為你的應(yīng)用程序分配線程資源的結(jié)構(gòu)。

ActorRef

當(dāng)你調(diào)用ActorSystemactorOf方法時,將創(chuàng)建并返回一個ActorRef的實(shí)例:
def actorOf(props: Props, name: String): ActorRef。

這個引用用來處理actor,你可以將其看做是處理實(shí)際actor的代理人(broker)或包裝外觀(facade)。ActorRef防止你破壞Actor模型,比如直接處理Actor實(shí)例,或直接修改Actor實(shí)例中的變量。所以只能通過給actor發(fā)送消息方式來執(zhí)行任務(wù),這種“袖手旁觀(不干涉,hands-off)”的方法幫助鞏固適宜的編程實(shí)踐。

ActorRef有以下特點(diǎn):

  • 它是不可變的
  • 它與actor實(shí)體是一對一的關(guān)系
  • 它是可序列化的,網(wǎng)絡(luò)可感知的。這使得你可以在網(wǎng)絡(luò)環(huán)境中傳送一個ActorRef

Actor之間的通信實(shí)例

下面給出的是兩個actor實(shí)例相互發(fā)送消息進(jìn)行通信的PingPong示例:

import akka.actor._

case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage

class Ping(pong: ActorRef) extends Actor{
  var count = 0
  def incrementAndPrint {count += 1; println(s"$count:ping")}
  def receive = {
    case StartMessage =>
      incrementAndPrint
      pong ! PongMessage
    case PingMessage =>
      incrementAndPrint
      if(count > 99) {
        sender ! StopMessage
        println("ping stopped")
        context.stop(self)
      }
      else
        sender ! PongMessage
    case _ => println("Ping got unexpected information")
  }
}

class Pong extends Actor {
  var count = 0
  def receive = {
    case StopMessage =>
      println("pong stopped")
      context.stop(self)
    case PongMessage =>
      count += 1
      println(s"$count:pong")
      sender ! PingMessage
    case _ => println("Pong got unexpected information")
  }
}

object PingPangTest extends App{
  val system = ActorSystem("PingPongTest")
  val pongActor = system.actorOf(Props[Pong], name="pong")
  val pingActor = system.actorOf(Props(new Ping(pongActor)),
                                  name = "ping")
  pingActor ! StartMessage
}

代碼注釋:

  • 創(chuàng)建ActorSystem之后;
  • 創(chuàng)建Pong的actor實(shí)例(pongActor對象其實(shí)是ActorRef的實(shí)例);
  • 之后創(chuàng)建Ping的actor實(shí)例,其構(gòu)造函數(shù)接受ActorRef參數(shù);
  • 通過給pingActor發(fā)送一個StartMessage消息來啟動pingActor和pongActor的具體動作;
  • Ping Actor和Pong Actor通過PingMessage和PongMessage相互發(fā)送消息,sender用來引用消息發(fā)送源Actor;
  • Ping通過計(jì)數(shù),知道進(jìn)行了100次消息的發(fā)送之后,發(fā)送StopMessage來終止actor。分別調(diào)用自己的context.stop方法來結(jié)束

啟動Actor

在ActorSystem層面,通過調(diào)用system.actorOf方法來創(chuàng)建actors;在actor內(nèi)部,通過調(diào)用context.actorOf方法來創(chuàng)建子actor。
下面給出一個ParentChild示例:

import akka.actor._

case class CreateChild (name: String)
case class Name (name: String)

class Child extends Actor {
  var name = "No name"
  override def postStop: Unit = {
    println(s"D'oh! They killed me ($name): ${self.path}")
  }
  def receive = {
    case Name(name) => this.name = name
    case _ => println(s"Child $name got message.")
  }
}

class Parent extends Actor {
  def receive = {
    case CreateChild(name) =>
      // Parent creates a new Child here
      println(s"Parent about to create Child ($name) ...")
      val child = context.actorOf(Props[Child], name=s"$name")
      child ! Name(name)
    case _ => println(s"Parent got some other message.")
  }
}

object ParentChildDemo extends App{
  val actorSystem = ActorSystem("ParentChildTest")
  val parent = actorSystem.actorOf(Props[Parent], name="Parent")

  // send messages to Parent to create to child actors
  parent ! CreateChild("XiaoMing")
  parent ! CreateChild("XiaoLiang")
  Thread.sleep(500)

  // lookup XiaoMing, the kill it
  println("Sending XiaoMing a PoisonPill ... ")
  val xiaoming = actorSystem.actorSelection("/user/Parent/XiaoMing")
  xiaoming ! PoisonPill
  println("XiaoMing was killed")

  Thread.sleep(5000)
  actorSystem.shutdown
}

打印結(jié)果:

Parent about to create Child (XiaoMing) ...
Parent about to create Child (XiaoLiang) ...
Sending XiaoMing a PoisonPill ...
XiaoMing was killed
D'oh! They killed me (XiaoMing): akka://ParentChildTest/user/Parent/XiaoMing
D'oh! They killed me (XiaoLiang): akka://ParentChildTest/user/Parent/XiaoLiang

終止Actor

在ActorSystem層面,通過system.stop(actorRef)來終止一個actor;在actor內(nèi)部,使用context.stop(actorRef)來結(jié)束一個actor。
如果當(dāng)前有正在處理的消息,對該消息的處理將在actor被終止之前完成,但是郵箱中的后續(xù)消息將不會被處理。缺省情況下這些消息會被送到 ActorSystem的dead letter mailbox, 但是這取決于郵箱的實(shí)現(xiàn)。

actor終止的相關(guān)處理

actor的終止分兩步: 第一步actor將停止對郵箱的處理,向所有子actor發(fā)送終止命令,然后處理來自子actor的終止消息直到所有的子actor都完成終止, 最后終止自己(調(diào)用postStop,銷毀郵箱,向DeathWatch發(fā)布Terminated,通知其監(jiān)管者)。這個過程保證actor系統(tǒng)中的子樹以一種有序的方式終止,將終止命令傳播到葉子結(jié)點(diǎn)并收集它們回送的確認(rèn)消息給被終止的監(jiān)管者。如果其中某個actor沒有響應(yīng)(i.e.由于處理消息用了太長時間以至于沒有收到終止命令), 整個過程將會被阻塞。

在 ActorSystem.shutdown被調(diào)用時, 系統(tǒng)根監(jiān)管actor會被終止,以上的過程將保證整個系統(tǒng)的正確終止。

postStop() hook是在actor被完全終止以后調(diào)用的。這是為了清理資源:

override def postStop() = {
  // 關(guān)閉文件或數(shù)據(jù)庫連接
}

PoisonPill和gracefulStop

還有其他兩種方式,發(fā)送PoisonPill消息或者使用gracefulStop終止。
你也可以向actor發(fā)送akka.actor.PoisonPill消息,這個消息處理完成后actor會被終止。PoisonPill與普通消息一樣被放進(jìn)隊(duì)列,因此會在已經(jīng)入隊(duì)列的其它消息之后被執(zhí)行。

如果你想等待終止過程的結(jié)束,或者組合若干actor的終止次序,可以使用gracefulStop。
下面給出gracefulStop的代碼示例:

import akka.actor._
import akka.pattern.gracefulStop
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

case object TestActorStop

class TestActor extends Actor {
  def receive = {
    case TestActorStop =>
      context.stop(self)
    case _ => println("TestActor got message")
  }
  override def postStop {println("TestActor: postStop")}
}

object GracefulStopTest extends App{
  val system = ActorSystem("GracefulStopTest")
  val testActor = system.actorOf(Props[TestActor], name="TestActor")
   // try to stop the actor graceful
  try {
    val stopped: Future[Boolean] = gracefulStop(testActor, 2 seconds, TestActorStop)
    Await.result(stopped, 3 seconds)
    println("testActor was stopped")
  } catch {
    case e: akka.pattern.AskTimeoutException => e.printStackTrace
  } finally {
    system.shutdown
  }
}

gracefulStop(actorRef, timeout)將返回一個Future實(shí)例,當(dāng)目標(biāo)actor有處理相關(guān)終止動作的消息時,會執(zhí)行成功。
上面示例中,通過發(fā)送TestActorStop消息來終止actor;如果沒有處理終止的工作,當(dāng)超過2s后,F(xiàn)uture拋出akka.pattern.AskTimeoutException異常。默認(rèn)情況下,gracefulStop將發(fā)送PoisonPill消息。

Kill消息

當(dāng)深入Akka actors,我們將認(rèn)識監(jiān)督者策略(supervisor strategies)概念。當(dāng)實(shí)現(xiàn)了監(jiān)督者策略,向actor發(fā)送一個Kill消息,這可以用來重新啟動actor。如果使用默認(rèn)的監(jiān)督者策略,Kill消息將終止目標(biāo)actor。
下面是示例代碼:

import akka.actor._

class Number5 extends Actor {
  def receive = {
    case _ => println("Number 5 got a message")
  }
  override def preStart { println("Number 5 is alive")}
  override def postStop { println("Number 5::postStop called")}
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println("Number 5::preRestart called")
  }
  override def postRestart(reason: Throwable): Unit = {
    println("Number 5::postRestart called")
  }
}

object KillTest extends App{
  val system = ActorSystem("KillTestSystem")
  val number5 = system.actorOf(Props[Number5], name="Number5")
  number5 ! "hello"
  number5 ! Kill
  system.shutdown
}

打印的信息:

Number 5 is alive
Number 5 got a message
[ERROR] [01/17/2016 19:20:09.342] [KillTestSystem-akka.actor.default-dispatcher-3] [akka://KillTestSystem/user/Number5] Kill (akka.actor.ActorKilledException)
Number 5::postStop called

轉(zhuǎn)載請注明作者Jason Ding及其出處
Github博客主頁(http://jasonding1354.github.io/)
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.itdecent.cn/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進(jìn)入我的博客主頁

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容