Akka ActorのExtensionを少し調べてみた


こんにちは、@s_tsukaです。

今回はAkka ActorのExtensionについて少し調べたり試したりしてみたので、その調査結果を書いていきます。

ActorのExtensionとは

公式ドキュメントは以下です。

http://doc.akka.io/docs/akka/2.4.0/scala/extending-akka.html

ActorSystemごとに1度ロードされるActorの拡張機能を作れる機能がActor Extensionsということらしいです。

正直よくわからないので、まずはサンプルコードを書いてみます。

Extensionのサンプル

上記の公式にサンプルがあるので、これを使って簡単なプログラムを書いてみます。

まずはbuild.sbtに

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.0"
)

を追加して、その後Main.scalaを書きます。

import java.util.concurrent.atomic.AtomicLong

import akka.actor._

object Main {
  def main(args: Array[String]): Unit = {
    val system1 = ActorSystem("my-system")
    val system2 = ActorSystem("my-system2")

    val actor1 = system1.actorOf(Props(classOf[MyActor]))
    val actor2 = system2.actorOf(Props(classOf[MyActor]))

    CountExtension(system1).increment() // a : 0 => 1
    actor1 ! "msg" // a : 1 => 2
    actor1 ! "msg" // a : 2 => 3
    actor1 ! "msg" // a : 3 => 4

    Thread.sleep(1000)
    actor2 ! "msg" // b : 0 => 1

    system1.terminate()
    system2.terminate()
  }
}

class MyActor extends Actor {
  val counter = CountExtension(context.system)

  def receive = {
    case someMessage =>
      val result = counter.increment()
      println(s"result = $result")
  }
}

class CountExtensionImpl extends Extension {
  private val counter = new AtomicLong(0)

  def increment() = counter.incrementAndGet()
}

object CountExtension
  extends ExtensionId[CountExtensionImpl]
  with ExtensionIdProvider {

  override def lookup = CountExtension

  override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl

  override def get(system: ActorSystem): CountExtensionImpl = super.get(system)
}

これを動かすと以下のように出力されるかと思います。

result = 2
result = 3
result = 4
result = 1

Extensionによってメッセージを受け取るたびにカウンタを増加してはprintlnする・・というだけの特に意味のないプログラムです。

何回か”msg”を送っているのでその度数字が増えています。

サンプルの解説

CountExtensionはActorSystemのためのお決まりのコード、みたいなものです。CountExtensionはapplyを持っていて、ActorSystemを食わせるとCountExtensionImplを取得できます。

この部分ですね。

CountExtension(context.system)

拡張本体はCountExtensionImplで、これは単にカウンタを加算しているだけです。

拡張はActorSystemごとに1度だけロードされるので、上記のコードではsystem1とsystem2でそれぞれ別の拡張(object)を持っています。なので、出力がresult 2, 3, 4と来て1となっている訳ですね。

ActorSystemごとに1度だけロードと言っていますが、CountExtensionの初期化をActorSystem作成時に行うにはドキュメントに従って以下のようにconfigを追加します。

akka {
  extensions = ["docs.extension.CountExtension"]
}

docs.extension.CountExtensionの部分は適宜、パッケージ名+クラス名に変更してください。

configなしの場合はExtensionのapply(ActorSystem)が実行されたときに初期化されるはずです。

なお、Extensionを使うとスレッドセーフでないことになると思います。詳細はこちらをご覧ください。

Actor Extensionはスレッドアンセーフな点に注意

Extensionを使うときは合わせてsynchronizedも常に念頭に置いておきたいですね。

Extensionの使いどころ

ここまででわりと簡単にExtensionを使えることがわかったかと思います。

ActorSystemごとに1インスタンスということもわかりました。

ですが、問題は使いどころです。場合によっては単にobject CountExtensionImplなどとして、普通にシングルトンのオブジェクトを作った方が良いかと思います。

どういうときにExtensionを使うべきなのか、OSSから学んでみます。

okumin/akka-persistence-sql-async

https://github.com/okumin/akka-persistence-sql-async

ScalikejdbcExtensionsというものを作っていますね。

https://github.com/okumin/akka-persistence-sql-async/blob/964962e2e7d09b164a14e7a532b6712cc344181c/core/src/main/scala/akka/persistence/common/ScalikeJDBCExtension.scala

これを別のActorから利用しています。

https://github.com/okumin/akka-persistence-sql-async/blob/fda57d61bec1a23389913a14dfbd5ae288da102c/core/src/main/scala/akka/persistence/common/StoragePlugin.scala#L13-L14

Settingの共有目的で使っているようですね。

ktoso/akka-raft

https://github.com/ktoso/akka-raft

DefaultLogCompactionというものを作っています。

https://github.com/ktoso/akka-raft/blob/6fdfeaa79eb710ef167f5bf2198dea426a927079/src/main/scala/pl/project13/scala/akka/raft/compaction/DefaultLogCompactionSupport.scala

RaftActor内でlogCompaction変数を作成して、最終的に以下の部分で使っているようです。

https://github.com/ktoso/akka-raft/blob/6fdfeaa79eb710ef167f5bf2198dea426a927079/src/main/scala/pl/project13/scala/akka/raft/SharedBehaviors.scala#L148

これはロジックを共有・共通化するために使っているようですね。

actorapp/actor-platform

https://github.com/actorapp/actor-platform

https://github.com/actorapp/actor-platform/blob/10052cce5178eb624db91e203c7c88f0a3a8c227/actor-server/actor-rpc-api/src/main/scala/im/actor/server/api/rpc/RpcApiExtension.scala

これがExtensionです。

各所からregisterを使っています。

servicesという状態を持ちつつ、自前で作成したactorにメッセージを送っています。

状態+処理共通化みたいな使われ方ですね。

結局明確な使いどころはわかりませんでしたが、状態を共有したいときや、メソッドを共有したいときに使うもののようです。個人的な意見ですが、Actorの拡張というよりはActorから利用可能な部品を定義する感じかなーと思いました。

ActorExtensionの使いどころとかメリットとか詳しい方いたらぜひ教えてください!