org.apache.spark.streaming.kafka
Class KafkaUtils

Object
  extended by org.apache.spark.streaming.kafka.KafkaUtils

public class KafkaUtils
extends Object


Constructor Summary
KafkaUtils()
           
 
Method Summary
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R>
JavaInputDStream<R>
createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
          :: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>>
JavaPairInputDStream<K,V>
createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, java.util.Map<String,String> kafkaParams, java.util.Set<String> topics)
          :: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R>
InputDStream<R>
createDirectStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<kafka.common.TopicAndPartition,Object> fromOffsets, scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler, scala.reflect.ClassTag<K> evidence$14, scala.reflect.ClassTag<V> evidence$15, scala.reflect.ClassTag<KD> evidence$16, scala.reflect.ClassTag<VD> evidence$17, scala.reflect.ClassTag<R> evidence$18)
          :: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>>
InputDStream<scala.Tuple2<K,V>>
createDirectStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Set<String> topics, scala.reflect.ClassTag<K> evidence$19, scala.reflect.ClassTag<V> evidence$20, scala.reflect.ClassTag<KD> evidence$21, scala.reflect.ClassTag<VD> evidence$22)
          :: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R>
JavaRDD<R>
createRDD(JavaSparkContext jsc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, OffsetRange[] offsetRanges, java.util.Map<kafka.common.TopicAndPartition,Broker> leaders, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
          :: Experimental :: Create a RDD from Kafka using offset ranges for each topic and partition.
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>>
JavaPairRDD<K,V>
createRDD(JavaSparkContext jsc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, java.util.Map<String,String> kafkaParams, OffsetRange[] offsetRanges)
          Create a RDD from Kafka using offset ranges for each topic and partition.
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>>
RDD<scala.Tuple2<K,V>>
createRDD(SparkContext sc, scala.collection.immutable.Map<String,String> kafkaParams, OffsetRange[] offsetRanges, scala.reflect.ClassTag<K> evidence$5, scala.reflect.ClassTag<V> evidence$6, scala.reflect.ClassTag<KD> evidence$7, scala.reflect.ClassTag<VD> evidence$8)
          Create a RDD from Kafka using offset ranges for each topic and partition.
static
<K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R>
RDD<R>
createRDD(SparkContext sc, scala.collection.immutable.Map<String,String> kafkaParams, OffsetRange[] offsetRanges, scala.collection.immutable.Map<kafka.common.TopicAndPartition,Broker> leaders, scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler, scala.reflect.ClassTag<K> evidence$9, scala.reflect.ClassTag<V> evidence$10, scala.reflect.ClassTag<KD> evidence$11, scala.reflect.ClassTag<VD> evidence$12, scala.reflect.ClassTag<R> evidence$13)
          :: Experimental :: Create a RDD from Kafka using offset ranges for each topic and partition.
static
<K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>>
JavaPairReceiverInputDStream<K,V>
createStream(JavaStreamingContext jssc, Class<K> keyTypeClass, Class<V> valueTypeClass, Class<U> keyDecoderClass, Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams, java.util.Map<String,Integer> topics, StorageLevel storageLevel)
          Create an input stream that pulls messages from Kafka Brokers.
static JavaPairReceiverInputDStream<String,String> createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics)
          Create an input stream that pulls messages from Kafka Brokers.
static JavaPairReceiverInputDStream<String,String> createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics, StorageLevel storageLevel)
          Create an input stream that pulls messages from Kafka Brokers.
static
<K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>>
ReceiverInputDStream<scala.Tuple2<K,V>>
createStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2, scala.reflect.ClassTag<U> evidence$3, scala.reflect.ClassTag<T> evidence$4)
          Create an input stream that pulls messages from Kafka Brokers.
static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
          Create an input stream that pulls messages from Kafka Brokers.
 
Methods inherited from class Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

KafkaUtils

public KafkaUtils()
Method Detail

