agg#

Transformations for grouping and aggregating Spark DataFrames.

See the architecture overview for more information on transformations.

Functions#

create_count_aggregation()

Returns a Count or CountGrouped transformation.

create_count_distinct_aggregation()

Returns a CountDistinct or CountDistinctGrouped transformation.

create_sum_aggregation()

Returns a Sum or SumGrouped transformation.

create_count_aggregation(input_domain: tmlt.core.domains.spark_domains.SparkDataFrameDomain, input_metric: Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance], count_column: Optional[str]) Count#
create_count_aggregation(input_domain: tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain, input_metric: Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared], count_column: Optional[str]) CountGrouped

Returns a Count or CountGrouped transformation.

Parameters
  • input_domain – Domain of input DataFrames or GroupedDataFrames.

  • input_metric – Distance metric on inputs.

  • count_column – If input_domain is a SparkGroupedDataFrameDomain, this is the name of the output count column.

create_count_distinct_aggregation(input_domain: tmlt.core.domains.spark_domains.SparkDataFrameDomain, input_metric: Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance], count_column: Optional[str]) CountDistinct#
create_count_distinct_aggregation(input_domain: tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain, input_metric: Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared], count_column: Optional[str]) CountDistinctGrouped

Returns a CountDistinct or CountDistinctGrouped transformation.

Parameters
  • input_domain – Domain of input DataFrames or GroupedDataFrames.

  • input_metric – Distance metric on inputs.

  • count_column – If input_domain is a SparkGroupedDataFrameDomain, this is the name of the output count column.

create_sum_aggregation(input_domain: tmlt.core.domains.spark_domains.SparkDataFrameDomain, input_metric: Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance], measure_column: str, lower: tmlt.core.utils.exact_number.ExactNumberInput, upper: tmlt.core.utils.exact_number.ExactNumberInput, sum_column: Optional[str]) Sum#
create_sum_aggregation(input_domain: tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain, input_metric: Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared], measure_column: str, lower: tmlt.core.utils.exact_number.ExactNumberInput, upper: tmlt.core.utils.exact_number.ExactNumberInput, sum_column: Optional[str]) SumGrouped

Returns a Sum or SumGrouped transformation.

Parameters
  • input_domain – Domain of input DataFrames or GroupedDataFrames.

  • input_metric – Distance metric on inputs. name of the output sum column.

  • measure_column – Column to be summed.

  • lower – Lower clipping bound for measure column.

  • upper – Upper clipping bound for measure column.

  • sum_column – If input_domain is a SparkGroupedDataFrameDomain, this is the column name to be used for sums in the DataFrame output by the measurement. If None, this column will be named “sum(<measure_column>)”.

Classes#

Count

Counts the number of records in a spark DataFrame.

CountDistinct

Counts the number of distinct records in a spark DataFrame.

CountGrouped

Counts the number of records in each group in a GroupedDataFrame.

CountDistinctGrouped

Counts the number of distinct records in each group in a GroupedDataFrame.

Sum

Returns the sum of a single numeric column in a spark DataFrame.

SumGrouped

Computes the sum of a column for each group in a GroupedDataFrame.

class Count(input_domain, input_metric)#

Bases: tmlt.core.transformations.base.Transformation

Counts the number of records in a spark DataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A  X
0  a1  2
1  a1  3
2  a2 -1
3  a2  5
>>> # Create the transformation
>>> count_dataframe = Count(
...     input_domain=SparkDataFrameDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "X": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     input_metric=SymmetricDifference(),
... )
>>> # Apply transformation to data
>>> count_dataframe(spark_dataframe)
4
Transformation Contract:
>>> count_dataframe.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> count_dataframe.output_domain
NumpyIntegerDomain(size=64)
>>> count_dataframe.input_metric
SymmetricDifference()
>>> count_dataframe.output_metric
AbsoluteDifference()
Stability Guarantee:

Count’s stability_function() returns d_in if input metric is SymmetricDifference and \(d_{in} * 2\) if input metric is HammingDistance.

>>> count_dataframe.stability_function(1)
1
Parameters
__init__(input_domain, input_metric)#

Constructor.

Parameters
stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(df)#

Returns the number of records in given DataFrame.

Parameters

df (pyspark.sql.DataFrame) –

Return type

numpy.int64

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class CountDistinct(input_domain, input_metric)#

Bases: tmlt.core.transformations.base.Transformation

Counts the number of distinct records in a spark DataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A  X
0  a1  2
1  a1  2
2  a2 -1
3  a2  5
>>> # Create the transformation
>>> count_distinct_dataframe = CountDistinct(
...     input_domain=SparkDataFrameDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "X": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     input_metric=SymmetricDifference(),
... )
>>> # Apply transformation to data
>>> count_distinct_dataframe(spark_dataframe)
3
Transformation contract:
>>> count_distinct_dataframe.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> count_distinct_dataframe.output_domain
NumpyIntegerDomain(size=64)
>>> count_distinct_dataframe.input_metric
SymmetricDifference()
>>> count_distinct_dataframe.output_metric
AbsoluteDifference()
Stability Guarantee:

