truncation#

Functions for truncating Spark DataFrames.

Functions#

truncate_large_groups()

Order rows by a hash function and keep at most threshold rows for each group.

drop_large_groups()

Drop all rows for groups that have more than threshold rows.

limit_keys_per_group()

Order keys by a hash function and keep at most threshold keys for each group.

truncate_large_groups(df, grouping_columns, threshold)#

Order rows by a hash function and keep at most threshold rows for each group.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
4  a3  b3
>>> print_sdf(truncate_large_groups(spark_dataframe, ["A"], 3))
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
4  a3  b3
>>> print_sdf(truncate_large_groups(spark_dataframe, ["A"], 2))
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b3
>>> print_sdf(truncate_large_groups(spark_dataframe, ["A"], 1))
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
Parameters:
  • df (pyspark.sql.DataFrame) – DataFrame to truncate.

  • grouping_columns (List[str]) – Columns defining the groups.

  • threshold (int) – Maximum number of rows to include for each group.

Return type:

pyspark.sql.DataFrame

drop_large_groups(df, grouping_columns, threshold)#

Drop all rows for groups that have more than threshold rows.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
4  a3  b3
>>> print_sdf(drop_large_groups(spark_dataframe, ["A"], 3))
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
4  a3  b3
>>> print_sdf(drop_large_groups(spark_dataframe, ["A"], 2))
    A   B
0  a1  b1
1  a2  b1
>>> print_sdf(drop_large_groups(spark_dataframe, ["A"], 1))
    A   B
0  a1  b1
1  a2  b1
Parameters:
  • df (pyspark.sql.DataFrame) – DataFrame to truncate.

  • grouping_columns (List[str]) – Columns defining the groups.

  • threshold (int) – Threshold for dropping groups. If more than threshold rows belong to the same group, all rows in that group are dropped.

Return type:

pyspark.sql.DataFrame

limit_keys_per_group(df, grouping_columns, key_columns, threshold)#

Order keys by a hash function and keep at most threshold keys for each group.

Note

After truncation there may still be an unbounded number of rows per key, but at most threshold keys per group

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
4  a3  b3
5  a4  b1
6  a4  b2
7  a4  b3
>>> print_sdf(
...     limit_keys_per_group(
...         df=spark_dataframe,
...         grouping_columns=["A"],
...         key_columns=["B"],
...         threshold=2,
...     )
... )
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
4  a3  b3
5  a4  b2
6  a4  b3
>>> print_sdf(
...     limit_keys_per_group(
...         df=spark_dataframe,
...         grouping_columns=["A"],
...         key_columns=["B"],
...         threshold=1,
...     )
... )
    A   B
0  a1  b1
1  a2  b1
2  a3  b3
3  a4  b3
Parameters:
  • df (pyspark.sql.DataFrame) – DataFrame to truncate.

  • grouping_columns (List[str]) – Columns defining the groups.

  • key_columns (List[str]) – Column defining the keys.

  • threshold (int) – Maximum number of keys to include for each group.

Return type:

pyspark.sql.DataFrame