AWS Big Data Study Notes – AWS Kinesis
This is the cheat sheet on AWS Kinesis. AWS Kinesis is a real-time streaming data platform on AWS:
- Kinesis Video Streams: build custom applications that process or analyze streaming video
- Kinesis Data Streams: build custom applications that process or analyze streaming data
- Kinesis Data Firehose: deliver real-time streaming data to AWS destinations such as S3, Redshift, Kinesis Analytics, and Elasticsearch Service
- Kinesis Data Analytics: process and analyze streaming data with standard SQL
AWS Kinesis Video Streams
Three components:
- Producer: send video/audio data to one or more video streams.
- Producer API: PutMedia API to write media data to a Kinesis video stream
- In a PutMediarequest, the producer sends a stream of media fragments. A fragment is a self-contained sequence of frames.
- As fragments arrive, Kinesis Video Streams assigns a unique fragment number, in increasing order. It also stores producer-side and server-side timestamps for each fragment, as Kinesis Video Streams-specific metadata.
- Video Stream: create a Kinesis video stream using the AWS Management Console or programmatically using the AWS SDKs.
- Consumer: create consumer applications to run on EC2 instances to get data (fragments and frames) to view, process or analyze
- Consumer APIs: GetMedia – input the starting fragment, returns fragments; GetMediaFromFragmentList (and ListFragments) – Batch processing applications are considered offline consumers.
AWS Kinesis Data Streams
Three components:
- Data Stream: a set of shards
- Calculate initial number_of_shards
- number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)
- incoming_write_bandwidth_in_KiB = average_data_size_in_KiB (round up to nearest 1 KiB) x records_per_second
- outgoing_read_bandwidth_in_KiB = incoming_write_bandwidth_in_KiB x number_of_consumers
- Each shard has a sequence of data records. Shard supports Read: 5 transactions per second, up to a maximum total data read rate of 2 MB per second. Shard support Write: up to 1,000 records per second, up to a maximum total data write rate of 1 MB per second (including partition keys)
- Data records are composed of a sequence number, a partition key, and a data blob, which is an immutable sequence of bytes.
- A partition key is used to group data by shard within a stream – Unicode strings (max 256 bytes, MD5 hash)
- A sequence number is unique within its shard – cannot be used as indexes, assigned by Kinesis Data Streams
- The retention period is set to a default of 24 hours after creation. You can increase the retention period up to 168 hours (7 days)
- Reshard: adjust the number of shards in your stream to adapt to changes in the rate of data flow through the stream. Resharding is always pairwise (e.g. cannot split/merge into more than two shards) in a single operation
- Shard split: increase the capacity (and cost) of your stream
- Shard merge: reduce the cost (and capacity) of your stream
- Reshard process: parent shard transitions from OPEN to CLOSED then EXPIRED
- Calculate initial number_of_shards
- Producer: put records(data ingestion) into Kinesis Data Streams
- Automatically encrypt sensitive data as a producer enters it into a stream – uses AWS KMS master keys for encryption
- Kinesis Agent: a stand-alone Java software application that offers an easy way to collect and send data to Kinesis Data Streams
- Using Kinesis Data Streams API: develop producers using the Kinesis Data Streams API with the AWS SDK for Java – putRecord/putRecords
- KPL (Kinesis Producer Library) simplifies producer application development. Key concepts in KPL:
- User records: blob data (e.g. JSON blob representing a UI event on a website). It is different than data records
- Retry mechanism: user records using the KPL addUserRecord operation, a record is given a time stamp and added to a buffer with a deadline set by RecordMaxBufferedTime configuration parameter. This time stamp/deadline combination sets the buffer priority. Records are flushed from the buffer based on buffer priority, aggregation configuration, and collection configuration. Records flushed are then sent to Kinesis data stream with PutRecords operation. The operation sends requests to the stream that occasionally exhibit full or partial failures. Records that fail are automatically added back to the KPL buffer. The new deadline is set based on the minimum of these two values: half the current RecordMaxBufferedTime configuration and the record’s time-to-live value
- Rate limiting: reduce spamming which limits per-shard throughput sent from a single producer.
- Batching: perform a single action on multiple items. The two types of KPL batching are designed to coexist and can be turned on or off independently of one another.
- Aggregation: store multiple records within a single Kinesis Data Streams record. Improve per shard throughput. Optimize TCO of the stream
- Collection: use the API operation PutRecords to send multiple Kinesis Data Streams records to one or more shards in your Kinesis data stream. Reduce the overhead of making many separate HTTP requests for multi-shard stream
- User records: blob data (e.g. JSON blob representing a UI event on a website). It is different than data records
- Consumer: get records from Kinesis Data Streams and process them.
- Two types of consumers: shared fan-out consumers (fixed at a total of 2 MiB/sec per shard for all consumers, 200ms, HTTP) and enhanced fan-out consumers (up to 2MiB/sec per shard per consumer, 70ms, HTTP/2)
- KCL (Kinesis Client Library) consume and process data from a Kinesis Data Stream – instantiate a worker
- Kinesis Data Streams application: deploy it on EC2. Utilize EC2 autoscaling
- Use Kinesis Data Firehose to deliver the stream to S3, Redshift, Elasticsearch Service (ES), or Splunk
- Use Kinesis Data Streams API: get data from a stream, getRecords, getShardIterator, adapt to Reshard
- Kinesis connector library: a pre-built library that helps you easily integrate Kinesis Data Streams with other AWS services and third-party tools. Kinesis Client Library (KCL) is required for using this library. The current version of this library provides connectors to DynamoDB, Redshift, S3, and Elasticsearch. The library also includes sample connectors of each type, plus Apache Ant builds files for running the samples.
If you want to learn the difference between AWS Kinesis Data Streams and Apache Kafka, please review this blog.
AWS Kinesis Firehose
Two components:
- Producer: send data to Kinesis Data Firehose delivery stream from Kinesis data stream, the Kinesis Agent, or the Kinesis Data Firehose API using the AWS SDK. You can also use CloudWatch Logs, CloudWatch Events, or AWS IoT as your data source
- Kinesis Data Firehose delivery stream: use Kinesis Data Firehose to create a Kinesis Data Firehose delivery stream
- Record: as large as 1000kb
- Buffer: Kinesis Data Firehose buffers incoming streaming data to a certain size or for a certain period of time before delivering it to destinations. The buffer size in MBs(S3 – 1MB to 128MB, ES – 1MB to 100MB) and Buffer interval is in seconds (60s – 900s)
- Kinesis Data Firehose retries every 5 minutes until the retry duration ends. If you set the retry duration to 0 (zero) seconds, Kinesis Data Firehose does not retry upon an index request failure.
- Record transformation: Data Firehose can invoke Lambda function to transform incoming source data and deliver the transformed data to destinations
- Data Transformation flow:
- Enables Kinesis Data Firehose data transformation with buffers incoming data up to 3 MB
- Invokes the specified Lambda function asynchronously with each buffered batch using the AWS Lambda synchronous invocation mode
- The transformed data is sent from Lambda to Kinesis Data Firehose
- Kinesis Data Firehose sends it to the destination when the specified destination buffering size or buffering interval is reached, whichever happens first. (The Lambda synchronous invocation mode has a payload size limit of 6 MB for both the request and the response. )
- All transformed records from Lambda must contain the following parameters, or Kinesis Data Firehose rejects them and treats that as a data transformation failure.
- recordId: The record ID is passed from Kinesis Data Firehose to Lambda during the invocation.
- result: The status of the data transformation of the record(Ok, Dropped, ProcessingFailed)
- data: The transformed data payload, after base64-encoding
- Lambda blueprints:
- General Firehose Processing: data transformation and status model for any custom transformation logic
- Apache log to JSON: Parses and converts Apache log lines to JSON objects, using predefined JSON field names.
- Apache log to CSV: Parses and converts Apache log lines to CSV format.
- Syslog to CSV: Parses and converts Syslog lines to CSV format
- Kinesis Data Firehose Process Record Streams as the source: Accesses the Kinesis Data Streams records in the input and returns them with a processing status.
- Kinesis Data Firehose Cloudwatch Logs Processor: Parses and extracts individual log events from records sent by CloudWatch Logs subscription filters
- Data Transformation flow:
If you want to learn the difference between Kinesis Data Streams and Kinesis Firehose please review this blog.
AWS Kinesis Data Analytics
Three parts of each Kinesis Data Analytics application:
- Input: The streaming source for your application – a Kinesis data stream or a Kinesis Data Firehose data delivery stream
- Application code: A series of SQL statements that process input and produce output – SQL statements or JOIN queries
- Output: Query results go to in-application streams – S3, Redshift, ES, or a custom destination
Kinesis Data Analytics for Java Applications:
- Use Java to process and analyze streaming data to perform time-series analytics, feed real-time dashboards, and create real-time metrics
- Use open-source libraries based on Flink: provisioning compute resources, parallel computation, automatic scaling, and application backups
- Types of connectors: sources (a Kinesis data stream, file, or other data source), sinks (a Kinesis data stream, Kinesis Data Firehose delivery stream, or other data destination), Asynchronous I/O (asynchronous access to a data source to enrich stream events)
- Operators: transform and aggregation
Streaming SQL:
- Continuous queries: A query over a stream executes continuously over streaming data
- Windowed queries:
- Stagger windows: multiple overlapping windows
- Tumbling windows: non-overlapping
- Sliding windows: the window slides with time using a fixed time or rowcount interval
- Stream joins: JOIN queries to correlate data
Error handling:
- Reports runtime errors using an in-application error stream called error_stream
- Error_stream schema: ERROR_TIME, ERROR_LEVEL, ERROR_NAME,MESSAGE, DATA_ROWTIME, DATA_ROW, PUMP_NAME
Notes List
- How to Pass AWS Certified Big Data SpecialtyAWS Big Data Study Notes – AWS Kinesis
- AWS Big Data Study Notes – EMR and Redshift
- AWS Big Data Study Notes – AWS Machine Learning and IoT
- AWS Big Data Study Notes – AWS QuickSight, Athena, Glue, and ES
- AWS Big Data Study Notes – AWS DynamoDB, S3 and SQS
- AWS Kinesis Data Streams vs. Kinesis Data Firehose
- Streaming Platforms: Apache Kafka vs. AWS Kinesis
- When Should Use Amazon DynamoDB Accelerator (AWS DAX)?
- Data Storage for Big Data: Aurora, Redshift or Hadoop?
- Apache Hadoop Ecosystem Cheat Sheet