CountDistinct’s stability_function() returns d_in if input metric is SymmetricDifference and \(d_{in} * 2\) if input metric is HammingDistance.

>>> count_distinct_dataframe.stability_function(1)
1
Parameters
__init__(input_domain, input_metric)#

Constructor.

Parameters
stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(df)#

Returns the number of distinct records in the given DataFrame.

Parameters

df (pyspark.sql.DataFrame) –

Return type

numpy.int64

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class CountGrouped(input_domain, input_metric, count_column=None)#

Bases: tmlt.core.transformations.base.Transformation

Counts the number of records in each group in a GroupedDataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A  X
0  a1  2
1  a1  3
2  a2 -1
3  a2  5
>>> # Specify group keys
>>> group_keys = spark.createDataFrame(
...     [("a0",), ("a1",)],
...     schema=["A"],
... )
>>> # Note that we have omitted 'a2' from our group keys
>>> # and included 'a0' which doesn't exist in the DataFrame
>>> # Create the transformation
>>> count_by_A = CountGrouped(
...     input_domain=SparkGroupedDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "X": SparkIntegerColumnDescriptor(),
...         },
...         groupby_columns=["A"],
...     ),
...     input_metric=SumOf(SymmetricDifference()),
... )
>>> # Create GroupedDataFrame
>>> grouped_dataframe = GroupedDataFrame(
...     dataframe=spark_dataframe,
...     group_keys=group_keys,
... )
>>> # Apply transformation to data
>>> print_sdf(count_by_A(grouped_dataframe))
    A  count
0  a0      0
1  a1      2
>>> # Note that the output does not contain an entry
>>> # for group key 'a2' but it does contain an entry
>>> # for group key 'a0'.
Transformation Contract:
>>> count_by_A.input_domain
SparkGroupedDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)}, groupby_columns=['A'])
>>> count_by_A.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> count_by_A.input_metric
SumOf(inner_metric=SymmetricDifference())
>>> count_by_A.output_metric
OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference()))
Stability Guarantee:

CountGrouped’s stability_function() returns d_in.

>>> count_by_A.stability_function(1)
1
Parameters
__init__(input_domain, input_metric, count_column=None)#

Constructor.

Parameters
property input_domain#

Returns input domain.

Return type

tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain

property count_column#

Returns the count column name.

Return type

str

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(grouped_data)#

Returns a DataFrame containing counts for each group.

Parameters

grouped_data (tmlt.core.utils.grouped_dataframe.GroupedDataFrame) –

Return type

pyspark.sql.DataFrame

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class CountDistinctGrouped(input_domain, input_metric, count_column=None)#

Bases: tmlt.core.transformations.base.Transformation

Counts the number of distinct records in each group in a GroupedDataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A  X
0  a1  2
1  a1  2
2  a1  3
3  a2 -1
4  a2  5
>>> # Specify group keys
>>> group_keys = spark.createDataFrame(
...     [("a0",), ("a1",)],
...     schema=["A"],
... )
>>> # Note that we have omitted 'a2' from our group keys
>>> # and included 'a0' which doesn't exist in the DataFrame
>>> # Create the transformation
>>> count_distinct_by_A = CountDistinctGrouped(
...     input_domain=SparkGroupedDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "X": SparkIntegerColumnDescriptor(),
...         },
...         groupby_columns=["A"],
...     ),
...     input_metric=SumOf(SymmetricDifference()),
... )
>>> # Create GroupedDataFrame
>>> grouped_dataframe = GroupedDataFrame(
...     dataframe=spark_dataframe,
...     group_keys=group_keys,
... )
>>> # Apply transformation to data
>>> print_sdf(count_distinct_by_A(grouped_dataframe))
    A  count_distinct
0  a0               0
1  a1               2
>>> # Note that the output does not contain an entry
>>> # for group key 'a2' but it does contain an entry
>>> # for group key 'a0'.
Transformation Contract:
>>> count_distinct_by_A.input_domain
SparkGroupedDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)}, groupby_columns=['A'])
>>> count_distinct_by_A.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'count_distinct': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> count_distinct_by_A.input_metric
SumOf(inner_metric=SymmetricDifference())
>>> count_distinct_by_A.output_metric
OnColumn(column='count_distinct', metric=SumOf(inner_metric=AbsoluteDifference()))
Stability Guarantee:

CountDistinctGrouped’s stability_function() returns d_in.

>>> count_distinct_by_A.stability_function(1)
1
Parameters
__init__(input_domain, input_metric, count_column=None)#

Constructor.

Parameters
property input_domain#

Returns input domain.

Return type

tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain

property count_column#

Returns the count column name.

Return type

str

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(grouped_data)#

Returns a DataFrame containing counts for each group.

Parameters

grouped_data (tmlt.core.utils.grouped_dataframe.GroupedDataFrame) –

Return type

pyspark.sql.DataFrame

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class Sum(input_domain, input_metric, measure_column, lower, upper)#

Bases: tmlt.core.transformations.base.Transformation

