Google Cloud Pub/Sub Java Client Library 鍏ラ杸 (銈点兂銉椼儷銇疭cala)


浠婂洖銇Google Cloud Pub/Sub銇甁ava Client Library銇仱銇勩仸銇俱仺銈併仸銇勩亶銇俱仚銆傘亾銈屻倰浣裤仯銇﹀疅闅涖伀Pub/Sub銈掓搷浣溿仐銇俱仚銆

鍓嶅洖锛Google Cloud Pub/Sub 鍏ラ杸

銈偆銉冦偗銈广偪銉笺儓銇偟銉炽儣銉偝銉笺儔

銈偆銉冦偗銈广偪銉笺儓: gcloud 銈炽優銉炽儔銉┿偆銉 銉勩兗銉伄浣跨敤

銈偆銉冦偗銈广偪銉笺儓銇獼ava銈炽兗銉夈亴杓夈仯銇︺亜銇俱仚銇屻侀儴鍒嗙殑銇伄銇у畬鐠с仹銇亗銈娿伨銇涖倱銆係DK銇叄銈屻仸銇勩倠浜恒伀銇壇銇勩伄銇с仚銇屻佽嚜鍒嗐伄鍫村悎銆佽獚瑷笺亱銈夈倧銇嬨倝銇亱銇c仧銇仹銇勩倣銇勩倣瑾裤伖銇俱仐銇熴

銇濄伄杈恒倞銇嬨倝瑾槑銇椼伨銇欍

銉┿偆銉栥儵銉伄瑙h銇ㄣ儞銉儔

Cloud Pub/Sub Client Libraries

銇俱仛銇┏銇椼亸瑙h銇椼仸銇勩倠銉氥兗銈搞亴銇傘倠銇仹銇撱仭銈夈倰銇旇Η銇忋仩銇曘亜銆

Client Library銇偝銉笺儔銇互涓嬨仹鍏枊銇曘倢銇︺亜銇俱仚銆侷ssue銈傜鐞嗐仌銈屻仸銇勩倠銇仹鍥般仯銇熴倝瑾裤伖銈嬨仺鑹亜銇嬨倐銇椼倢銇俱仜銈撱

https://github.com/GoogleCloudPlatform/google-cloud-java

PubSub銇亾銇°倝銇с仚銆

https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-pubsub

maven銇亾銇°倝銇с仚銆

https://mvnrepository.com/artifact/com.google.cloud/google-cloud-pubsub

鑷垎銇儲銉笺偠銇倛銈嬭獚瑷笺亗銈嬨亜銇偟銉笺儞銈广偄銈偊銉炽儓銇甤redential銉曘偂銈ゃ儷銇倛銈嬭獚瑷

銈炽兗銉夈倰鏇搞亸鍓嶃伀銇俱仛銇獚瑷笺伀銇ゃ亜銇﹁鏄庛仐銇俱仚銆

PubSub Topic銇儭銉冦偦銉笺偢銈掗佷俊銇欍倠銈炽兗銉夈倰鍕曘亱銇欐檪銆佸綋鐒躲仹銇欍亴銆佽獚瑷笺亴蹇呰銇с仚銆侴CE涓娿仾銇┿伄鍫村悎銇偆銉炽偣銈裤兂銈广伄銉兗銉(?)銇倛銇c仸瑾嶈绐佺牬銇椼伨銇欍亴銆佽嚜鍒嗐伄銉兗銈儷PC涓娿倓銈兂銉椼儸銉熴偣鐠板銇с伅銇濄亞銇亜銇嶃伨銇涖倱銆

銇濄亾銇с佷笂瑷樸伄銉氥兗銈搞仹銈傝В瑾仌銈屻仸銇勩伨銇欍亴銆佷互涓嬨伄銈炽優銉炽儔銈掑疅琛屻仐銇俱仚銆

% gcloud auth application-default login
Your browser has been opened to visit:

 https://accounts.google.com/o/oauth2/auth?redirect_uri=...



Credentials saved to file: [/Users/tsukaby/.config/gcloud/application_default_credentials.json]

These credentials will be used by any library that requests
Application Default Credentials.

