《Akka学习笔记:ACTORS介绍》
《Akka学习笔记:Actor消息传递(1)》
《Akka学习笔记:Actor消息传递(2)》
《Akka学习笔记:日志》
《Akka学习笔记:测试Actors》
《Akka学习笔记:Actor消息处理-请求和响应(1) 》
《Akka学习笔记:Actor消息处理-请求和响应(2) 》
《Akka学习笔记:ActorSystem(配置)》
《Akka学习笔记:ActorSystem(调度)》
调度
正如你在ActorSystem中的API看到的,如下:
//Light-weight scheduler for running asynchronous tasks after some deadline in the future. def scheduler : Scheduler
在 ActorSystem 中有大量的方法调用scheduler,而scheduler返回的是Scheduler。Scheduler中有大量的schedule方法,利用他们我们可以在Actor环境下做大量的有趣的事情。
A、SCHEDULE SOMETHING TO EXECUTE ONCE
在我们的Student-Teacher例子里面,假如在我们的测试用例程序中StudentActor想在接收到InitSignal消息后的5秒中之后才发送消息给Teacher。我们的代码看起来像这样的:
class StudentDelayedActor (teacherActorRef:ActorRef) extends Actor with ActorLogging { def receive = { case InitSignal=> { import context.dispatcher context.system.scheduler.scheduleOnce(5 seconds, teacherActorRef, QuoteRequest) //teacherActorRef!QuoteRequest } ... ... } }
1、测试用例
我们来写个测试用例来测试这个:
"A delayed student" must { "fire the QuoteRequest after 5 seconds when an InitSignal is sent to it" in { import me.rerun.akkanotes.messaging.protocols.StudentProtocol._ val teacherRef = system.actorOf(Props[TeacherActor], "teacherActorDelayed") val studentRef = system.actorOf(Props(new StudentDelayedActor(teacherRef)), "studentDelayedActor") EventFilter.info (start="Printing from Student Actor", occurrences=1).intercept{ studentRef!InitSignal } } }
2、将Eventfilter interception的超时时间增大
EventFilter在等待消息在 EventStream 中出现的默认的超时时间是3秒。我们将它增加到7秒来测试我们的程序, 我们可以通过filter-leeway 配置属性实现。
class RequestResponseTest extends TestKit(ActorSystem("TestUniversityMessageSystem", ConfigFactory.parseString(""" akka{ loggers = ["akka.testkit.TestEventListener"] test{ filter-leeway = 7s } } """))) with WordSpecLike with MustMatchers with BeforeAndAfterAll with ImplicitSender { ... ...
B. SCHEDULE SOMETHING TO EXECUTE REPEATEDLY
为了能够重复地运行任务,你可以用Scheduler的schedule 方法。最常用的schedule方法将定期地给Actor发送消息,它接收四个参数:
1、在第一次运行的时候需要等待多少时间;
2、子序列循序的频率;
3、我们想发送消息的目标ActorRef ;
4、消息
case InitSignal=> { import context.dispatcher context.system.scheduler.schedule(0 seconds, 5 seconds, teacherActorRef, QuoteRequest) //teacherActorRef!QuoteRequest }
琐事
在这里引入import context.dispatcher非常重要。schedule方法需要一个很重要的隐形参数ExecutionContext,查看schedule 方法的实现就知道原因很明显
final def schedule( initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = schedule(initialDelay, interval, new Runnable { def run = { receiver ! message if (receiver.isTerminated) throw new SchedulerException("timer active for terminated actor") } })
schedule 方法仅仅在Runnable中包装了tell,而它最后被我们传进来的ExecutionContext所调用。为了使得ExecutionContext 在这个范围内隐式可用,我们利用了上下文中可用的隐式dispatcher。可以从 ActorCell.scala (Context)代码里面看到
/** * Returns the dispatcher (MessageDispatcher) that is used for this Actor. * Importing this member will place an implicit ExecutionContext in scope. */ implicit def dispatcher: ExecutionContextExecutor本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Akka学习笔记:ActorSystem(调度)】(https://www.iteblog.com/archives/1166.html)