spark_measurements#

Measurements on Spark DataFrames.

See the architecture guide for more information.

Classes#

SparkMeasurement

Base class that materializes output DataFrames before returning.

AddNoiseToColumn

Adds noise to a single aggregated column of a Spark DataFrame.

ApplyInPandas

Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.

GeometricPartitionSelection

Discovers the distinct rows in a DataFrame, suppressing infrequent rows.

SparseVectorPrefixSums

Find the rank of the row causing the prefix sum to exceed the threshold.

class SparkMeasurement(input_domain, input_metric, output_measure, is_interactive)#

Bases: tmlt.core.measurements.base.Measurement

Base class that materializes output DataFrames before returning.

Parameters:
property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_measure: tmlt.core.measures.Measure#

Distance measure on output.

Return type:

tmlt.core.measures.Measure

property is_interactive: bool#

Returns true iff the measurement is interactive.

Return type:

bool

__init__(input_domain, input_metric, output_measure, is_interactive)#

Constructor.

Parameters:
  • input_domain (Domain) – Domain of input datasets.

  • input_metric (Metric) – Distance metric for input datasets.

  • output_measure (Measure) – Distance measure for measurement’s output.

  • is_interactive (bool) – Whether the measurement is interactive.

abstract call(val)#

Performs measurement.

Warning

Spark recomputes the output of this method (adding different noise each time) on every call to collect.

Parameters:

val (Any)

Return type:

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters:

val (Any)

Return type:

pyspark.sql.DataFrame

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the privacy and stability tutorial (add link?) for more information.

Parameters:

d_in (Any) – Distance between inputs under input_metric.

Raises:

NotImplementedError – If not overridden.

Return type:

Any

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type:

bool

class AddNoiseToColumn(input_domain, measurement, measure_column)#

Bases: SparkMeasurement

Adds noise to a single aggregated column of a Spark DataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B  count
0  a1  b1      3
1  a1  b2      2
2  a2  b1      1
3  a2  b2      0
>>> # Create a measurement that can add noise to a pd.Series
>>> add_laplace_noise = AddLaplaceNoise(
...     scale="0.5",
...     input_domain=NumpyIntegerDomain(),
... )
>>> # Create a measurement that can add noise to a Spark DataFrame
>>> add_laplace_noise_to_column = AddNoiseToColumn(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...             "count": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     measurement=AddNoiseToSeries(add_laplace_noise),
...     measure_column="count",
... )
>>> # Apply measurement to data
>>> noisy_spark_dataframe = add_laplace_noise_to_column(spark_dataframe)
>>> print_sdf(noisy_spark_dataframe)  
    A   B   count
0  a1  b1 ...
1  a1  b2 ...
2  a2  b1 ...
3  a2  b2 ...
Measurement Contract:
>>> add_laplace_noise_to_column.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> add_laplace_noise_to_column.input_metric
OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference()))
>>> add_laplace_noise_to_column.output_measure
PureDP()
Privacy Guarantee:

AddNoiseToColumn’s privacy_function() returns the output of privacy function on the AddNoiseToSeries measurement.

>>> add_laplace_noise_to_column.privacy_function(1)
2
Parameters:
property measure_column: str#

Returns the name of the column to add noise to.

Return type:

str

property measurement: tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries#

Returns the AddNoiseToSeries measurement to apply to measure column.

Return type:

tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_measure: tmlt.core.measures.Measure#

Distance measure on output.

Return type:

tmlt.core.measures.Measure

property is_interactive: bool#

Returns true iff the measurement is interactive.

Return type:

bool

__init__(input_domain, measurement, measure_column)#

Constructor.

Parameters:

Note

The input metric of this measurement is derived from the measure_column and the input metric of the measurement to be applied. In particular, the input metric of this measurement is measurement.input_metric on the specified measure_column.

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Raises:

NotImplementedError – If the privacy_function() of the AddNoiseToSeries measurement raises NotImplementedError.

Return type:

tmlt.core.utils.exact_number.ExactNumber

call(val)#

Applies measurement to measure column.

Parameters:

