spark_measurements#

Measurements on Spark DataFrames.

Classes#

SparkMeasurement

Base class that materializes output DataFrames before returning.

AddNoiseToColumn

Adds noise to a single aggregated column of a Spark DataFrame.

ApplyInPandas

Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.

GeometricPartitionSelection

Discovers the distinct rows in a DataFrame, suppressing infrequent rows.

class SparkMeasurement(input_domain, input_metric, output_measure, is_interactive)#

Bases: tmlt.core.measurements.base.Measurement

Base class that materializes output DataFrames before returning.

Parameters
__init__(input_domain, input_metric, output_measure, is_interactive)#

Constructor.

Parameters
  • input_domain (DomainDomain) – Domain of input datasets.

  • input_metric (MetricMetric) – Distance metric for input datasets.

  • output_measure (MeasureMeasure) – Distance measure for measurement’s output.

  • is_interactive (boolbool) – Whether the measurement is interactive.

abstract call(self, val)#

Performs measurement.

Warning

Spark recomputes the output of this method (adding different noise each time) on every call to collect.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

__call__(self, val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure(self)#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive(self)#

Returns true iff the measurement is interactive.

Return type

bool

privacy_function(self, d_in)#

Returns the smallest d_out satisfied by the measurement.

See the privacy and stability tutorial (add link?) for more information.

Parameters

d_in (Any) – Distance between inputs under input_metric.

Raises

NotImplementedError – If not overridden.

Return type

Any

privacy_relation(self, d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool

class AddNoiseToColumn(input_domain, measurement, measure_column)#

Bases: SparkMeasurement

Adds noise to a single aggregated column of a Spark DataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B  count
0  a1  b1      3
1  a1  b2      2
2  a2  b1      1
3  a2  b2      0
>>> # Create a measurement that can add noise to a pd.Series
>>> add_laplace_noise = AddLaplaceNoise(
...     scale="0.5",
...     input_domain=NumpyIntegerDomain(),
... )
>>> # Create a measurement that can add noise to a Spark DataFrame
>>> add_laplace_noise_to_column = AddNoiseToColumn(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...             "count": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     measurement=AddNoiseToSeries(add_laplace_noise),
...     measure_column="count",
... )
>>> # Apply measurement to data
>>> noisy_spark_dataframe = add_laplace_noise_to_column(spark_dataframe)
>>> print_sdf(noisy_spark_dataframe) 
    A   B   count
0  a1  b1 ...
1  a1  b2 ...
2  a2  b1 ...
3  a2  b2 ...
Measurement Contract:
>>> add_laplace_noise_to_column.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> add_laplace_noise_to_column.input_metric
OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference()))
>>> add_laplace_noise_to_column.output_measure
PureDP()
Privacy Guarantee:

AddNoiseToColumn’s privacy_function() returns the output of privacy function on the AddNoiseToSeries measurement.

>>> add_laplace_noise_to_column.privacy_function(1)
2
Parameters
__init__(input_domain, measurement, measure_column)#

Constructor.

Parameters

Note

The input metric of this measurement is derived from the measure_column and the input metric of the measurement to be applied. In particular, the input metric of this measurement is measurement.input_metric on the specified measure_column.

property measure_column(self)#

Returns the name of the column to add noise to.

Return type

str

property measurement(self)#

Returns the AddNoiseToSeries measurement to apply to measure column.

Return type

tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries

privacy_function(self, d_in)#

Returns the smallest d_out satisfied by the measurement.

See the privacy and stability tutorial for more information. # TODO(#1320)

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Raises

NotImplementedError – If the privacy_function() of the AddNoiseToSeries measurement raises NotImplementedError.

Return type

tmlt.core.utils.exact_number.ExactNumber

call(self, sdf)#

Applies measurement to measure column.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

