spark_measurements#

Measurements on Spark DataFrames.

See the architecture guide for more information.

Classes#

SparkMeasurement

Base class that materializes output DataFrames before returning.

AddNoiseToColumn

Adds noise to a single aggregated column of a Spark DataFrame.

ApplyInPandas

Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.

GeometricPartitionSelection

Discovers the distinct rows in a DataFrame, suppressing infrequent rows.

BoundSelection

Discovers a noisy bound based on a DataFrame Column.

class SparkMeasurement(input_domain, input_metric, output_measure, is_interactive)#

Bases: tmlt.core.measurements.base.Measurement

Base class that materializes output DataFrames before returning.

Parameters
__init__(input_domain, input_metric, output_measure, is_interactive)#

Constructor.

Parameters
  • input_domain (DomainDomain) – Domain of input datasets.

  • input_metric (MetricMetric) – Distance metric for input datasets.

  • output_measure (MeasureMeasure) – Distance measure for measurement’s output.

  • is_interactive (boolbool) – Whether the measurement is interactive.

abstract call(val)#

Performs measurement.

Warning

Spark recomputes the output of this method (adding different noise each time) on every call to collect.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive#

Returns true iff the measurement is interactive.

Return type

bool

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the privacy and stability tutorial (add link?) for more information.

Parameters

d_in (Any) – Distance between inputs under input_metric.

Raises

NotImplementedError – If not overridden.

Return type

Any

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool

class AddNoiseToColumn(input_domain, measurement, measure_column)#

Bases: SparkMeasurement

Adds noise to a single aggregated column of a Spark DataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B  count
0  a1  b1      3
1  a1  b2      2
2  a2  b1      1
3  a2  b2      0
>>> # Create a measurement that can add noise to a pd.Series
>>> add_laplace_noise = AddLaplaceNoise(
...     scale="0.5",
...     input_domain=NumpyIntegerDomain(),
... )
>>> # Create a measurement that can add noise to a Spark DataFrame
>>> add_laplace_noise_to_column = AddNoiseToColumn(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...             "count": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     measurement=AddNoiseToSeries(add_laplace_noise),
...     measure_column="count",
... )
>>> # Apply measurement to data
>>> noisy_spark_dataframe = add_laplace_noise_to_column(spark_dataframe)
>>> print_sdf(noisy_spark_dataframe) 
    A   B   count
0  a1  b1 ...
1  a1  b2 ...
2  a2  b1 ...
3  a2  b2 ...
Measurement Contract:
>>> add_laplace_noise_to_column.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> add_laplace_noise_to_column.input_metric
OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference()))
>>> add_laplace_noise_to_column.output_measure
PureDP()
Privacy Guarantee:

AddNoiseToColumn’s privacy_function() returns the output of privacy function on the AddNoiseToSeries measurement.

>>> add_laplace_noise_to_column.privacy_function(1)
2
Parameters
__init__(input_domain, measurement, measure_column)#

Constructor.

Parameters

Note

The input metric of this measurement is derived from the measure_column and the input metric of the measurement to be applied. In particular, the input metric of this measurement is measurement.input_metric on the specified measure_column.

property measure_column#

Returns the name of the column to add noise to.

Return type

str

property measurement#

Returns the AddNoiseToSeries measurement to apply to measure column.

Return type

tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Raises

NotImplementedError – If the privacy_function() of the AddNoiseToSeries measurement raises NotImplementedError.

Return type

tmlt.core.utils.exact_number.ExactNumber

call(val)#

Applies measurement to measure column.

Parameters

val (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive#

Returns true iff the measurement is interactive.

Return type

bool

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool

class ApplyInPandas(input_domain, input_metric, aggregation_function)#

Bases: SparkMeasurement

Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.

Parameters
__init__(input_domain, input_metric, aggregation_function)#

Constructor.

Parameters
property aggregation_function#

Returns the aggregation function.

Return type

tmlt.core.measurements.pandas_measurements.dataframe.Aggregate

property input_domain#

Returns input domain.

Return type

tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Raises

NotImplementedError – If self.aggregation_function.privacy_function(d_in) raises NotImplementedError.

Return type

tmlt.core.utils.exact_number.ExactNumber

call(val)#

Returns DataFrame obtained by applying pandas aggregation to each group.

Parameters

val (tmlt.core.utils.grouped_dataframe.GroupedDataFrame) –

Return type

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive#

Returns true iff the measurement is interactive.

Return type

bool

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool

class GeometricPartitionSelection(input_domain, threshold, alpha, count_column=None)#

Bases: SparkMeasurement

Discovers the distinct rows in a DataFrame, suppressing infrequent rows.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
      A   B
0    a1  b1
1    a2  b2
2    a2  b2
3    a2  b2
4    a2  b2
..   ..  ..
96   a2  b2
97   a2  b2
98   a2  b2
99   a2  b2
100  a2  b2

[101 rows x 2 columns]
>>> measurement = GeometricPartitionSelection(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         },
...     ),
...     threshold=50,
...     alpha=1,
... )
>>> noisy_spark_dataframe = measurement(spark_dataframe) 
>>> print_sdf(noisy_spark_dataframe)  
    A   B  count
0  a2  b2    106
Measurement Contract:
>>> measurement.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> measurement.input_metric
SymmetricDifference()
>>> measurement.output_measure
ApproxDP()
Privacy Guarantee:

