DataFrame.dropDuplicatesWithinWatermark(subset: Optional[List[str]] = None) → pyspark.sql.dataframe.DataFrame[source]
Return a new DataFrame with duplicate rows removed,

optionally only considering certain columns, within watermark.

This only works with streaming DataFrame, and watermark for the input DataFrame must be set via withWatermark().

For a streaming DataFrame, this will keep all data across triggers as intermediate state to drop duplicated rows. The state will be kept to guarantee the semantic, “Events are deduplicated as long as the time distance of earliest and latest events are smaller than the delay threshold of watermark.” Users are encouraged to set the delay threshold of watermark longer than max timestamp differences among duplicated events.

Note: too late data older than watermark will be dropped.

New in version 3.5.0.

subsetList of column names, optional

List of columns to use for duplicate comparison (default All columns).


DataFrame without duplicates.


Supports Spark Connect.


>>> from pyspark.sql import Row
>>> from pyspark.sql.functions import timestamp_seconds
>>> df = spark.readStream.format("rate").load().selectExpr(
...     "value % 5 AS value", "timestamp")
>>>"value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
DataFrame[value: bigint, time: timestamp]

Deduplicate the same rows.

>>> df.dropDuplicatesWithinWatermark() 

Deduplicate values on ‘value’ columns.

>>> df.dropDuplicatesWithinWatermark(['value'])