DynamoDB Streams + AWS Lambda Triggers で連携すると Item のデータは event オブジェクトで渡されることを確認した。
- Lambda (node.js) のコード
'use strict'; const AWS = require('aws-sdk'); const util = require('util'); // util モジュールを読み込む exports.handler = (event, context, callback) => { console.log(util.inspect(event,false,null)); // event オブジェクトの中身を CloudWatch Logs に出力する callback(null, `Successfully processed records.`); };
- マネジメントコンソールから Item を追加する。
- CloudWatch Logs を確認する。
START RequestId: e244136e-f013-4370-8eaa-cbb5e7eeb17c Version: $LATEST 2019-10-12T18:22:49.014Z e244136e-f013-4370-8eaa-cbb5e7eeb17c { Records: [ { eventID: '0a137cbf32e847170dc7a2678022f9ce', eventName: 'INSERT', eventVersion: '1.1', eventSource: 'aws:dynamodb', awsRegion: 'ap-northeast-1', dynamodb: { ApproximateCreationDateTime: 1570904568, Keys: { page_id: { S: 'az2c7az-4cbe-4cb8-ba57-aza9e48662az' }, activity_dt: { S: '2019-10-13 03:22:00' } }, NewImage: { page_id: { S: 'az2c7az-4cbe-4cb8-ba57-aza9e48662az' }, ★追加した Item が出力されている activity_dt: { S: '2019-10-13 03:22:00' }, ★ payload: { S: '{"Hits": 5,"device": { "make": "Haltheon", "platform": { "name": "Android", "version": "4.0.3" }, "location": { "latitude": "69.290003", "longitude": "-130.506449", "country": "LA" }},"session": { "session_id": "459d25a9-03da-46d3-b447-2a649d52aa45", "start_timestamp": "147544967644", "stop_timestamp": "1507239966595"}}\n' } }, ★ SequenceNumber: '707844700000000030212725492', SizeBytes: 512, StreamViewType: 'NEW_IMAGE' }, eventSourceARN: 'arn:aws:dynamodb:ap-northeast-1:123456780123:table/web_analytics/stream/2019-05-16T23:33:44.183' } ] } END RequestId: e244136e-f013-4370-8eaa-cbb5e7eeb17c REPORT RequestId: e244136e-f013-4370-8eaa-cbb5e7eeb17c Duration: 78.63 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 72 MB Init Duration: 196.79 ms
参考
Lambda は、ストリームからレコードを読み取り、ストリームレコードを含むイベントを使用して関数を同期的に呼び出します。Lambda は、バッチ単位でレコードを読み取り、関数を呼び出してバッチからレコードを処理します。
例 DynamoDB ストリーム レコードイベント
{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }Lambda は、レコードの DynamoDB ストリーム ストリームにあるシャードを 1 秒あたり 4 回の基本レートでポーリングします。レコードが利用可能になると、Lambda は関数を呼び出し、結果を待機します。処理が成功すると、Lambda は、レコードをさらに受け取るまでポーリングを再開します。
AWS Lambda を Amazon DynamoDB に使用する - AWS Lambda
Node.js
次の例では、DynamoDB からメッセージを処理し、その内容をログ記録します。例 ProcessDynamoDBStream.js
console.log('Loading function'); exports.lambda_handler = function(event, context, callback) { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { console.log(record.eventID); console.log(record.eventName); console.log('DynamoDB Record: %j', record.dynamodb); }); callback(null, "message"); };サンプルコードを圧縮してデプロイパッケージを作成します。手順については、「Node.js の AWS Lambda デプロイパッケージ」を参照してください。
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-ddb-create-package.html:tit;e