Google Cloud Pub/Sub Java Client Library 入門 (サンプルはScala)
2017/07/18
今回はGoogle Cloud Pub/SubのJava Client Libraryについてまとめていきます。これを使って実際にPub/Subを操作します。
クイックスタートのサンプルコード
クイックスタート: gcloud コマンドライン ツールの使用
クイックスタートにJavaコードが載っていますが、部分的なので完璧ではありません。SDKに慣れている人には良いのですが、自分の場合、認証からわからなかったのでいろいろ調べました。
その辺りから説明します。
ライブラリの解説とビルド
Cloud Pub/Sub Client Libraries
まずは詳しく解説しているページがあるのでこちらをご覧ください。
Client Libraryのコードは以下で公開されています。Issueも管理されているので困ったら調べると良いかもしれません。
https://github.com/GoogleCloudPlatform/google-cloud-java
PubSubはこちらです。
https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-pubsub
mavenはこちらです。
https://mvnrepository.com/artifact/com.google.cloud/google-cloud-pubsub
自分のユーザによる認証あるいはサービスアカウントのcredentialファイルによる認証
コードを書く前にまずは認証について説明します。
PubSub Topicにメッセージを送信するコードを動かす時、当然ですが、認証が必要です。GCE上などの場合はインスタンスのロール(?)によって認証突破しますが、自分のローカルPC上やオンプレミス環境ではそうはいきません。
そこで、上記のページでも解説されていますが、以下のコマンドを実行します。
% gcloud auth application-default login Your browser has been opened to visit: https://accounts.google.com/o/oauth2/auth?redirect_uri=... Credentials saved to file: [/Users/tsukaby/.config/gcloud/application_default_credentials.json] These credentials will be used by any library that requests Application Default Credentials.
credentialファイルが生成されます。これによってClient libraryが認証を突破できるようになります。(Client Libraryを使う時、上記のapplication_default_credentials.jsonを自動で読み込む)
この方法はローカルPC上での開発には良いですが、STGやPRD環境には向きません。このようなケースではサービスアカウントを使います。
ちなみに認証なしでプログラムを実行すると以下のような例外が出て、GCE上で実行するかGOOGLE_APPLICATION_CREDENTIALS環境変数を設定しろと言ってきます。この環境変数に設定するものがサービスアカウントのcredentialファイルのパスです。
Exception in thread "main" java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information. at com.google.auth.oauth2.DefaultCredentialsProvider.getDefaultCredentials(DefaultCredentialsProvider.java:111) at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:103) at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:76) at com.google.api.gax.core.GoogleCredentialsProvider.getCredentials(GoogleCredentialsProvider.java:54) at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:156) at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:79) at com.google.cloud.pubsub.v1.Publisher$Builder.build(Publisher.java:625) at PublisherMain$.main(PublisherMain.scala:17) at PublisherMain.main(PublisherMain.scala)
サービスアカウントの作成はこのあたりを参照してください。
https://cloud.google.com/iam/docs/creating-managing-service-accounts?hl=ja
それほど難しくないのでGUI上でやっても良いと思います。
以下のように必要事項を入力して作成します。
サービスアカウント名はpubsubとしましたが、本来はこのアカウントを利用するアプリケーション名が良いと思います。(アプリごとに権限を分けるはずなので)
役割はPub/Sub管理者としておき、絞っておきます。(他のGCEなどを操作する権限は不要なので)
秘密鍵をJSONで生成します。credentialファイルをDLできるはずなので、DLしたらローカルPC上の適当な場所に保存し
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credential.json
というように、DLしたファイルのパスを指定して環境変数を作ります。
これで認証は完了です。
あとは予めTopicとSubscriptionを作成しておきます。
次から実際にコードを書いていきます。
Publisherコード
maven, gradle, sbtなどでclient libraryの依存を書いて、以下のようにコードを書きます。以下はScalaですが、Javaなどでもほぼ同じです。
import com.google.api.core.{ ApiFutureCallback, ApiFutures } import com.google.api.gax.core.GoogleCredentialsProvider import com.google.cloud.pubsub.v1.Publisher import com.google.protobuf.ByteString import com.google.pubsub.v1.{ PubsubMessage, TopicName } import scala.collection.JavaConversions._ object PublisherMain { def main(args: Array[String]): Unit = { val projectName = "myProject" val topicName = "myTopic" val publisher: Publisher = Publisher.defaultBuilder(TopicName.create(projectName, topicName)) .setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply( Seq("https://www.googleapis.com/auth/pubsub")).build() ) .build() (0 until 10) foreach { num => val message = f"msg-${num}%03d" val data = ByteString.copyFromUtf8(message) val pubsubMessage = PubsubMessage.newBuilder.setData(data).build val messageIdFuture = publisher.publish(pubsubMessage) ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { System.out.println(s"published with message id: $messageId, msg: $message") } override def onFailure(t: Throwable): Unit = { System.out.println("failed to publish: " + t) } }) } publisher.shutdown() } }
実行結果。
published with message id: 135416101273060, msg: msg-001 published with message id: 135416101273061, msg: msg-002 published with message id: 135416101273062, msg: msg-003 published with message id: 135416101273063, msg: msg-004 published with message id: 135416101273064, msg: msg-005 published with message id: 135416101273065, msg: msg-006 published with message id: 135416101273066, msg: msg-007 published with message id: 135416101273067, msg: msg-008 published with message id: 135415004700240, msg: msg-000 published with message id: 135416027748863, msg: msg-009
publishできました。
Subscriberコード
import com.google.api.gax.core.GoogleCredentialsProvider import com.google.cloud.pubsub.v1.AckReplyConsumer import com.google.cloud.pubsub.v1.MessageReceiver import com.google.cloud.pubsub.v1.Subscriber import com.google.pubsub.v1.PubsubMessage import com.google.pubsub.v1.SubscriptionName import scala.collection.JavaConversions._ object SubscriberMain { def main(args: Array[String]): Unit = { val projectId = "myProject" val subscriptionId = "mySubscription" val subscriptionName = SubscriptionName.create(projectId, subscriptionId) // Instantiate an asynchronous message receiver val receiver = new MessageReceiver() { override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = { println(message.getData.toString("UTF-8")) consumer.ack() } } var subscriber: Subscriber = null try { subscriber = Subscriber.defaultBuilder(subscriptionName, receiver) .setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply( Seq("https://www.googleapis.com/auth/pubsub")).build() ) .build val a = subscriber.startAsync a.awaitTerminated() } finally { // stop receiving messages if (subscriber != null) { subscriber.stopAsync } } } }
実行結果。
msg-006 msg-002 msg-008 msg-003 msg-005 msg-000 msg-001 msg-004 msg-009 msg-007
取れました。簡単ですね。
解説
特にありませんが一応少し補足します。
.setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply( Seq("https://www.googleapis.com/auth/pubsub")).build() )
これはOAuth2によってaccess tokenを得る時に、権限をpubsubだけに絞っています。・・・がscopeの指定はレガシーな方式で、現在ではGCPにもIAMがありますので(実際今回Roleを使った)、必ずしも必要ではないようです。
ApiFutures.addCallback
これは内部的にはcom.google.api.core、つまりGoogle Guavaです。詳細はリンク先のGitHubからどうぞ。
Subscriberの方でconsumer.ack()を行っています。これを忘れるとデータが再送され続けてしまうのでご注意ください。
Publisherの方でデータを流し続けている状態でSubscriberを停止しようとしてもなかなかアプリが停止しないと思います。(SIGTERMを受け付けてもすぐに停止しない)
これはどうやらバグのようで以下のようなIssueがあがってました。
その他
Kinesis Client Libraryよりも簡単に書けそうな印象を受けました。Kinesis Client Libraryの方がモデルが少し複雑です。CheckpointのタイミングやShutdown時の処理、Workerの概念と何を行っているか把握する必要があるなど注意点が多かった覚えがあります。Pub/Sub Client Libraryは運用してはいませんが、調べた限りでは特に罠もなく、使いやすい印象を受けました。あえて言うならアプリの処理内容によってackを返すタイミングと応答受付期限に気をつける必要があるくらいでしょうか。まあ期限内、かつ最後にack返すだけですが・・・。
今回は使いませんでしたが、Topicを作成する操作なども行えるようです。
まとめ
Pub/Sub Client Libraryの紹介やリファレンス、認証、簡単な使い方について説明しました。
次回は複数のProject間でのPub/Subの操作・認証について解説していきます。