ablog

不器用で落着きのない技術者のメモ

AWS Resource Access Manager を利用するには Organizations で「すべての機能」の有効化が必要

Requirements
  • Only the management account can enable sharing with AWS Organizations.
  • The organization must be enabled for all features. For more information, see Enabling All Features in Your Organization in the AWS Organizations User Guide.
Sharing your AWS resources - AWS Resource Access Manager

AWS Organizations has two available feature sets:

  • All features – This feature set is the preferred way to work with AWS Organizations, and it includes Consolidating Billing features. When you create an organization, enabling all features is the default. With all features enabled, you can use the advanced account management features available in AWS Organizations such as integration with supported AWS services and organization management policies.
  • Consolidated Billing features – All organizations support this subset of features, which provides basic management tools that you can use to centrally manage the accounts in your organization.

If you create an organization with consolidated billing features only, you can later enable all features. This page describes the process of enabling all features

Enabling all features in your organization - AWS Organizations

Aurora PostgreSQL に RDS Proxy 経由で IAM 認証で接続する

Aurora PostgreSQL に RDS Proxy 経由で IAM 認証で接続する手順。
f:id:yohei-a:20210212094006p:plain

前提

  • RDS Proxy で [IAM 認証] が [必須] になっていること。
  • AWS CLI がインストールされていること*1
  • psql がインストールされていること*2
$ sudo yum -y install postgresql

手順

同一アカウントから
$ export RDSHOST=aurora-postgres-117.proxy-************.ap-northeast-1.rds.amazonaws.com
$ export PGPASSWORD="$(aws rds generate-db-auth-token --hostname $RDSHOST --port 5432 --region ap-northeast-1 --username awsuser)"
$ psql -h $RDSHOST -p 5432 -d postgres -U awsuser
クロスアカウントアクセス(VPCピアリング接続、共有VPCなど)
$ cat .aws/config
[default]
region = ap-northeast-1
[profile instancecrossaccount]
region = ap-northeast-1
role_arn = arn:aws:iam::234567890123:role/EC2SwitchRole
credential_source = Ec2InstanceMetadata
sts_regional_endpoints=regional
$ export RDSHOST=aurora-postgres-117.proxy-************.ap-northeast-1.rds.amazonaws.com
$ export PGPASSWORD="$(aws rds generate-db-auth-token --hostname $RDSHOST --port 5432 --region ap-northeast-1 --username awsuser --profile instancecrossaccount)"
$ psql -h $RDSHOST -p 5432 -d postgres -U awsuser

*1:以下の手順で認証する場合の前提

*2:以下の手順で認証する場合の前提

Private Subnet の EC2 にセッションマネージャーで接続できない

IAM ロールも VPC エンドポイントも正しく設定しているのに、Private Subnet の EC2 にセッションマネージャーで接続できない場合の原因と解決策。

事象

  • Private Subnet の EC2 にセッションマネージャーで接続できない 。
  • 以下の VPC エンドポイントは作成している。
    • com.amazonaws.region.ssm
    • com.amazonaws.region.ec2messages
    • com.amazonaws.region.ssmmessages
  • EC2 インスタンスに AmazonSSMManagedInstanceCore(IAMポリシー)をアタッチした IAM ロールをアタッチしている。

原因

  • VPC エンドポイントに関連付けているセキュリティグループのインバウンドで EC2 インスタンスからの https が許可されていないため。

解決策

  • VPC エンドポイントに関連付けているセキュリティグループで VPC やサブネットの CIDR、EC2 にアタッチしているセキュリティグループなどからのインバウンドの https を許可する。

参考

[Security group] で既存のセキュリティグループを選択するか、新しいセキュリティグループを作成します。新しいセキュリティグループを作成した場合は、VPC コンソールを開き、[Security Groups] をクリックした上で、作成したセキュリティグループを選択します。[Inbound Rules] タブで [Edit inbound rules] を選択します。以下の詳細を含むルールを追加してから、[Save rules] を選択します。
[Type] で [HTTPS] を選択します。
[Source] で [VPC/Subnet CIDR] を選択します。
セキュリティグループ ID を書き留めます。この ID は、他のエンドポイントで使用します。

Systems Manager を使用したインターネットアクセスなしでのプライベート EC2 インスタンスの管理

PySpark で Parquet から行数をカウントしたメモ

from pyspark.sql import SparkSession
spark = SparkSession.builder.           \
  appName("ExamplePySparkSubmitTask").  \
  config("spark.databricks.hive.metastore.glueCatalog.enabled", "true"). \
  enableHiveSupport(). \
  getOrCreate()

print(spark.sparkContext.getConf().get("spark.databricks.hive.metastore.glueCatalog.enabled"))
sql("SELECT COUNT(*) FROM default.table1").show()

KCL のサンプルコードの sequenceNumberOfPreviousRecord の初期値について

以下のサンプルコードの sequenceNumberOfPreviousRecord のループの1回目の初期値が何かという話だが、setSequenceNumberForOrdering はシャード内の順序を保証するメソッドで2回目からのループで1回目より後になればよいので、初期値は null でよさそう。

PutRecord の例
以下のコードでは、2 つのパーティションキーに配分される 10 件のデータレコードを作成し、myStreamName という名前のストリームに格納しています。

for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}

上記のコード例では、setSequenceNumberForOrdering を使用して、各パーティションキー内で順番が厳密に増えるようにしています。このパラメータを効果的に使用するには、現在のレコード (レコード SequenceNumberForOrderingn) の を前述のレコード (レコード n-1) のシーケンス番号に設定します。ストリームに追加されたレコードのシーケンス番号を取得するには、getSequenceNumber の結果に対して putRecord を呼び出します。
パラメータは、同じパーティションキーのシーケンス番号が厳密に増えるようにします。SequenceNumberForOrderingSequenceNumberForOrdering では、複数のパーティションキーにわたるレコードの順序付けは行いません。

That's a strangely incomplete example. Doesn't show or discuss how sequenceNumberOfPreviousRecordin is initialized. I found a slightly better example in aws forums, apparently the starting sequence number to use is null.

String sequenceNumberOfPreviousRecord = null;
for (int j = 0; j < 200; j++) {
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName(myStreamName);
  putRecordRequest.setData(ByteBuffer.wrap(String.format("%s-%d",testData, 200+j).getBytes()));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); 
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();

  System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
      + ", Data : " + String.format("%s-%d",testData, 200+j)
      + ", SequenceNumber : " + putRecordResult.getSequenceNumber()
      );
}

Your example's use of partition key is weird as well. Unless if there is a very skewed distribution of partition keys, keys 0 and 1 are very likely to end up in the same shard. In most cases you're best served using a random uuid to ensure distributing incoming records across your shards.

amazon web services - What exactly does sequenceNumberForOrdering do when putting records into a Kinesis stream with the Java SDK? - Stack Overflow

setSequenceNumberForOrdering
public void setSequenceNumberForOrdering(String sequenceNumberForOrdering)
Guarantees strictly increasing sequence numbers, for puts from the same client and to the same partition key. Usage: set the SequenceNumberForOrdering of record n to the sequence number of record n-1 (as returned in the result when putting record n-1). If this parameter is not set, records will be coarsely ordered based on arrival time.

Parameters:
sequenceNumberForOrdering - Guarantees strictly increasing sequence numbers, for puts from the same client and to the same partition key. Usage: set the SequenceNumberForOrdering of record n to the sequence number of record n-1 (as returned in the result when putting record n-1). If this parameter is not set, records will be coarsely ordered based on arrival time.

PutRecordRequest (AWS Java SDK for Amazon Kinesis 1.11.121 API)