Packages

class TwsTester[K, I, O] extends AnyRef

Testing utility for transformWithState stateful processors.

This class enables unit testing of StatefulProcessor business logic by simulating the behavior of transformWithState. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.

Supported:

  • Processing input rows and producing output rows via test().
  • Initial state setup via constructor parameter.
  • Direct state manipulation via updateValueState, updateListState, updateMapState.
  • Direct state inspection via peekValueState, peekListState, peekMapState.
  • Timers in ProcessingTime mode (use setProcessingTime to fire timers).
  • Timers in EventTime mode (use setWatermark to manually set the watermark and fire expired timers).
  • Late event filtering in EventTime mode.

Not Supported:

  • TTL. States persist indefinitely, even if TTLConfig is set.
  • Automatic watermark propagation: In production Spark streaming, the watermark is computed from event times and propagated at the end of each microbatch. TwsTester does not simulate this behavior because it processes keys individually rather than in batches. To test watermark-dependent logic, use setWatermark() to manually set the watermark to the desired value before calling test().

Use Cases:

  • Primary: Unit testing business logic in handleInputRows implementations.
  • Not recommended: End-to-end testing or performance testing - use actual Spark streaming queries for those scenarios.
K

the type of grouping key.

I

the type of input rows.

O

the type of output rows.

Source
TwsTester.scala
Since

4.2.0

Linear Supertypes
AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. TwsTester
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Instance Constructors

  1. new TwsTester(processor: StatefulProcessor[K, I, O], initialState: List[(K, Any)] = List(), timeMode: TimeMode = TimeMode.None, outputMode: OutputMode = OutputMode.Append, eventTimeExtractor: Option[(I) => Long] = None)

    processor

    the StatefulProcessor to test.

    initialState

    initial state for each key as a list of (key, state) tuples.

    timeMode

    time mode (None, ProcessingTime or EventTime).

    outputMode

    output mode (Append, Update, or Complete).

    eventTimeExtractor

    function to extract event time (in milliseconds) from input rows. Required when using TimeMode.EventTime. Used for late event filtering.

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  5. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @IntrinsicCandidate() @native()
  6. def deleteState(stateName: String, key: K): Unit

    Deletes state for a given key.

  7. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  8. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  9. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @IntrinsicCandidate() @native()
  10. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @IntrinsicCandidate() @native()
  11. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  12. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  13. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @IntrinsicCandidate() @native()
  14. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @IntrinsicCandidate() @native()
  15. def peekListState[T](stateName: String, key: K): List[T]

    Retrieves the list state for a given key.

  16. def peekMapState[MK, MV](stateName: String, key: K): Map[MK, MV]

    Retrieves the map state for a given key.

  17. def peekValueState[T](stateName: String, key: K): Option[T]

    Retrieves the value state for a given key.

  18. def setProcessingTime(currentTimeMs: Long): List[O]

    Sets the simulated processing time and fires all expired timers.

    Sets the simulated processing time and fires all expired timers.

    Call this after test() to simulate time passage and trigger any timers registered with registerTimer(). Timers with expiry time <= current processing time will fire, invoking handleExpiredTimer for each. This mirrors Spark's behavior where timers are processed after input data within a microbatch.

    currentTimeMs

    the processing time to set in milliseconds

    returns

    output rows emitted by handleExpiredTimer for all fired timers

  19. def setWatermark(currentWatermarkMs: Long): List[O]

    Sets the watermark and fires all expired event-time timers.

    Sets the watermark and fires all expired event-time timers.

    Use this in EventTime mode to manually set the watermark. This is the only way to set the watermark in TwsTester, as automatic watermark propagation based on event times is not supported. Timers with expiry time <= new watermark will fire.

    currentWatermarkMs

    the watermark to set in milliseconds

    returns

    output rows emitted by handleExpiredTimer for all fired timers

  20. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  21. def test(key: K, values: List[I]): List[O]

    Processes input rows for a single key through the stateful processor.

    Processes input rows for a single key through the stateful processor.

    In EventTime mode, late events (where event time <= current watermark) are filtered out before reaching the processor, matching the behavior of real Spark streaming.

    The watermark is not automatically advanced based on event times. Use setWatermark() to manually set the watermark before calling test().

    key

    the grouping key

    values

    input rows to process

    returns

    all output rows produced by the processor

  22. def toString(): String
    Definition Classes
    AnyRef → Any
  23. def updateListState[T](stateName: String, key: K, value: List[T])(implicit ct: ClassTag[T]): Unit

    Sets the list state for a given key.

  24. def updateMapState[MK, MV](stateName: String, key: K, value: Map[MK, MV]): Unit

    Sets the map state for a given key.

  25. def updateValueState[T](stateName: String, key: K, value: T): Unit

    Sets the value state for a given key.

  26. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  27. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()
  28. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable]) @Deprecated
    Deprecated

    (Since version 9)

Inherited from AnyRef

Inherited from Any

Ungrouped