credential銉曘偂銈ゃ儷銇岀敓鎴愩仌銈屻伨銇欍傘亾銈屻伀銈堛仯銇lient library銇岃獚瑷笺倰绐佺牬銇с亶銈嬨倛銇嗐伀銇倞銇俱仚銆(Client Library銈掍娇銇嗘檪銆佷笂瑷樸伄application_default_credentials.json銈掕嚜鍕曘仹瑾伩杈笺個)

銇撱伄鏂规硶銇儹銉笺偒銉玃C涓娿仹銇枊鐧恒伀銇壇銇勩仹銇欍亴銆丼TG銈凱RD鐠板銇伅鍚戙亶銇俱仜銈撱傘亾銇倛銇嗐仾銈便兗銈广仹銇銈点兗銉撱偣銈€偒銈︺兂銉銈掍娇銇勩伨銇欍

銇°仾銇裤伀瑾嶈銇仐銇с儣銉偘銉┿儬銈掑疅琛屻仚銈嬨仺浠ヤ笅銇倛銇嗐仾渚嬪銇屽嚭銇︺丟CE涓娿仹瀹熻銇欍倠銇婫OOGLE_APPLICATION_CREDENTIALS鐠板澶夋暟銈掕ō瀹氥仐銈嶃仺瑷銇c仸銇嶃伨銇欍傘亾銇挵澧冨鏁般伀瑷畾銇欍倠銈傘伄銇屻偟銉笺儞銈广偄銈偊銉炽儓銇甤redential銉曘偂銈ゃ儷銇儜銈广仹銇欍

Exception in thread "main" java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
 at com.google.auth.oauth2.DefaultCredentialsProvider.getDefaultCredentials(DefaultCredentialsProvider.java:111)
 at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:103)
 at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:76)
 at com.google.api.gax.core.GoogleCredentialsProvider.getCredentials(GoogleCredentialsProvider.java:54)
 at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:156)
 at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:79)
 at com.google.cloud.pubsub.v1.Publisher$Builder.build(Publisher.java:625)
 at PublisherMain$.main(PublisherMain.scala:17)
 at PublisherMain.main(PublisherMain.scala)

銈点兗銉撱偣銈€偒銈︺兂銉堛伄浣滄垚銇亾銇亗銇熴倞銈掑弬鐓с仐銇︺亸銇犮仌銇勩

https://cloud.google.com/iam/docs/creating-managing-service-accounts?hl=ja

銇濄倢銇汇仼闆c仐銇忋仾銇勩伄銇UI涓娿仹銈勩仯銇︺倐鑹亜銇ㄦ濄亜銇俱仚銆

浠ヤ笅銇倛銇嗐伀蹇呰浜嬮爡銈掑叆鍔涖仐銇︿綔鎴愩仐銇俱仚銆

銈点兗銉撱偣銈€偒銈︺兂銉堝悕銇痯ubsub銇ㄣ仐銇俱仐銇熴亴銆佹湰鏉ャ伅銇撱伄銈€偒銈︺兂銉堛倰鍒╃敤銇欍倠銈€儣銉偙銉笺偡銉с兂鍚嶃亴鑹亜銇ㄦ濄亜銇俱仚銆傦紙銈€儣銉仈銇ㄣ伀妯╅檺銈掑垎銇戙倠銇仛銇伄銇э級

褰瑰壊銇疨ub/Sub绠$悊鑰呫仺銇椼仸銇娿亶銆佺禐銇c仸銇娿亶銇俱仚銆傦紙浠栥伄GCE銇仼銈掓搷浣溿仚銈嬫ī闄愩伅涓嶈銇伄銇э級

绉樺瘑閸点倰JSON銇х敓鎴愩仐銇俱仚銆俢redential銉曘偂銈ゃ儷銈扗L銇с亶銈嬨伅銇氥仾銇仹銆丏L銇椼仧銈夈儹銉笺偒銉玃C涓娿伄閬╁綋銇牬鎵銇繚瀛樸仐

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credential.json

銇ㄣ亜銇嗐倛銇嗐伀銆丏L銇椼仧銉曘偂銈ゃ儷銇儜銈广倰鎸囧畾銇椼仸鐠板澶夋暟銈掍綔銈娿伨銇欍

銇撱倢銇ц獚瑷笺伅瀹屼簡銇с仚銆

銇傘仺銇簣銈乀opic銇⊿ubscription銈掍綔鎴愩仐銇︺亰銇嶃伨銇欍

娆°亱銈夊疅闅涖伀銈炽兗銉夈倰鏇搞亜銇︺亜銇嶃伨銇欍

