pyspark.streaming module

Module contents

class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)

Bases: object

Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create DStream various input sources. It can be from an existing SparkContext. After creating and transforming DStreams, the streaming computation can be started and stopped using context.start() and context.stop(), respectively. context.awaitTermination() allows the current thread to wait for the termination of the context by stop() or by an exception.

awaitTermination(timeout=None)

Wait for the execution to stop. :param timeout: time to wait in seconds

awaitTerminationOrTimeout(timeout)

Wait for the execution to stop. Return true if it’s stopped; or throw the reported error during the execution; or false if the waiting time elapsed before returning from the method. :param timeout: time to wait in seconds

binaryRecordsStream(directory, recordLength)

Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as flat binary files with records of fixed length. Files must be written to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.

Parameters:
  • directory – Directory to load data from
  • recordLength – Length of each record in bytes
checkpoint(directory)

Sets the context to periodically checkpoint the DStream operations for master fault-tolerance. The graph will be checkpointed every batch interval.

Parameters:directory – HDFS-compatible directory where the checkpoint data will be reliably stored
classmethod getOrCreate(checkpointPath, setupFunc)

Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. If checkpoint data exists in the provided checkpointPath, then StreamingContext will be recreated from the checkpoint data. If the data does not exist, then the provided setupFunc will be used to create a JavaStreamingContext.

Parameters:
  • checkpointPath – Checkpoint directory used in an earlier JavaStreamingContext program
  • setupFunc – Function to create a new JavaStreamingContext and setup DStreams
queueStream(rdds, oneAtATime=True, default=None)

Create an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.

NOTE: changes to the queue after the stream is created will not be recognized.

Parameters:
  • rdds – Queue of RDDs
  • oneAtATime – pick one rdd each time or pick all of them once.
  • default – The default rdd if no more in rdds
remember(duration)

Set each DStreams in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of time and releases them for garbage collection. This method allows the developer to specify how to long to remember the RDDs (if the developer wishes to query old data outside the DStream computation).

Parameters:duration – Minimum duration (in seconds) that each DStream should remember its RDDs
socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))

Create an input from TCP source hostname:port. Data is received using a TCP socket and receive byte is interpreted as UTF8 encoded \n delimited lines.

Parameters:
  • hostname – Hostname to connect to for receiving data
  • port – Port to connect to for receiving data
  • storageLevel – Storage level to use for storing the received objects
sparkContext

Return SparkContext which is associated with this StreamingContext.

start()

Start the execution of the streams.

stop(stopSparkContext=True, stopGraceFully=False)

Stop the execution of the streams, with option of ensuring all received data has been processed.

Parameters:
  • stopSparkContext – Stop the associated SparkContext or not
  • stopGracefully – Stop gracefully by waiting for the processing of all received data to be completed
textFileStream(directory)

Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.

transform(dstreams, transformFunc)

Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams. The order of the JavaRDDs in the transform function parameter will be the same as the order of corresponding DStreams in the list.

union(*dstreams)

Create a unified DStream from multiple DStreams of the same type and same slide duration.

class pyspark.streaming.DStream(jdstream, ssc, jrdd_deserializer)

Bases: object

A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see RDD in the Spark core documentation for more details on RDDs).

DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a StreamingContext or it can be generated by transforming existing DStreams using operations such as map, window and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream.

DStreams internally is characterized by a few basic properties:
  • A list of other DStreams that the DStream depends on
  • A time interval at which the DStream generates an RDD
  • A function that is used to generate an RDD after each time interval
cache()

Persist the RDDs of this DStream with the default storage level (MEMORY_ONLY_SER).

checkpoint(interval)

Enable periodic checkpointing of RDDs of this DStream

Parameters:interval – time in seconds, after each period of that, generated RDD will be checkpointed
cogroup(other, numPartitions=None)

Return a new DStream by applying ‘cogroup’ between RDDs of this DStream and other DStream.

Hash partitioning is used to generate the RDDs with numPartitions partitions.

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None)

Return a new DStream by applying combineByKey to each RDD.

context()

Return the StreamingContext associated with this DStream

count()

Return a new DStream in which each RDD has a single element generated by counting each RDD of this DStream.

countByValue()

Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream.

countByValueAndWindow(windowDuration, slideDuration, numPartitions=None)

Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream.

Parameters:
  • windowDuration – width of the window; must be a multiple of this DStream’s batching interval
  • slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
  • numPartitions – number of partitions of each RDD in the new DStream.
countByWindow(windowDuration, slideDuration)

Return a new DStream in which each RDD has a single element generated by counting the number of elements in a window over this DStream. windowDuration and slideDuration are as defined in the window() operation.

This is equivalent to window(windowDuration, slideDuration).count(), but will be more efficient if window is large.

filter(f)

Return a new DStream containing only the elements that satisfy predicate.

flatMap(f, preservesPartitioning=False)

Return a new DStream by applying a function to all elements of this DStream, and then flattening the results

flatMapValues(f)

Return a new DStream by applying a flatmap function to the value of each key-value pairs in this DStream without changing the key.

foreachRDD(func)

Apply a function to each RDD in this DStream.

fullOuterJoin(other, numPartitions=None)

Return a new DStream by applying ‘full outer join’ between RDDs of this DStream and other DStream.

Hash partitioning is used to generate the RDDs with numPartitions partitions.

glom()

Return a new DStream in which RDD is generated by applying glom() to RDD of this DStream.

groupByKey(numPartitions=None)

Return a new DStream by applying groupByKey on each RDD.

groupByKeyAndWindow(windowDuration, slideDuration, numPartitions=None)

