在<a href="http://www.itdecent.cn/p/f786b6ff91c7">之前的文章</a>中介紹了spring boot簡(jiǎn)化集成Akka,最近通過(guò)在項(xiàng)目中的實(shí)踐,又有的新的想法。
首先定義Akka配置類
import akka.actor.ActorSystem
import akka.actor.Props
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import com.sam.demo.akka.scala.facade.ProcessManagers
@Component
class AkkaConfig {
val system = ActorSystem("ReactiveEnterprise")
val processManagersRef = system.actorOf(Props[ProcessManagers].withDispatcher("my-thread-pool-dispatcher"), "processManagers")
@Bean
def processManagers = {
processManagersRef
}
@Bean
def worker = {
system.actorSelection("/user/processManagers/worker")
}
}
在配置類中,首先定義了ActorRef對(duì)象processManagersRef,作用是作為其他業(yè)務(wù)相關(guān)Actor的監(jiān)控者,避免在akka user節(jié)點(diǎn)下創(chuàng)建過(guò)多的Actor。 注意worker,是通過(guò)路徑查詢獲取。worker的創(chuàng)建放在processManagers中。當(dāng)AkkaConfig實(shí)例化時(shí),processManagersRef隨之實(shí)例化。
ProcessManagers如下:
import org.slf4j.LoggerFactory
import akka.actor.Actor
import akka.actor.Props
import com.sam.demo.akka.scala.Worker
class ProcessManagers extends Actor {
val logger = LoggerFactory.getLogger(getClass)
val worker = context.actorOf(Props[Worker].withDispatcher("my-thread-pool-dispatcher"), "worker")
def receive = {
case x: Any =>
}
}
該類無(wú)任務(wù)業(yè)務(wù)方法,其作用僅僅是創(chuàng)建了另一個(gè)Actor(worker),將來(lái)可擴(kuò)展ProcessManagers作為其他業(yè)務(wù)Actor的監(jiān)控者。
在該類實(shí)例化過(guò)程中,同時(shí)也創(chuàng)建了worker的ActorRef
Worker定義如下:
import akka.actor.Actor
import org.slf4j.LoggerFactory
import com.sam.demo.BeanFactory
import com.sam.demo.service.DomainService
class Worker extends Actor {
val logger = LoggerFactory.getLogger(getClass)
lazy val domainService = BeanFactory.buildFactory().getBean(classOf[DomainService])
def receive = {
case x: String => {
logger.info("x:{}", x)
val textMsg = new TextMessage(domainService.toUpper(x))
sender ! textMsg
}
}
}
通過(guò)BeanFactory獲得spring IOC容器中的業(yè)務(wù)Bean,從而實(shí)現(xiàn)業(yè)務(wù)方法的調(diào)用。
關(guān)鍵點(diǎn)來(lái)了,如何向Actor發(fā)消息? 給哪個(gè)Actor發(fā)消息? 之前的做法是發(fā)消息給ProcessManagers,ProcessManagers透?jìng)飨⒔oWorker,之所以這樣做,是因?yàn)闊o(wú)法將Worker的ActorRef引用注入到Spring IOC容器中的Bean中。
現(xiàn)在,在AkkaConfig中已經(jīng)將Worker的引用(注意: 通過(guò)system.actorSelection的返回類型是ActorSelection,不是ActorRef)暴露為一個(gè)Spring Bean,是需要將Worker的引用注入給調(diào)用方即可。
調(diào)用方如下:
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.sam.demo.akka.scala.TextMessage;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import akka.util.Timeout;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
@RestController
@RequestMapping(value={"/users"})
public class UserController {
@Autowired
private ActorSelection worker;
@SuppressWarnings({"rawtypes","unchecked"})
@RequestMapping(value={"/find"})
public String find(String id) throws Exception {
String uuid = UUID.randomUUID().toString();
Future future = Patterns.ask(worker, uuid, Timeout.apply(10L, TimeUnit.SECONDS));
TextMessage o = (TextMessage)Await.result(future, Duration.create(10, TimeUnit.SECONDS));
return o.msg();
}
}
現(xiàn)在實(shí)現(xiàn)了調(diào)用方直接將消息發(fā)給業(yè)務(wù)Actor,由原來(lái)2次消息發(fā)送變成了1次。
代碼其他部分保持不變,其他細(xì)節(jié)參考<a href="http://www.itdecent.cn/p/f786b6ff91c7">Spring非常規(guī)集成Akka</a>