createStream

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(StreamingContext ssc,
                                                                             String zkQuorum,
                                                                             String groupId,
                                                                             scala.collection.immutable.Map<String,Object> topics,
                                                                             StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.

Parameters:
ssc - StreamingContext object
zkQuorum - Zookeeper quorum (hostname:port,hostname:port,..)
groupId - The group id for this consumer
topics - Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
storageLevel - Storage level to use for storing the received objects (default: StorageLevel.MEMORY_AND_DISK_SER_2)
Returns:
(undocumented)

createStream

public static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> ReceiverInputDStream<scala.Tuple2<K,V>> createStream(StreamingContext ssc,
                                                                                                                                                     scala.collection.immutable.Map<String,String> kafkaParams,
                                                                                                                                                     scala.collection.immutable.Map<String,Object> topics,
                                                                                                                                                     StorageLevel storageLevel,
                                                                                                                                                     scala.reflect.ClassTag<K> evidence$1,
                                                                                                                                                     scala.reflect.ClassTag<V> evidence$2,
                                                                                                                                                     scala.reflect.ClassTag<U> evidence$3,
                                                                                                                                                     scala.reflect.ClassTag<T> evidence$4)
Create an input stream that pulls messages from Kafka Brokers.

Parameters:
ssc - StreamingContext object
kafkaParams - Map of kafka configuration parameters, see http://kafka.apache.org/08/configuration.html
topics - Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.
storageLevel - Storage level to use for storing the received objects
evidence$1 - (undocumented)
evidence$2 - (undocumented)
evidence$3 - (undocumented)
evidence$4 - (undocumented)
Returns:
(undocumented)

createStream

public static JavaPairReceiverInputDStream<String,String> createStream(JavaStreamingContext jssc,
                                                                       String zkQuorum,
                                                                       String groupId,
                                                                       java.util.Map<String,Integer> topics)
Create an input stream that pulls messages from Kafka Brokers. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.

Parameters:
jssc - JavaStreamingContext object
zkQuorum - Zookeeper quorum (hostname:port,hostname:port,..)
groupId - The group id for this consumer
topics - Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
Returns:
(undocumented)

createStream

public static JavaPairReceiverInputDStream<String,String> createStream(JavaStreamingContext jssc,
                                                                       String zkQuorum,
                                                                       String groupId,
                                                                       java.util.Map<String,Integer> topics,
                                                                       StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.

Parameters:
jssc - JavaStreamingContext object
zkQuorum - Zookeeper quorum (hostname:port,hostname:port,..).
groupId - The group id for this consumer.
topics - Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.
storageLevel - RDD storage level.
Returns:
(undocumented)

createStream

public static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> JavaPairReceiverInputDStream<K,V> createStream(JavaStreamingContext jssc,
                                                                                                                                               Class<K> keyTypeClass,
                                                                                                                                               Class<V> valueTypeClass,
                                                                                                                                               Class<U> keyDecoderClass,
                                                                                                                                               Class<T> valueDecoderClass,
                                                                                                                                               java.util.Map<String,String> kafkaParams,
                                                                                                                                               java.util.Map<String,Integer> topics,
                                                                                                                                               StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.

Parameters:
jssc - JavaStreamingContext object
keyTypeClass - Key type of DStream
valueTypeClass - value type of Dstream
keyDecoderClass - Type of kafka key decoder
valueDecoderClass - Type of kafka value decoder
kafkaParams - Map of kafka configuration parameters, see http://kafka.apache.org/08/configuration.html
topics - Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
storageLevel - RDD storage level.
Returns:
(undocumented)

createRDD

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> RDD<scala.Tuple2<K,V>> createRDD(SparkContext sc,
                                                                                                                                   scala.collection.immutable.Map<String,String> kafkaParams,
                                                                                                                                   OffsetRange[] offsetRanges,
                                                                                                                                   scala.reflect.ClassTag<K> evidence$5,
                                                                                                                                   scala.reflect.ClassTag<V> evidence$6,
                                                                                                                                   scala.reflect.ClassTag<KD> evidence$7,
                                                                                                                                   scala.reflect.ClassTag<VD> evidence$8)
Create a RDD from Kafka using offset ranges for each topic and partition.

Parameters:
sc - SparkContext object
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers) specified in host1:port1,host2:port2 form.
offsetRanges - Each OffsetRange in the batch corresponds to a range of offsets for a given Kafka topic/partition
evidence$5 - (undocumented)
evidence$6 - (undocumented)
evidence$7 - (undocumented)
evidence$8 - (undocumented)
Returns:
(undocumented)

createRDD

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> RDD<R> createRDD(SparkContext sc,
                                                                                                                     scala.collection.immutable.Map<String,String> kafkaParams,
                                                                                                                     OffsetRange[] offsetRanges,
                                                                                                                     scala.collection.immutable.Map<kafka.common.TopicAndPartition,Broker> leaders,
                                                                                                                     scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler,
                                                                                                                     scala.reflect.ClassTag<K> evidence$9,
                                                                                                                     scala.reflect.ClassTag<V> evidence$10,
                                                                                                                     scala.reflect.ClassTag<KD> evidence$11,
                                                                                                                     scala.reflect.ClassTag<VD> evidence$12,
                                                                                                                     scala.reflect.ClassTag<R> evidence$13)
:: Experimental :: Create a RDD from Kafka using offset ranges for each topic and partition. This allows you specify the Kafka leader to connect to (to optimize fetching) and access the message as well as the metadata.

Parameters:
sc - SparkContext object
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers) specified in host1:port1,host2:port2 form.
offsetRanges - Each OffsetRange in the batch corresponds to a range of offsets for a given Kafka topic/partition
leaders - Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, in which case leaders will be looked up on the driver.
messageHandler - Function for translating each message and metadata into the desired type
evidence$9 - (undocumented)
evidence$10 - (undocumented)
evidence$11 - (undocumented)
evidence$12 - (undocumented)
evidence$13 - (undocumented)
Returns:
(undocumented)

createRDD

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> JavaPairRDD<K,V> createRDD(JavaSparkContext jsc,
                                                                                                                             Class<K> keyClass,
                                                                                                                             Class<V> valueClass,
                                                                                                                             Class<KD> keyDecoderClass,
                                                                                                                             Class<VD> valueDecoderClass,
                                                                                                                             java.util.Map<String,String> kafkaParams,
                                                                                                                             OffsetRange[] offsetRanges)
Create a RDD from Kafka using offset ranges for each topic and partition.

Parameters:
jsc - JavaSparkContext object
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers) specified in host1:port1,host2:port2 form.
offsetRanges - Each OffsetRange in the batch corresponds to a range of offsets for a given Kafka topic/partition
keyClass - (undocumented)
valueClass - (undocumented)
keyDecoderClass - (undocumented)
valueDecoderClass - (undocumented)
Returns:
(undocumented)

