Interface GroupState<S>
- All Superinterfaces:
- org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState<S>
- All Known Subinterfaces:
- TestGroupState<S>
 Wrapper class for interacting with per-group state data in mapGroupsWithState and
 flatMapGroupsWithState operations on KeyValueGroupedDataset.
 
 Detail description on [map/flatMap]GroupsWithState operation
 -------------------------------------------------------------- Both, mapGroupsWithState and
 flatMapGroupsWithState in KeyValueGroupedDataset will invoke the user-given function on
 each group (defined by the grouping function in Dataset.groupByKey()) while maintaining a
 user-defined per-group state between invocations. For a static batch Dataset, the function will
 be invoked once per group. For a streaming Dataset, the function will be invoked for each group
 repeatedly in every trigger. That is, in every batch of the StreamingQuery, the function will
 be invoked once for each group that has data in the trigger. Furthermore, if timeout is set,
 then the function will be invoked on timed-out groups (more detail below).
 
The function is invoked with the following parameters. - The key of the group. - An iterator containing all the values for this group. - A user-defined state object set by previous invocations of the given function.
 In case of a batch Dataset, there is only one invocation and the state object will be empty as
 there is no prior state. Essentially, for batch Datasets, [map/flatMap]GroupsWithState is
 equivalent to [map/flatMap]Groups and any updates to the state and/or timeouts have no
 effect.
 
 The major difference between mapGroupsWithState and flatMapGroupsWithState is that the
 former allows the function to return one and only one record, whereas the latter allows the
 function to return any number of records (including no records). Furthermore, the
 flatMapGroupsWithState is associated with an operation output mode, which can be either
 Append or Update. Semantically, this defines whether the output records of one trigger is
 effectively replacing the previously output records (from previous triggers) or is appending to
 the list of previously output records. Essentially, this defines how the Result Table (refer to
 the semantics in the programming guide) is updated, and allows us to reason about the semantics
 of later operations.
 
 Important points to note about the function (both mapGroupsWithState and
 flatMapGroupsWithState).
   - In a trigger, the function will be called only the groups present in the batch. So do not
     assume that the function will be called in every trigger for every group that has state.
   - There is no guaranteed ordering of values in the iterator in the function, neither with
     batch, nor with streaming Datasets.
   - All the data will be shuffled before applying the function.
   - If timeout is set, then the function will also be called with no values. See more details
     on GroupStateTimeout below.
 
 Important points to note about using GroupState.
   - The value of the state cannot be null. So updating state with null will throw
     IllegalArgumentException.
   - Operations on GroupState are not thread-safe. This is to avoid memory barriers.
   - If remove() is called, then exists() will return false, get() will throw
     NoSuchElementException and getOption() will return None
   - After that, if update(newState) is called, then exists() will again return true,
     get() and getOption()will return the updated value.
 
 Important points to note about using GroupStateTimeout.
   - The timeout type is a global param across all the groups (set as timeout param in
     [map|flatMap]GroupsWithState, but the exact timeout duration/timestamp is configurable
     per group by calling setTimeout...() in GroupState.
   - Timeouts can be either based on processing time (i.e.
     GroupStateTimeout.ProcessingTimeTimeout) or event time (i.e.
     GroupStateTimeout.EventTimeTimeout).
   - With ProcessingTimeTimeout, the timeout duration can be set by calling
     GroupState.setTimeoutDuration. The timeout will occur when the clock has advanced by the
     set duration. Guarantees provided by this timeout with a duration of D ms are as follows:
     - Timeout will never occur before the clock time has advanced by D ms
     - Timeout will occur eventually when there is a trigger in the query (i.e. after D ms). So
       there is no strict upper bound on when the timeout would occur. For example, the trigger
       interval of the query will affect when the timeout actually occurs. If there is no data
       in the stream (for any group) for a while, then there will not be any trigger and timeout
       function call will not occur until there is data.
     - Since the processing time timeout is based on the clock time, it is affected by the
       variations in the system clock (i.e. time zone changes, clock skew, etc.).
   - With EventTimeTimeout, the user also has to specify the event time watermark in the query
     using Dataset.withWatermark(). With this setting, data that is older than the watermark
     is filtered out. The timeout can be set for a group by setting a timeout timestamp
     usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark
     advances beyond the set timestamp. You can control the timeout delay by two parameters -
     (i) watermark delay and an additional duration beyond the timestamp in the event (which is
     guaranteed to be newer than watermark due to the filtering). Guarantees provided by this
     timeout are as follows:
     - Timeout will never occur before the watermark has exceeded the set timeout.
     - Similar to processing time timeouts, there is no strict upper bound on the delay when the
       timeout actually occurs. The watermark can advance only when there is data in the stream
       and the event time of the data has actually advanced.
   - When the timeout occurs for a group, the function is called for that group with no values,
     and GroupState.hasTimedOut() set to true.
   - The timeout is reset every time the function is called on a group, that is, when the group
     has new data, or the group has timed out. So the user has to set the timeout duration every
     time the function is called, otherwise, there will not be any timeout set.
 
 [map/flatMap]GroupsWithState can take a user defined initial state as an additional argument.
 This state will be applied when the first batch of the streaming query is processed. If there
 are no matching rows in the data for the keys present in the initial state, the state is still
 applied and the function will be invoked with the values being an empty iterator.
 
 Scala example of using GroupState in mapGroupsWithState:
 
 // A mapping function that maintains an integer state for string keys and returns a string.
 // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
 def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {
   if (state.hasTimedOut) {                // If called when timing out, remove the state
     state.remove()
   } else if (state.exists) {              // If state exists, use it for processing
     val existingState = state.get         // Get the existing state
     val shouldRemove = ...                // Decide whether to remove the state
     if (shouldRemove) {
       state.remove()                      // Remove the state
     } else {
       val newState = ...
       state.update(newState)              // Set the new state
       state.setTimeoutDuration("1 hour")  // Set the timeout
     }
   } else {
     val initialState = ...
     state.update(initialState)            // Set the initial state
     state.setTimeoutDuration("1 hour")    // Set the timeout
   }
   ...
   // return something
 }
 dataset
   .groupByKey(...)
   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)
 
 Java example of using GroupState:
 
 // A mapping function that maintains an integer state for string keys and returns a string.
 // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
 MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
    new MapGroupsWithStateFunction<String, Integer, Integer, String>() {
      @Override
      public String call(String key, Iterator<Integer> value, GroupState<Integer> state) {
        if (state.hasTimedOut()) {            // If called when timing out, remove the state
          state.remove();
        } else if (state.exists()) {            // If state exists, use it for processing
          int existingState = state.get();      // Get the existing state
          boolean shouldRemove = ...;           // Decide whether to remove the state
          if (shouldRemove) {
            state.remove();                     // Remove the state
          } else {
            int newState = ...;
            state.update(newState);             // Set the new state
            state.setTimeoutDuration("1 hour"); // Set the timeout
          }
        } else {
          int initialState = ...;               // Set the initial state
          state.update(initialState);
          state.setTimeoutDuration("1 hour");   // Set the timeout
        }
        ...
         // return something
      }
    };
 dataset
     .groupByKey(...)
     .mapGroupsWithState(
         mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);
 - Since:
- 2.2.0
- 
Method SummaryModifier and TypeMethodDescriptionbooleanexists()Whether state exists or not.get()Get the state value if it exists, or throw NoSuchElementException.longGet the current processing time as milliseconds in epoch time.longGet the current event time watermark as milliseconds in epoch time.scala.Option<S>Get the state value as a scala Option.booleanWhether the function has been called because the key has timed out.voidremove()Remove this state.voidsetTimeoutDuration(long durationMs) Set the timeout duration in ms for this key.voidsetTimeoutDuration(String duration) Set the timeout duration for this key as a string.voidsetTimeoutTimestamp(long timestampMs) Set the timeout timestamp for this key as milliseconds in epoch time.voidsetTimeoutTimestamp(long timestampMs, String additionalDuration) Set the timeout timestamp for this key as milliseconds in epoch time and an additional duration as a string (e.g.voidsetTimeoutTimestamp(Date timestamp) Set the timeout timestamp for this key as a java.sql.Date.voidsetTimeoutTimestamp(Date timestamp, String additionalDuration) Set the timeout timestamp for this key as a java.sql.Date and an additional duration as a string (e.g.voidUpdate the value of the state.
- 
Method Details- 
existsboolean exists()Whether state exists or not.
- 
getGet the state value if it exists, or throw NoSuchElementException.- Throws:
- NoSuchElementException
 
- 
getCurrentProcessingTimeMslong getCurrentProcessingTimeMs()Get the current processing time as milliseconds in epoch time.- Returns:
- (undocumented)
- Note:
- In a streaming query, this will return a constant value throughout the duration of a trigger, even if the trigger is re-executed.
 
- 
getCurrentWatermarkMsGet the current event time watermark as milliseconds in epoch time.- Returns:
- (undocumented)
- Throws:
- UnsupportedOperationException
- Note:
- In a streaming query, this can be called only when watermark is set before calling
   [map/flatMap]GroupsWithState. In a batch query, this method always returns -1., The watermark gets propagated in the end of each query. As a result, this method will return 0 (1970-01-01T00:00:00) for the first micro-batch. If you use this value as a part of the timestamp set in thesetTimeoutTimestamp, it may lead to the state expiring immediately in the next micro-batch, once the watermark gets the real value from your data.
 
- 
getOptionscala.Option<S> getOption()Get the state value as a scala Option.
- 
hasTimedOutboolean hasTimedOut()Whether the function has been called because the key has timed out.- Returns:
- (undocumented)
- Note:
- This can return true only when timeouts are enabled in [map/flatMap]GroupsWithState.
 
- 
removevoid remove()Remove this state.
- 
setTimeoutDurationvoid setTimeoutDuration(long durationMs) throws IllegalArgumentException, UnsupportedOperationException Set the timeout duration in ms for this key.- Parameters:
- durationMs- (undocumented)
- Throws:
- IllegalArgumentException
- UnsupportedOperationException
- Note:
- Processing time timeoutmust be enabled in- [map/flatMap]GroupsWithStatefor calling this method., This method has no effect when used in a batch query.
 
- 
setTimeoutDurationvoid setTimeoutDuration(String duration) throws IllegalArgumentException, UnsupportedOperationException Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.- Parameters:
- duration- (undocumented)
- Throws:
- IllegalArgumentException
- UnsupportedOperationException
- Note:
- Processing time timeoutmust be enabled in- [map/flatMap]GroupsWithStatefor calling this method., This method has no effect when used in a batch query.
 
- 
setTimeoutTimestampvoid setTimeoutTimestamp(long timestampMs) throws IllegalArgumentException, UnsupportedOperationException Set the timeout timestamp for this key as milliseconds in epoch time. This timestamp cannot be older than the current watermark.- Parameters:
- timestampMs- (undocumented)
- Throws:
- IllegalArgumentException
- UnsupportedOperationException
- Note:
- Event time timeoutmust be enabled in- [map/flatMap]GroupsWithStatefor calling this method., This method has no effect when used in a batch query.
 
- 
setTimeoutTimestampvoid setTimeoutTimestamp(long timestampMs, String additionalDuration) throws IllegalArgumentException, UnsupportedOperationException Set the timeout timestamp for this key as milliseconds in epoch time and an additional duration as a string (e.g. "1 hour", "2 days", etc.). The final timestamp (including the additional duration) cannot be older than the current watermark.- Parameters:
- timestampMs- (undocumented)
- additionalDuration- (undocumented)
- Throws:
- IllegalArgumentException
- UnsupportedOperationException
- Note:
- Event time timeoutmust be enabled in- [map/flatMap]GroupsWithStatefor calling this method., This method has no side effect when used in a batch query.
 
- 
setTimeoutTimestampSet the timeout timestamp for this key as a java.sql.Date. This timestamp cannot be older than the current watermark.- Parameters:
- timestamp- (undocumented)
- Throws:
- UnsupportedOperationException
- Note:
- Event time timeoutmust be enabled in- [map/flatMap]GroupsWithStatefor calling this method., This method has no side effect when used in a batch query.
 
- 
setTimeoutTimestampvoid setTimeoutTimestamp(Date timestamp, String additionalDuration) throws IllegalArgumentException, UnsupportedOperationException Set the timeout timestamp for this key as a java.sql.Date and an additional duration as a string (e.g. "1 hour", "2 days", etc.). The final timestamp (including the additional duration) cannot be older than the current watermark.- Parameters:
- timestamp- (undocumented)
- additionalDuration- (undocumented)
- Throws:
- IllegalArgumentException
- UnsupportedOperationException
- Note:
- Event time timeoutmust be enabled in- [map/flatMap]GroupsWithStatefor calling this method., This method has no side effect when used in a batch query.
 
- 
updateUpdate the value of the state.
 
-