How to deploy AWS kinesi flow?

I would like to fork / chain / replicate the AWS Kinesis input stream into N new Kinesis streams so that every record written to Kinesis input appears in each of the N streams.

Is there an AWS or open source service ?

I prefer not to write code for this if there is a ready-made solution. AWS Kinesis firehose is not a solution because it cannot output to kinesis. Perhaps an AWS Lambda solution if it doesn't work too expensive?

+8
amazon-web-services amazon-kinesis
source share
2 answers

There are two ways you could fork Amazon Kinesis :

  • Use Amazon Kinesis Analytics to copy records to additional streams.
  • Run the AWS Lambda function to copy records to another stream.

Option 1: Using Amazon Kinesis Analytics to Overclock

You can use Amazon Kinesis Analytics to generate a new stream from an existing stream.

From the Amazon Kinesis Analytics Documentation :

Amazon Kinesis Analytics applications continuously read and process real-time streaming data . You write application code using SQL to process incoming streaming data and receive output. Amazon Kinesis Analytics then writes the output to the configured destination .

Amazon Kinesis Analytics flow diagram

Fan-out is mentioned in the Application Code section:

You can also write SQL queries that work independently of each other. For example, you can write two SQL statements that request the same thread in an application, but send the output to different threads in applications .

I managed to implement this as follows:

  • Three streams are created: input, output1, output2
  • Two Amazon Kinesis Analytics apps created: copy1, copy2

The Amazon Kinesis Analytics SQL application is as follows:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (log VARCHAR(16)); CREATE OR REPLACE PUMP "COPY_PUMP1" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001"; 

This code creates a pump (think of it as a constant choice) that selects the input stream and outputs the output1 stream. I created another identical application that outputs output2 stream.

To check, I sent the data to the input stream:

 #!/usr/bin/env python import json, time from boto import kinesis kinesis = kinesis.connect_to_region("us-west-2") i = 0 while True: data={} data['log'] = 'Record ' + str(i) i += 1 print data kinesis.put_record("input", json.dumps(data), "key") time.sleep(2) 

I let it work for a while, and then output the result using this code:

 from boto import kinesis kinesis = kinesis.connect_to_region("us-west-2") iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator'] records = kinesis.get_records(iterator, 5) print [r['Data'] for r in records['Records']] 

The output was:

 [u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}'] 

I ran it again for output2 and the identical output was shown.

Option 2: Using AWS Lambda

If you fork in many threads, an AWS Lambda function could be a more efficient way:

  • Launched by Amazon Kinesis.
  • writes records to multiple Amazon Kinesis data streams

You can even make the Lambda function detect output streams based on a naming convention (for example, any stream named app-output-* ).

+11
source share

There is a github repository from Amazon lab that provides branching using lambda. https://github.com/awslabs/aws-lambda-fanout . Also read “Converting a Lambda synchronous call to asynchronous” at https://medium.com/retailmenot-engineering/building-a-high-throughput-data-pipeline-with-kinesis-lambda-and-dynamodb-7d78e992a02d , which is crucial value for building truly asynchronous processing.

+1
source share

All Articles