For \(d_{in} = 0\), returns \((0, 0)\)

For \(d_{in} = 1\), returns \((1/\alpha, 1 - CDF_{\alpha}[\tau - 2])\)

For \(d_{in} > 1\), returns \((d_{in} \cdot \epsilon, d_{in} \cdot e^{d_{in} \cdot \epsilon} \cdot \delta)\)

where:

>>> epsilon, delta = measurement.privacy_function(1)
>>> epsilon
1
>>> delta.to_float(round_up=True)
3.8328565409781243e-22
>>> epsilon, delta = measurement.privacy_function(2)
>>> epsilon
2
>>> delta.to_float(round_up=True)
5.664238400088129e-21
Methods#

alpha()

Returns the noise scale.

threshold()

Returns the minimum noisy count to include row.

count_column()

Returns the count column name.

privacy_function()

Returns the smallest d_out satisfied by the measurement.

call()

Return the noisy counts for common rows.

__call__()

Performs measurement and returns a DataFrame with additional protections.

input_domain()

Return input domain for the measurement.

input_metric()

Distance metric on input domain.

output_measure()

Distance measure on output.

is_interactive()

Returns true iff the measurement is interactive.

privacy_relation()

Return True if close inputs produce close outputs.

Parameters
__init__(input_domain, threshold, alpha, count_column=None)#

Constructor.

Parameters
property alpha#

Returns the noise scale.

Return type

tmlt.core.utils.exact_number.ExactNumber

property threshold#

Returns the minimum noisy count to include row.

Return type

int

property count_column#

Returns the count column name.

Return type

str

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

Tuple[tmlt.core.utils.exact_number.ExactNumber, tmlt.core.utils.exact_number.ExactNumber]

call(val)#

Return the noisy counts for common rows.

Parameters

val (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters

val (Any) –

Return type

pyspark.sql.DataFrame

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive#

Returns true iff the measurement is interactive.

Return type

bool

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool

class BoundSelection(input_domain, bound_column, alpha, threshold=0.95)#

Bases: tmlt.core.measurements.base.Measurement

Discovers a noisy bound based on a DataFrame Column.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
     A   B
0   a1  10
1   a1  10
2   a1  10
3   a1  10
4   a1  10
..  ..  ..
95  a1  40
96  a1  40
97  a1  40
98  a1  40
99  a1  40

[100 rows x 2 columns]
>>> measurement = BoundSelection(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     bound_column="B",
...     alpha=1,
...     threshold=0.95,
... )
>>> min_bound, max_bound = measurement(spark_dataframe) 
>>> print(f"Min: {min_bound}, Max: {max_bound}")  
Min: -64, Max: 64
Measurement Contract:
>>> measurement.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> measurement.input_metric
SymmetricDifference()
>>> measurement.output_measure
PureDP()
Privacy Guarantee:

For \(d_{in} = 0\), returns \(0\)

For \(d_{in} > 0\), returns \((4/\alpha) * d_{in}\)

where:

>>> measurement.privacy_function(1)
4
>>> measurement.privacy_function(2)
8
Methods#

bound_column()

Returns the column to compute the bounds for.

bound_column_type()

Returns the type of the bound column.

splits()

Returns the splits.

alpha()

Returns the alpha.

threshold()

Returns the threshold.

privacy_function()

Returns the smallest d_out satisfied by the measurement.

__call__()

Returns the bounds for the given column.

input_domain()

Return input domain for the measurement.

input_metric()

Distance metric on input domain.

output_measure()

Distance measure on output.

is_interactive()

Returns true iff the measurement is interactive.

privacy_relation()

Return True if close inputs produce close outputs.

Parameters
__init__(input_domain, bound_column, alpha, threshold=0.95)#

Constructor.

Parameters
  • input_domain (SparkDataFrameDomainSparkDataFrameDomain) – Domain of the input Spark DataFrames. Input must either be a column of floating point numbers or a column of integers.

  • bound_column (strstr) – Column name for finding the bounds. The column values must be between [-2^64 + 1, 2^64 - 1] for 64 bit integers, [-2^32 + 1, 2^32 - 1] for 32 bit integers and between [-2^100, 2^100] for floats.

  • alpha (ExactNumber | float | int | str | Fraction | ExprUnion[ExactNumber, float, int, str, Fraction, Expr]) – The noise scale parameter for Geometric noise that will be added to true number of values falling between the tested bounds on each round of the algorithm. Noise with scale of \(\alpha / 2\) will be added to compute the threshold. See AddGeometricNoise for more information.

  • threshold (floatfloat (default: 0.95)) – The fraction of the total count to use as the threshold. This value should be between (0, 1]. By default it is set to 0.95.

property bound_column#

Returns the column to compute the bounds for.

Return type

str

property bound_column_type#

Returns the type of the bound column.

Return type

tmlt.core.domains.spark_domains.SparkColumnDescriptor

property splits#

Returns the splits.

Return type

Union[List[float], List[int]]

property alpha#

Returns the alpha.

Return type

tmlt.core.utils.exact_number.ExactNumber

property threshold#

Returns the threshold.

Return type

float

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Returns the bounds for the given column.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

Union[Tuple[float, float], Tuple[int, int]]

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_measure#

Distance measure on output.

Return type

tmlt.core.measures.Measure

property is_interactive#

Returns true iff the measurement is interactive.

Return type

bool

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type

bool