Returns the sum of a single numeric column in a spark DataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A  X
0  a1  2
1  a1  3
2  a2 -1
3  a2  5
>>> # Create the transformation
>>> sum_X = Sum(
...     input_domain=SparkDataFrameDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "X": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     input_metric=SymmetricDifference(),
...     measure_column="X",
...     upper=4,
...     lower=0,
... )
>>> # Apply transformation to data
>>> sum_X(spark_dataframe)
9
Transformation Contract:
>>> sum_X.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> sum_X.output_domain
NumpyIntegerDomain(size=64)
>>> sum_X.input_metric
SymmetricDifference()
>>> sum_X.output_metric
AbsoluteDifference()
Stability Guarantee:

Sum’s stability_function() returns d_in times sensitivity of the sum. (See below for more information).

>>> sum_X.stability_function(1)
4

The sensitivity of the sum is:

Methods#

upper()

Returns upper clipping bound.

lower()

Returns lower clipping bound.

measure_column()

Returns name of the column to be summed.

stability_function()

Returns the smallest d_out satisfied by the transformation.

__call__()

Returns the sum of specified column in the dataframe.

input_domain()

Return input domain for the measurement.

input_metric()

Distance metric on input domain.

output_domain()

Return input domain for the measurement.

output_metric()

Distance metric on input domain.

stability_relation()

Returns True only if close inputs produce close outputs.

__or__()

Return this transformation chained with another component.

Parameters
__init__(input_domain, input_metric, measure_column, lower, upper)#

Constructor.

Parameters
property upper#

Returns upper clipping bound.

Return type

tmlt.core.utils.exact_number.ExactNumber

property lower#

Returns lower clipping bound.

Return type

tmlt.core.utils.exact_number.ExactNumber

property measure_column#

Returns name of the column to be summed.

Return type

str

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(df)#

Returns the sum of specified column in the dataframe.

Parameters

df (pyspark.sql.DataFrame) –

Return type

Union[int, float]

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class SumGrouped(input_domain, input_metric, measure_column, lower, upper, sum_column=None)#

Bases: tmlt.core.transformations.base.Transformation

Computes the sum of a column for each group in a GroupedDataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A  X
0  a1  2
1  a1  3
2  a2 -1
3  a2  6
>>> # Specify group keys
>>> group_keys = spark.createDataFrame(
...     [("a0",), ("a2",)],
...     schema=["A"],
... )
>>> # Note that we omit the key 'a1' even though it
>>> # exists in the spark dataframe and include 'a0'.
>>> # Create the transformation
>>> sum_X_by_A = SumGrouped(
...     input_domain=SparkGroupedDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "X": SparkIntegerColumnDescriptor(),
...         },
...         groupby_columns=["A"],
...     ),
...     input_metric=SumOf(SymmetricDifference()),
...     measure_column="X",
...     upper=4,
...     lower=0,
... )
>>> # Create GroupedDataFrame
>>> grouped_dataframe = GroupedDataFrame(
...     dataframe=spark_dataframe,
...     group_keys=group_keys,
... )
>>> # Apply transformation to data
>>> print_sdf(sum_X_by_A(grouped_dataframe))
    A  sum(X)
0  a0       0
1  a2       4
Transformation Contract:
>>> sum_X_by_A.input_domain
SparkGroupedDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)}, groupby_columns=['A'])
>>> sum_X_by_A.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'sum(X)': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> sum_X_by_A.input_metric
SumOf(inner_metric=SymmetricDifference())
>>> sum_X_by_A.output_metric
OnColumn(column='sum(X)', metric=SumOf(inner_metric=AbsoluteDifference()))
Stability Guarantee:

SumGrouped’s stability_function() returns d_in * sensitivity of the sum.

>>> sum_X_by_A.stability_function(1)
4

The sensitivity of the sum is:

  • \(\max(|h|, |\ell|)\)

Methods#

upper()

Returns upper clipping bound.

lower()

Returns lower clipping bound.

measure_column()

Returns name of the column to be summed.

sum_column()

Returns name of the output column containing sums.

input_domain()

Returns input domain.

stability_function()

Returns the smallest d_out satisfied by the transformation.

__call__()

Returns DataFrame containing sum of specified column for each group.

input_metric()

Distance metric on input domain.

output_domain()

Return input domain for the measurement.

output_metric()

Distance metric on input domain.

stability_relation()

Returns True only if close inputs produce close outputs.

__or__()

Return this transformation chained with another component.

Parameters
__init__(input_domain, input_metric, measure_column, lower, upper, sum_column=None)#

Constructor.

Parameters
property upper#

Returns upper clipping bound.

Return type

tmlt.core.utils.exact_number.ExactNumber

property lower#

Returns lower clipping bound.

Return type

tmlt.core.utils.exact_number.ExactNumber

property measure_column#

Returns name of the column to be summed.

Return type

str

property sum_column#

Returns name of the output column containing sums.

Return type

str

property input_domain#

Returns input domain.

Return type

tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(grouped_dataframe)#

Returns DataFrame containing sum of specified column for each group.

Parameters

grouped_dataframe (tmlt.core.utils.grouped_dataframe.GroupedDataFrame) –

Return type

pyspark.sql.DataFrame

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.