__call__(self, val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure(self)#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive(self)#

Returns true iff the measurement is interactive.

Return type

bool

privacy_relation(self, d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool

class ApplyInPandas(input_domain, input_metric, aggregation_function)#

Bases: SparkMeasurement

Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.

Parameters
__init__(input_domain, input_metric, aggregation_function)#

Constructor.

Parameters
property aggregation_function(self)#

Returns the aggregation function.

Return type

tmlt.core.measurements.pandas_measurements.dataframe.Aggregate

property input_domain(self)#

Returns input domain.

Return type

tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain

privacy_function(self, d_in)#

Returns the smallest d_out satisfied by the measurement.

See the privacy and stability tutorial for more information. # TODO(#1320)

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Raises

NotImplementedError – If self.aggregation_function.privacy_function(d_in) raises NotImplementedError.

Return type

tmlt.core.utils.exact_number.ExactNumber

call(self, grouped_dataframe)#

Returns DataFrame obtained by applying pandas aggregation to each group.

Parameters

grouped_dataframe (tmlt.core.utils.grouped_dataframe.GroupedDataFrame) –

Return type

pyspark.sql.DataFrame

__call__(self, val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure(self)#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive(self)#

Returns true iff the measurement is interactive.

Return type

bool

privacy_relation(self, d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool

class GeometricPartitionSelection(input_domain, threshold, alpha, count_column=None)#

Bases: SparkMeasurement

Discovers the distinct rows in a DataFrame, suppressing infrequent rows.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
      A   B
0    a1  b1
1    a2  b2
2    a2  b2
3    a2  b2
4    a2  b2
..   ..  ..
96   a2  b2
97   a2  b2
98   a2  b2
99   a2  b2
100  a2  b2

[101 rows x 2 columns]
>>> measurement = GeometricPartitionSelection(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         },
...     ),
...     threshold=50,
...     alpha=1,
... )
>>> noisy_spark_dataframe = measurement(spark_dataframe) 
>>> print_sdf(noisy_spark_dataframe)  
    A   B  count
0  a2  b2    106
Measurement Contract:
>>> measurement.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> measurement.input_metric
SymmetricDifference()
>>> measurement.output_measure
ApproxDP()
Privacy Guarantee:

For \(d_{in} = 0\), returns \((0, 0)\)

For \(d_{in} = 1\), returns \((1/\alpha, 1 - CDF_{\alpha}[\tau - 2])\)

For \(d_{in} > 1\), returns \((d_{in} \cdot \epsilon, d_{in} \cdot e^{d_{in} \cdot \epsilon} \cdot \delta)\)

where:

>>> epsilon, delta = measurement.privacy_function(1)
>>> epsilon
1
>>> delta.to_float(round_up=True)
3.8328565409781243e-22
>>> epsilon, delta = measurement.privacy_function(2)
>>> epsilon
2
>>> delta.to_float(round_up=True)
5.664238400088129e-21
Methods#

alpha()

Returns the noise scale.

threshold()

Returns the minimum noisy count to include row.

count_column()

Returns the count column name.

privacy_function()

Returns the smallest d_out satisfied by the measurement.

call()

Return the noisy counts for common rows.

__call__()

Performs measurement and returns a DataFrame with additional protections.

input_domain()

Return input domain for the measurement.

input_metric()

Distance metric on input domain.

output_measure()

Distance measure on output.

is_interactive()

Returns true iff the measurement is interactive.

privacy_relation()

Return True if close inputs produce close outputs.

Parameters
__init__(input_domain, threshold, alpha, count_column=None)#

Constructor.

Parameters
property alpha(self)#

Returns the noise scale.

Return type

tmlt.core.utils.exact_number.ExactNumber

property threshold(self)#

Returns the minimum noisy count to include row.

Return type

int

property count_column(self)#

Returns the count column name.

Return type

str

privacy_function(self, d_in)#

Returns the smallest d_out satisfied by the measurement.

See the privacy and stability tutorial for more information. # TODO(#1320)

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

Tuple[tmlt.core.utils.exact_number.ExactNumber, tmlt.core.utils.exact_number.ExactNumber]

call(self, sdf)#

Return the noisy counts for common rows.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

__call__(self, val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure(self)#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive(self)#

Returns true iff the measurement is interactive.

Return type

bool

privacy_relation(self, d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool