https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html#DBActivityStreams.KinesisAccess の Python のサンプルコードの以下を編集して実行。
(中略) #aws_access_key_id="YOUR_ACCESS_KEY" #aws_secret_access_key="YOUR_SECRET_KEY" key_id = "2e******-****-****-****-**********96" # ★ Kinesis Firehose の暗号化に使っている KMS CMK のキーID stream_name = "aws-rds-das-cluster-FA************************" # ★ Kinesis Data streams のストリーム名 region_name = 'ap-northeast-1' cluster_id = "cluster-FA************************" # ★ RDS のリソース ID (中略) if __name__ == '__main__': session = boto3.session.Session( # aws_access_key_id=aws_access_key_id, # aws_secret_access_key=aws_secret_access_key ) (中略) # cipher = AES.new(plaintext, AES.MODE_GCM) # ★ エラーになるのでコメントアウト (中略)
EC2 に IAM ロールをアタッチして実行するので、aws_access_key_id と aws_secret_access_key はコメントアウト。
エラーと対応
ImportError: No module named boto3
- エラー
$ python activity-stream.py Traceback (most recent call last): File "activity-stream.py", line 2, in <module> import boto3 ImportError: No module named boto3
- 解決策
$ sudo yum -y install python-pip
$ sudo pip install boto3
ImportError: No module named aws_encryption_sdk
- エラー
$ python activity-stream.py Traceback (most recent call last): File "activity-stream.py", line 5, in <module> import aws_encryption_sdk ImportError: No module named aws_encryption_sdk
- 解決策
$ sudo pip install aws-encryption-sdk
ImportError: No module named Crypto.Cipher
- エラー
$ python activity-stream.py Traceback (most recent call last): File "activity-stream.py", line 6, in <module> from Crypto.Cipher import AES ImportError: No module named Crypto.Cipher
- 解決策
$ sudo yum -y install gcc $ sudo yum -y install python-devel $ sudo pip install pycrypto
- 補足:"sudo pip install pycrypto" で以下のエラーが発生し、gcc をインストールすると解消した。
$ sudo pip install pycrypto (中略) Command "/usr/bin/python2 -u -c "import setuptools, tokenize;__file__='/tmp/pip-build-KjnQaQ/pycrypto/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record /tmp/pip-Ldn0VE-record/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /tmp/pip-build-KjnQaQ/pycrypto/
KeyError: 'DatabaseActivityEvents'
- エラー
$ python activity-stream.py Traceback (most recent call last): File "activity-stream.py", line 79, in <module> decoded = base64.b64decode(record_data['DatabaseActivityEvents']) KeyError: 'DatabaseActivityEvents'
- 解決策
- DatabaseActivityEventsの最初の "D" を小文字の "d" に書換える。print でデバッグしたら実際のデータでは小文字になっていた。
print(record_data) # decoded = base64.b64decode(record_data['DatabaseActivityEvents']) decoded = base64.b64decode(record_data['databaseActivityEvents'])
AttributeError: 'module' object has no attribute 'MODE_GCM'
- エラー
$ python activity-stream-debug.py Traceback (most recent call last): File "activity-stream-debug.py", line 91, in <module> cipher = AES.new(plaintext, AES.MODE_GCM) AttributeError: 'module' object has no attribute 'MODE_GCM'
- 解決策
- "cipher = AES.new(plaintext, AES.MODE_GCM)" の行をコメントアウト
#cipher = AES.new(plaintext, AES.MODE_GCM)
参考
https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html#DBActivityStreams.EnablingAWS SDK を使用したアクティビティストリームの処理
アクティビティストリームをプログラムで処理するには、AWS SDK を使用します。Kinesis データストリームを処理する方法で完全に機能する Java および Python の例を以下に示します。
import zlib import boto3 import base64 import json import aws_encryption_sdk from Crypto.Cipher import AES from aws_encryption_sdk import DefaultCryptoMaterialsManager from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType aws_access_key_id="YOUR_ACCESS_KEY" aws_secret_access_key="YOUR_SECRET_KEY" key_id = "your_key_id" stream_name = "YOUR_KINESIS_STREAM_NAME" region_name = 'YOUR_REGION_NAME' cluster_id = "YOUR_CLUSTER_ID" class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, wrapping_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = wrapping_key def _get_raw_key(self, key_id): return self.wrapping_key def decrypt(decoded, plaintext): wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plaintext, wrapping_key_type=EncryptionKeyType.SYMMETRIC) my_key_provider = MyRawMasterKeyProvider(wrapping_key) my_key_provider.add_master_key("DataKey") with aws_encryption_sdk.stream( mode='d', source=decoded, materials_manager=DefaultCryptoMaterialsManager(master_key_provider=my_key_provider) ) as decryptor: for chunk in decryptor: d = zlib.decompressobj(16 + zlib.MAX_WBITS) decompressed_database_stream = d.decompress(chunk) print decompressed_database_stream if __name__ == '__main__': session = boto3.session.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key ) kms = session.client('kms', region_name=region_name) client = session.client('kinesis', region_name=region_name) response = client.describe_stream(StreamName=stream_name) shard_it = {} for idx, shard in enumerate(response['StreamDescription']['Shards']): shared_iterator = client.get_shard_iterator( StreamName=stream_name, ShardId=response['StreamDescription']['Shards'][idx]['ShardId'], ShardIteratorType='LATEST', )["ShardIterator"] shard_it[idx] = shared_iterator while True: rows = [] for shared_iterator in shard_it: response = client.get_records(ShardIterator=shard_it[shared_iterator], Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) key = record_data['key'] decoded = base64.b64decode(record_data['DatabaseActivityEvents']) decoded_data_key = base64.b64decode(record_data['key']) decrypt_result = kms.decrypt(CiphertextBlob=decoded_data_key, EncryptionContext={"aws:rds:dbc-id": cluster_id}) plaintext = decrypt_result[u'Plaintext'] cipher = AES.new(plaintext, AES.MODE_GCM) decrypt(decoded, decrypt_result[u'Plaintext']) shard_it[shared_iterator] = response["NextShardIterator"]