Publisher銈炽兗銉

maven, gradle, sbt銇仼銇lient library銇緷瀛樸倰鏇搞亜銇︺佷互涓嬨伄銈堛亞銇偝銉笺儔銈掓浉銇嶃伨銇欍備互涓嬨伅Scala銇с仚銇屻丣ava銇仼銇с倐銇汇伡鍚屻仒銇с仚銆

import com.google.api.core.{ ApiFutureCallback, ApiFutures }
import com.google.api.gax.core.GoogleCredentialsProvider
import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
import com.google.pubsub.v1.{ PubsubMessage, TopicName }

import scala.collection.JavaConversions._

object PublisherMain {
  def main(args: Array[String]): Unit = {
    val projectName = "myProject"
    val topicName = "myTopic"

    val publisher: Publisher = Publisher.defaultBuilder(TopicName.create(projectName, topicName))
      .setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply(
        Seq("https://www.googleapis.com/auth/pubsub")).build()
      )
      .build()

    (0 until 10) foreach { num =>
      val message = f"msg-${num}%03d"
      val data = ByteString.copyFromUtf8(message)
      val pubsubMessage = PubsubMessage.newBuilder.setData(data).build
      val messageIdFuture = publisher.publish(pubsubMessage)

      ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback[String]() {
        override def onSuccess(messageId: String): Unit = {
          System.out.println(s"published with message id: $messageId, msg: $message")
        }

        override def onFailure(t: Throwable): Unit = {
          System.out.println("failed to publish: " + t)
        }
      })
    }

    publisher.shutdown()
  }
}

瀹熻绲愭灉銆

published with message id: 135416101273060, msg: msg-001
published with message id: 135416101273061, msg: msg-002
published with message id: 135416101273062, msg: msg-003
published with message id: 135416101273063, msg: msg-004
published with message id: 135416101273064, msg: msg-005
published with message id: 135416101273065, msg: msg-006
published with message id: 135416101273066, msg: msg-007
published with message id: 135416101273067, msg: msg-008
published with message id: 135415004700240, msg: msg-000
published with message id: 135416027748863, msg: msg-009

publish銇с亶銇俱仐銇熴

Subscriber銈炽兗銉

import com.google.api.gax.core.GoogleCredentialsProvider
import com.google.cloud.pubsub.v1.AckReplyConsumer
import com.google.cloud.pubsub.v1.MessageReceiver
import com.google.cloud.pubsub.v1.Subscriber
import com.google.pubsub.v1.PubsubMessage
import com.google.pubsub.v1.SubscriptionName

import scala.collection.JavaConversions._

object SubscriberMain {
  def main(args: Array[String]): Unit = {

    val projectId = "myProject"
    val subscriptionId = "mySubscription"
    val subscriptionName = SubscriptionName.create(projectId, subscriptionId)

    // Instantiate an asynchronous message receiver
    val receiver = new MessageReceiver() {
      override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
        println(message.getData.toString("UTF-8"))
        consumer.ack()
      }
    }

    var subscriber: Subscriber = null
    try {
      subscriber = Subscriber.defaultBuilder(subscriptionName, receiver)
        .setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply(
          Seq("https://www.googleapis.com/auth/pubsub")).build()
        )
        .build
      val a = subscriber.startAsync
      a.awaitTerminated()
    } finally {
      // stop receiving messages
      if (subscriber != null) {
        subscriber.stopAsync
      }
    }
  }
}

瀹熻绲愭灉銆

msg-006
msg-002
msg-008
msg-003
msg-005
msg-000
msg-001
msg-004
msg-009
msg-007

鍙栥倢銇俱仐銇熴傜啊鍗樸仹銇欍伃銆

瑙h

鐗广伀銇傘倞銇俱仜銈撱亴涓蹇滃皯銇楄瓒炽仐銇俱仚銆

.setCredentialsProvider(GoogleCredentialsProvider.newBuilder().setScopesToApply(
  Seq("https://www.googleapis.com/auth/pubsub")).build()
)

