ablog

不器用で落着きのない技術者のメモ

Amazon PostgreSQL互換のアクティビティストリームを使ってみる

https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html#DBActivityStreams.KinesisAccessPython のサンプルコードの以下を編集して実行。

(中略)

#aws_access_key_id="YOUR_ACCESS_KEY"
#aws_secret_access_key="YOUR_SECRET_KEY"
key_id = "2e******-****-****-****-**********96" # ★ Kinesis Firehose の暗号化に使っている KMS CMK のキーID 
stream_name = "aws-rds-das-cluster-FA************************" # ★ Kinesis Data streams のストリーム名
region_name = 'ap-northeast-1'
cluster_id = "cluster-FA************************" # ★ RDS のリソース ID

(中略)

if __name__ == '__main__':
    session = boto3.session.Session(
#        aws_access_key_id=aws_access_key_id,
#        aws_secret_access_key=aws_secret_access_key
    )

(中略)

#                cipher = AES.new(plaintext, AES.MODE_GCM) # ★ エラーになるのでコメントアウト

(中略)

EC2 に IAM ロールをアタッチして実行するので、aws_access_key_id と aws_secret_access_key はコメントアウト

エラーと対応

ImportError: No module named boto3
  • エラー
$ python activity-stream.py 
Traceback (most recent call last):
  File "activity-stream.py", line 2, in <module>
    import boto3
ImportError: No module named boto3
  • 解決策
$ sudo yum -y install python-pip
$ sudo pip install boto3
ImportError: No module named aws_encryption_sdk
  • エラー
$ python activity-stream.py 
Traceback (most recent call last):
  File "activity-stream.py", line 5, in <module>
    import aws_encryption_sdk
ImportError: No module named aws_encryption_sdk
  • 解決策
$ sudo pip install aws-encryption-sdk
ImportError: No module named Crypto.Cipher
  • エラー
$ python activity-stream.py 
Traceback (most recent call last):
  File "activity-stream.py", line 6, in <module>
    from Crypto.Cipher import AES
ImportError: No module named Crypto.Cipher
  • 解決策
$ sudo yum -y install gcc
$ sudo yum -y install python-devel
$ sudo pip install pycrypto
  • 補足:"sudo pip install pycrypto" で以下のエラーが発生し、gcc をインストールすると解消した。
$ sudo pip install pycrypto
(中略)
Command "/usr/bin/python2 -u -c "import setuptools, tokenize;__file__='/tmp/pip-build-KjnQaQ/pycrypto/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record /tmp/pip-Ldn0VE-record/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /tmp/pip-build-KjnQaQ/pycrypto/
KeyError: 'DatabaseActivityEvents'
  • エラー
$ python activity-stream.py 
Traceback (most recent call last):
  File "activity-stream.py", line 79, in <module>
    decoded = base64.b64decode(record_data['DatabaseActivityEvents'])
KeyError: 'DatabaseActivityEvents'
  • 解決策
  • DatabaseActivityEventsの最初の "D" を小文字の "d" に書換える。print でデバッグしたら実際のデータでは小文字になっていた。
                print(record_data)
#                decoded = base64.b64decode(record_data['DatabaseActivityEvents'])
                decoded = base64.b64decode(record_data['databaseActivityEvents'])
AttributeError: 'module' object has no attribute 'MODE_GCM'
  • エラー
$ python activity-stream-debug.py
Traceback (most recent call last):
  File "activity-stream-debug.py", line 91, in <module>
    cipher = AES.new(plaintext, AES.MODE_GCM)
AttributeError: 'module' object has no attribute 'MODE_GCM'
#cipher = AES.new(plaintext, AES.MODE_GCM)

参考

AWS SDK を使用したアクティビティストリームの処理

アクティビティストリームをプログラムで処理するには、AWS SDK を使用します。Kinesis データストリームを処理する方法で完全に機能する Java および Python の例を以下に示します。

import zlib
import boto3
import base64
import json
import aws_encryption_sdk
from Crypto.Cipher import AES
from aws_encryption_sdk import DefaultCryptoMaterialsManager
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType


aws_access_key_id="YOUR_ACCESS_KEY"
aws_secret_access_key="YOUR_SECRET_KEY"
key_id = "your_key_id"
stream_name = "YOUR_KINESIS_STREAM_NAME"
region_name = 'YOUR_REGION_NAME'
cluster_id = "YOUR_CLUSTER_ID"

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, wrapping_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = wrapping_key

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt(decoded, plaintext):
    wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                               wrapping_key=plaintext, wrapping_key_type=EncryptionKeyType.SYMMETRIC)
    my_key_provider = MyRawMasterKeyProvider(wrapping_key)
    my_key_provider.add_master_key("DataKey")
    with aws_encryption_sdk.stream(
            mode='d',
            source=decoded,
            materials_manager=DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)
    ) as decryptor:
        for chunk in decryptor:
            d = zlib.decompressobj(16 + zlib.MAX_WBITS)
            decompressed_database_stream = d.decompress(chunk)
            print decompressed_database_stream


if __name__ == '__main__':
    session = boto3.session.Session(
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )

    kms = session.client('kms', region_name=region_name)

    client = session.client('kinesis', region_name=region_name)

    response = client.describe_stream(StreamName=stream_name)
    shard_it = {}
    for idx, shard in enumerate(response['StreamDescription']['Shards']):
        shared_iterator = client.get_shard_iterator(
            StreamName=stream_name,
            ShardId=response['StreamDescription']['Shards'][idx]['ShardId'],
            ShardIteratorType='LATEST',
        )["ShardIterator"]
        shard_it[idx] = shared_iterator

    while True:
        rows = []
        for shared_iterator in shard_it:
            response = client.get_records(ShardIterator=shard_it[shared_iterator], Limit=10000)
            for record in response['Records']:
                record_data = record['Data']
                record_data = json.loads(record_data)
                key = record_data['key']
                decoded = base64.b64decode(record_data['DatabaseActivityEvents'])
                decoded_data_key = base64.b64decode(record_data['key'])
                decrypt_result = kms.decrypt(CiphertextBlob=decoded_data_key,
                                             EncryptionContext={"aws:rds:dbc-id":
                                                                    cluster_id})
                plaintext = decrypt_result[u'Plaintext']
                cipher = AES.new(plaintext, AES.MODE_GCM)
                decrypt(decoded, decrypt_result[u'Plaintext'])
            shard_it[shared_iterator] = response["NextShardIterator"]
https://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html#DBActivityStreams.Enabling