val (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters:

val (Any)

Return type:

pyspark.sql.DataFrame

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type:

bool

class ApplyInPandas(input_domain, input_metric, aggregation_function)#

Bases: SparkMeasurement

Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.

Parameters:
property aggregation_function: tmlt.core.measurements.pandas_measurements.dataframe.Aggregate#

Returns the aggregation function.

Return type:

tmlt.core.measurements.pandas_measurements.dataframe.Aggregate

property input_domain: tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain#

Returns input domain.

Return type:

tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_measure: tmlt.core.measures.Measure#

Distance measure on output.

Return type:

tmlt.core.measures.Measure

property is_interactive: bool#

Returns true iff the measurement is interactive.

Return type:

bool

__init__(input_domain, input_metric, aggregation_function)#

Constructor.

Parameters:
privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Raises:

NotImplementedError – If self.aggregation_function.privacy_function(d_in) raises NotImplementedError.

Return type:

tmlt.core.utils.exact_number.ExactNumber

call(val)#

Returns DataFrame obtained by applying pandas aggregation to each group.

Parameters:

val (tmlt.core.utils.grouped_dataframe.GroupedDataFrame)

Return type:

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters:

val (Any)

Return type:

pyspark.sql.DataFrame

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type:

bool

class GeometricPartitionSelection(input_domain, threshold, alpha, count_column=None)#

Bases: SparkMeasurement

Discovers the distinct rows in a DataFrame, suppressing infrequent rows.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
      A   B
0    a1  b1
1    a2  b2
2    a2  b2
3    a2  b2
4    a2  b2
..   ..  ..
96   a2  b2
97   a2  b2
98   a2  b2
99   a2  b2
100  a2  b2

[101 rows x 2 columns]
>>> measurement = GeometricPartitionSelection(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         },
...     ),
...     threshold=50,
...     alpha=1,
... )
>>> noisy_spark_dataframe = measurement(spark_dataframe)  
>>> print_sdf(noisy_spark_dataframe)  
    A   B  count
0  a2  b2    106
Measurement Contract:
>>> measurement.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> measurement.input_metric
SymmetricDifference()
>>> measurement.output_measure
ApproxDP()
Privacy Guarantee:

For \(d_{in} = 0\), returns \((0, 0)\)

For \(d_{in} = 1\), returns \((1/\alpha, 1 - CDF_{\alpha}[\tau - 2])\)

For \(d_{in} > 1\), returns \((d_{in} \cdot \epsilon, d_{in} \cdot e^{d_{in} \cdot \epsilon} \cdot \delta)\)

where:

>>> epsilon, delta = measurement.privacy_function(1)
>>> epsilon
1
>>> delta.to_float(round_up=True)
3.8328565409781243e-22
>>> epsilon, delta = measurement.privacy_function(2)
>>> epsilon
2
>>> delta.to_float(round_up=True)
5.664238400088129e-21
Parameters:
property alpha: tmlt.core.utils.exact_number.ExactNumber#

Returns the noise scale.

Return type:

tmlt.core.utils.exact_number.ExactNumber

property threshold: int#

Returns the minimum noisy count to include row.

Return type:

int

property count_column: str#

Returns the count column name.

Return type:

str

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_measure: tmlt.core.measures.Measure#

Distance measure on output.

Return type:

tmlt.core.measures.Measure

property is_interactive: bool#

Returns true iff the measurement is interactive.

Return type:

bool

__init__(input_domain, threshold, alpha, count_column=None)#

Constructor.

Parameters:
  • input_domain (SparkDataFrameDomain) – Domain of the input Spark DataFrames. Input cannot contain floating point columns.

  • threshold (int) – The minimum threshold for the noisy count to have to be released. Can be nonpositive, but must be integral.

  • alpha (Union[ExactNumber, float, int, str, Fraction, Expr]) – The noise scale parameter for Geometric noise. See AddGeometricNoise for more information.

  • count_column (Optional[str]) – Column name for output group counts. If None, output column will be named “count”.

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type:

Tuple[tmlt.core.utils.exact_number.ExactNumber, tmlt.core.utils.exact_number.ExactNumber]

call(val)#

Return the noisy counts for common rows.

Parameters:

