How to save Kinesis stream to S3 storage in a specific folder structure in S3 bucket

I have an event captured by Kinesis Stream. I want to put all events in a specific folder structure on S3. I want to create a folder with a date stamp, since all the events of June 15th should be included in this folder, and on June 16th a new folder should appear to select events, etc.

As a newbie to Kinesis, I am just going to the documentation and I found that there is a connector infrastructure in which S3Emitter is used with the configuration to select the S3 location where the emitted data should be. However, can anyone suggest me how to maintain the folder structure in order to fix the date of the event in the folder with the date mutation?

+7
amazon-s3 amazon-kinesis
source share
3 answers

Unfortunately, the functionality you are looking for is not available in S3Emitter for Amazon Kinesis at the moment, but just works as a buffer that is flushed based on the amount of input data, see respectively. comment :

This IEmitter implementation is used to store files from a Kinesis stream in S3. [...] When the buffer is full , this method emits a class that adds the contents of the buffer to S3 as a single file. The file name generated from the first and last sequence numbers of the entries contained in this file is a divided dash . [...] [emphasis mine]

In addition, Kinesis does not have a first-level date concept for events (respectively data records), but only refers to sequence numbers, so you need to add them accordingly. data processing at the application level, see the "Data Recording" section of Amazon Kinesis Terminology :

Data records are data units that are stored in an Amazon Kinesis stream. Data records consist of a sequence number, a partition key, and a data block , which is an uninterpreted immutable sequence of bytes. Amazon Kinesis does not validate, interpret, or modify data in a block in any way . [...] [emphasis mine]

+4
source share

I found a way to solve this problem and posted the answer here: https://github.com/awslabs/amazon-kinesis-connectors/issues/24

Here is the answer again:

It is easy to achieve the following changes in the sample code:

In S3sample.properties:

createS3Bucket = true 

In S3Emitter.java:

 /* Add the required imports */ import java.text.SimpleDateFormat; import java.util.Calendar; public class S3Emitter implements IEmitter { //create date_bucket variable protected final String date_bucket = new SimpleDateFormat("yyyy_MM_dd_HH").format(Calendar.getInstance().getTime()); public S3Emitter(KinesisConnectorConfiguration configuration) { s3Bucket = configuration.S3_BUCKET + "/" + date_bucket; } } 

Hope this helps!

+7
source share

Since 2014, AWS has been offering a new solution. And especially Kinesis Firehose, who does the job. You just need to send data from the Kinesis stream to Kinesis Firehose with this lambda and create firehose in a few clicks.

+1
source share

All Articles