createRDD

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> JavaRDD<R> createRDD(JavaSparkContext jsc,
                                                                                                                         Class<K> keyClass,
                                                                                                                         Class<V> valueClass,
                                                                                                                         Class<KD> keyDecoderClass,
                                                                                                                         Class<VD> valueDecoderClass,
                                                                                                                         Class<R> recordClass,
                                                                                                                         java.util.Map<String,String> kafkaParams,
                                                                                                                         OffsetRange[] offsetRanges,
                                                                                                                         java.util.Map<kafka.common.TopicAndPartition,Broker> leaders,
                                                                                                                         Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
:: Experimental :: Create a RDD from Kafka using offset ranges for each topic and partition. This allows you specify the Kafka leader to connect to (to optimize fetching) and access the message as well as the metadata.

Parameters:
jsc - JavaSparkContext object
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers) specified in host1:port1,host2:port2 form.
offsetRanges - Each OffsetRange in the batch corresponds to a range of offsets for a given Kafka topic/partition
leaders - Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, in which case leaders will be looked up on the driver.
messageHandler - Function for translating each message and metadata into the desired type
keyClass - (undocumented)
valueClass - (undocumented)
keyDecoderClass - (undocumented)
valueDecoderClass - (undocumented)
recordClass - (undocumented)
Returns:
(undocumented)

createDirectStream

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> InputDStream<R> createDirectStream(StreamingContext ssc,
                                                                                                                                       scala.collection.immutable.Map<String,String> kafkaParams,
                                                                                                                                       scala.collection.immutable.Map<kafka.common.TopicAndPartition,Object> fromOffsets,
                                                                                                                                       scala.Function1<kafka.message.MessageAndMetadata<K,V>,R> messageHandler,
                                                                                                                                       scala.reflect.ClassTag<K> evidence$14,
                                                                                                                                       scala.reflect.ClassTag<V> evidence$15,
                                                                                                                                       scala.reflect.ClassTag<KD> evidence$16,
                                                                                                                                       scala.reflect.ClassTag<VD> evidence$17,
                                                                                                                                       scala.reflect.ClassTag<R> evidence$18)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver. This stream can guarantee that each message from Kafka is included in transformations exactly once (see points below).

Points to note: - No receivers: This stream does not use any receiver. It directly queries Kafka - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see HasOffsetRanges). - Failure Recovery: To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.). - End-to-end semantics: This stream ensures that every records is effectively received and transformed exactly once, but gives no guarantees on whether the transformed data are outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure that the output operation is idempotent, or use transactions to output records atomically. See the programming guide for more details.

