spark_measurements#
Measurements on Spark DataFrames.
Classes#
Base class that materializes output DataFrames before returning. |
|
Adds noise to a single aggregated column of a Spark DataFrame. |
|
Applies a pandas dataframe aggregation to each group in a GroupedDataFrame. |
|
Discovers the distinct rows in a DataFrame, suppressing infrequent rows. |
- class SparkMeasurement(input_domain, input_metric, output_measure, is_interactive)#
Bases:
tmlt.core.measurements.base.Measurement
Base class that materializes output DataFrames before returning.
- Parameters
input_domain (tmlt.core.domains.base.Domain) –
input_metric (tmlt.core.metrics.Metric) –
output_measure (tmlt.core.measures.Measure) –
is_interactive (bool) –
- __init__(input_domain, input_metric, output_measure, is_interactive)#
Constructor.
- abstract call(self, val)#
Performs measurement.
Warning
Spark recomputes the output of this method (adding different noise each time) on every call to collect.
- Parameters
val (Any) –
- Return type
- __call__(self, val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_measure(self)#
Distance measure on output.
- Return type
- privacy_function(self, d_in)#
Returns the smallest d_out satisfied by the measurement.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If not overridden.
- Return type
Any
- privacy_relation(self, d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type
- class AddNoiseToColumn(input_domain, measurement, measure_column)#
Bases:
SparkMeasurement
Adds noise to a single aggregated column of a Spark DataFrame.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B count 0 a1 b1 3 1 a1 b2 2 2 a2 b1 1 3 a2 b2 0 >>> # Create a measurement that can add noise to a pd.Series >>> add_laplace_noise = AddLaplaceNoise( ... scale="0.5", ... input_domain=NumpyIntegerDomain(), ... ) >>> # Create a measurement that can add noise to a Spark DataFrame >>> add_laplace_noise_to_column = AddNoiseToColumn( ... input_domain=SparkDataFrameDomain( ... schema={ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "count": SparkIntegerColumnDescriptor(), ... }, ... ), ... measurement=AddNoiseToSeries(add_laplace_noise), ... measure_column="count", ... ) >>> # Apply measurement to data >>> noisy_spark_dataframe = add_laplace_noise_to_column(spark_dataframe) >>> print_sdf(noisy_spark_dataframe) A B count 0 a1 b1 ... 1 a1 b2 ... 2 a2 b1 ... 3 a2 b2 ...
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Spark DataFrame
Input metric -
OnColumn
with metric SumOf(SymmetricDifference()) (forPureDP
) or RootSumOfSquared(SymmetricDifference()) (forRhoZCDP
) on each column.
>>> add_laplace_noise_to_column.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> add_laplace_noise_to_column.input_metric OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference())) >>> add_laplace_noise_to_column.output_measure PureDP()
- Privacy Guarantee:
AddNoiseToColumn
’sprivacy_function()
returns the output of privacy function on theAddNoiseToSeries
measurement.>>> add_laplace_noise_to_column.privacy_function(1) 2
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) –
measurement (tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries) –
measure_column (str) –
- __init__(input_domain, measurement, measure_column)#
Constructor.
- Parameters
input_domain (
SparkDataFrameDomain
SparkDataFrameDomain
) – Domain of input spark DataFrames.measurement (
AddNoiseToSeries
AddNoiseToSeries
) –AddNoiseToSeries
measurement for adding noise to measure_column.
Note
The input metric of this measurement is derived from the measure_column and the input metric of the measurement to be applied. In particular, the input metric of this measurement is measurement.input_metric on the specified measure_column.
- property measurement(self)#
Returns the
AddNoiseToSeries
measurement to apply to measure column.
- privacy_function(self, d_in)#
Returns the smallest d_out satisfied by the measurement.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If the
privacy_function()
of theAddNoiseToSeries
measurement raisesNotImplementedError
.- Return type
- call(self, sdf)#
Applies measurement to measure column.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- __call__(self, val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_measure(self)#
Distance measure on output.
- Return type
- privacy_relation(self, d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type
- class ApplyInPandas(input_domain, input_metric, aggregation_function)#
Bases:
SparkMeasurement
Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain) –
input_metric (Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared]) –
aggregation_function (tmlt.core.measurements.pandas_measurements.dataframe.Aggregate) –
- __init__(input_domain, input_metric, aggregation_function)#
Constructor.
- Parameters
input_domain (
SparkGroupedDataFrameDomain
SparkGroupedDataFrameDomain
) – Domain of the input GroupedDataFrames.input_metric (
SumOf
|RootSumOfSquared
Union
[SumOf
,RootSumOfSquared
]) – Distance metric on inputs. It must one ofSumOf
orRootSumOfSquared
with inner metricSymmetricDifference
.aggregation_function (
Aggregate
Aggregate
) – An Aggregation measurement to be applied to each group. The input domain of this measurement must be aPandasDataFrameDomain
corresponding to a subset of the non-grouping columns in the input_domain.
- property aggregation_function(self)#
Returns the aggregation function.
- property input_domain(self)#
Returns input domain.
- privacy_function(self, d_in)#
Returns the smallest d_out satisfied by the measurement.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If self.aggregation_function.privacy_function(d_in) raises
NotImplementedError
.- Return type
- call(self, grouped_dataframe)#
Returns DataFrame obtained by applying pandas aggregation to each group.
- Parameters
grouped_dataframe (tmlt.core.utils.grouped_dataframe.GroupedDataFrame) –
- Return type
- __call__(self, val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_measure(self)#
Distance measure on output.
- Return type
- privacy_relation(self, d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type
- class GeometricPartitionSelection(input_domain, threshold, alpha, count_column=None)#
Bases:
SparkMeasurement
Discovers the distinct rows in a DataFrame, suppressing infrequent rows.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b2 2 a2 b2 3 a2 b2 4 a2 b2 .. .. .. 96 a2 b2 97 a2 b2 98 a2 b2 99 a2 b2 100 a2 b2 [101 rows x 2 columns] >>> measurement = GeometricPartitionSelection( ... input_domain=SparkDataFrameDomain( ... schema={ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... }, ... ), ... threshold=50, ... alpha=1, ... ) >>> noisy_spark_dataframe = measurement(spark_dataframe) >>> print_sdf(noisy_spark_dataframe) A B count 0 a2 b2 106
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Spark DataFrame
Input metric -
SymmetricDifference
Output measure -
ApproxDP
>>> measurement.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> measurement.input_metric SymmetricDifference() >>> measurement.output_measure ApproxDP()
- Privacy Guarantee:
For \(d_{in} = 0\), returns \((0, 0)\)
For \(d_{in} = 1\), returns \((1/\alpha, 1 - CDF_{\alpha}[\tau - 2])\)
For \(d_{in} > 1\), returns \((d_{in} \cdot \epsilon, d_{in} \cdot e^{d_{in} \cdot \epsilon} \cdot \delta)\)
where:
\(\alpha\) is
alpha
\(\tau\) is
threshold
\(\epsilon\) is the first element returned for the \(d_{in} = 1\) case
\(\delta\) is the second element returned for the \(d_{in} = 1\) case
\(CDF_{\alpha}\) is
double_sided_geometric_cmf_exact()
>>> epsilon, delta = measurement.privacy_function(1) >>> epsilon 1 >>> delta.to_float(round_up=True) 3.8328565409781243e-22 >>> epsilon, delta = measurement.privacy_function(2) >>> epsilon 2 >>> delta.to_float(round_up=True) 5.664238400088129e-21
# Returns the noise scale.
Returns the minimum noisy count to include row.
Returns the count column name.
Returns the smallest d_out satisfied by the measurement.
Return the noisy counts for common rows.
Performs measurement and returns a DataFrame with additional protections.
Return input domain for the measurement.
Distance metric on input domain.
Distance measure on output.
Returns true iff the measurement is interactive.
Return True if close inputs produce close outputs.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) –
threshold (int) –
alpha (tmlt.core.utils.exact_number.ExactNumberInput) –
count_column (Optional[str]) –
- __init__(input_domain, threshold, alpha, count_column=None)#
Constructor.
- Parameters
input_domain (
SparkDataFrameDomain
SparkDataFrameDomain
) – Domain of the input Spark DataFrames. Input cannot contain floating point columns.threshold (
int
int
) – The minimum threshold for the noisy count to have to be released. Can be nonpositive, but must be integral.alpha (
ExactNumber
|float
|int
|str
|Fraction
|Expr
Union
[ExactNumber
,float
,int
,str
,Fraction
,Expr
]) – The noise scale parameter for Geometric noise. SeeAddGeometricNoise
for more information.count_column (
str
|None
Optional
[str
] (default:None
)) – Column name for output group counts. If None, output column will be named “count”.
- property alpha(self)#
Returns the noise scale.
- Return type
- privacy_function(self, d_in)#
Returns the smallest d_out satisfied by the measurement.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
Tuple[tmlt.core.utils.exact_number.ExactNumber, tmlt.core.utils.exact_number.ExactNumber]
- call(self, sdf)#
Return the noisy counts for common rows.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- __call__(self, val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_measure(self)#
Distance measure on output.
- Return type
- privacy_relation(self, d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type