Package org.apache.spark.streaming
Class State<S>
Object
org.apache.spark.streaming.State<S>
- Type Parameters:
S
- Class of the state
:: Experimental ::
Abstract class for getting and updating the state in mapping function used in the
mapWithState
operation of a pair DStream
(Scala)
or a JavaPairDStream
(Java).
Scala example of using State
:
// A mapping function that maintains an integer state and returns a String
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
// Check if state exists
if (state.exists) {
val existingState = state.get // Get the existing state
val shouldRemove = ... // Decide whether to remove the state
if (shouldRemove) {
state.remove() // Remove the state
} else {
val newState = ...
state.update(newState) // Set the new state
}
} else {
val initialState = ...
state.update(initialState) // Set the initial state
}
... // return something
}
Java example of using State
:
// A mapping function that maintains an integer state and returns a String
Function3<String, Optional<Integer>, State<Integer>, String> mappingFunction =
new Function3<String, Optional<Integer>, State<Integer>, String>() {
@Override
public String call(String key, Optional<Integer> value, State<Integer> state) {
if (state.exists()) {
int existingState = state.get(); // Get the existing state
boolean shouldRemove = ...; // Decide whether to remove the state
if (shouldRemove) {
state.remove(); // Remove the state
} else {
int newState = ...;
state.update(newState); // Set the new state
}
} else {
int initialState = ...; // Set the initial state
state.update(initialState);
}
// return something
}
};
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionabstract boolean
exists()
Whether the state already existsabstract S
get()
Get the state if it exists, otherwise it will throwjava.util.NoSuchElementException
.final scala.Option<S>
Get the state as ascala.Option
.abstract boolean
Whether the state is timing out and going to be removed by the system after the current batch.abstract void
remove()
Remove the state if it exists.final String
toString()
abstract void
Update the state with a new value.
-
Constructor Details
-
State
public State()
-
-
Method Details
-
exists
public abstract boolean exists()Whether the state already exists -
get
Get the state if it exists, otherwise it will throwjava.util.NoSuchElementException
. Check withexists()
whether the state exists or not before callingget()
.- Returns:
- (undocumented)
- Throws:
NoSuchElementException
- If the state does not exist.
-
getOption
Get the state as ascala.Option
. It will beSome(state)
if it exists, otherwiseNone
.- Returns:
- (undocumented)
-
isTimingOut
public abstract boolean isTimingOut()Whether the state is timing out and going to be removed by the system after the current batch. This timeout can occur if timeout duration has been specified in theStatSpec
and the key has not received any new data for that timeout duration.- Returns:
- (undocumented)
-
remove
public abstract void remove()Remove the state if it exists.State cannot be updated if it has been already removed (that is,
remove()
has already been called) or it is going to be removed due to timeout (that is,isTimingOut()
istrue
). -
toString
-
update
Update the state with a new value.State cannot be updated if it has been already removed (that is,
remove()
has already been called) or it is going to be removed due to timeout (that is,isTimingOut()
istrue
).- Parameters:
newState
- (undocumented)- Throws:
IllegalArgumentException
- If the state has already been removed, or is going to be removed
-