I just started using Kinesis, whose API is available here.
I used this to click 100 records on kinesit
for (int j = 0; j < 100; j++) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
putRecordRequest.setData(ByteBuffer.wrap(data.getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", j));
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
+ ", ShardID : " + putRecordResult.getShardId() + ", Sequence No : "+ putRecordResult.getSequenceNumber());
}
Now I want to get the number of records that have been clicked. For this, I use this:
Iterator<Shard> shardIterator = getTotalShardsIterator();
Now, using the above iterator, I get the score as:
.....
while (shardIterator.hasNext()) {
Shard shard = shardIterator.next();
String shardId = shard.getShardId();
int datacount = getDataCount(shardId, myStreamName);
totalStreamDataCount+= datacount;
System.out.println("Data Count for Shard " + shardId + " is : " + datacount);
}
.....
Here is my getDataCount function (shardId, myStreamName)
public static int getDataCount(String shardId, String streamName) {
int dataCount = 0;
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(streamName);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(ShardIteratorType.TRIM_HORIZON);
GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(1000);
GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
if(!records.isEmpty() && records.size() > 0){
dataCount = records.size();
Iterator<Record> iterator = records.iterator();
while(iterator.hasNext()) {
Record record = iterator.next();
byte[] bytes = record.getData().array();
String recordData = new String(bytes);
System.out.println("Shard Id. :"+shardId+"Seq. No. is : "+" Record data :"+recordData);
}
}
return dataCount;
}
But this code gives the results of the mismatch every time I run it, for example, sometimes it shows 81 for a while 91
Please shed some light on this .. :)