Google Cloud Pub/Sub のデータをSpark Streamingで読む
今回はPub/SubのデータをSpark Streamingで読んでみたいと思います。
前回:Google Cloud Pub/Sub を複数のProjectにまたがって利用する。またそのときの課金について
Spark StreamingのPub/Subライブラリ
2017/7時点で最新であるSpark2.2.0の公式ドキュメントにはPub/Subの記述はありせんが、
Apache BahirというSparkの拡張を使うことでPub/Subも接続できるようになります。
https://github.com/apache/bahir
使い方
spark-shellで利用する例を載せておきます。
spark-shellを立ち上げてから入力しても良いですが、ここは-iオプションを使います。
まずは以下のようなファイルを作成しておきます。
import org.apache.spark.streaming.pubsub._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
val credentialFilePath = "/Users/tsukaby/.gcp/service-account-credential.json"
val projectId = "myproject-1"
val subscriptionId = "mySubscription"
val credential = SparkGCPCredentials.builder.jsonServiceAccount(credentialFilePath).build
val ssc = new StreamingContext(sc, Seconds(10))
val dStream = PubsubUtils.createStream(ssc, projectId, None, subscriptionId, credential, StorageLevel.MEMORY_ONLY)
dStream.foreachRDD(rdd => rdd.map(msg => new String(msg.getData, "UTF-8")).foreach(println))
ssc.start()
ssc.awaitTermination()
次に以下のコマンドのようにライブラリを取り込みつつ-iで上記のコードを実行します。
./bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.1.1 -i pubsub.scala
以上です。
これで指定したSubscriptionからメッセージが取れると思います。
注意点
PubsubInputDStreamは2.1.1の現時点で以下のようになっています。
これはこのように(Link)メッセージを取得したら即座にackを返しています。つまりそのあとに実行されるDStream(RDD)のActionが正常に終了しなかった場合、メッセージの欠損につながります。本来であれば全てのActionが完了したあとでackを返すべきです。
実際にこのコードを利用するときはメッセージの欠損について検討する必要があります。
(この問題はKinesisのSparkStreamingモジュールにも同様に存在します)
まとめ
Spark StreamingでPub/Subからメッセージを読む方法と注意点について説明しました。