ablog

不器用で落着きのない技術者のメモ

DynamoDB Streams + AWS Lambda Triggers で連携すると Item は event オブジェクトで渡される

DynamoDB Streams + AWS Lambda Triggers で連携すると Item のデータは event オブジェクトで渡されることを確認した。

  • Lambda (node.js) のコード
'use strict';

const AWS = require('aws-sdk');
const util = require('util'); // util モジュールを読み込む

exports.handler = (event, context, callback) => {
    console.log(util.inspect(event,false,null)); // event オブジェクトの中身を CloudWatch Logs に出力する
    callback(null, `Successfully processed records.`);
};
  • マネジメントコンソールから Item を追加する。


  • CloudWatch Logs を確認する。
START RequestId: e244136e-f013-4370-8eaa-cbb5e7eeb17c Version: $LATEST
2019-10-12T18:22:49.014Z e244136e-f013-4370-8eaa-cbb5e7eeb17c { Records:
[ { eventID: '0a137cbf32e847170dc7a2678022f9ce',
eventName: 'INSERT',
eventVersion: '1.1',
eventSource: 'aws:dynamodb',
awsRegion: 'ap-northeast-1',
dynamodb:
{ ApproximateCreationDateTime: 1570904568,
Keys:
{ page_id: { S: 'az2c7az-4cbe-4cb8-ba57-aza9e48662az' },
activity_dt: { S: '2019-10-13 03:22:00' } }, 
NewImage:
{ page_id: { S: 'az2c7az-4cbe-4cb8-ba57-aza9e48662az' }, ★追加した Item が出力されている
activity_dt: { S: '2019-10-13 03:22:00' }, ★
payload:
{ S: '{"Hits": 5,"device": { "make": "Haltheon", "platform": { "name": "Android", "version": "4.0.3" }, "location": { "latitude": "69.290003", "longitude": "-130.506449", "country": "LA" }},"session": { "session_id": "459d25a9-03da-46d3-b447-2a649d52aa45", "start_timestamp": "147544967644", "stop_timestamp": "1507239966595"}}\n' } }, ★
SequenceNumber: '707844700000000030212725492',
SizeBytes: 512,
StreamViewType: 'NEW_IMAGE' },
eventSourceARN: 'arn:aws:dynamodb:ap-northeast-1:123456780123:table/web_analytics/stream/2019-05-16T23:33:44.183' } ] }
END RequestId: e244136e-f013-4370-8eaa-cbb5e7eeb17c
REPORT RequestId: e244136e-f013-4370-8eaa-cbb5e7eeb17c Duration: 78.63 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 72 MB Init Duration: 196.79 ms

参考

aws.amazon.com

Lambda は、ストリームからレコードを読み取り、ストリームレコードを含むイベントを使用して関数を同期的に呼び出します。Lambda は、バッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。

例 DynamoDB ストリーム レコードイベント

{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": eventsourcearn,
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": sourcearn,
      "eventSource": "aws:dynamodb"
    }

Lambda は、レコードの DynamoDB ストリーム ストリームにあるシャードを 1 秒あたり 4 回の基本レートでポーリングします。レコードが利用可能になると、Lambda は関数を呼び出し、結果を待機します。処理が成功すると、Lambda は、レコードをさらに受け取るまでポーリングを再開します。

AWS Lambda を Amazon DynamoDB に使用する - AWS Lambda

Node.js
次の例では、DynamoDB からメッセージを処理し、その内容をログ記録します。

例 ProcessDynamoDBStream.js

console.log('Loading function');

exports.lambda_handler = function(event, context, callback) {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(function(record) {
        console.log(record.eventID);
        console.log(record.eventName);
        console.log('DynamoDB Record: %j', record.dynamodb);
    });
    callback(null, "message"); 
};

サンプルコードを圧縮してデプロイパッケージを作成します。手順については、「Node.js の AWS Lambda デプロイパッケージ」を参照してください。

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-ddb-create-package.html:tit;e