Parameters:
ssc - StreamingContext object
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers) specified in host1:port1,host2:port2 form.
fromOffsets - Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream
messageHandler - Function for translating each message and metadata into the desired type
evidence$14 - (undocumented)
evidence$15 - (undocumented)
evidence$16 - (undocumented)
evidence$17 - (undocumented)
evidence$18 - (undocumented)
Returns:
(undocumented)

createDirectStream

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> InputDStream<scala.Tuple2<K,V>> createDirectStream(StreamingContext ssc,
                                                                                                                                                     scala.collection.immutable.Map<String,String> kafkaParams,
                                                                                                                                                     scala.collection.immutable.Set<String> topics,
                                                                                                                                                     scala.reflect.ClassTag<K> evidence$19,
                                                                                                                                                     scala.reflect.ClassTag<V> evidence$20,
                                                                                                                                                     scala.reflect.ClassTag<KD> evidence$21,
                                                                                                                                                     scala.reflect.ClassTag<VD> evidence$22)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver. This stream can guarantee that each message from Kafka is included in transformations exactly once (see points below).

Points to note: - No receivers: This stream does not use any receiver. It directly queries Kafka - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see HasOffsetRanges). - Failure Recovery: To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.). - End-to-end semantics: This stream ensures that every records is effectively received and transformed exactly once, but gives no guarantees on whether the transformed data are outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure that the output operation is idempotent, or use transactions to output records atomically. See the programming guide for more details.

Parameters:
ssc - StreamingContext object
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form. If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" to determine where the stream starts (defaults to "largest")
topics - Names of the topics to consume
evidence$19 - (undocumented)
evidence$20 - (undocumented)
evidence$21 - (undocumented)
evidence$22 - (undocumented)
Returns:
(undocumented)

createDirectStream

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R> JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc,
                                                                                                                                           Class<K> keyClass,
                                                                                                                                           Class<V> valueClass,
                                                                                                                                           Class<KD> keyDecoderClass,
                                                                                                                                           Class<VD> valueDecoderClass,
                                                                                                                                           Class<R> recordClass,
                                                                                                                                           java.util.Map<String,String> kafkaParams,
                                                                                                                                           java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets,
                                                                                                                                           Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver. This stream can guarantee that each message from Kafka is included in transformations exactly once (see points below).

Points to note: - No receivers: This stream does not use any receiver. It directly queries Kafka - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see HasOffsetRanges). - Failure Recovery: To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.). - End-to-end semantics: This stream ensures that every records is effectively received and transformed exactly once, but gives no guarantees on whether the transformed data are outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure that the output operation is idempotent, or use transactions to output records atomically. See the programming guide for more details.

Parameters:
jssc - JavaStreamingContext object
keyClass - Class of the keys in the Kafka records
valueClass - Class of the values in the Kafka records
keyDecoderClass - Class of the key decoder
valueDecoderClass - Class of the value decoder
recordClass - Class of the records in DStream
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
fromOffsets - Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream
messageHandler - Function for translating each message and metadata into the desired type
Returns:
(undocumented)

createDirectStream

public static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>> JavaPairInputDStream<K,V> createDirectStream(JavaStreamingContext jssc,
                                                                                                                                               Class<K> keyClass,
                                                                                                                                               Class<V> valueClass,
                                                                                                                                               Class<KD> keyDecoderClass,
                                                                                                                                               Class<VD> valueDecoderClass,
                                                                                                                                               java.util.Map<String,String> kafkaParams,
                                                                                                                                               java.util.Set<String> topics)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver. This stream can guarantee that each message from Kafka is included in transformations exactly once (see points below).

Points to note: - No receivers: This stream does not use any receiver. It directly queries Kafka - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see HasOffsetRanges). - Failure Recovery: To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.). - End-to-end semantics: This stream ensures that every records is effectively received and transformed exactly once, but gives no guarantees on whether the transformed data are outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure that the output operation is idempotent, or use transactions to output records atomically. See the programming guide for more details.

Parameters:
jssc - JavaStreamingContext object
keyClass - Class of the keys in the Kafka records
valueClass - Class of the values in the Kafka records
keyDecoderClass - Class of the key decoder
valueDecoderClass - Class type of the value decoder
kafkaParams - Kafka configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form. If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" to determine where the stream starts (defaults to "largest")
topics - Names of the topics to consume
Returns:
(undocumented)