銇撱倢銇疧Auth2銇倛銇c仸access token銈掑緱銈嬫檪銇佹ī闄愩倰pubsub銇犮亼銇禐銇c仸銇勩伨銇欍傘兓銉汇兓銇宻cope銇寚瀹氥伅銉偓銈枫兗銇柟寮忋仹銆佺従鍦ㄣ仹銇疓CP銇倐IAM銇屻亗銈娿伨銇欍伄銇(瀹熼殯浠婂洖Role銈掍娇銇c仧)銆佸繀銇氥仐銈傚繀瑕併仹銇仾銇勩倛銇嗐仹銇欍

ApiFutures.addCallback

銇撱倢銇唴閮ㄧ殑銇伅com.google.api.core銆併仱銇俱倞Google Guava銇с仚銆傝┏绱般伅銉兂銈厛銇瓽itHub銇嬨倝銇┿亞銇炪

Subscriber銇柟銇onsumer.ack()銈掕銇c仸銇勩伨銇欍傘亾銈屻倰蹇樸倢銈嬨仺銉囥兗銈裤亴鍐嶉併仌銈岀稓銇戙仸銇椼伨銇嗐伄銇с仈娉ㄦ剰銇忋仩銇曘亜銆

Publisher銇柟銇с儑銉笺偪銈掓祦銇楃稓銇戙仸銇勩倠鐘舵厠銇ubscriber銈掑仠姝€仐銈堛亞銇ㄣ仐銇︺倐銇亱銇亱銈€儣銉亴鍋滄銇椼仾銇勩仺鎬濄亜銇俱仚銆(SIGTERM銈掑彈銇戜粯銇戙仸銈傘仚銇愩伀鍋滄銇椼仾銇勶級

銇撱倢銇仼銇嗐倓銈夈儛銈般伄銈堛亞銇т互涓嬨伄銈堛亞銇狪ssue銇屻亗銇屻仯銇︺伨銇椼仧銆

Cannot stop subscriber

銇濄伄浠

Kinesis Client Library銈堛倞銈傜啊鍗樸伀鏇搞亼銇濄亞銇嵃璞°倰鍙椼亼銇俱仐銇熴侹inesis Client Library銇柟銇屻儮銉囥儷銇屽皯銇楄闆戙仹銇欍侰heckpoint銇偪銈ゃ儫銉炽偘銈凷hutdown鏅傘伄鍑︾悊銆乄orker銇蹇点仺浣曘倰琛屻仯銇︺亜銈嬨亱鎶婃彙銇欍倠蹇呰銇屻亗銈嬨仾銇╂敞鎰忕偣銇屽銇嬨仯銇熻銇堛亴銇傘倞銇俱仚銆侾ub/Sub Client Library銇亱鐢ㄣ仐銇︺伅銇勩伨銇涖倱銇屻佽銇广仧闄愩倞銇с伅鐗广伀缃犮倐銇亸銆佷娇銇勩倓銇欍亜鍗拌薄銈掑彈銇戙伨銇椼仧銆傘亗銇堛仸瑷銇嗐仾銈夈偄銉椼儶銇嚘鐞嗗唴瀹广伀銈堛仯銇ck銈掕繑銇欍偪銈ゃ儫銉炽偘銇ㄥ繙绛斿彈浠樻湡闄愩伀姘椼倰銇ゃ亼銈嬪繀瑕併亴銇傘倠銇忋倝銇勩仹銇椼倗銇嗐亱銆傘伨銇傛湡闄愬唴銆併亱銇ゆ渶寰屻伀ack杩斻仚銇犮亼銇с仚銇屻兓銉汇兓銆

浠婂洖銇娇銇勩伨銇涖倱銇с仐銇熴亴銆乀opic銈掍綔鎴愩仚銈嬫搷浣溿仾銇┿倐琛屻亪銈嬨倛銇嗐仹銇欍

銇俱仺銈

Pub/Sub Client Library銇垂浠嬨倓銉儠銈°儸銉炽偣銆佽獚瑷笺佺啊鍗樸仾浣裤亜鏂广伀銇ゃ亜銇﹁鏄庛仐銇俱仐銇熴

娆″洖銇鏁般伄Project闁撱仹銇甈ub/Sub銇搷浣溿兓瑾嶈銇仱銇勩仸瑙h銇椼仸銇勩亶銇俱仚銆

娆″洖锛Google Cloud Pub/Sub 銈掕鏁般伄Project銇伨銇熴亴銇c仸鍒╃敤銇欍倠銆傘伨銇熴仢銇仺銇嶃伄瑾查噾銇仱銇勩仸