public class JavaStreamingContext
extends Object
implements java.io.Closeable
StreamingContext
which is the main
entry point for Spark Streaming functionality. It provides methods to create
JavaDStream
and
JavaPairDStream$
from input sources. The internal
org.apache.spark.api.java.JavaSparkContext (see core Spark documentation) can be accessed
using context.sparkContext
. After creating and transforming DStreams, the streaming
computation can be started and stopped using context.start()
and context.stop()
,
respectively. context.awaitTermination()
allows the current thread to wait for the
termination of a context by stop()
or by an exception.Constructor and Description |
---|
JavaStreamingContext(JavaSparkContext sparkContext,
Duration batchDuration)
Create a JavaStreamingContext using an existing JavaSparkContext.
|
JavaStreamingContext(SparkConf conf,
Duration batchDuration)
Create a JavaStreamingContext using a SparkConf configuration.
|
JavaStreamingContext(StreamingContext ssc) |
JavaStreamingContext(String path)
Recreate a JavaStreamingContext from a checkpoint file.
|
JavaStreamingContext(String path,
org.apache.hadoop.conf.Configuration hadoopConf)
Re-creates a JavaStreamingContext from a checkpoint file.
|
JavaStreamingContext(String master,
String appName,
Duration batchDuration)
Create a StreamingContext.
|
JavaStreamingContext(String master,
String appName,
Duration batchDuration,
String sparkHome,
String jarFile)
Create a StreamingContext.
|
JavaStreamingContext(String master,
String appName,
Duration batchDuration,
String sparkHome,
String[] jars)
Create a StreamingContext.
|
JavaStreamingContext(String master,
String appName,
Duration batchDuration,
String sparkHome,
String[] jars,
java.util.Map<String,String> environment)
Create a StreamingContext.
|
Modifier and Type | Method and Description |
---|---|
<T> JavaReceiverInputDStream<T> |
actorStream(akka.actor.Props props,
String name)
Create an input stream with any arbitrary user implemented actor receiver.
|
<T> JavaReceiverInputDStream<T> |
actorStream(akka.actor.Props props,
String name,
StorageLevel storageLevel)
Create an input stream with any arbitrary user implemented actor receiver.
|
<T> JavaReceiverInputDStream<T> |
actorStream(akka.actor.Props props,
String name,
StorageLevel storageLevel,
akka.actor.SupervisorStrategy supervisorStrategy)
Create an input stream with any arbitrary user implemented actor receiver.
|
void |
addStreamingListener(StreamingListener streamingListener)
Add a
StreamingListener object for
receiving system events related to streaming. |
void |
awaitTermination()
Wait for the execution to stop.
|
void |
awaitTermination(long timeout)
Wait for the execution to stop.
|
void |
checkpoint(String directory)
Sets the context to periodically checkpoint the DStream operations for master
fault-tolerance.
|
void |
close() |
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(String directory)
Create an input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
static JavaStreamingContext |
getOrCreate(String checkpointPath,
org.apache.hadoop.conf.Configuration hadoopConf,
JavaStreamingContextFactory factory)
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
|
static JavaStreamingContext |
getOrCreate(String checkpointPath,
org.apache.hadoop.conf.Configuration hadoopConf,
JavaStreamingContextFactory factory,
boolean createOnError)
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
|
static JavaStreamingContext |
getOrCreate(String checkpointPath,
JavaStreamingContextFactory factory)
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
|
static String[] |
jarOfClass(Class<?> cls)
Find the JAR from which a given class was loaded, to make it easy for users to pass
their JARs to StreamingContext.
|
<T> JavaDStream<T> |
queueStream(java.util.Queue<JavaRDD<T>> queue)
Create an input stream from an queue of RDDs.
|
<T> JavaInputDStream<T> |
queueStream(java.util.Queue<JavaRDD<T>> queue,
boolean oneAtATime)
Create an input stream from an queue of RDDs.
|
<T> JavaInputDStream<T> |
queueStream(java.util.Queue<JavaRDD<T>> queue,
boolean oneAtATime,
JavaRDD<T> defaultRDD)
Create an input stream from an queue of RDDs.
|
<T> JavaReceiverInputDStream<T> |
rawSocketStream(String hostname,
int port)
Create an input stream from network source hostname:port, where data is received
as serialized blocks (serialized using the Spark's serializer) that can be directly
pushed into the block manager without deserializing them.
|
<T> JavaReceiverInputDStream<T> |
rawSocketStream(String hostname,
int port,
StorageLevel storageLevel)
Create an input stream from network source hostname:port, where data is received
as serialized blocks (serialized using the Spark's serializer) that can be directly
pushed into the block manager without deserializing them.
|
<T> JavaReceiverInputDStream<T> |
receiverStream(Receiver<T> receiver)
Create an input stream with any arbitrary user implemented receiver.
|
void |
remember(Duration duration)
Sets each DStreams in this context to remember RDDs it generated in the last given duration.
|
JavaSparkContext |
sc() |
<T> JavaReceiverInputDStream<T> |
socketStream(String hostname,
int port,
Function<java.io.InputStream,Iterable<T>> converter,
StorageLevel storageLevel)
Create an input stream from network source hostname:port.
|
JavaReceiverInputDStream<String> |
socketTextStream(String hostname,
int port)
Create an input stream from network source hostname:port.
|
JavaReceiverInputDStream<String> |
socketTextStream(String hostname,
int port,
StorageLevel storageLevel)
Create an input stream from network source hostname:port.
|
JavaSparkContext |
sparkContext()
The underlying SparkContext
|
StreamingContext |
ssc() |
void |
start()
Start the execution of the streams.
|
void |
stop()
Stop the execution of the streams.
|
void |
stop(boolean stopSparkContext)
Stop the execution of the streams.
|
void |
stop(boolean stopSparkContext,
boolean stopGracefully)
Stop the execution of the streams.
|
JavaDStream<String> |
textFileStream(String directory)
Create an input stream that monitors a Hadoop-compatible filesystem
for new files and reads them as text files (using key as LongWritable, value
as Text and input format as TextInputFormat).
|
<T> JavaDStream<T> |
transform(java.util.List<JavaDStream<?>> dstreams,
Function2<java.util.List<JavaRDD<?>>,Time,JavaRDD<T>> transformFunc)
Create a new DStream in which each RDD is generated by applying a function on RDDs of
the DStreams.
|
<K,V> JavaPairDStream<K,V> |
transformToPair(java.util.List<JavaDStream<?>> dstreams,
Function2<java.util.List<JavaRDD<?>>,Time,JavaPairRDD<K,V>> transformFunc)
Create a new DStream in which each RDD is generated by applying a function on RDDs of
the DStreams.
|
<T> JavaDStream<T> |
union(JavaDStream<T> first,
java.util.List<JavaDStream<T>> rest)
Create a unified DStream from multiple DStreams of the same type and same slide duration.
|
<K,V> JavaPairDStream<K,V> |
union(JavaPairDStream<K,V> first,
java.util.List<JavaPairDStream<K,V>> rest)
Create a unified DStream from multiple DStreams of the same type and same slide duration.
|
public JavaStreamingContext(StreamingContext ssc)
public JavaStreamingContext(String master, String appName, Duration batchDuration)
master
- Name of the Spark MasterappName
- Name to be used when registering with the schedulerbatchDuration
- The time interval at which streaming data will be divided into batchespublic JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String jarFile)
master
- Name of the Spark MasterappName
- Name to be used when registering with the schedulerbatchDuration
- The time interval at which streaming data will be divided into batchessparkHome
- The SPARK_HOME directory on the slave nodesjarFile
- JAR file containing job code, to ship to cluster. This can be a path on the
local file system or an HDFS, HTTP, HTTPS, or FTP URL.public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars)
master
- Name of the Spark MasterappName
- Name to be used when registering with the schedulerbatchDuration
- The time interval at which streaming data will be divided into batchessparkHome
- The SPARK_HOME directory on the slave nodesjars
- Collection of JARs to send to the cluster. These can be paths on the local file
system or HDFS, HTTP, HTTPS, or FTP URLs.public JavaStreamingContext(String master, String appName, Duration batchDuration, String sparkHome, String[] jars, java.util.Map<String,String> environment)
master
- Name of the Spark MasterappName
- Name to be used when registering with the schedulerbatchDuration
- The time interval at which streaming data will be divided into batchessparkHome
- The SPARK_HOME directory on the slave nodesjars
- Collection of JARs to send to the cluster. These can be paths on the local file
system or HDFS, HTTP, HTTPS, or FTP URLs.environment
- Environment variables to set on worker nodespublic JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)
sparkContext
- The underlying JavaSparkContext to usebatchDuration
- The time interval at which streaming data will be divided into batchespublic JavaStreamingContext(SparkConf conf, Duration batchDuration)
conf
- A Spark application configurationbatchDuration
- The time interval at which streaming data will be divided into batchespublic JavaStreamingContext(String path)
path
- Path to the directory that was specified as the checkpoint directorypublic JavaStreamingContext(String path, org.apache.hadoop.conf.Configuration hadoopConf)
path
- Path to the directory that was specified as the checkpoint directory
public static JavaStreamingContext getOrCreate(String checkpointPath, JavaStreamingContextFactory factory)
checkpointPath
, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the provided factory
will be used to create a JavaStreamingContext.
checkpointPath
- Checkpoint directory used in an earlier JavaStreamingContext programfactory
- JavaStreamingContextFactory object to create a new JavaStreamingContextpublic static JavaStreamingContext getOrCreate(String checkpointPath, org.apache.hadoop.conf.Configuration hadoopConf, JavaStreamingContextFactory factory)
checkpointPath
, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the provided factory
will be used to create a JavaStreamingContext.
checkpointPath
- Checkpoint directory used in an earlier StreamingContext programfactory
- JavaStreamingContextFactory object to create a new JavaStreamingContexthadoopConf
- Hadoop configuration if necessary for reading from any HDFS compatible
file systempublic static JavaStreamingContext getOrCreate(String checkpointPath, org.apache.hadoop.conf.Configuration hadoopConf, JavaStreamingContextFactory factory, boolean createOnError)
checkpointPath
, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the provided factory
will be used to create a JavaStreamingContext.
checkpointPath
- Checkpoint directory used in an earlier StreamingContext programfactory
- JavaStreamingContextFactory object to create a new JavaStreamingContexthadoopConf
- Hadoop configuration if necessary for reading from any HDFS compatible
file systemcreateOnError
- Whether to create a new JavaStreamingContext if there is an
error in reading checkpoint data.public static String[] jarOfClass(Class<?> cls)
public StreamingContext ssc()
public JavaSparkContext sparkContext()
public JavaSparkContext sc()
public JavaReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objectspublic JavaReceiverInputDStream<String> socketTextStream(String hostname, int port)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datapublic <T> JavaReceiverInputDStream<T> socketStream(String hostname, int port, Function<java.io.InputStream,Iterable<T>> converter, StorageLevel storageLevel)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving dataconverter
- Function to convert the byte stream to objectsstorageLevel
- Storage level to use for storing the received objectspublic JavaDStream<String> textFileStream(String directory)
directory
- HDFS directory to monitor for new filepublic <T> JavaReceiverInputDStream<T> rawSocketStream(String hostname, int port, StorageLevel storageLevel)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objectspublic <T> JavaReceiverInputDStream<T> rawSocketStream(String hostname, int port)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datapublic <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> JavaPairInputDStream<K,V> fileStream(String directory)
directory
- HDFS directory to monitor for new filepublic <T> JavaReceiverInputDStream<T> actorStream(akka.actor.Props props, String name, StorageLevel storageLevel, akka.actor.SupervisorStrategy supervisorStrategy)
props
- Props object defining creation of the actorname
- Name of the actorstorageLevel
- Storage level to use for storing the received objects
public <T> JavaReceiverInputDStream<T> actorStream(akka.actor.Props props, String name, StorageLevel storageLevel)
props
- Props object defining creation of the actorname
- Name of the actorstorageLevel
- Storage level to use for storing the received objects
public <T> JavaReceiverInputDStream<T> actorStream(akka.actor.Props props, String name)
props
- Props object defining creation of the actorname
- Name of the actor
public <T> JavaDStream<T> queueStream(java.util.Queue<JavaRDD<T>> queue)
NOTE: changes to the queue after the stream is created will not be recognized.
queue
- Queue of RDDspublic <T> JavaInputDStream<T> queueStream(java.util.Queue<JavaRDD<T>> queue, boolean oneAtATime)
NOTE: changes to the queue after the stream is created will not be recognized.
queue
- Queue of RDDsoneAtATime
- Whether only one RDD should be consumed from the queue in every intervalpublic <T> JavaInputDStream<T> queueStream(java.util.Queue<JavaRDD<T>> queue, boolean oneAtATime, JavaRDD<T> defaultRDD)
NOTE: changes to the queue after the stream is created will not be recognized.
queue
- Queue of RDDsoneAtATime
- Whether only one RDD should be consumed from the queue in every intervaldefaultRDD
- Default RDD is returned by the DStream when the queue is emptypublic <T> JavaReceiverInputDStream<T> receiverStream(Receiver<T> receiver)
receiver
- Custom implementation of Receiverpublic <T> JavaDStream<T> union(JavaDStream<T> first, java.util.List<JavaDStream<T>> rest)
public <K,V> JavaPairDStream<K,V> union(JavaPairDStream<K,V> first, java.util.List<JavaPairDStream<K,V>> rest)
public <T> JavaDStream<T> transform(java.util.List<JavaDStream<?>> dstreams, Function2<java.util.List<JavaRDD<?>>,Time,JavaRDD<T>> transformFunc)
JavaPairDStream
.toJavaDStream().
In the transform function, convert the JavaRDD corresponding to that JavaDStream to
a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().public <K,V> JavaPairDStream<K,V> transformToPair(java.util.List<JavaDStream<?>> dstreams, Function2<java.util.List<JavaRDD<?>>,Time,JavaPairRDD<K,V>> transformFunc)
JavaPairDStream
.toJavaDStream().
In the transform function, convert the JavaRDD corresponding to that JavaDStream to
a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().public void checkpoint(String directory)
directory
- HDFS-compatible directory where the checkpoint data will be reliably storedpublic void remember(Duration duration)
duration
- Minimum duration that each DStream should remember its RDDspublic void addStreamingListener(StreamingListener streamingListener)
StreamingListener
object for
receiving system events related to streaming.public void start()
public void awaitTermination()
public void awaitTermination(long timeout)
timeout
- time to wait in millisecondspublic void stop()
public void stop(boolean stopSparkContext)
stopSparkContext
- Stop the associated SparkContext or notpublic void stop(boolean stopSparkContext, boolean stopGracefully)
stopSparkContext
- Stop the associated SparkContext or notstopGracefully
- Stop gracefully by waiting for the processing of all
received data to be completedpublic void close()
close
in interface java.io.Closeable
close
in interface AutoCloseable