public class StreamingContext extends Object implements Logging
DStream
s from various input sources. It can be either
created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
The associated SparkContext can be accessed using context.sparkContext
. After
creating and transforming DStreams, the streaming computation can be started and stopped
using context.start()
and context.stop()
, respectively.
context.awaitTransformation()
allows the current thread to wait for the termination
of the context by stop()
or by an exception.Constructor and Description |
---|
StreamingContext(SparkConf conf,
Duration batchDuration)
Create a StreamingContext by providing the configuration necessary for a new SparkContext.
|
StreamingContext(SparkContext sparkContext,
Duration batchDuration)
Create a StreamingContext using an existing SparkContext.
|
StreamingContext(String path)
Recreate a StreamingContext from a checkpoint file.
|
StreamingContext(String path,
org.apache.hadoop.conf.Configuration hadoopConf)
Recreate a StreamingContext from a checkpoint file.
|
StreamingContext(String master,
String appName,
Duration batchDuration,
String sparkHome,
scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
Create a StreamingContext by providing the details necessary for creating a new SparkContext.
|
Modifier and Type | Method and Description |
---|---|
<T> ReceiverInputDStream<T> |
actorStream(akka.actor.Props props,
String name,
StorageLevel storageLevel,
akka.actor.SupervisorStrategy supervisorStrategy,
scala.reflect.ClassTag<T> evidence$3)
Create an input stream with any arbitrary user implemented actor receiver.
|
void |
addStreamingListener(StreamingListener streamingListener)
Add a
StreamingListener object for
receiving system events related to streaming. |
void |
awaitTermination()
Wait for the execution to stop.
|
void |
awaitTermination(long timeout)
Wait for the execution to stop.
|
void |
checkpoint(String directory)
Set the context to periodically checkpoint the DStream operations for driver
fault-tolerance.
|
String |
checkpointDir() |
Duration |
checkpointDuration() |
SparkConf |
conf() |
static int |
DEFAULT_CLEANER_TTL() |
SparkEnv |
env() |
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(String directory,
scala.reflect.ClassTag<K> evidence$6,
scala.reflect.ClassTag<V> evidence$7,
scala.reflect.ClassTag<F> evidence$8)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
<K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> |
fileStream(String directory,
scala.Function1<org.apache.hadoop.fs.Path,Object> filter,
boolean newFilesOnly,
scala.reflect.ClassTag<K> evidence$9,
scala.reflect.ClassTag<V> evidence$10,
scala.reflect.ClassTag<F> evidence$11)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them using the given key-value types and input format.
|
static StreamingContext |
getOrCreate(String checkpointPath,
scala.Function0<StreamingContext> creatingFunc,
org.apache.hadoop.conf.Configuration hadoopConf,
boolean createOnError)
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
|
org.apache.spark.streaming.DStreamGraph |
graph() |
boolean |
isCheckpointPresent() |
static scala.Option<String> |
jarOfClass(Class<?> cls)
Find the JAR from which a given class was loaded, to make it easy for users to pass
their JARs to StreamingContext.
|
<T> ReceiverInputDStream<T> |
networkStream(Receiver<T> receiver,
scala.reflect.ClassTag<T> evidence$1)
Create an input stream with any arbitrary user implemented receiver.
|
org.apache.spark.streaming.ui.StreamingJobProgressListener |
progressListener() |
<T> InputDStream<T> |
queueStream(scala.collection.mutable.Queue<RDD<T>> queue,
boolean oneAtATime,
scala.reflect.ClassTag<T> evidence$12)
Create an input stream from a queue of RDDs.
|
<T> InputDStream<T> |
queueStream(scala.collection.mutable.Queue<RDD<T>> queue,
boolean oneAtATime,
RDD<T> defaultRDD,
scala.reflect.ClassTag<T> evidence$13)
Create an input stream from a queue of RDDs.
|
<T> ReceiverInputDStream<T> |
rawSocketStream(String hostname,
int port,
StorageLevel storageLevel,
scala.reflect.ClassTag<T> evidence$5)
Create a input stream from network source hostname:port, where data is received
as serialized blocks (serialized using the Spark's serializer) that can be directly
pushed into the block manager without deserializing them.
|
<T> ReceiverInputDStream<T> |
receiverStream(Receiver<T> receiver,
scala.reflect.ClassTag<T> evidence$2)
Create an input stream with any arbitrary user implemented receiver.
|
void |
remember(Duration duration)
Set each DStreams in this context to remember RDDs it generated in the last given duration.
|
SparkContext |
sc() |
org.apache.spark.streaming.scheduler.JobScheduler |
scheduler() |
<T> ReceiverInputDStream<T> |
socketStream(String hostname,
int port,
scala.Function1<java.io.InputStream,scala.collection.Iterator<T>> converter,
StorageLevel storageLevel,
scala.reflect.ClassTag<T> evidence$4)
Create a input stream from TCP source hostname:port.
|
ReceiverInputDStream<String> |
socketTextStream(String hostname,
int port,
StorageLevel storageLevel)
Create a input stream from TCP source hostname:port.
|
SparkContext |
sparkContext()
Return the associated Spark context
|
void |
start()
Start the execution of the streams.
|
scala.Enumeration.Value |
state() |
void |
stop(boolean stopSparkContext)
Stop the execution of the streams immediately (does not wait for all received data
to be processed).
|
void |
stop(boolean stopSparkContext,
boolean stopGracefully)
Stop the execution of the streams, with option of ensuring all received data
has been processed.
|
org.apache.spark.streaming.StreamingContext.StreamingContextState$ |
StreamingContextState()
Accessor for nested Scala object
|
DStream<String> |
textFileStream(String directory)
Create a input stream that monitors a Hadoop-compatible filesystem
for new files and reads them as text files (using key as LongWritable, value
as Text and input format as TextInputFormat).
|
static <K,V> PairDStreamFunctions<K,V> |
toPairDStreamFunctions(DStream<scala.Tuple2<K,V>> stream,
scala.reflect.ClassTag<K> kt,
scala.reflect.ClassTag<V> vt,
scala.math.Ordering<K> ord) |
<T> DStream<T> |
transform(scala.collection.Seq<DStream<?>> dstreams,
scala.Function2<scala.collection.Seq<RDD<?>>,Time,RDD<T>> transformFunc,
scala.reflect.ClassTag<T> evidence$15)
Create a new DStream in which each RDD is generated by applying a function on RDDs of
the DStreams.
|
scala.Option<org.apache.spark.streaming.ui.StreamingTab> |
uiTab() |
<T> DStream<T> |
union(scala.collection.Seq<DStream<T>> streams,
scala.reflect.ClassTag<T> evidence$14)
Create a unified DStream from multiple DStreams of the same type and same slide duration.
|
org.apache.spark.streaming.ContextWaiter |
waiter() |
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
initialized, initializeIfNecessary, initializeLogging, initLock, isTraceEnabled, log_, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning
public StreamingContext(SparkContext sparkContext, Duration batchDuration)
sparkContext
- existing SparkContextbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(SparkConf conf, Duration batchDuration)
conf
- a org.apache.spark.SparkConf object specifying Spark parametersbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment)
master
- cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).appName
- a name for your job, to display on the cluster web UIbatchDuration
- the time interval at which streaming data will be divided into batchespublic StreamingContext(String path, org.apache.hadoop.conf.Configuration hadoopConf)
path
- Path to the directory that was specified as the checkpoint directoryhadoopConf
- Optional, configuration object if necessary for reading from
HDFS compatible filesystemspublic StreamingContext(String path)
path
- Path to the directory that was specified as the checkpoint directorypublic static int DEFAULT_CLEANER_TTL()
public static <K,V> PairDStreamFunctions<K,V> toPairDStreamFunctions(DStream<scala.Tuple2<K,V>> stream, scala.reflect.ClassTag<K> kt, scala.reflect.ClassTag<V> vt, scala.math.Ordering<K> ord)
public static StreamingContext getOrCreate(String checkpointPath, scala.Function0<StreamingContext> creatingFunc, org.apache.hadoop.conf.Configuration hadoopConf, boolean createOnError)
checkpointPath
, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the StreamingContext
will be created by called the provided creatingFunc
.
checkpointPath
- Checkpoint directory used in an earlier StreamingContext programcreatingFunc
- Function to create a new StreamingContexthadoopConf
- Optional Hadoop configuration if necessary for reading from the
file systemcreateOnError
- Optional, whether to create a new StreamingContext if there is an
error in reading checkpoint data. By default, an exception will be
thrown on error.public static scala.Option<String> jarOfClass(Class<?> cls)
public boolean isCheckpointPresent()
public SparkContext sc()
public SparkConf conf()
public SparkEnv env()
public org.apache.spark.streaming.DStreamGraph graph()
public String checkpointDir()
public Duration checkpointDuration()
public org.apache.spark.streaming.scheduler.JobScheduler scheduler()
public org.apache.spark.streaming.ContextWaiter waiter()
public org.apache.spark.streaming.ui.StreamingJobProgressListener progressListener()
public scala.Option<org.apache.spark.streaming.ui.StreamingTab> uiTab()
public org.apache.spark.streaming.StreamingContext.StreamingContextState$ StreamingContextState()
public scala.Enumeration.Value state()
public SparkContext sparkContext()
public void remember(Duration duration)
duration
- Minimum duration that each DStream should remember its RDDspublic void checkpoint(String directory)
directory
- HDFS-compatible directory where the checkpoint data will be reliably stored.
Note that this must be a fault-tolerant file system like HDFS forpublic <T> ReceiverInputDStream<T> networkStream(Receiver<T> receiver, scala.reflect.ClassTag<T> evidence$1)
receiver
- Custom implementation of Receiverpublic <T> ReceiverInputDStream<T> receiverStream(Receiver<T> receiver, scala.reflect.ClassTag<T> evidence$2)
receiver
- Custom implementation of Receiverpublic <T> ReceiverInputDStream<T> actorStream(akka.actor.Props props, String name, StorageLevel storageLevel, akka.actor.SupervisorStrategy supervisorStrategy, scala.reflect.ClassTag<T> evidence$3)
props
- Props object defining creation of the actorname
- Name of the actorstorageLevel
- RDD storage level. Defaults to memory-only.
public ReceiverInputDStream<String> socketTextStream(String hostname, int port, StorageLevel storageLevel)
\n
delimited
lines.hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)public <T> ReceiverInputDStream<T> socketStream(String hostname, int port, scala.Function1<java.io.InputStream,scala.collection.Iterator<T>> converter, StorageLevel storageLevel, scala.reflect.ClassTag<T> evidence$4)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving dataconverter
- Function to convert the byte stream to objectsstorageLevel
- Storage level to use for storing the received objectspublic <T> ReceiverInputDStream<T> rawSocketStream(String hostname, int port, StorageLevel storageLevel, scala.reflect.ClassTag<T> evidence$5)
hostname
- Hostname to connect to for receiving dataport
- Port to connect to for receiving datastorageLevel
- Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> InputDStream<scala.Tuple2<K,V>> fileStream(String directory, scala.reflect.ClassTag<K> evidence$6, scala.reflect.ClassTag<V> evidence$7, scala.reflect.ClassTag<F> evidence$8)
directory
- HDFS directory to monitor for new filepublic <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> InputDStream<scala.Tuple2<K,V>> fileStream(String directory, scala.Function1<org.apache.hadoop.fs.Path,Object> filter, boolean newFilesOnly, scala.reflect.ClassTag<K> evidence$9, scala.reflect.ClassTag<V> evidence$10, scala.reflect.ClassTag<F> evidence$11)
directory
- HDFS directory to monitor for new filefilter
- Function to filter paths to processnewFilesOnly
- Should process only new files and ignore existing files in the directorypublic DStream<String> textFileStream(String directory)
directory
- HDFS directory to monitor for new filepublic <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue, boolean oneAtATime, scala.reflect.ClassTag<T> evidence$12)
queue
- Queue of RDDsoneAtATime
- Whether only one RDD should be consumed from the queue in every intervalpublic <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue, boolean oneAtATime, RDD<T> defaultRDD, scala.reflect.ClassTag<T> evidence$13)
queue
- Queue of RDDsoneAtATime
- Whether only one RDD should be consumed from the queue in every intervaldefaultRDD
- Default RDD is returned by the DStream when the queue is empty.
Set as null if no RDD should be returned when emptypublic <T> DStream<T> union(scala.collection.Seq<DStream<T>> streams, scala.reflect.ClassTag<T> evidence$14)
public <T> DStream<T> transform(scala.collection.Seq<DStream<?>> dstreams, scala.Function2<scala.collection.Seq<RDD<?>>,Time,RDD<T>> transformFunc, scala.reflect.ClassTag<T> evidence$15)
public void addStreamingListener(StreamingListener streamingListener)
StreamingListener
object for
receiving system events related to streaming.public void start()
SparkException
- if the context has already been started or stopped.public void awaitTermination()
public void awaitTermination(long timeout)
timeout
- time to wait in millisecondspublic void stop(boolean stopSparkContext)
stopSparkContext
- if true, stops the associated SparkContext. The underlying SparkContext
will be stopped regardless of whether this StreamingContext has been
started.public void stop(boolean stopSparkContext, boolean stopGracefully)
stopSparkContext
- if true, stops the associated SparkContext. The underlying SparkContext
will be stopped regardless of whether this StreamingContext has been
started.stopGracefully
- if true, stops gracefully by waiting for the processing of all
received data to be completed