Return a new DStream by applying groupByKey over a sliding window. Similar to DStream.groupByKey(), but applies it over a sliding window.

Parameters:
  • windowDuration – width of the window; must be a multiple of this DStream’s batching interval
  • slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
  • numPartitions – Number of partitions of each RDD in the new DStream.
join(other, numPartitions=None)

Return a new DStream by applying ‘join’ between RDDs of this DStream and other DStream.

Hash partitioning is used to generate the RDDs with numPartitions partitions.

leftOuterJoin(other, numPartitions=None)

Return a new DStream by applying ‘left outer join’ between RDDs of this DStream and other DStream.

Hash partitioning is used to generate the RDDs with numPartitions partitions.

map(f, preservesPartitioning=False)

Return a new DStream by applying a function to each element of DStream.

mapPartitions(f, preservesPartitioning=False)

Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs of this DStream.

mapPartitionsWithIndex(f, preservesPartitioning=False)

Return a new DStream in which each RDD is generated by applying mapPartitionsWithIndex() to each RDDs of this DStream.

mapValues(f)

Return a new DStream by applying a map function to the value of each key-value pairs in this DStream without changing the key.

partitionBy(numPartitions, partitionFunc=<function portable_hash at 0x2c45140>)

Return a copy of the DStream in which each RDD are partitioned using the specified partitioner.

persist(storageLevel)

Persist the RDDs of this DStream with the given storage level

pprint(num=10)

Print the first num elements of each RDD generated in this DStream.

Parameters:num – the number of elements from the first will be printed.
reduce(func)

Return a new DStream in which each RDD has a single element generated by reducing each RDD of this DStream.

reduceByKey(func, numPartitions=None)

Return a new DStream by applying reduceByKey to each RDD.

reduceByKeyAndWindow(func, invFunc, windowDuration, slideDuration=None, numPartitions=None, filterFunc=None)

Return a new DStream by applying incremental reduceByKey over a sliding window.

The reduced value of over a new window is calculated using the old window’s reduce value :
  1. reduce the new values that entered the window (e.g., adding new counts)
  2. “inverse reduce” the old values that left the window (e.g., subtracting old counts)

invFunc can be None, then it will reduce all the RDDs in window, could be slower than having invFunc.

Parameters:
  • reduceFunc – associative reduce function
  • invReduceFunc – inverse function of reduceFunc
  • windowDuration – width of the window; must be a multiple of this DStream’s batching interval
  • slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
  • numPartitions – number of partitions of each RDD in the new DStream.
  • filterFunc – function to filter expired key-value pairs; only pairs that satisfy the function are retained set this to null if you do not want to filter
reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

Return a new DStream in which each RDD has a single element generated by reducing all elements in a sliding window over this DStream.

if invReduceFunc is not None, the reduction is done incrementally using the old window’s reduced value :

  1. reduce the new values that entered the window (e.g., adding new counts)

2. “inverse reduce” the old values that left the window (e.g., subtracting old counts) This is more efficient than invReduceFunc is None.

Parameters:
  • reduceFunc – associative reduce function
  • invReduceFunc – inverse reduce function of reduceFunc
  • windowDuration – width of the window; must be a multiple of this DStream’s batching interval
  • slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval
repartition(numPartitions)

Return a new DStream with an increased or decreased level of parallelism.

rightOuterJoin(other, numPartitions=None)

Return a new DStream by applying ‘right outer join’ between RDDs of this DStream and other DStream.

Hash partitioning is used to generate the RDDs with numPartitions partitions.

saveAsTextFiles(prefix, suffix=None)

Save each RDD in this DStream as at text file, using string representation of elements.

slice(begin, end)

Return all the RDDs between ‘begin’ to ‘end’ (both included)

begin, end could be datetime.datetime() or unix_timestamp

transform(func)

Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream.

func can have one argument of rdd, or have two arguments of (time, rdd)

transformWith(func, other, keepSerializer=False)

Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream and ‘other’ DStream.

func can have two arguments of (rdd_a, rdd_b) or have three arguments of (time, rdd_a, rdd_b)

union(other)

Return a new DStream by unifying data of another DStream with this DStream.

Parameters:other – Another DStream having the same interval (i.e., slideDuration) as this DStream.
updateStateByKey(updateFunc, numPartitions=None)

Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key.

Parameters:updateFunc – State update function. If this function returns None, then corresponding state key-value pair will be eliminated.
window(windowDuration, slideDuration=None)

Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream.

Parameters:
  • windowDuration – width of the window; must be a multiple of this DStream’s batching interval
  • slideDuration – sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream’s batching interval

pyspark.streaming.kafka module

class pyspark.streaming.kafka.KafkaUtils[source]

Bases: object

static createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, storageLevel=StorageLevel(True, True, False, False, 2), keyDecoder=<function utf8_decoder at 0x409d410>, valueDecoder=<function utf8_decoder at 0x409d410>)[source]

Create an input stream that pulls messages from a Kafka Broker.

Parameters:
  • ssc – StreamingContext object
  • zkQuorum – Zookeeper quorum (hostname:port,hostname:port,..).
  • groupId – The group id for this consumer.
  • topics – Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.
  • kafkaParams – Additional params for Kafka
  • storageLevel – RDD storage level.
  • keyDecoder – A function used to decode key (default is utf8_decoder)
  • valueDecoder – A function used to decode value (default is utf8_decoder)
Returns:

A DStream object

pyspark.streaming.kafka.utf8_decoder(s)[source]

Decode the unicode as UTF-8

Table Of Contents

Previous topic

pyspark.sql module

Next topic

pyspark.ml package

This Page