ablog

不器用で落着きのない技術者のメモ

Aurora PostgreSQL 互換のアクティビティストリームを Lambda でデコードする

設定

KMS キー作製
  • カスタマー管理型の対称キーを作成。
Aurora PostgreSQL互換
  • Aurora クラスターを作成する。
    • エンジンバージョン: 11.6
  • クラスターを選択して、[アクション]-[アクティビティストリームの開始] を選択。
    • マスターキー: KMS のキーを選択する
    • Database activity stream mode: Asynchronous
    • すぐに適用: すぐに適用
Kinesis Data Streams
  • 上記の通りアクティビティストリームを開始すると、"aws-rds-das-cluster-{Aurora クラスターのリソース ID}" という Kinesis ストリームが作成される。
  • 以下の通り設定を変更。
    • データ保持期間: 168時間
    • シャードレベルメトリクス: 全て選択
Lambda 関数作成
  • 名前: AuroraPostgresActivityStreamDecorder
  • ランタイム: Python 3.7
  • 基本設定-タイムアウト: 15分
  • 環境変数を以下の通り設定
    • CLUSTER_ID: cluster-****** (Aurora クラスターのリソースID)
    • KEY_ID: 2e******-****-****-****-*********96(KMS CMK のキーID)
    • STREAM_NAME: aws-rds-das-cluster-******(Kinesis Data Streams のストリーム名)
  • 関数コード
    • コードをダウンロードする。
$ aws s3api get-object --bucket ianmckay-ap-northeast-1 --key activitystream/activity_handler.zip activity_handler.zip
{
    "AcceptRanges": "bytes", 
    "ContentType": "application/zip", 
    "LastModified": "Wed, 05 Jun 2019 07:30:37 GMT", 
    "ContentLength": 12253393, 
    "ETag": "\"ed58c34a6483723874da571419d5a1b1-2\"", 
    "Metadata": {}
}
$ ls -lh activity_handler.zip
-rw-rw-r-- 1 ec2-user ec2-user 12M  47 08:14 activity_handler.zip
  • zip を解凍する。
$ unzip activity_handler.zip 
  • activity_handler 以下に lambda_function.py を作成する。
import zlib
import boto3
import base64
import json
import os
import aws_encryption_sdk
import hashlib
from dateutil import parser
from aws_encryption_sdk import DefaultCryptoMaterialsManager
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType

key_id = os.environ['KEY_ID']
stream_name = os.environ['STREAM_NAME']
region_name = os.environ['AWS_REGION']
cluster_id = os.environ['CLUSTER_ID']

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, wrapping_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = wrapping_key

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt(decoded, plaintext):
    wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                               wrapping_key=plaintext, wrapping_key_type=EncryptionKeyType.SYMMETRIC)
    my_key_provider = MyRawMasterKeyProvider(wrapping_key)
    my_key_provider.add_master_key("DataKey")
    with aws_encryption_sdk.stream(
            mode='d',
            source=decoded,
            materials_manager=DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)
    ) as decryptor:
        entries = []
        for chunk in decryptor:
            d = zlib.decompressobj(16 + zlib.MAX_WBITS)
            decompressed_database_stream = d.decompress(chunk)
            print(decompressed_database_stream)
            record_event = json.loads(decompressed_database_stream.decode("utf-8"))
            for evt in record_event['databaseActivityEventList']:
                if evt['type'] != "heartbeat":
                    entries.append(evt)
        if len(entries) > 0:
            return entries
        else:
            return None

def lambda_handler(event, context):
    output = []
    succeeded_record_cnt = 0
    failed_record_cnt = 0
    kms = boto3.client('kms')

    for record in event['records']:
        print(record['recordId'])
        record_data = json.loads(base64.b64decode(record['data']))
        decoded = base64.b64decode(record_data['databaseActivityEvents'])
        decoded_data_key = base64.b64decode(record_data['key'])
        decrypt_result = kms.decrypt(CiphertextBlob=decoded_data_key,
                                     EncryptionContext={"aws:rds:dbc-id": cluster_id})
        data = decrypt(decoded, decrypt_result[u'Plaintext'])
        print(data)
        if data is not None:
            output_record = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(data).encode('utf-8')).decode('utf-8')
            }
            output.append(output_record)

    return {'records': output}
  • 圧縮する。
$ zip -r activity_handler.zip .
  • コード エントリ タイプ: .zip ファイルをアップロードを選択して保存する。
Kinesis Data Firehose
  • Kinesis Data Streams で [Kinesis コンシューマーの接続] を選択。
  • 以下の通り作成する。
    • 名前: 任意
    • IAM role: IAM ロールを作成して選択
    • Source: 作成された "aws-rds-das-cluster-{Aurora クラスターのリソース ID}" を選択
    • Source record transformation: Enabled
    • Lambda function: 作成した Lambda 関数を選択
    • Amazon S3 destination-S3 bucket: S3バケットを作成する

実行結果

  • CloudWatch - ロググループ -/aws/lambda/関数名
START RequestId: 722ac394-af94-4ca3-acb6-558065ab524f Version: $LATEST
49605848106803408097691799470104043760743503384497094706000000
b'
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "cluster-******",
    "instanceId": "db-******",
    "databaseActivityEventList": [
        {
            "type": "heartbeat"
        }
    ]
}
'
[{'type': 'heartbeat'}]
49605848106803408097691799470118550870578880515141533746000000
b'
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "cluster-******",
    "instanceId": "db-******",
    "databaseActivityEventList": [
        {
            "type": "heartbeat"
        }
    ]
}
'
[{'type': 'heartbeat'}]
  • S3
    • az-apg-activity-stream/2020/04/07/11/aws-rds-das-cluster-KG...-kfh-3-2020-04-07-11-03
[
    {
        "_1": [
            {
                "type": "heartbeat"
            }
        ]
    },
    {
        "_1": [
            {
                "type": "heartbeat"
            }
        ]
    },
    {
        "_1": [
            {
                "logTime": "2020-04-07 11:03:23.575463+00",
                "statementId": 7,
                "substatementId": 1,
                "objectType": "TABLE",
                "command": "SELECT",
                "objectName": "pg_catalog.pg_class",
                "databaseName": "mydb",
                "dbUserName": "awsuser",
                "remoteHost": "3.112.53.130",
                "remotePort": "33094",
                "sessionId": "5e8c5d92.66e2",
                "rowCount": 1,
                "commandText": "select count(*) from pg_class;",
                "paramList": [],
                "pid": 26338,
                "clientApplication": "psql",
                "exitCode": null,
                "class": "READ",
                "serverVersion": "3.1.1",
                "serverType": "PostgreSQL",
                "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
                "serverHost": "172.31.11.167",
                "netProtocol": "TCP",
                "dbProtocol": "Postgres 3.0",
                "type": "record",
                "startTime": "2020-04-07 11:03:23.575457+00",
                "errorMessage": null
            }
        ]
    },

(以降略)