org.apache.spark.streaming.kafka
Class KafkaTestUtils

Object
  extended by org.apache.spark.streaming.kafka.KafkaTestUtils
All Implemented Interfaces:
Logging

public class KafkaTestUtils
extends Object
implements Logging

This is a helper class for Kafka test suites. This has the functionality to set up and tear down local Kafka servers, and to push data using Kafka producers.

The reason to put Kafka test utility class in src is to test Python related Kafka APIs.


Constructor Summary
KafkaTestUtils()
           
 
Method Summary
 String brokerAddress()
           
 void createTopic(String topic)
          Create a Kafka topic and wait until it propagated to the whole cluster
<T> T
eventually(Time timeout, Time interval, scala.Function0<T> func)
           
 void sendMessages(String topic, java.util.Map<String,Integer> messageToFreq)
          Java-friendly function for sending messages to the Kafka broker
 void sendMessages(String topic, scala.collection.immutable.Map<String,Object> messageToFreq)
          Send the messages to the Kafka broker
 void sendMessages(String topic, String[] messages)
          Send the array of messages to the Kafka broker
 void setup()
          setup the whole embedded servers, including Zookeeper and Kafka brokers
 void teardown()
          Teardown the whole servers, including Kafka broker and Zookeeper
 void waitUntilLeaderOffset(String topic, int partition, long offset)
          Wait until the leader offset for the given topic/partition equals the specified offset
 String zkAddress()
           
 org.I0Itec.zkclient.ZkClient zookeeperClient()
           
 
Methods inherited from class Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 
Methods inherited from interface org.apache.spark.Logging
initializeIfNecessary, initializeLogging, isTraceEnabled, log_, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning
 

Constructor Detail

KafkaTestUtils

public KafkaTestUtils()
Method Detail

zkAddress

public String zkAddress()

brokerAddress

public String brokerAddress()

zookeeperClient

public org.I0Itec.zkclient.ZkClient zookeeperClient()

setup

public void setup()
setup the whole embedded servers, including Zookeeper and Kafka brokers


teardown

public void teardown()
Teardown the whole servers, including Kafka broker and Zookeeper


createTopic

public void createTopic(String topic)
Create a Kafka topic and wait until it propagated to the whole cluster


sendMessages

public void sendMessages(String topic,
                         java.util.Map<String,Integer> messageToFreq)
Java-friendly function for sending messages to the Kafka broker


sendMessages

public void sendMessages(String topic,
                         scala.collection.immutable.Map<String,Object> messageToFreq)
Send the messages to the Kafka broker


sendMessages

public void sendMessages(String topic,
                         String[] messages)
Send the array of messages to the Kafka broker


eventually

public <T> T eventually(Time timeout,
                        Time interval,
                        scala.Function0<T> func)

waitUntilLeaderOffset

public void waitUntilLeaderOffset(String topic,
                                  int partition,
                                  long offset)
Wait until the leader offset for the given topic/partition equals the specified offset