ablog

不器用で落着きのない技術者のメモ

KCL のサンプルコードの sequenceNumberOfPreviousRecord の初期値について

以下のサンプルコードの sequenceNumberOfPreviousRecord のループの1回目の初期値が何かという話だが、setSequenceNumberForOrdering はシャード内の順序を保証するメソッドで2回目からのループで1回目より後になればよいので、初期値は null でよさそう。

PutRecord の例
以下のコードでは、2 つのパーティションキーに配分される 10 件のデータレコードを作成し、myStreamName という名前のストリームに格納しています。

for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}

上記のコード例では、setSequenceNumberForOrdering を使用して、各パーティションキー内で順番が厳密に増えるようにしています。このパラメータを効果的に使用するには、現在のレコード (レコード SequenceNumberForOrderingn) の を前述のレコード (レコード n-1) のシーケンス番号に設定します。ストリームに追加されたレコードのシーケンス番号を取得するには、getSequenceNumber の結果に対して putRecord を呼び出します。
パラメータは、同じパーティションキーのシーケンス番号が厳密に増えるようにします。SequenceNumberForOrderingSequenceNumberForOrdering では、複数のパーティションキーにわたるレコードの順序付けは行いません。

That's a strangely incomplete example. Doesn't show or discuss how sequenceNumberOfPreviousRecordin is initialized. I found a slightly better example in aws forums, apparently the starting sequence number to use is null.

String sequenceNumberOfPreviousRecord = null;
for (int j = 0; j < 200; j++) {
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName(myStreamName);
  putRecordRequest.setData(ByteBuffer.wrap(String.format("%s-%d",testData, 200+j).getBytes()));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); 
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();

  System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
      + ", Data : " + String.format("%s-%d",testData, 200+j)
      + ", SequenceNumber : " + putRecordResult.getSequenceNumber()
      );
}

Your example's use of partition key is weird as well. Unless if there is a very skewed distribution of partition keys, keys 0 and 1 are very likely to end up in the same shard. In most cases you're best served using a random uuid to ensure distributing incoming records across your shards.

amazon web services - What exactly does sequenceNumberForOrdering do when putting records into a Kinesis stream with the Java SDK? - Stack Overflow

setSequenceNumberForOrdering
public void setSequenceNumberForOrdering(String sequenceNumberForOrdering)
Guarantees strictly increasing sequence numbers, for puts from the same client and to the same partition key. Usage: set the SequenceNumberForOrdering of record n to the sequence number of record n-1 (as returned in the result when putting record n-1). If this parameter is not set, records will be coarsely ordered based on arrival time.

Parameters:
sequenceNumberForOrdering - Guarantees strictly increasing sequence numbers, for puts from the same client and to the same partition key. Usage: set the SequenceNumberForOrdering of record n to the sequence number of record n-1 (as returned in the result when putting record n-1). If this parameter is not set, records will be coarsely ordered based on arrival time.

PutRecordRequest (AWS Java SDK for Amazon Kinesis 1.11.121 API)