groupby#
Transformations for performing groupby on Spark DataFrames.
Functions#
Returns GroupBy transformation with Cartesian product of column domains as keys. |
|
Returns a GroupBy transformation using user-supplied list of group keys. |
|
Returns a DataFrame containing the Cartesian product of given column domains. |
- create_groupby_from_column_domains(input_domain, input_metric, use_l2, column_domains)#
Returns GroupBy transformation with Cartesian product of column domains as keys.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B C 0 a1 b1 c1 1 a2 b1 c2 2 a3 b2 c1 3 a3 b2 c1 >>> groupby_B_C = create_groupby_from_column_domains( ... input_domain=SparkDataFrameDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... input_metric=SymmetricDifference(), ... use_l2=False, ... column_domains={ ... "B": ["b1", "b2"], ... "C": ["c1", "c2"], ... } ... ) >>> # Apply transformation to data >>> grouped_dataframe = groupby_B_C(spark_dataframe) >>> groups_df = grouped_dataframe.agg(count("*").alias("count"), fill_value=0) >>> print(groups_df.toPandas().sort_values(["B", "C"], ignore_index=True)) B C count 0 b1 c1 1 1 b1 c2 1 2 b2 c1 2 3 b2 c2 0 >>> # Note that the group key ("b2", "c2") does not appear in the DataFrame >>> # but appears in the aggregation output with the given fill value.
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) – Domain of input DataFrames.
input_metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance, tmlt.core.metrics.IfGroupedBy]) – Metric on input DataFrames.
use_l2 (bool) – If True, use
RootSumOfSquared
instead ofSumOf
in the output metric.column_domains (Mapping[str, Union[List[str], List[Optional[str]], List[int], List[Optional[int]], List[datetime.date], List[Optional[datetime.date]]]]) – Mapping from column name to list of distinct values.
- Return type:
Note
column_domains
must be public.
- create_groupby_from_list_of_keys(input_domain, input_metric, use_l2, groupby_columns, keys)#
Returns a GroupBy transformation using user-supplied list of group keys.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B C 0 a1 b1 c1 1 a2 b1 c2 2 a3 b2 c1 3 a3 b2 c1 >>> groupby_B_C = create_groupby_from_list_of_keys( ... input_domain=SparkDataFrameDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... input_metric=SymmetricDifference(), ... use_l2=False, ... groupby_columns=["B", "C"], ... keys=[("b1", "c1"), ("b2", "c2")] ... ) >>> # Apply transformation to data >>> grouped_dataframe = groupby_B_C(spark_dataframe) >>> groups_df = grouped_dataframe.agg(count("*").alias("count"), fill_value=0) >>> print(groups_df.toPandas().sort_values(["B", "C"], ignore_index=True)) B C count 0 b1 c1 1 1 b2 c2 0 >>> # Note that there is no record corresponding to the key ("b1", "c2") >>> # since we did not specify this key while constructing the GroupBy even >>> # though this key appears in the input DataFrame.
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) – Domain of input DataFrames.
input_metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance, tmlt.core.metrics.IfGroupedBy]) – Metric on input DataFrames.
use_l2 (bool) – If True, use
RootSumOfSquared
instead ofSumOf
in the output metric.groupby_columns (List[str]) – List of column names to groupby.
keys (List[Tuple[Union[str, int], Ellipsis]]) – List of distinct tuples corresponding to group keys.
- Return type:
Note
keys
must be public list of tuples with no duplicates.
- compute_full_domain_df(column_domains)#
Returns a DataFrame containing the Cartesian product of given column domains.
- Parameters:
column_domains (Mapping[str, Union[List[str], List[Optional[str]], List[int], List[Optional[int]], List[datetime.date], List[Optional[datetime.date]]]])
- Return type:
Classes#
Groups a Spark DataFrame by given group keys. |
- class GroupBy(input_domain, input_metric, use_l2, group_keys)#
Bases:
tmlt.core.transformations.base.Transformation
Groups a Spark DataFrame by given group keys.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> groupby_B = GroupBy( ... input_domain=SparkDataFrameDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... input_metric=SymmetricDifference(), ... use_l2=False, ... group_keys=spark.createDataFrame( ... pd.DataFrame( ... { ... "B":["b1", "b2"] ... } ... ) ... ) ... ) >>> # Apply transformation to data >>> grouped_dataframe = groupby_B(spark_dataframe) >>> counts_df = grouped_dataframe.agg(count("*").alias("count"), fill_value=0) >>> print(counts_df.sort("B").toPandas()) B count 0 b1 2 1 b2 2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkGroupedDataFrameDomain
Input metric -
SymmetricDifference
orHammingDistance
orIfGroupedBy
(with inner metricSymmetricDifference
)Output metric -
SumOf
orRootSumOfSquared
ofSymmetricDifference
>>> groupby_B.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> groupby_B.output_domain SparkGroupedDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}, groupby_columns=['B']) >>> groupby_B.input_metric SymmetricDifference() >>> groupby_B.output_metric SumOf(inner_metric=SymmetricDifference())
- Stability Guarantee:
GroupBy
’sstability_function()
returns thed_in
if theinput_metric
isSymmetricDifference
orIfGroupedBy
, otherwise it returnsd_in
times2
.>>> groupby_B.stability_function(1) 1
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain)
input_metric (Union[tmlt.core.metrics.HammingDistance, tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.IfGroupedBy])
use_l2 (bool)
group_keys (pyspark.sql.DataFrame)
- property use_l2: bool#
Returns whether the output metric will use
RootSumOfSquared
.- Return type:
- property group_keys: pyspark.sql.DataFrame#
Returns DataFrame containing group keys.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(input_domain, input_metric, use_l2, group_keys)#
Constructor.
- Parameters:
input_domain (
SparkDataFrameDomain
) – Input domain.input_metric (
Union
[HammingDistance
,SymmetricDifference
,IfGroupedBy
]) – Input metric.use_l2 (
bool
) – If True, useRootSumOfSquared
instead ofSumOf
in the output metric.group_keys (
DataFrame
) – DataFrame where rows correspond to group keys.
Note
group_keys
must be public.
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under
input_metric
.- Return type:
- __call__(sdf)#
Performs groupby.
- Parameters:
sdf (pyspark.sql.DataFrame)
- Return type:
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type:
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.