Google Cloud Pub/Sub Java Client Library 入門 (サンプルはScala)


今回はGoogle Cloud Pub/SubのJava Client Libraryについてまとめていきます。これを使って実際にPub/Subを操作します。

前回:Google Cloud Pub/Sub 入門

クイックスタートのサンプルコード

クイックスタート: gcloud コマンドライン ツールの使用

クイックスタートにJavaコードが載っていますが、部分的なので完璧ではありません。SDKに慣れている人には良いのですが、自分の場合、認証からわからなかったのでいろいろ調べました。

その辺りから説明します。

ライブラリの解説とビルド

Cloud Pub/Sub Client Libraries

まずは詳しく解説しているページがあるのでこちらをご覧ください。

Client Libraryのコードは以下で公開されています。Issueも管理されているので困ったら調べると良いかもしれません。

https://github.com/GoogleCloudPlatform/google-cloud-java

PubSubはこちらです。

https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-pubsub

mavenはこちらです。

https://mvnrepository.com/artifact/com.google.cloud/google-cloud-pubsub

自分のユーザによる認証あるいはサービスアカウントのcredentialファイルによる認証

コードを書く前にまずは認証について説明します。

PubSub Topicにメッセージを送信するコードを動かす時、当然ですが、認証が必要です。GCE上などの場合はインスタンスのロール(?)によって認証突破しますが、自分のローカルPC上やオンプレミス環境ではそうはいきません。

そこで、上記のページでも解説されていますが、以下のコマンドを実行します。

% gcloud auth application-default login
Your browser has been opened to visit:

 https://accounts.google.com/o/oauth2/auth?redirect_uri=...



Credentials saved to file: [/Users/tsukaby/.config/gcloud/application_default_credentials.json]

These credentials will be used by any library that requests
Application Default Credentials.

credentialファイルが生成されます。これによってClient libraryが認証を突破できるようになります。(Client Libraryを使う時、上記のapplication_default_credentials.jsonを自動で読み込む)

この方法はローカルPC上での開発には良いですが、STGやPRD環境には向きません。このようなケースではサービスアカウントを使います。

ちなみに認証なしでプログラムを実行すると以下のような例外が出て、GCE上で実行するかGOOGLE_APPLICATION_CREDENTIALS環境変数を設定しろと言ってきます。この環境変数に設定するものがサービスアカウントのcredentialファイルのパスです。

Exception in thread "main" java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
 at com.google.auth.oauth2.DefaultCredentialsProvider.getDefaultCredentials(DefaultCredentialsProvider.java:111)
 at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:103)
 at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:76)
 at com.google.api.gax.core.GoogleCredentialsProvider.getCredentials(GoogleCredentialsProvider.java:54)
 at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:156)
 at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:79)
 at com.google.cloud.pubsub.v1.Publisher$Builder.build(Publisher.java:625)
 at PublisherMain$.main(PublisherMain.scala:17)
 at PublisherMain.main(PublisherMain.scala)

サービスアカウントの作成はこのあたりを参照してください。

https://cloud.google.com/iam/docs/creating-managing-service-accounts?hl=ja

それほど難しくないのでGUI上でやっても良いと思います。

以下のように必要事項を入力して作成します。

サービスアカウント名はpubsubとしましたが、本来はこのアカウントを利用するアプリケーション名が良いと思います。(アプリごとに権限を分けるはずなので)

役割はPub/Sub管理者としておき、絞っておきます。(他のGCEなどを操作する権限は不要なので)

秘密鍵をJSONで生成します。credentialファイルをDLできるはずなので、DLしたらローカルPC上の適当な場所に保存し

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credential.json

というように、DLしたファイルのパスを指定して環境変数を作ります。

これで認証は完了です。

あとは予めTopicとSubscriptionを作成しておきます。

次から実際にコードを書いていきます。

Publisherコード

maven, gradle, sbtなどでclient libraryの依存を書いて、以下のようにコードを書きます。以下はScalaですが、Javaなどでもほぼ同じです。

import com.google.api.core.{ ApiFutureCallback, ApiFutures }
import com.google.api.gax.core.GoogleCredentialsProvider
import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
import com.google.pubsub.v1.{ PubsubMessage, TopicName }

import scala.collection.JavaConversions._

object PublisherMain {
  def main(args: Array[String]): Unit = {
    val projectName = "myProject"
    val topicName = "myTopic"

    val publisher: Publisher = Publisher.defaultBuilder(TopicName.create(projectName, topicName))
      .setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply(
        Seq("https://www.googleapis.com/auth/pubsub")).build()
      )
      .build()

