class TwsTester[K, I, O] extends AnyRef
Testing utility for transformWithState stateful processors.
This class enables unit testing of StatefulProcessor business logic by simulating the behavior of transformWithState. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.
Supported:
- Processing input rows and producing output rows via
test(). - Initial state setup via constructor parameter.
- Direct state manipulation via
updateValueState,updateListState,updateMapState. - Direct state inspection via
peekValueState,peekListState,peekMapState. - Timers in ProcessingTime mode (use
setProcessingTimeto fire timers). - Timers in EventTime mode (use
setWatermarkto manually set the watermark and fire expired timers). - Late event filtering in EventTime mode.
Not Supported:
- TTL. States persist indefinitely, even if TTLConfig is set.
- Automatic watermark propagation: In production Spark streaming, the watermark is
computed from event times and propagated at the end of each microbatch. TwsTester does
not simulate this behavior because it processes keys individually rather than in batches.
To test watermark-dependent logic, use
setWatermark()to manually set the watermark to the desired value before callingtest().
Use Cases:
- Primary: Unit testing business logic in
handleInputRowsimplementations. - Not recommended: End-to-end testing or performance testing - use actual Spark streaming queries for those scenarios.
- K
the type of grouping key.
- I
the type of input rows.
- O
the type of output rows.
- Source
- TwsTester.scala
- Since
4.2.0
- Alphabetic
- By Inheritance
- TwsTester
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new TwsTester(processor: StatefulProcessor[K, I, O], initialState: List[(K, Any)] = List(), timeMode: TimeMode = TimeMode.None, outputMode: OutputMode = OutputMode.Append, eventTimeExtractor: Option[(I) => Long] = None)
- processor
the StatefulProcessor to test.
- initialState
initial state for each key as a list of (key, state) tuples.
- timeMode
time mode (None, ProcessingTime or EventTime).
- outputMode
output mode (Append, Update, or Complete).
- eventTimeExtractor
function to extract event time (in milliseconds) from input rows. Required when using TimeMode.EventTime. Used for late event filtering.
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
- def deleteState(stateName: String, key: K): Unit
Deletes state for a given key.
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @IntrinsicCandidate() @native()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @IntrinsicCandidate() @native()
- def peekListState[T](stateName: String, key: K): List[T]
Retrieves the list state for a given key.
- def peekMapState[MK, MV](stateName: String, key: K): Map[MK, MV]
Retrieves the map state for a given key.
- def peekValueState[T](stateName: String, key: K): Option[T]
Retrieves the value state for a given key.
- def setProcessingTime(currentTimeMs: Long): List[O]
Sets the simulated processing time and fires all expired timers.
Sets the simulated processing time and fires all expired timers.
Call this after
test()to simulate time passage and trigger any timers registered withregisterTimer(). Timers with expiry time <= current processing time will fire, invokinghandleExpiredTimerfor each. This mirrors Spark's behavior where timers are processed after input data within a microbatch.- currentTimeMs
the processing time to set in milliseconds
- returns
output rows emitted by
handleExpiredTimerfor all fired timers
- def setWatermark(currentWatermarkMs: Long): List[O]
Sets the watermark and fires all expired event-time timers.
Sets the watermark and fires all expired event-time timers.
Use this in EventTime mode to manually set the watermark. This is the only way to set the watermark in TwsTester, as automatic watermark propagation based on event times is not supported. Timers with expiry time <= new watermark will fire.
- currentWatermarkMs
the watermark to set in milliseconds
- returns
output rows emitted by
handleExpiredTimerfor all fired timers
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def test(key: K, values: List[I]): List[O]
Processes input rows for a single key through the stateful processor.
Processes input rows for a single key through the stateful processor.
In EventTime mode, late events (where event time <= current watermark) are filtered out before reaching the processor, matching the behavior of real Spark streaming.
The watermark is not automatically advanced based on event times. Use
setWatermark()to manually set the watermark before callingtest().- key
the grouping key
- values
input rows to process
- returns
all output rows produced by the processor
- def toString(): String
- Definition Classes
- AnyRef → Any
- def updateListState[T](stateName: String, key: K, value: List[T])(implicit ct: ClassTag[T]): Unit
Sets the list state for a given key.
- def updateMapState[MK, MV](stateName: String, key: K, value: Map[MK, MV]): Unit
Sets the map state for a given key.
- def updateValueState[T](stateName: String, key: K, value: T): Unit
Sets the value state for a given key.
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
Deprecated Value Members
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable]) @Deprecated
- Deprecated
(Since version 9)