Google Cloud Pub/Sub のデータをSpark Streamingで読む


今回はPub/SubのデータをSpark Streamingで読んでみたいと思います。

前回:Google Cloud Pub/Sub を複数のProjectにまたがって利用する。またそのときの課金について

Spark StreamingのPub/Subライブラリ

2017/7時点で最新であるSpark2.2.0の公式ドキュメントにはPub/Subの記述はありせんが、

Apache BahirというSparkの拡張を使うことでPub/Subも接続できるようになります。

http://bahir.apache.org/

https://github.com/apache/bahir

使い方

spark-shellで利用する例を載せておきます。

spark-shellを立ち上げてから入力しても良いですが、ここは-iオプションを使います。

まずは以下のようなファイルを作成しておきます。

import org.apache.spark.streaming.pubsub._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

val credentialFilePath = "/Users/tsukaby/.gcp/service-account-credential.json"
val projectId = "myproject-1"
val subscriptionId = "mySubscription"

val credential = SparkGCPCredentials.builder.jsonServiceAccount(credentialFilePath).build
val ssc = new StreamingContext(sc, Seconds(10))
val dStream = PubsubUtils.createStream(ssc, projectId, None, subscriptionId, credential, StorageLevel.MEMORY_ONLY)
dStream.foreachRDD(rdd => rdd.map(msg => new String(msg.getData, "UTF-8")).foreach(println))
ssc.start()
ssc.awaitTermination()

次に以下のコマンドのようにライブラリを取り込みつつ-iで上記のコードを実行します。

./bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.1.1 -i pubsub.scala

以上です。

これで指定したSubscriptionからメッセージが取れると思います。

注意点

PubsubInputDStreamは2.1.1の現時点で以下のようになっています。

https://github.com/apache/bahir/blob/v2.1.1-rc2/streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala

これはこのように(Link)メッセージを取得したら即座にackを返しています。つまりそのあとに実行されるDStream(RDD)のActionが正常に終了しなかった場合、メッセージの欠損につながります。本来であれば全てのActionが完了したあとでackを返すべきです。

実際にこのコードを利用するときはメッセージの欠損について検討する必要があります。

(この問題はKinesisのSparkStreamingモジュールにも同様に存在します)

まとめ

Spark StreamingでPub/Subからメッセージを読む方法と注意点について説明しました。