Amazon Kinesisの解説およびCLI操作


こんにちは、@s_tsukaです。

今回はAWSのサービスであるAmazon Kinesisについて簡単に紹介、またCLIで操作する方法について説明したいと思います。ちなみに私は仕事で1年ほど利用した経験があります。

KinesisはAWS web consoleで操作しても良いのですが、Web consoleでは一部の操作しか行えません。例えば、「試しに1件レコードを追加してみる」、といった操作は自分でAWS SDKを使ってプログラミングするか、CLIを使って操作するしかありません。ですのでCLI操作を知っておくと開発段階で少しやりやすいかと思います。

基本・help

今回利用するAWS CLIは以下のURLのものです。

https://aws.amazon.com/jp/cli/

pythonをインストールするとpipも付いてきますので、あとはpip install awscliするだけですね。簡単。他にもACCESS_KEYとSECRET_KEYがないと認証・認可できませんが、このあたりはググればたくさん導入記事が出ますので、そちらを参考にしてください。

インストールするとawsコマンドおよびサブコマンドが使えるようになります。

% aws help

AWS()                                                                    AWS()



NAME
       aws -

DESCRIPTION
       The  AWS  Command  Line  Interface is a unified tool to manage your AWS
       services.

SYNOPSIS
          aws [options] <command> <subcommand> [parameters]

       Use aws command help for information on a  specific  command.  Use  aws
       help topics to view a list of available help topics.

OPTIONS
       --debug (boolean)

...(長いので省略)

ここでAVAILABLE SERVICESというセクションがあり、そこにkinesisと書いてあります。これがサブコマンドです。

どうやって使うのかわからない場合は以下のようにしましょう。

% aws kinesis help

KINESIS()                                                            KINESIS()



NAME
       kinesis -

DESCRIPTION
       Amazon  Kinesis  is  a managed service that scales elastically for real
       time processing of streaming big data.

AVAILABLE COMMANDS
       o add-tags-to-stream

       o create-stream

       o delete-stream

...(長いので省略)

AVAILABLE COMMANDSがあるので、さらにこれを付けてあげます。

% aws kinesis create-stream help

...(中略)

SYNOPSIS
            create-stream
          --stream-name <value>
          --shard-count <value>
          [--cli-input-json <value>]
          [--generate-cli-skeleton]

...(後略)

やっと色々分かるようになりました。基本的にはこうやってhelpを見て、使い方を覚えればOKです。上記の例だと、kinesisのStreamを作成するには aws kinesis create-stream —stream-name foo —shard-count 1 というようにすれば良いことがわかりますね。

Streamの準備

以下のコマンドを叩くとストリームの一覧が取得できます。まだ何も作成していないので何もありません。

% aws kinesis list-streams
{
    "StreamNames": []
}

実験用のStreamを作成して、今後はそれを操作することにしましょう。

—stream-nameは適当にお好きなものを、—shard-countは今回は1で良いですが、利用する環境によって増やす必要があるので、以下のURLあたりを読んでおくと良いです。

https://aws.amazon.com/jp/kinesis/streams/pricing/

% aws kinesis create-stream --stream-name test-stream --shard-count 1

書き込むデータサイズと書き込む頻度によってshardを増やす必要があります。

Streamが作成されれば準備完了なのですが、一応最後に確認しておきましょう。

% aws kinesis describe-stream --stream-name test-stream
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE",
        "StreamName": "test-stream",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:985988196667:stream/test-stream",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49561517114387968943330123805292965859691429103514157058"
                }
            }
        ]
    }
}

describe-streamで状態を確認できます。StreamStatusがACTIVEなので、利用可能です。Stream作成直後はCREATINGになっていると思います。

Streamに値を投入する

では実際に値を投入します。こちらの利用方法も割と簡単です。

% aws kinesis put-record --stream-name test-stream --data '{"name":"tsukaby"}' --partition-key a
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49561517114387968943330123805736641635490145001592061954"
}

3つの引数を付けるだけですね。—dataはJSONにしましたが、別にplain textでもCSVでもTSVでもMessage packでも何でもOKです。個人的にはJSONをお勧めします。

自分はTSVで運用したこともありましたが、JSONと比較して投入するデータサイズが減るというメリット以外はデメリットばかりでちょっと辛かったです。値を見てもフィールドの意味がすぐに分からなかったり、新しい値を追加するときの拡張性が悪かったり、他色々。Message packの方がデータサイズは減りますが、まあそのあたりは用途とかと相談で。

—partition-keyは今は適当にaにしました。1 shardなので問題ありませんが、2 shard以上利用する方はご注意ください。データを投入するたびに適切なkeyを付けないと、ずっと同じshardだけにデータが投入されることになってしまいます。このあたりは別途公式のドキュメントなどを読み込むと良いと思います。

簡単に言うと例えば1個のStreamが2個のshardを持っている場合、内部的にはShard1はHashKey 0から2^127 - 1番までのデータを格納、Shard2は2^127から2^128 - 1番までのデータを格納、というように担当が決まっています。—partition-keyは格納するshardを特定するためのkeyです。ここを常に同じにしてしまうと同じshardにしかデータが入らないので注意がいります。実際にAWS SDKなどでPutRecordするときは、毎回MD5とかで適当なHashを計算すればOKです。

Streamから値を取得する

ここは少し面倒です。put-recordではStreamに対して操作を行いましたが、get-recordではShardに対して操作を行わなければいけません。

% aws kinesis get-records help

(中略)

SYNOPSIS
            get-records
          --shard-iterator <value>
          [--limit <value>]
          [--cli-input-json <value>]
          [--generate-cli-skeleton]

(後略)

% aws kinesis get-shard-iterator help

(中略)

SYNOPSIS
 get-shard-iterator
 --stream-name <value>
 --shard-id <value>
 --shard-iterator-type <value>
 [--starting-sequence-number <value>]
 [--cli-input-json <value>]
 [--generate-cli-skeleton]

(後略)

get-recordsにはshard-iteratorが必要で、shard-iteratorにはshard-idが必要です・・・。shard-idはdescribeで分かります。では順を追ってコマンドを実行してみます。

% aws kinesis describe-stream --stream-name test-stream
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE",
        "StreamName": "test-stream",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:985988196667:stream/test-stream",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49561517114387968943330123805292965859691429103514157058"
                }
            }
        ]
    }
}

% aws kinesis get-shard-iterator --stream-name test-stream --shard-id shardId-000000000000 --shard-iterator-type LATEST
{
    "ShardIterator": "AAAAAAAAAAHtAaqRRMBOE3f0Gt5PrQW5jE4tAcprR+I9uHqHfAqeYGZm2IkUs7VOJDJVo+yAcGFDteVtTESH4ADH5DUszYhhd1D8RWkeGpr3fmZvnU3zmrJJwWrEs3rPAezqK+RnFKOvZV0HZoANfXFfIK5FHjqK/eN2ugofUbYITj2GGrr42n977K9/PCbgOjmNCqc0Z/5S6fny2j7srFOBww4QV4oR"
}


% aws kinesis get-records --shard-iterator AAAAAAAAAAHtAaqRRMBOE3f0Gt5PrQW5jE4tAcprR+I9uHqHfAqeYGZm2IkUs7VOJDJVo+yAcGFDteVtTESH4ADH5DUszYhhd1D8RWkeGpr3fmZvnU3zmrJJwWrEs3rPAezqK+RnFKOvZV0HZoANfXFfIK5FHjqK/eN2ugofUbYITj2GGrr42n977K9/PCbgOjmNCqc0Z/5S6fny2j7srFOBww4QV4oR
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAFj1dv5ha75IV+fx67uKZmAy+b1coMHczCPw6c5Z87Gu9wweVOeBO1oCDhIU785oBvkQ+guJ3rido8PhafXYYHU2jdRiS1TqkLbqfTKRE3vHBJM+yiJHNVIbCrYoDBOrcSSO1CxNt9FIaOgzbjQ/ngN7R+jRqll4U4jheJfnRv2b2x0i2rA+749ubSg8S6YD4oQA3KdeXNIEhNr37KuVnU2",
    "MillisBehindLatest": 0
}

できました。Recordsが返ってきています。・・・が、さきほどputしたのに中身が空です。これはshard-iterator-typeをLATESTにしていて、さっきputしたときから少し時間が経過してしまったからだと思われます。もし中身が返ってきていない場合は、再度putしてからなるべくすぐに上記のコマンドをもう一度叩いてデータを取り出してみます。

ちなみにshard-iteratorのランダム文字列の部分はみなさんの環境によってことなるため、適宜変更してください。

% aws kinesis get-records --shard-iterator AAAAAAAAAAHk40m+CR2DAYdn1LmEBjvE34TbUeIsaHwtPz5flR9iV1d6x28Fr8tt9OLHEcrXdmbyWg6aekcJ4lUTP/GZ+jBUddwBCBgprqrZ5w5VIEQ/lqyxvnldEeiTBz3KgQNEyH8P4fyPmI1ZGrrGEPIfAoq0qid3HXCuaG786/UpnpAaWWMtuYDWc9kuZ9Dl5OuNUKO1rB1mFCMccSOYHel2QftJ
{
    "Records": [
        {
            "PartitionKey": "a",
            "Data": "eyJuYW1lIjoidHN1a2FieSJ9",
            "SequenceNumber": "49561517114387968943330123805737850561309886074603962370"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAGN2OU2/GF5H0kuBHGErAa0/8vbvZrv5eiszm5TBwvQ+Xz7cJRrdtEl87KYqJVHT6QEnJuRrPQ77k5MIZkjL+cPr50JgsfEa7bItbNk8Ww2/gnKYFDkJmX0p4/ZkijrV6+ezaPDALudbSQ91YBvRCk6dlj/Zt/Ebrj3fhDJlBstrrJ1Ytwli5EbvzEfjH1oqNE5sW+V6WHZ0kKg4BjbXHF1",
    "MillisBehindLatest": 0
}

取れました。Dataの部分はBase64エンコーディングされているので、適当なツールか、以下のURLのようなオンラインでコードアプリでデコードします。

https://www.base64decode.org/

デコードした結果以下のようになりました。

{"name":"tsukaby"}

正しく投入したデータを取れました。

get-recordsが面倒なのでスクリプト化する

https://github.com/tsukaby/aws-cli-scripts/blob/master/kinesis/get-records.sh

このような感じでスクリプト化しておけば簡単に利用できますね。チェック処理なかったり、Shardへのアクセスが不自由だったり、色々と微妙なところはありますが。

Streamの変更(必要に応じて)

上記でShardを1で作りましたが、もし利用データ量が増えるなどして1shardでは足りなくなった場合はReshardingという作業を行う必要があります。例えば1shardでは2016/4現在、1MB/sec, 1000request/sec 受け付けられます。データを投入する側の負荷が増えて秒間1200requestとかになった場合は、Reshardingを行う必要があります。

Reshardingは単純にShardの数を複数に分けるだけでなく、実は上記で少し触れたHashKeyについて考慮しなくてはなりません。以下の記事が参考になります。

[JAWS-UG CLI] Kinesis:#3 Shard の分割

例えばこんな感じで分割します。

aws kinesis split-shard --stream-name test-stream --shard-to-split shardId-000000000000 --new-starting-hash-key 170141183460469231731687303715884105728

Streamの削除

最後に今回テスト用に作成したStreamを削除します。

% aws kinesis delete-stream --stream-name test-stream

まとめ

いかがでしたでしょうか。

Kinesisの説明をAWS CLIを利用する方法を紹介しました。

1回覚えてしまえば、意外とhelpを見てなんとかなりますので、みなさんもぜひKinesisを使うときはCLIも使ってみてください。