設定
- Aurora クラスターを作成する。
- クラスターを選択して、[アクション]-[アクティビティストリームの開始] を選択。
- マスターキー: KMS のキーを選択する
- Database activity stream mode: Asynchronous
- すぐに適用: すぐに適用
- 上記の通りアクティビティストリームを開始すると、"aws-rds-das-cluster-{Aurora クラスターのリソース ID}" という Kinesis ストリームが作成される。
- 以下の通り設定を変更。
- データ保持期間: 168時間
- シャードレベルメトリクス: 全て選択
Lambda 関数作成
- 名前: AuroraPostgresActivityStreamDecorder
- ランタイム: Python 3.7
- 基本設定-タイムアウト: 15分
- 環境変数を以下の通り設定
- CLUSTER_ID: cluster-****** (Aurora クラスターのリソースID)
- KEY_ID: 2e******-****-****-****-*********96(KMS CMK のキーID)
- STREAM_NAME: aws-rds-das-cluster-******(Kinesis Data Streams のストリーム名)
- 関数コード
$ aws s3api get-object --bucket ianmckay-ap-northeast-1 --key activitystream/activity_handler.zip activity_handler.zip
{
"AcceptRanges": "bytes",
"ContentType": "application/zip",
"LastModified": "Wed, 05 Jun 2019 07:30:37 GMT",
"ContentLength": 12253393,
"ETag": "\"ed58c34a6483723874da571419d5a1b1-2\"",
"Metadata": {}
}
$ ls -lh activity_handler.zip
-rw-rw-r-- 1 ec2-user ec2-user 12M 4月 7 08:14 activity_handler.zip
$ unzip activity_handler.zip
- activity_handler 以下に lambda_function.py を作成する。
import zlib
import boto3
import base64
import json
import os
import aws_encryption_sdk
import hashlib
from dateutil import parser
from aws_encryption_sdk import DefaultCryptoMaterialsManager
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
key_id = os.environ['KEY_ID']
stream_name = os.environ['STREAM_NAME']
region_name = os.environ['AWS_REGION']
cluster_id = os.environ['CLUSTER_ID']
class MyRawMasterKeyProvider(RawMasterKeyProvider):
provider_id = "BC"
def __new__(cls, *args, **kwargs):
obj = super(RawMasterKeyProvider, cls).__new__(cls)
return obj
def __init__(self, wrapping_key):
RawMasterKeyProvider.__init__(self)
self.wrapping_key = wrapping_key
def _get_raw_key(self, key_id):
return self.wrapping_key
def decrypt(decoded, plaintext):
wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
wrapping_key=plaintext, wrapping_key_type=EncryptionKeyType.SYMMETRIC)
my_key_provider = MyRawMasterKeyProvider(wrapping_key)
my_key_provider.add_master_key("DataKey")
with aws_encryption_sdk.stream(
mode='d',
source=decoded,
materials_manager=DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)
) as decryptor:
entries = []
for chunk in decryptor:
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
decompressed_database_stream = d.decompress(chunk)
print(decompressed_database_stream)
record_event = json.loads(decompressed_database_stream.decode("utf-8"))
for evt in record_event['databaseActivityEventList']:
if evt['type'] != "heartbeat":
entries.append(evt)
if len(entries) > 0:
return entries
else:
return None
def lambda_handler(event, context):
output = []
succeeded_record_cnt = 0
failed_record_cnt = 0
kms = boto3.client('kms')
for record in event['records']:
print(record['recordId'])
record_data = json.loads(base64.b64decode(record['data']))
decoded = base64.b64decode(record_data['databaseActivityEvents'])
decoded_data_key = base64.b64decode(record_data['key'])
decrypt_result = kms.decrypt(CiphertextBlob=decoded_data_key,
EncryptionContext={"aws:rds:dbc-id": cluster_id})
data = decrypt(decoded, decrypt_result[u'Plaintext'])
print(data)
if data is not None:
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(data).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
return {'records': output}
$ zip -r activity_handler.zip .
- コード エントリ タイプ: .zip ファイルをアップロードを選択して保存する。
- Kinesis Data Streams で [Kinesis コンシューマーの接続] を選択。
- 以下の通り作成する。
- 名前: 任意
- IAM role: IAM ロールを作成して選択
- Source: 作成された "aws-rds-das-cluster-{Aurora クラスターのリソース ID}" を選択
- Source record transformation: Enabled
- Lambda function: 作成した Lambda 関数を選択
- Amazon S3 destination-S3 bucket: S3バケットを作成する
実行結果
- CloudWatch - ロググループ -/aws/lambda/関数名
START RequestId: 722ac394-af94-4ca3-acb6-558065ab524f Version: $LATEST
49605848106803408097691799470104043760743503384497094706000000
b'
{
"type": "DatabaseActivityMonitoringRecord",
"clusterId": "cluster-******",
"instanceId": "db-******",
"databaseActivityEventList": [
{
"type": "heartbeat"
}
]
}
'
[{'type': 'heartbeat'}]
49605848106803408097691799470118550870578880515141533746000000
b'
{
"type": "DatabaseActivityMonitoringRecord",
"clusterId": "cluster-******",
"instanceId": "db-******",
"databaseActivityEventList": [
{
"type": "heartbeat"
}
]
}
'
[{'type': 'heartbeat'}]
- S3
- az-apg-activity-stream/2020/04/07/11/aws-rds-das-cluster-KG...-kfh-3-2020-04-07-11-03
[
{
"_1": [
{
"type": "heartbeat"
}
]
},
{
"_1": [
{
"type": "heartbeat"
}
]
},
{
"_1": [
{
"logTime": "2020-04-07 11:03:23.575463+00",
"statementId": 7,
"substatementId": 1,
"objectType": "TABLE",
"command": "SELECT",
"objectName": "pg_catalog.pg_class",
"databaseName": "mydb",
"dbUserName": "awsuser",
"remoteHost": "3.112.53.130",
"remotePort": "33094",
"sessionId": "5e8c5d92.66e2",
"rowCount": 1,
"commandText": "select count(*) from pg_class;",
"paramList": [],
"pid": 26338,
"clientApplication": "psql",
"exitCode": null,
"class": "READ",
"serverVersion": "3.1.1",
"serverType": "PostgreSQL",
"serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
"serverHost": "172.31.11.167",
"netProtocol": "TCP",
"dbProtocol": "Postgres 3.0",
"type": "record",
"startTime": "2020-04-07 11:03:23.575457+00",
"errorMessage": null
}
]
},
(以降略)