あめがえるのITブログ

頑張りすぎない。ほどほどに頑張るブログ。

Amazon Kinesis Data StreamsからLambdaを呼び出して処理を行ってみた。

AWSチュートリアルが豊富なのでとりあえず試そうという場合すごい助かるので、今回もKinesis Data StreamsとLambdaの連携をチュートリアルで実施してみた。

やること

下記チュートリアルを参考に、CloudShellからAWS CLIを使い、Kinesis Data Streamsへデータを送信し、受信したことを確認したのち、Lambdaでデータを取得、CloudWatchLogsへ出力させる。
docs.aws.amazon.com

※せっかくなのでチュートリアル記載のnode.jsではなく下記サイトにあるPythonのサンプルスクリプト使用する。いろいろな言語のサンプルを用意してくれてるなんてホンマAmazonさんのやさしさは五臓六腑に染み渡るで。(´ω`)
docs.aws.amazon.com

実践!

1.実行ロール作成
1-1.IAM ロール追加

# aws iam create-role --role-name lambda-kinesis-role --assume-role-policy-document \
'{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}'
{
    "Role": {
        "Path": "/",
        "RoleName": "lambda-kinesis-role",
        "RoleId": "AROA3QANHVTEHD6MKN43M",
        "Arn": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role",
        "CreateDate": "2023-07-31T04:29:47+00:00",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "lambda.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }
    }
}

1-2.IAM ロールにポリシー追加

# aws iam attach-role-policy \
  --role-name lambda-kinesis-role \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole


2.Lambda関数作成
2-1.サンプルプログラム作成

# vi index.py
from __future__ import print_function
#import json
import base64
def lambda_handler(event, context):
    for record in event['Records']:
       #Kinesis data is base64 encoded so decode here
       payload=base64.b64decode(record["kinesis"]["data"])
       print("Decoded payload: " + str(payload))
# zip function.zip index.py
  adding: index.py (deflated 36%)

2-2.Lambda関数作成

# aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.lambda_handler --runtime python3.11 \
--role arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role
{
    "FunctionName": "ProcessKinesisRecords",
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:ProcessKinesisRecords",
    "Runtime": "python3.11",
    "Role": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role",
    "Handler": "index.lambda_handler",
    "CodeSize": 360,
    "Description": "",
    "Timeout": 3,
    "MemorySize": 128,
    "LastModified": "2023-07-31T04:31:28.756+0000",
    "CodeSha256": "BAHNKDAuyBjsEGyC3iwxCZEVmomarmsQQJwdKBY1DoM=",
    "Version": "$LATEST",
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "RevisionId": "8d472bbe-6324-4ee7-9272-16752ca2e1dd",
    "State": "Pending",
    "StateReason": "The function is being created.",
    "StateReasonCode": "Creating",
    "PackageType": "Zip",
    "Architectures": [
        "x86_64"
    ],
    "EphemeralStorage": {
        "Size": 512
    },
    "SnapStart": {
        "ApplyOn": "None",
        "OptimizationStatus": "Off"
    },
    "RuntimeVersionConfig": {
        "RuntimeVersionArn": "arn:aws:lambda:ap-northeast-1::runtime:a96d37d47210bbc80cb11c50028140d002b18ce0f3a93200d92ac5cac3132669"
    }
}


3.Lambda関数テスト
3-1.テスト用ファイル作成

# vi input.txt
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/lambda-stream"
        }
    ]
}

3-2.テスト実施

# aws lambda invoke --function-name ProcessKinesisRecords \
--cli-binary-format raw-in-base64-out --payload file://input.txt outputfile.txt
{
    "StatusCode": 200,
    "ExecutedVersion": "$LATEST"
}

下記が表示される場合、設定が間違っている可能性あるため見直す。
※たぶんハンドラー名が違う可能性あり。ハンドラー名は関数作成時に[ファイル名(拡張子不要).関数名(function名ではなくpython内で定義されている関数名)]で指定する。
{
    "StatusCode": 200,
    "FunctionError": "Unhandled",
    "ExecutedVersion": "$LATEST"
}


# cat outputfile.txt
null


4.Kinesis ストリーム作成
4-1.Kinesis ストリーム作成

# aws kinesis create-stream --stream-name lambda-stream --shard-count 1
# aws kinesis describe-stream --stream-name lambda-stream
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49643147187789294785829935674091958326181224517008883714"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": "2023-07-31T04:39:28+00:00"
    }
}


5.LambdaイベントソースにKinesis Data Streamsを追加
5-1.CloudShellから下記コマンドを実行

# aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
{
    "UUID": "184b615d-280c-4071-ac26-7a8de963651e",
    "StartingPosition": "LATEST",
    "BatchSize": 100,
    "MaximumBatchingWindowInSeconds": 0,
    "ParallelizationFactor": 1,
    "EventSourceArn": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/lambda-stream",
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:ProcessKinesisRecords",
    "LastModified": "2023-07-31T04:40:46.454000+00:00",
    "LastProcessingResult": "No records processed",
    "State": "Creating",
    "StateTransitionReason": "User action",
    "DestinationConfig": {
        "OnFailure": {}
    },
    "MaximumRecordAgeInSeconds": -1,
    "BisectBatchOnFunctionError": false,
    "MaximumRetryAttempts": -1,
    "TumblingWindowInSeconds": 0,
    "FunctionResponseTypes": []
}


# aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/lambda-stream
{
    "EventSourceMappings": [
        {
            "UUID": "184b615d-280c-4071-ac26-7a8de963651e",
            "StartingPosition": "LATEST",
            "BatchSize": 100,
            "MaximumBatchingWindowInSeconds": 0,
            "ParallelizationFactor": 1,
            "EventSourceArn": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/lambda-stream",
            "FunctionArn": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:ProcessKinesisRecords",
            "LastModified": "2023-07-31T04:40:00+00:00",
            "LastProcessingResult": "No records processed",
            "State": "Enabled",
            "StateTransitionReason": "User action",
            "DestinationConfig": {
                "OnFailure": {}
            },
            "MaximumRecordAgeInSeconds": -1,
            "BisectBatchOnFunctionError": false,
            "MaximumRetryAttempts": -1,
            "TumblingWindowInSeconds": 0,
            "FunctionResponseTypes": []
        }
    ]
}


6.Kinesisテスト
6-1.Kinesis Data Streamsへレコード追加

# aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49643147187789294785829935674587617912223231961926205442"
}


7.CloudWatch Logs確認
7-1.CloudWatch-[ロググループ]-[/aws/lambda/ProcessKinesisRecords]
7-2.[Hello, this is a test.]がログに出力されていることを確認。


8.クリーンアップ
8-1.IAMロール削除

# aws iam detach-role-policy --role-name lambda-kinesis-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
# aws iam delete-role --role-name lambda-kinesis-role

8-2.Lambda関数削除

# aws lambda delete-function --function-name ProcessKinesisRecords

8-3.Kinesisストリーム削除

# aws kinesis delete-stream --stream-name lambda-stream
# aws kinesis describe-stream-summary --stream-name lambda-stream

8-4.ファイル削除

# rm -rf index.py function.zip input.txt outputfile.txt




感想

次はCloudWatchLogsじゃなくDynamoDBやS3と連携してみたいのー( ̄д ̄)