spark_measurements#
Measurements on Spark DataFrames.
See the architecture guide for more information.
Classes#
Base class that materializes output DataFrames before returning. |
|
Adds noise to a single aggregated column of a Spark DataFrame. |
|
Applies a pandas dataframe aggregation to each group in a GroupedDataFrame. |
|
Discovers the distinct rows in a DataFrame, suppressing infrequent rows. |
|
Find the rank of the row causing the prefix sum to exceed the threshold. |
- class SparkMeasurement(input_domain, input_metric, output_measure, is_interactive)#
Bases:
tmlt.core.measurements.base.Measurement
Base class that materializes output DataFrames before returning.
- Parameters:
input_domain (tmlt.core.domains.base.Domain)
input_metric (tmlt.core.metrics.Metric)
output_measure (tmlt.core.measures.Measure)
is_interactive (bool)
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_measure: tmlt.core.measures.Measure#
Distance measure on output.
- Return type:
- __init__(input_domain, input_metric, output_measure, is_interactive)#
Constructor.
- abstract call(val)#
Performs measurement.
Warning
Spark recomputes the output of this method (adding different noise each time) on every call to collect.
- Parameters:
val (Any)
- Return type:
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters:
val (Any)
- Return type:
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
- Raises:
NotImplementedError – If not overridden.
- Return type:
Any
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under
input_metric
.d_out (Any) – Distance between outputs under
output_measure
.
- Return type:
- class AddNoiseToColumn(input_domain, measurement, measure_column)#
Bases:
SparkMeasurement
Adds noise to a single aggregated column of a Spark DataFrame.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B count 0 a1 b1 3 1 a1 b2 2 2 a2 b1 1 3 a2 b2 0 >>> # Create a measurement that can add noise to a pd.Series >>> add_laplace_noise = AddLaplaceNoise( ... scale="0.5", ... input_domain=NumpyIntegerDomain(), ... ) >>> # Create a measurement that can add noise to a Spark DataFrame >>> add_laplace_noise_to_column = AddNoiseToColumn( ... input_domain=SparkDataFrameDomain( ... schema={ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "count": SparkIntegerColumnDescriptor(), ... }, ... ), ... measurement=AddNoiseToSeries(add_laplace_noise), ... measure_column="count", ... ) >>> # Apply measurement to data >>> noisy_spark_dataframe = add_laplace_noise_to_column(spark_dataframe) >>> print_sdf(noisy_spark_dataframe) A B count 0 a1 b1 ... 1 a1 b2 ... 2 a2 b1 ... 3 a2 b2 ...
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Spark DataFrame
Input metric -
OnColumn
with metricSumOf(SymmetricDifference())
(forPureDP
) orRootSumOfSquared(SymmetricDifference())
(forRhoZCDP
) on each column.
>>> add_laplace_noise_to_column.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> add_laplace_noise_to_column.input_metric OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference())) >>> add_laplace_noise_to_column.output_measure PureDP()
- Privacy Guarantee:
AddNoiseToColumn
’sprivacy_function()
returns the output of privacy function on theAddNoiseToSeries
measurement.>>> add_laplace_noise_to_column.privacy_function(1) 2
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain)
measurement (tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries)
measure_column (str)
- property measurement: tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries#
Returns the
AddNoiseToSeries
measurement to apply to measure column.
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_measure: tmlt.core.measures.Measure#
Distance measure on output.
- Return type:
- __init__(input_domain, measurement, measure_column)#
Constructor.
- Parameters:
input_domain (
SparkDataFrameDomain
) – Domain of input spark DataFrames.measurement (
AddNoiseToSeries
) –AddNoiseToSeries
measurement for adding noise tomeasure_column
.measure_column (
str
) – Name of column to add noise to.
Note
The input metric of this measurement is derived from the
measure_column
and the input metric of themeasurement
to be applied. In particular, the input metric of this measurement ismeasurement.input_metric
on the specifiedmeasure_column
.
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Raises:
NotImplementedError – If the
privacy_function()
of theAddNoiseToSeries
measurement raisesNotImplementedError
.- Return type:
- call(val)#
Applies measurement to measure column.
- Parameters:
val (pyspark.sql.DataFrame)
- Return type:
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters:
val (Any)
- Return type:
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under
input_metric
.d_out (Any) – Distance between outputs under
output_measure
.
- Return type:
- class ApplyInPandas(input_domain, input_metric, aggregation_function)#
Bases:
SparkMeasurement
Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain)
input_metric (Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared])
aggregation_function (tmlt.core.measurements.pandas_measurements.dataframe.Aggregate)
- property aggregation_function: tmlt.core.measurements.pandas_measurements.dataframe.Aggregate#
Returns the aggregation function.
- property input_domain: tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain#
Returns input domain.
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_measure: tmlt.core.measures.Measure#
Distance measure on output.
- Return type:
- __init__(input_domain, input_metric, aggregation_function)#
Constructor.
- Parameters:
input_domain (
SparkGroupedDataFrameDomain
) – Domain of the input GroupedDataFrames.input_metric (
Union
[SumOf
,RootSumOfSquared
]) – Distance metric on inputs. It must one ofSumOf
orRootSumOfSquared
with inner metricSymmetricDifference
.aggregation_function (
Aggregate
) – An Aggregation measurement to be applied to each group. The input domain of this measurement must be aPandasDataFrameDomain
corresponding to a subset of the non-grouping columns in theinput_domain
.
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Raises:
NotImplementedError – If self.aggregation_function.privacy_function(d_in) raises
NotImplementedError
.- Return type:
- call(val)#
Returns DataFrame obtained by applying pandas aggregation to each group.
- Parameters:
- Return type:
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters:
val (Any)
- Return type:
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under
input_metric
.d_out (Any) – Distance between outputs under
output_measure
.
- Return type:
- class GeometricPartitionSelection(input_domain, threshold, alpha, count_column=None)#
Bases:
SparkMeasurement
Discovers the distinct rows in a DataFrame, suppressing infrequent rows.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b2 2 a2 b2 3 a2 b2 4 a2 b2 .. .. .. 96 a2 b2 97 a2 b2 98 a2 b2 99 a2 b2 100 a2 b2 [101 rows x 2 columns] >>> measurement = GeometricPartitionSelection( ... input_domain=SparkDataFrameDomain( ... schema={ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... }, ... ), ... threshold=50, ... alpha=1, ... ) >>> noisy_spark_dataframe = measurement(spark_dataframe) >>> print_sdf(noisy_spark_dataframe) A B count 0 a2 b2 106
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Spark DataFrame
Input metric -
SymmetricDifference
Output measure -
ApproxDP
>>> measurement.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> measurement.input_metric SymmetricDifference() >>> measurement.output_measure ApproxDP()
- Privacy Guarantee:
For \(d_{in} = 0\), returns \((0, 0)\)
For \(d_{in} = 1\), returns \((1/\alpha, 1 - CDF_{\alpha}[\tau - 2])\)
For \(d_{in} > 1\), returns \((d_{in} \cdot \epsilon, d_{in} \cdot e^{d_{in} \cdot \epsilon} \cdot \delta)\)
where:
\(\alpha\) is
alpha
\(\tau\) is
threshold
\(\epsilon\) is the first element returned for the \(d_{in} = 1\) case
\(\delta\) is the second element returned for the \(d_{in} = 1\) case
\(CDF_{\alpha}\) is
double_sided_geometric_cmf_exact()
>>> epsilon, delta = measurement.privacy_function(1) >>> epsilon 1 >>> delta.to_float(round_up=True) 3.8328565409781243e-22 >>> epsilon, delta = measurement.privacy_function(2) >>> epsilon 2 >>> delta.to_float(round_up=True) 5.664238400088129e-21
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain)
threshold (int)
alpha (tmlt.core.utils.exact_number.ExactNumberInput)
count_column (Optional[str])
- property alpha: tmlt.core.utils.exact_number.ExactNumber#
Returns the noise scale.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_measure: tmlt.core.measures.Measure#
Distance measure on output.
- Return type:
- __init__(input_domain, threshold, alpha, count_column=None)#
Constructor.
- Parameters:
input_domain (
SparkDataFrameDomain
) – Domain of the input Spark DataFrames. Input cannot contain floating point columns.threshold (
int
) – The minimum threshold for the noisy count to have to be released. Can be nonpositive, but must be integral.alpha (
Union
[ExactNumber
,float
,int
,str
,Fraction
,Expr
]) – The noise scale parameter for Geometric noise. SeeAddGeometricNoise
for more information.count_column (
Optional
[str
]) – Column name for output group counts. If None, output column will be named “count”.
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type:
Tuple[tmlt.core.utils.exact_number.ExactNumber, tmlt.core.utils.exact_number.ExactNumber]
- call(val)#
Return the noisy counts for common rows.
- Parameters:
val (pyspark.sql.DataFrame)
- Return type:
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters:
val (Any)
- Return type:
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under
input_metric
.d_out (Any) – Distance between outputs under
output_measure
.
- Return type:
- class SparseVectorPrefixSums(input_domain, count_column, rank_column, alpha, grouping_columns=None, threshold_fraction=0.95)#
Bases:
SparkMeasurement
Find the rank of the row causing the prefix sum to exceed the threshold.
Example
>>> # Example input >>> print_sdf(spark_dataframe) grouping rank count 0 A 0 1 1 A 1 1 2 A 2 1 3 A 3 1 4 A 4 1 5 A 5 1 6 A 6 1 7 A 7 1 8 A 8 1 9 A 9 1 10 B -5 2 11 B -4 2 12 B -3 2 13 B -2 2 14 B -1 1000 15 B 0 2 16 B 1 2 17 B 2 2 18 B 3 2 19 B 4 2
>>> measurement = SparseVectorPrefixSums( ... input_domain=SparkDataFrameDomain( ... schema={ ... "grouping": SparkStringColumnDescriptor(), ... "rank": SparkIntegerColumnDescriptor(), ... "count": SparkIntegerColumnDescriptor(), ... }, ... ), ... count_column="count", ... rank_column="rank", ... alpha=1, ... grouping_columns=["grouping"], ... threshold_fraction=0.90, ... ) >>> noisy_spark_dataframe = measurement(spark_dataframe) >>> print_sdf(noisy_spark_dataframe) grouping rank 0 A 8 1 B -1
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Spark DataFrame
- Input metric -
OnColumn
(with inner metric SumOf(AbsoluteDifference())
) on thecount_column
).
- Input metric -
Output measure -
PureDP
>>> measurement.input_domain SparkDataFrameDomain(schema={'grouping': SparkStringColumnDescriptor(allow_null=False), 'rank': SparkIntegerColumnDescriptor(allow_null=False, size=64), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> measurement.input_metric OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference())) >>> measurement.output_measure PureDP()
- Privacy Guarantee:
For \(d_{in} = 0\), returns \(0\)
For \(d_{in} \ge 1\), returns \((4 / \alpha) \cdot d_{in}\)
where:
\(\alpha\) is
alpha
>>> measurement.privacy_function(1) 4 >>> measurement.privacy_function(2) 8
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain)
count_column (str)
rank_column (str)
alpha (tmlt.core.utils.exact_number.ExactNumberInput)
grouping_columns (Optional[List[str]])
threshold_fraction (float)
- property alpha: tmlt.core.utils.exact_number.ExactNumber#
Returns the alpha.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_measure: tmlt.core.measures.Measure#
Distance measure on output.
- Return type:
- __init__(input_domain, count_column, rank_column, alpha, grouping_columns=None, threshold_fraction=0.95)#
Constructor.
- Parameters:
input_domain (
SparkDataFrameDomain
) – Dataframe containing bin counts.count_column (
str
) – Column name for the column containing the counts.rank_column (
str
) – Column name for the column defining the ranking on rows to compute prefix sums.alpha (
Union
[ExactNumber
,float
,int
,str
,Fraction
,Expr
]) – The noise scale parameter for Geometric noise that will be added to each prefix sum. Noise with scale of \(\alpha / 2\) will be added when computing the threshold. SeeAddGeometricNoise
for more information.grouping_columns (
Optional
[List
[str
]]) – Optional list of column names defining the groups. The output dataframe will contain one row per group. If None, the entire input dataframe is treated as a single group.threshold_fraction (
float
) – The fraction of the total count to use as the threshold. This value should be between (0, 1]. By default it is set to 0.95.
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type:
- call(val)#
Return row causing prefix sum to exceed the threshold.
- Parameters:
val (pyspark.sql.DataFrame)
- Return type:
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters:
val (Any)
- Return type:
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under
input_metric
.d_out (Any) – Distance between outputs under
output_measure
.
- Return type: