Spark Streaming + Kinesis Integration

Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases. Here we explain how to configure Spark Streaming to receive data from Kinesis.

Configuring Kinesis

A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following guide.

Configuring Spark Streaming Application

  1. Linking: In your SBT/Maven project definition, link your streaming application against the following artifact (see Linking section in the main programming guide for further information).

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.10
     version = 1.5.1
    

    Note that by linking to this library, you will include ASL-licensed code in your application.

  2. Programming: In the streaming application code, import KinesisUtils and create the input DStream as follows:

     import org.apache.spark.streaming.Duration
     import org.apache.spark.streaming.kinesis._
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
    
     val kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
    

    See the API docs and the example. Refer to the Running the Example section for instructions on how to run the example.

     import org.apache.spark.streaming.Duration;
     import org.apache.spark.streaming.kinesis.*;
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
    
     JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
    

    See the API docs and the example. Refer to the next subsection for instructions to run the example.

     from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
     kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
    

    See the API docs and the example. Refer to the next subsection for instructions to run the example.

    • streamingContext: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream

    • [Kineiss app name]: The application name that will be used to checkpoint the Kinesis sequence numbers in DynamoDB table.
      • The application name must be unique for a given account and region.
      • If the table exists but has incorrect checkpoint information (for a different stream, or old expired sequenced numbers), then there may be temporary errors.
    • [Kinesis stream name]: The Kinesis stream that this streaming application will pull data from.

    • [endpoint URL]: Valid Kinesis endpoints URL can be found here.

    • [region name]: Valid Kinesis region names can be found here.

    • [checkpoint interval]: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

    • [initial position]: Can be either InitialPositionInStream.TRIM_HORIZON or InitialPositionInStream.LATEST (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).

    In other versions of the API, you can also specify the AWS access key and secret key directly.

  3. Deploying: Package spark-streaming-kinesis-asl_2.10 and its dependencies (except spark-core_2.10 and spark-streaming_2.10 which are provided by spark-submit) into the application JAR. Then use spark-submit to launch your application (see Deploying section in the main programming guide).

    Points to remember at runtime:

    • Kinesis data processing is ordered per partition and occurs at-least once per message.

    • Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB.

    • A single Kinesis stream shard is processed by one input DStream at a time.

    Spark Streaming Kinesis Architecture

    • A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads.

    • Multiple input DStreams running in separate processes/instances can read from a Kinesis stream.

    • You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard.

    • Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point.

    • The Kinesis input DStream will balance the load between all DStreams - even across processes/instances.

    • The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load.

    • As a best practice, it’s recommended that you avoid re-shard jitter by over-provisioning when possible.

    • Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details.

    • There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing. These are 2 independent partitioning schemes.

Running the Example

To run the example,

Kinesis Checkpointing