val (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters:

val (Any)

Return type:

pyspark.sql.DataFrame

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type:

bool

class SparseVectorPrefixSums(input_domain, count_column, rank_column, alpha, grouping_columns=None, threshold_fraction=0.95)#

Bases: SparkMeasurement

Find the rank of the row causing the prefix sum to exceed the threshold.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
   grouping  rank  count
0         A     0      1
1         A     1      1
2         A     2      1
3         A     3      1
4         A     4      1
5         A     5      1
6         A     6      1
7         A     7      1
8         A     8      1
9         A     9      1
10        B    -5      2
11        B    -4      2
12        B    -3      2
13        B    -2      2
14        B    -1   1000
15        B     0      2
16        B     1      2
17        B     2      2
18        B     3      2
19        B     4      2
>>> measurement = SparseVectorPrefixSums(
...     input_domain=SparkDataFrameDomain(
...         schema={
...             "grouping": SparkStringColumnDescriptor(),
...             "rank": SparkIntegerColumnDescriptor(),
...             "count": SparkIntegerColumnDescriptor(),
...         },
...     ),
...     count_column="count",
...     rank_column="rank",
...     alpha=1,
...     grouping_columns=["grouping"],
...     threshold_fraction=0.90,
... )
>>> noisy_spark_dataframe = measurement(spark_dataframe)  
>>> print_sdf(noisy_spark_dataframe)  
  grouping  rank
0        A     8
1        B    -1
Measurement Contract:
  • Input domain - SparkDataFrameDomain

  • Output type - Spark DataFrame

  • Input metric - OnColumn (with inner metric

    SumOf(AbsoluteDifference())) on the count_column).

  • Output measure - PureDP

>>> measurement.input_domain
SparkDataFrameDomain(schema={'grouping': SparkStringColumnDescriptor(allow_null=False), 'rank': SparkIntegerColumnDescriptor(allow_null=False, size=64), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> measurement.input_metric
OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference()))
>>> measurement.output_measure
PureDP()
Privacy Guarantee:

For \(d_{in} = 0\), returns \(0\)

For \(d_{in} \ge 1\), returns \((4 / \alpha) \cdot d_{in}\)

where:

>>> measurement.privacy_function(1)
4
>>> measurement.privacy_function(2)
8
Parameters:
property alpha: tmlt.core.utils.exact_number.ExactNumber#

Returns the alpha.

Return type:

tmlt.core.utils.exact_number.ExactNumber

property threshold_fraction: float#

Returns the threshold.

Return type:

float

property count_column: str#

Returns the count column.

Return type:

str

property rank_column: str#

Returns the rank column.

Return type:

str

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_measure: tmlt.core.measures.Measure#

Distance measure on output.

Return type:

tmlt.core.measures.Measure

property is_interactive: bool#

Returns true iff the measurement is interactive.

Return type:

bool

__init__(input_domain, count_column, rank_column, alpha, grouping_columns=None, threshold_fraction=0.95)#

Constructor.

Parameters:
  • input_domain (SparkDataFrameDomain) – Dataframe containing bin counts.

  • count_column (str) – Column name for the column containing the counts.

  • rank_column (str) – Column name for the column defining the ranking on rows to compute prefix sums.

  • alpha (Union[ExactNumber, float, int, str, Fraction, Expr]) – The noise scale parameter for Geometric noise that will be added to each prefix sum. Noise with scale of \(\alpha / 2\) will be added when computing the threshold. See AddGeometricNoise for more information.

  • grouping_columns (Optional[List[str]]) – Optional list of column names defining the groups. The output dataframe will contain one row per group. If None, the entire input dataframe is treated as a single group.

  • threshold_fraction (float) – The fraction of the total count to use as the threshold. This value should be between (0, 1]. By default it is set to 0.95.

privacy_function(d_in)#

Returns the smallest d_out satisfied by the measurement.

See the architecture guide for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type:

tmlt.core.utils.exact_number.ExactNumber

call(val)#

Return row causing prefix sum to exceed the threshold.

Parameters:

val (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

__call__(val)#

Performs measurement and returns a DataFrame with additional protections.

See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.

Parameters:

val (Any)

Return type:

pyspark.sql.DataFrame

privacy_relation(d_in, d_out)#

Return True if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_measure.

Return type:

bool