    (0 until 10) foreach { num =>
      val message = f"msg-${num}%03d"
      val data = ByteString.copyFromUtf8(message)
      val pubsubMessage = PubsubMessage.newBuilder.setData(data).build
      val messageIdFuture = publisher.publish(pubsubMessage)

      ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback[String]() {
        override def onSuccess(messageId: String): Unit = {
          System.out.println(s"published with message id: $messageId, msg: $message")
        }

        override def onFailure(t: Throwable): Unit = {
          System.out.println("failed to publish: " + t)
        }
      })
    }

    publisher.shutdown()
  }
}

実行結果。

published with message id: 135416101273060, msg: msg-001
published with message id: 135416101273061, msg: msg-002
published with message id: 135416101273062, msg: msg-003
published with message id: 135416101273063, msg: msg-004
published with message id: 135416101273064, msg: msg-005
published with message id: 135416101273065, msg: msg-006
published with message id: 135416101273066, msg: msg-007
published with message id: 135416101273067, msg: msg-008
published with message id: 135415004700240, msg: msg-000
published with message id: 135416027748863, msg: msg-009

publishできました。

Subscriberコード

import com.google.api.gax.core.GoogleCredentialsProvider
import com.google.cloud.pubsub.v1.AckReplyConsumer
import com.google.cloud.pubsub.v1.MessageReceiver
import com.google.cloud.pubsub.v1.Subscriber
import com.google.pubsub.v1.PubsubMessage
import com.google.pubsub.v1.SubscriptionName

import scala.collection.JavaConversions._

object SubscriberMain {
  def main(args: Array[String]): Unit = {

    val projectId = "myProject"
    val subscriptionId = "mySubscription"
    val subscriptionName = SubscriptionName.create(projectId, subscriptionId)

    // Instantiate an asynchronous message receiver
    val receiver = new MessageReceiver() {
      override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
        println(message.getData.toString("UTF-8"))
        consumer.ack()
      }
    }

    var subscriber: Subscriber = null
    try {
      subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
        .setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply(
          Seq("https://www.googleapis.com/auth/pubsub")).build()
        )
        .build
      val a = subscriber.startAsync
      a.awaitTerminated()
    } finally {
      // stop receiving messages
      if (subscriber != null) {
        subscriber.stopAsync
      }
    }
  }
}

実行結果。

msg-006
msg-002
msg-008
msg-003
msg-005
msg-000
msg-001
msg-004
msg-009
msg-007

取れました。簡単ですね。

解説

特にありませんが一応少し補足します。

.setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply(
  Seq("https://www.googleapis.com/auth/pubsub")).build()
)

これはOAuth2によってaccess tokenを得る時に、権限をpubsubだけに絞っています。・・・がscopeの指定はレガシーな方式で、現在ではGCPにもIAMがありますので(実際今回Roleを使った)、必ずしも必要ではないようです。

ApiFutures.addCallback

これは内部的にはcom.google.api.core、つまりGoogle Guavaです。詳細はリンク先のGitHubからどうぞ。

Subscriberの方でconsumer.ack()を行っています。これを忘れるとデータが再送され続けてしまうのでご注意ください。

Publisherの方でデータを流し続けている状態でSubscriberを停止しようとしてもなかなかアプリが停止しないと思います。(SIGTERMを受け付けてもすぐに停止しない)

これはどうやらバグのようで以下のようなIssueがあがってました。

Cannot stop subscriber

その他

Kinesis Client Libraryよりも簡単に書けそうな印象を受けました。Kinesis Client Libraryの方がモデルが少し複雑です。CheckpointのタイミングやShutdown時の処理、Workerの概念と何を行っているか把握する必要があるなど注意点が多かった覚えがあります。Pub/Sub Client Libraryは運用してはいませんが、調べた限りでは特に罠もなく、使いやすい印象を受けました。あえて言うならアプリの処理内容によってackを返すタイミングと応答受付期限に気をつける必要があるくらいでしょうか。まあ期限内、かつ最後にack返すだけですが・・・。

今回は使いませんでしたが、Topicを作成する操作なども行えるようです。

まとめ

Pub/Sub Client Libraryの紹介やリファレンス、認証、簡単な使い方について説明しました。

次回は複数のProject間でのPub/Subの操作・認証について解説していきます。

次回:Google Cloud Pub/Sub を複数のProjectにまたがって利用する。またそのときの課金について