Scala 的 Actor 并发模型介绍

Actor 是什么

Actor 是一种以通信和事件机制完成并行编程的并发模型,它诞生的目的是为了简化原始的以共享数据和锁机制的多线程并行的实现方式。简而言之,Actor 能让你更容易的写出复杂的并行程序。

Scala-Actor 库的历史

在 2.10 版本之前的 Scala 上,Actor API 以及类库都是其语言内置库提供的,在包 scala.actors 中。
但是在 2.10.0 版本时 Scala-Actor 已经被移除。所以,我会从旧版的 Scala-Actor API 开始过渡到新版 Actor API。

搭建开发环境

准备以下:

  1. SBT 插件(参考这篇文章)
  2. JDK1.7(不能是1.8或者更高的版本)

注意:因为要使用旧版本的 Scala,所以不能用太新的 JDK。如果你的系统已经存在更新的 JDK 版本,请临时切换为 1.7 版本,否则 Scala 编译器将无法编译成功任何一个 Actor 程序。

另外,如果你想在高版本 Scala 上依赖这个独立的 scala-actor 库来使用旧版 Actor API 的话,对不起也不行,至少在我这边是不成功的。

SBT 配置(使用 Scala2.9 的最后一个版本):

// ...
scalaVersion := "2.9.3"

自动切换 JDK

如果在学习旧 Actor 的过程中又必须要在其他 SBT 项目中用到新的 JDK,得来回重命名目录,有点麻烦,那么写个脚本一键自动切换不就行了。

#! /usr/bin/env bash

SOFT_DIR="/data/soft"
JDK_LAST="jdk.last"
JDK_OLD="jdk1.7"
readonly SOFT_DIR JDK_LAST JDK_OLD
-v(){
    echo "JDK_VERSION:${1} Switched!"
    java -version
}
if [ -e ${SOFT_DIR}/${JDK_OLD} ];then
    mv "${SOFT_DIR}/jdk" "${SOFT_DIR}/${JDK_LAST}"
    mv "${SOFT_DIR}/${JDK_OLD}" "${SOFT_DIR}/jdk"
    -v "1.7"
else
    mv "${SOFT_DIR}/jdk" "${SOFT_DIR}/${JDK_OLD}"
    mv "${SOFT_DIR}/${JDK_LAST}" "${SOFT_DIR}/jdk"
    -v "Lasted"
fi;

Actor 的基本用法

扩展 Actor trait

import actors._

object actor01 extends Actor {
  override def act(): Unit = {
    for (i <- 0 to 5) {
      println("I'm acting!" + i)
    }
  }

  def main(args: Array[String]): Unit = {
    start()
  }
}

扩展了 Actor trait 的 object 必须重写 act 方法。然后可以将异步代码放入当中。
start 方法启动,就跟 Java 中的 Thread.start 方法类似。当然这个例子没有体现出 Actor 的任何好处,只是一个最基本的扩展 Actor 例子而已。

使用 actor 函数构造异步代码

// 导入相关包
import actors.Actor._
// 使用 actor 函数(此处的代码放到 main 方法中)
actor {
  for (i <- 0 to 5) {
    println(i)
  }
}

用 actor 函数构造的 actor 代码会立刻被执行,简化了简单的异步代码编写过程。

传递消息和处理消息

val echoActor: Actor = actor {
  var flag = true
  while (flag) {
    receive {
      case msg =>
        println("received: " + msg)
        if (msg == "exit")
          flag = false
    }
  }
}

def main(args: Array[String]): Unit = {
  echoActor ! "我是一条消息~"
  echoActor ! "我是第二条消息~"
  echoActor ! "我是第三条消息~"
  echoActor ! "exit"
}

actor 方法构造 actor 对象,receive 函数监听和处理消息(此处的 msg 被推倒为 String 类型)。receive 响应消息以后就结束,所以需要死循环持续保持消息的处理。
actor 对象的 ! 方法表示传递消息,给接收函数(此处的 receive 代码块)消费。

处理指定类型的消息

receive {
  case msg: Int => println(msg)
  case msg: String => println(msg)
  case (msg: String, status: Int) => println(msg, status)
}

上面对三种消息类型进行了匹配处理,分别是 Int、String 和 Tuple2[String, Int],不同的消息类型分别做不同的处理。
如果传递有上述三种类型以外的消息,receive 不做如何处理,继续阻塞直到匹配到相应类型的消息。

使用 react 函数替代 receive

override def act(): Unit = {
  loop {
    react {
      case (name: String, actor: Actor) =>
        actor ! name
      case msg => println("Unhandled message: " + msg)
    }
  }
}

将 receive 函数换成 react 函数来响应消息,不同点在于 receive 函数时每一个 Actor 都必须有自己的线程,才能保证每一次 act 方法都能被执行,也就是需要一个原生线程来阻塞函数执行。而 react 方法由于不需要返回,无需保留当前线程的调用栈,所以无需阻塞代码的执行,等到有消息需要被处理的时候再临时分配线程,继续被任意一个被唤醒的线程重用,即它是异步执行的且有相当高的线程复用率,甚至理论上只需要一个线程就可以做到 Actor 的功能。它的优点是资源消耗非常小,适合承载高 IO 场景下的大量并发。
由于 react 的特性,无法用 while(true) 来正常控制流程,所以 actor 库提供了 loop 函数,loop 的参数表示被重复执行的代码块,即使是调用 react 也可以。所以此处用的是 loop 函数而不是 while(true)。

待更新。