There are two ways you could fork Amazon Kinesis :
- Use Amazon Kinesis Analytics to copy records to additional streams.
- Run the AWS Lambda function to copy records to another stream.
Option 1: Using Amazon Kinesis Analytics to Overclock
You can use Amazon Kinesis Analytics to generate a new stream from an existing stream.
From the Amazon Kinesis Analytics Documentation :
Amazon Kinesis Analytics applications continuously read and process real-time streaming data . You write application code using SQL to process incoming streaming data and receive output. Amazon Kinesis Analytics then writes the output to the configured destination .

Fan-out is mentioned in the Application Code section:
You can also write SQL queries that work independently of each other. For example, you can write two SQL statements that request the same thread in an application, but send the output to different threads in applications .
I managed to implement this as follows:
- Three streams are created: input, output1, output2
- Two Amazon Kinesis Analytics apps created: copy1, copy2
The Amazon Kinesis Analytics SQL application is as follows:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (log VARCHAR(16)); CREATE OR REPLACE PUMP "COPY_PUMP1" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";
This code creates a pump (think of it as a constant choice) that selects the input stream and outputs the output1 stream. I created another identical application that outputs output2 stream.
To check, I sent the data to the input stream:
#!/usr/bin/env python import json, time from boto import kinesis kinesis = kinesis.connect_to_region("us-west-2") i = 0 while True: data={} data['log'] = 'Record ' + str(i) i += 1 print data kinesis.put_record("input", json.dumps(data), "key") time.sleep(2)
I let it work for a while, and then output the result using this code:
from boto import kinesis kinesis = kinesis.connect_to_region("us-west-2") iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator'] records = kinesis.get_records(iterator, 5) print [r['Data'] for r in records['Records']]
The output was:
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']
I ran it again for output2 and the identical output was shown.
Option 2: Using AWS Lambda
If you fork in many threads, an AWS Lambda function could be a more efficient way:
- Launched by Amazon Kinesis.
- writes records to multiple Amazon Kinesis data streams
You can even make the Lambda function detect output streams based on a naming convention (for example, any stream named app-output-* ).