Akka ActorのExtensionを少し調べてみた
こんにちは、@s_tsukaです。
今回はAkka ActorのExtensionについて少し調べたり試したりしてみたので、その調査結果を書いていきます。
ActorのExtensionとは
公式ドキュメントは以下です。
http://doc.akka.io/docs/akka/2.4.0/scala/extending-akka.html
ActorSystemごとに1度ロードされるActorの拡張機能を作れる機能がActor Extensionsということらしいです。
正直よくわからないので、まずはサンプルコードを書いてみます。
Extensionのサンプル
上記の公式にサンプルがあるので、これを使って簡単なプログラムを書いてみます。
まずはbuild.sbtに
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.4.0" )
を追加して、その後Main.scalaを書きます。
import java.util.concurrent.atomic.AtomicLong import akka.actor._ object Main { def main(args: Array[String]): Unit = { val system1 = ActorSystem("my-system") val system2 = ActorSystem("my-system2") val actor1 = system1.actorOf(Props(classOf[MyActor])) val actor2 = system2.actorOf(Props(classOf[MyActor])) CountExtension(system1).increment() // a : 0 => 1 actor1 ! "msg" // a : 1 => 2 actor1 ! "msg" // a : 2 => 3 actor1 ! "msg" // a : 3 => 4 Thread.sleep(1000) actor2 ! "msg" // b : 0 => 1 system1.terminate() system2.terminate() } } class MyActor extends Actor { val counter = CountExtension(context.system) def receive = { case someMessage => val result = counter.increment() println(s"result = $result") } } class CountExtensionImpl extends Extension { private val counter = new AtomicLong(0) def increment() = counter.incrementAndGet() } object CountExtension extends ExtensionId[CountExtensionImpl] with ExtensionIdProvider { override def lookup = CountExtension override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl override def get(system: ActorSystem): CountExtensionImpl = super.get(system) }
これを動かすと以下のように出力されるかと思います。
result = 2 result = 3 result = 4 result = 1
Extensionによってメッセージを受け取るたびにカウンタを増加してはprintlnする・・というだけの特に意味のないプログラムです。
何回か”msg”を送っているのでその度数字が増えています。
サンプルの解説
CountExtensionはActorSystemのためのお決まりのコード、みたいなものです。CountExtensionはapplyを持っていて、ActorSystemを食わせるとCountExtensionImplを取得できます。
この部分ですね。
CountExtension(context.system)
拡張本体はCountExtensionImplで、これは単にカウンタを加算しているだけです。
拡張はActorSystemごとに1度だけロードされるので、上記のコードではsystem1とsystem2でそれぞれ別の拡張(object)を持っています。なので、出力がresult 2, 3, 4と来て1となっている訳ですね。
ActorSystemごとに1度だけロードと言っていますが、CountExtensionの初期化をActorSystem作成時に行うにはドキュメントに従って以下のようにconfigを追加します。
akka { extensions = ["docs.extension.CountExtension"] }
docs.extension.CountExtensionの部分は適宜、パッケージ名+クラス名に変更してください。
configなしの場合はExtensionのapply(ActorSystem)が実行されたときに初期化されるはずです。
なお、Extensionを使うとスレッドセーフでないことになると思います。詳細はこちらをご覧ください。
Actor Extensionはスレッドアンセーフな点に注意
Extensionを使うときは合わせてsynchronizedも常に念頭に置いておきたいですね。
Extensionの使いどころ
ここまででわりと簡単にExtensionを使えることがわかったかと思います。
ActorSystemごとに1インスタンスということもわかりました。
ですが、問題は使いどころです。場合によっては単にobject CountExtensionImplなどとして、普通にシングルトンのオブジェクトを作った方が良いかと思います。
どういうときにExtensionを使うべきなのか、OSSから学んでみます。
okumin/akka-persistence-sql-async
https://github.com/okumin/akka-persistence-sql-async
ScalikejdbcExtensionsというものを作っていますね。
これを別のActorから利用しています。
Settingの共有目的で使っているようですね。
ktoso/akka-raft
https://github.com/ktoso/akka-raft
DefaultLogCompactionというものを作っています。
RaftActor内でlogCompaction変数を作成して、最終的に以下の部分で使っているようです。
これはロジックを共有・共通化するために使っているようですね。
actorapp/actor-platform
https://github.com/actorapp/actor-platform
これがExtensionです。
各所からregisterを使っています。
servicesという状態を持ちつつ、自前で作成したactorにメッセージを送っています。
状態+処理共通化みたいな使われ方ですね。
結局明確な使いどころはわかりませんでしたが、状態を共有したいときや、メソッドを共有したいときに使うもののようです。個人的な意見ですが、Actorの拡張というよりはActorから利用可能な部品を定義する感じかなーと思いました。
ActorExtensionの使いどころとかメリットとか詳しい方いたらぜひ教えてください!