Packages

package streaming

Ordering
  1. Alphabetic
Visibility
  1. Public
  2. Protected

Type Members

  1. final class DataStreamReader extends Logging

    Interface used to load a streaming Dataset from external storage systems (e.g.

    Interface used to load a streaming Dataset from external storage systems (e.g. file systems, key-value stores, etc). Use SparkSession.readStream to access this.

    Annotations
    @Evolving()
    Since

    2.0.0

  2. final class DataStreamWriter[T] extends AnyRef

    Interface used to write a streaming Dataset to external storage systems (e.g.

    Interface used to write a streaming Dataset to external storage systems (e.g. file systems, key-value stores, etc). Use Dataset.writeStream to access this.

    Annotations
    @Evolving()
    Since

    2.0.0

  3. trait GroupState[S] extends LogicalGroupState[S]

    :: Experimental ::

    :: Experimental ::

    Wrapper class for interacting with per-group state data in mapGroupsWithState and flatMapGroupsWithState operations on KeyValueGroupedDataset.

    Detail description on [map/flatMap]GroupsWithState operation -------------------------------------------------------------- Both, mapGroupsWithState and flatMapGroupsWithState in KeyValueGroupedDataset will invoke the user-given function on each group (defined by the grouping function in Dataset.groupByKey()) while maintaining a user-defined per-group state between invocations. For a static batch Dataset, the function will be invoked once per group. For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger. That is, in every batch of the StreamingQuery, the function will be invoked once for each group that has data in the trigger. Furthermore, if timeout is set, then the function will be invoked on timed-out groups (more detail below).

    The function is invoked with the following parameters.

    • The key of the group.
    • An iterator containing all the values for this group.
    • A user-defined state object set by previous invocations of the given function.

    In case of a batch Dataset, there is only one invocation and the state object will be empty as there is no prior state. Essentially, for batch Datasets, [map/flatMap]GroupsWithState is equivalent to [map/flatMap]Groups and any updates to the state and/or timeouts have no effect.

    The major difference between mapGroupsWithState and flatMapGroupsWithState is that the former allows the function to return one and only one record, whereas the latter allows the function to return any number of records (including no records). Furthermore, the flatMapGroupsWithState is associated with an operation output mode, which can be either Append or Update. Semantically, this defines whether the output records of one trigger is effectively replacing the previously output records (from previous triggers) or is appending to the list of previously output records. Essentially, this defines how the Result Table (refer to the semantics in the programming guide) is updated, and allows us to reason about the semantics of later operations.

    Important points to note about the function (both mapGroupsWithState and flatMapGroupsWithState).

    • In a trigger, the function will be called only the groups present in the batch. So do not assume that the function will be called in every trigger for every group that has state.
    • There is no guaranteed ordering of values in the iterator in the function, neither with batch, nor with streaming Datasets.
    • All the data will be shuffled before applying the function.
    • If timeout is set, then the function will also be called with no values. See more details on GroupStateTimeout below.

    Important points to note about using GroupState.

    • The value of the state cannot be null. So updating state with null will throw IllegalArgumentException.
    • Operations on GroupState are not thread-safe. This is to avoid memory barriers.
    • If remove() is called, then exists() will return false, get() will throw NoSuchElementException and getOption() will return None
    • After that, if update(newState) is called, then exists() will again return true, get() and getOption()will return the updated value.

    Important points to note about using GroupStateTimeout.

    • The timeout type is a global param across all the groups (set as timeout param in [map|flatMap]GroupsWithState, but the exact timeout duration/timestamp is configurable per group by calling setTimeout...() in GroupState.
    • Timeouts can be either based on processing time (i.e. GroupStateTimeout.ProcessingTimeTimeout) or event time (i.e. GroupStateTimeout.EventTimeTimeout).
    • With ProcessingTimeTimeout, the timeout duration can be set by calling GroupState.setTimeoutDuration. The timeout will occur when the clock has advanced by the set duration. Guarantees provided by this timeout with a duration of D ms are as follows:
      • Timeout will never occur before the clock time has advanced by D ms
      • Timeout will occur eventually when there is a trigger in the query (i.e. after D ms). So there is no strict upper bound on when the timeout would occur. For example, the trigger interval of the query will affect when the timeout actually occurs. If there is no data in the stream (for any group) for a while, then there will not be any trigger and timeout function call will not occur until there is data.
      • Since the processing time timeout is based on the clock time, it is affected by the variations in the system clock (i.e. time zone changes, clock skew, etc.).
    • With EventTimeTimeout, the user also has to specify the event time watermark in the query using Dataset.withWatermark(). With this setting, data that is older than the watermark is filtered out. The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp. You can control the timeout delay by two parameters - (i) watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering). Guarantees provided by this timeout are as follows:
      • Timeout will never occur before the watermark has exceeded the set timeout.
      • Similar to processing time timeouts, there is no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream and the event time of the data has actually advanced.
    • When the timeout occurs for a group, the function is called for that group with no values, and GroupState.hasTimedOut() set to true.
    • The timeout is reset every time the function is called on a group, that is, when the group has new data, or the group has timed out. So the user has to set the timeout duration every time the function is called, otherwise, there will not be any timeout set.

    [map/flatMap]GroupsWithState can take a user defined initial state as an additional argument. This state will be applied when the first batch of the streaming query is processed. If there are no matching rows in the data for the keys present in the initial state, the state is still applied and the function will be invoked with the values being an empty iterator.

    Scala example of using GroupState in mapGroupsWithState:

    // A mapping function that maintains an integer state for string keys and returns a string.
    // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
    def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {
    
      if (state.hasTimedOut) {                // If called when timing out, remove the state
        state.remove()
    
      } else if (state.exists) {              // If state exists, use it for processing
        val existingState = state.get         // Get the existing state
        val shouldRemove = ...                // Decide whether to remove the state
        if (shouldRemove) {
          state.remove()                      // Remove the state
    
        } else {
          val newState = ...
          state.update(newState)              // Set the new state
          state.setTimeoutDuration("1 hour")  // Set the timeout
        }
    
      } else {
        val initialState = ...
        state.update(initialState)            // Set the initial state
        state.setTimeoutDuration("1 hour")    // Set the timeout
      }
      ...
      // return something
    }
    
    dataset
      .groupByKey(...)
      .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)

    Java example of using GroupState:

    // A mapping function that maintains an integer state for string keys and returns a string.
    // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
    MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
       new MapGroupsWithStateFunction<String, Integer, Integer, String>() {
    
         @Override
         public String call(String key, Iterator<Integer> value, GroupState<Integer> state) {
           if (state.hasTimedOut()) {            // If called when timing out, remove the state
             state.remove();
    
           } else if (state.exists()) {            // If state exists, use it for processing
             int existingState = state.get();      // Get the existing state
             boolean shouldRemove = ...;           // Decide whether to remove the state
             if (shouldRemove) {
               state.remove();                     // Remove the state
    
             } else {
               int newState = ...;
               state.update(newState);             // Set the new state
               state.setTimeoutDuration("1 hour"); // Set the timeout
             }
    
           } else {
             int initialState = ...;               // Set the initial state
             state.update(initialState);
             state.setTimeoutDuration("1 hour");   // Set the timeout
           }
           ...
            // return something
         }
       };
    
    dataset
        .groupByKey(...)
        .mapGroupsWithState(
            mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);
    S

    User-defined type of the state to be stored for each group. Must be encodable into Spark SQL types (see Encoder for more details).

    Annotations
    @Experimental() @Evolving()
    Since

    2.2.0

  4. class GroupStateTimeout extends AnyRef

    Represents the type of timeouts possible for the Dataset operations mapGroupsWithState and flatMapGroupsWithState.

    Represents the type of timeouts possible for the Dataset operations mapGroupsWithState and flatMapGroupsWithState.

    See documentation on GroupState for more details.

    Annotations
    @Experimental() @Evolving()
    Since

    2.2.0

  5. trait MapState[K, V] extends Serializable
    Annotations
    @Experimental() @Evolving()
  6. class OutputMode extends AnyRef

    OutputMode describes what data will be written to a streaming sink when there is new data available in a streaming DataFrame/Dataset.

    OutputMode describes what data will be written to a streaming sink when there is new data available in a streaming DataFrame/Dataset.

    Annotations
    @Evolving()
    Since

    2.0.0

  7. class SinkProgress extends Serializable

    Information about progress made for a sink in the execution of a StreamingQuery during a trigger.

    Information about progress made for a sink in the execution of a StreamingQuery during a trigger. See StreamingQueryProgress for more information.

    Annotations
    @Evolving()
    Since

    2.1.0

  8. class SourceProgress extends Serializable

    Information about progress made for a source in the execution of a StreamingQuery during a trigger.

    Information about progress made for a source in the execution of a StreamingQuery during a trigger. See StreamingQueryProgress for more information.

    Annotations
    @Evolving()
    Since

    2.1.0

  9. class StateOperatorProgress extends Serializable

    Information about updates made to stateful operators in a StreamingQuery during a trigger.

    Information about updates made to stateful operators in a StreamingQuery during a trigger.

    Annotations
    @Evolving()
  10. trait StreamingQuery extends api.StreamingQuery[Dataset]

    <invalid inheritdoc annotation>

  11. class StreamingQueryException extends Exception with SparkThrowable

    Exception that stopped a StreamingQuery.

    Exception that stopped a StreamingQuery. Use cause get the actual exception that caused the failure.

    Annotations
    @Evolving()
    Since

    2.0.0

  12. abstract class StreamingQueryListener extends Serializable

    Interface for listening to events related to StreamingQueries.

    Interface for listening to events related to StreamingQueries.

    Annotations
    @Evolving()
    Since

    2.0.0

    Note

    The methods are not thread-safe as they may be called from different threads.

  13. class StreamingQueryManager extends Logging

    A class to manage all the StreamingQuery active in a SparkSession.

    A class to manage all the StreamingQuery active in a SparkSession.

    Annotations
    @Evolving()
    Since

    2.0.0

  14. class StreamingQueryProgress extends Serializable

    Information about progress made in the execution of a StreamingQuery during a trigger.

    Information about progress made in the execution of a StreamingQuery during a trigger. Each event relates to processing done for a single trigger of the streaming query. Events are emitted even when no new data is available to be processed.

    Annotations
    @Evolving()
    Since

    2.1.0

  15. class StreamingQueryStatus extends Serializable

    Reports information about the instantaneous status of a streaming query.

    Reports information about the instantaneous status of a streaming query.

    Annotations
    @Evolving()
    Since

    2.1.0

  16. case class TTLConfig(ttlDuration: Duration) extends Product with Serializable

    TTL Configuration for state variable.

    TTL Configuration for state variable. State values will not be returned past ttlDuration, and will be eventually removed from the state store. Any state update resets the ttl to current processing time plus ttlDuration.

    ttlDuration

    time to live duration for state stored in the state variable.

  17. trait TestGroupState[S] extends GroupState[S]

    :: Experimental ::

    :: Experimental ::

    The extended version of GroupState interface with extra getters of state machine fields to improve testability of the GroupState implementations which inherit from the extended interface.

    Scala example of using TestGroupState:

    // Please refer to ScalaDoc of `GroupState` for the Scala definition of `mappingFunction()`
    
    import org.apache.spark.api.java.Optional
    import org.apache.spark.sql.streaming.GroupStateTimeout
    import org.apache.spark.sql.streaming.TestGroupState
    // other imports
    
    // test class setups
    
    test("MapGroupsWithState state transition function") {
      // Creates the prevState input for the state transition function
      // with desired configs. The `create()` API would guarantee that
      // the generated instance has the same behavior as the one built by
      // engine with the same configs.
      val prevState = TestGroupState.create[Int](
        optionalState = Optional.empty[Int],
        timeoutConf = NoTimeout,
        batchProcessingTimeMs = 1L,
        eventTimeWatermarkMs = Optional.of(1L),
        hasTimedOut = false)
    
      val key: String = ...
      val values: Iterator[Int] = ...
    
      // Asserts the prevState is in init state without updates.
      assert(!prevState.isUpdated)
    
      // Calls the state transition function with the test previous state
      // with desired configs.
      mappingFunction(key, values, prevState)
    
      // Asserts the test GroupState object has been updated but not removed
      // after calling the state transition function
      assert(prevState.isUpdated)
      assert(!prevState.isRemoved)
    }

    Java example of using TestGroupSate:

    // Please refer to ScalaDoc of `GroupState` for the Java definition of `mappingFunction()`
    
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.sql.streaming.GroupStateTimeout;
    import org.apache.spark.sql.streaming.TestGroupState;
    // other imports
    
    // test class setups
    
    // test `MapGroupsWithState` state transition function `mappingFunction()`
    public void testMappingFunctionWithTestGroupState() {
      // Creates the prevState input for the state transition function
      // with desired configs. The `create()` API would guarantee that
      // the generated instance has the same behavior as the one built by
      // engine with the same configs.
      TestGroupState<Int> prevState = TestGroupState.create(
        Optional.empty(),
        GroupStateTimeout.NoTimeout(),
        1L,
        Optional.of(1L),
        false);
    
      String key = ...;
      Integer[] values = ...;
    
      // Asserts the prevState is in init state without updates.
      Assertions.assertFalse(prevState.isUpdated());
    
      // Calls the state transition function with the test previous state
      // with desired configs.
      mappingFunction.call(key, Arrays.asList(values).iterator(), prevState);
    
      // Asserts the test GroupState object has been updated but not removed
      // after calling the state transition function
      Assertions.assertTrue(prevState.isUpdated());
      Assertions.assertFalse(prevState.isRemoved());
    }
    S

    User-defined type of the state to be stored for each group. Must be encodable into Spark SQL types (see Encoder for more details).

    Annotations
    @Experimental() @Evolving()
    Since

    3.2.0

  18. class TimeMode extends AnyRef

    Represents the time modes (used for specifying timers and ttl) possible for the Dataset operations transformWithState.

    Represents the time modes (used for specifying timers and ttl) possible for the Dataset operations transformWithState.

    Annotations
    @Experimental() @Evolving()
  19. class Trigger extends AnyRef

    Policy used to indicate how often results should be produced by a StreamingQuery.

    Policy used to indicate how often results should be produced by a StreamingQuery.

    Annotations
    @Evolving()
    Since

    2.0.0

Value Members

  1. object DataStreamWriter
  2. object StreamingQueryListener extends Serializable

    Companion object of StreamingQueryListener that defines the listener events.

    Companion object of StreamingQueryListener that defines the listener events.

    Annotations
    @Evolving()
    Since

    2.0.0

  3. object TestGroupState

Ungrouped