あめがえるのITブログ

頑張りすぎない。ほどほどに頑張るブログ。

Amazon Kinesis Data StreamsをAWS CLIで使ってみた

Kinesis Data FirehoseはCloudWatchLogsの連携で少し使ったことがあるがDataStreamsは使ったことがなかったのでイメージつかみのため軽く試してみた。。。

やること

下記公式の手順をもとに、CloudShellからAWS CLIを使用して、Kinesis Data Streamsにストリームの作成、データ追加・確認、ストリームの削除をする。

docs.aws.amazon.com

実践!

1.ストリーム作成
1-1.CloudShellから下記コマンド実行

# aws kinesis create-stream --stream-name test
# aws kinesis describe-stream-summary --stream-name test
{
    "StreamDescriptionSummary": {
        "StreamName": "test",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxx:stream/test",
        "StreamStatus": "ACTIVE",
        "StreamModeDetails": {
            "StreamMode": "ON_DEMAND"
        },
        "RetentionPeriodHours": 24,
        "StreamCreationTimestamp": "2023-07-29T06:46:07+00:00",
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 4,
        "ConsumerCount": 0
    }
}

※使用する準備が完了すると[StreamStatus]が[CREATING]から[ACTIVE]に変わる。
GUI上からも確認


2.レコード入力
2-1.CloudShellから下記コマンドを実行

# aws kinesis list-streams
{
    "StreamNames": [
        "test"
    ],
    "StreamSummaries": [
        {
            "StreamName": "test",
            "StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxx:stream/test",
            "StreamStatus": "ACTIVE",
            "StreamModeDetails": {
                "StreamMode": "ON_DEMAND"
            },
            "StreamCreationTimestamp": "2023-07-29T06:46:07+00:00"
        }
    ]
}
# aws kinesis put-record --stream-name foo --partition-key 123 --data test_data

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49643088242102774106510357996668832379901808809925935106"
}


3.レコード取得
3-1.CloudShellから下記コマンドを実行

# aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test
# aws kinesis get-records --shard-iterator AAAAAAAAAAEoRZy0GHznHj3YBe/yo1jAlRaWCzSWpmip369FNf3jWgo2Jw2eLbAL4uFsdxm30cqsl7X1lkKmPovBN8oV/qlTY7boJr9+MCKP8cVZkqm+bezzbhu4DYmb453f5Bqhkn3yrWdsr5CRMogfqf05JqL9TPhezJqxp9pL5faP8H1RrIt/zP2E43nfEBjZg2HQFhUL9f7IwDQSZ0hLiR9qeKrLiF0KCWmOH1VQT1Iwk0X58Q==
{
    "Records": [
        {
            "SequenceNumber": "49643088242102774106510357996668832379901808809925935106",
            "ApproximateArrivalTimestamp": "2023-07-29T06:49:31.256000+00:00",
            "Data": "testdata",
            "PartitionKey": "123"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAG0+3cHJQ9CakFD8DpWEcNd1SEyTsXNtDe1U1Ob+WP3uIXdmDIpNKH8p/WM+RSDADK75f1J+qKOMnTvN2rAgbyIJew2kGMKIOa1CkrFa6kDbXInB6J90QmxfgYrFR1G0nx7dU0FN/K1VJLCa6PcllopOXj3OCtcA6ptAGoVu8lzTuQavNadSkz69cJ/7cKDIrYGSfrxc5Ri0MFuO5/m0BxDkm60DE+sGepowrr1ZMwKkQ==",
    "MillisBehindLatest": 0
}


4.クリーンアップ
4-1.CloudShellから下記コマンドを実行

# aws kinesis delete-stream --stream-name test
# aws kinesis describe-stream-summary --stream-name test
An error occurred (ResourceNotFoundException) when calling the DescribeStreamSummary operation: Stream test under account xxxxxxxxxxx not found.

GUI上からも確認



感想

今回は手始めな感じなので、次は少し凝ってコンシューマアプリを使ってもっと細かい制御を行いたいと思います。( ̄д ̄;;