spark_measurements#
Measurements on Spark DataFrames.
See the architecture guide for more information.
Classes#
Base class that materializes output DataFrames before returning. |
|
Adds noise to a single aggregated column of a Spark DataFrame. |
|
Applies a pandas dataframe aggregation to each group in a GroupedDataFrame. |
|
Discovers the distinct rows in a DataFrame, suppressing infrequent rows. |
|
Discovers a noisy bound based on a DataFrame Column. |
- class SparkMeasurement(input_domain, input_metric, output_measure, is_interactive)#
Bases:
tmlt.core.measurements.base.Measurement
Base class that materializes output DataFrames before returning.
- Parameters
input_domain (tmlt.core.domains.base.Domain) –
input_metric (tmlt.core.metrics.Metric) –
output_measure (tmlt.core.measures.Measure) –
is_interactive (bool) –
- __init__(input_domain, input_metric, output_measure, is_interactive)#
Constructor.
- abstract call(val)#
Performs measurement.
Warning
Spark recomputes the output of this method (adding different noise each time) on every call to collect.
- Parameters
val (Any) –
- Return type
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_measure#
Distance measure on output.
- Return type
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If not overridden.
- Return type
Any
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type
- class AddNoiseToColumn(input_domain, measurement, measure_column)#
Bases:
SparkMeasurement
Adds noise to a single aggregated column of a Spark DataFrame.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B count 0 a1 b1 3 1 a1 b2 2 2 a2 b1 1 3 a2 b2 0 >>> # Create a measurement that can add noise to a pd.Series >>> add_laplace_noise = AddLaplaceNoise( ... scale="0.5", ... input_domain=NumpyIntegerDomain(), ... ) >>> # Create a measurement that can add noise to a Spark DataFrame >>> add_laplace_noise_to_column = AddNoiseToColumn( ... input_domain=SparkDataFrameDomain( ... schema={ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "count": SparkIntegerColumnDescriptor(), ... }, ... ), ... measurement=AddNoiseToSeries(add_laplace_noise), ... measure_column="count", ... ) >>> # Apply measurement to data >>> noisy_spark_dataframe = add_laplace_noise_to_column(spark_dataframe) >>> print_sdf(noisy_spark_dataframe) A B count 0 a1 b1 ... 1 a1 b2 ... 2 a2 b1 ... 3 a2 b2 ...
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Spark DataFrame
Input metric -
OnColumn
with metric SumOf(SymmetricDifference()) (forPureDP
) or RootSumOfSquared(SymmetricDifference()) (forRhoZCDP
) on each column.
>>> add_laplace_noise_to_column.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'count': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> add_laplace_noise_to_column.input_metric OnColumn(column='count', metric=SumOf(inner_metric=AbsoluteDifference())) >>> add_laplace_noise_to_column.output_measure PureDP()
- Privacy Guarantee:
AddNoiseToColumn
’sprivacy_function()
returns the output of privacy function on theAddNoiseToSeries
measurement.>>> add_laplace_noise_to_column.privacy_function(1) 2
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) –
measurement (tmlt.core.measurements.pandas_measurements.series.AddNoiseToSeries) –
measure_column (str) –
- __init__(input_domain, measurement, measure_column)#
Constructor.
- Parameters
input_domain (
SparkDataFrameDomain
SparkDataFrameDomain
) – Domain of input spark DataFrames.measurement (
AddNoiseToSeries
AddNoiseToSeries
) –AddNoiseToSeries
measurement for adding noise to measure_column.
Note
The input metric of this measurement is derived from the measure_column and the input metric of the measurement to be applied. In particular, the input metric of this measurement is measurement.input_metric on the specified measure_column.
- property measurement#
Returns the
AddNoiseToSeries
measurement to apply to measure column.
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If the
privacy_function()
of theAddNoiseToSeries
measurement raisesNotImplementedError
.- Return type
- call(val)#
Applies measurement to measure column.
- Parameters
val (pyspark.sql.DataFrame) –
- Return type
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_measure#
Distance measure on output.
- Return type
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type
- class ApplyInPandas(input_domain, input_metric, aggregation_function)#
Bases:
SparkMeasurement
Applies a pandas dataframe aggregation to each group in a GroupedDataFrame.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkGroupedDataFrameDomain) –
input_metric (Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared]) –
aggregation_function (tmlt.core.measurements.pandas_measurements.dataframe.Aggregate) –
- __init__(input_domain, input_metric, aggregation_function)#
Constructor.
- Parameters
input_domain (
SparkGroupedDataFrameDomain
SparkGroupedDataFrameDomain
) – Domain of the input GroupedDataFrames.input_metric (
SumOf
|RootSumOfSquared
Union
[SumOf
,RootSumOfSquared
]) – Distance metric on inputs. It must one ofSumOf
orRootSumOfSquared
with inner metricSymmetricDifference
.aggregation_function (
Aggregate
Aggregate
) – An Aggregation measurement to be applied to each group. The input domain of this measurement must be aPandasDataFrameDomain
corresponding to a subset of the non-grouping columns in the input_domain.
- property aggregation_function#
Returns the aggregation function.
- property input_domain#
Returns input domain.
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If self.aggregation_function.privacy_function(d_in) raises
NotImplementedError
.- Return type
- call(val)#
Returns DataFrame obtained by applying pandas aggregation to each group.
- Parameters
- Return type
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_measure#
Distance measure on output.
- Return type
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type
- class GeometricPartitionSelection(input_domain, threshold, alpha, count_column=None)#
Bases:
SparkMeasurement
Discovers the distinct rows in a DataFrame, suppressing infrequent rows.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b2 2 a2 b2 3 a2 b2 4 a2 b2 .. .. .. 96 a2 b2 97 a2 b2 98 a2 b2 99 a2 b2 100 a2 b2 [101 rows x 2 columns] >>> measurement = GeometricPartitionSelection( ... input_domain=SparkDataFrameDomain( ... schema={ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... }, ... ), ... threshold=50, ... alpha=1, ... ) >>> noisy_spark_dataframe = measurement(spark_dataframe) >>> print_sdf(noisy_spark_dataframe) A B count 0 a2 b2 106
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Spark DataFrame
Input metric -
SymmetricDifference
Output measure -
ApproxDP
>>> measurement.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> measurement.input_metric SymmetricDifference() >>> measurement.output_measure ApproxDP()
- Privacy Guarantee:
For \(d_{in} = 0\), returns \((0, 0)\)
For \(d_{in} = 1\), returns \((1/\alpha, 1 - CDF_{\alpha}[\tau - 2])\)
For \(d_{in} > 1\), returns \((d_{in} \cdot \epsilon, d_{in} \cdot e^{d_{in} \cdot \epsilon} \cdot \delta)\)
where:
\(\alpha\) is
alpha
\(\tau\) is
threshold
\(\epsilon\) is the first element returned for the \(d_{in} = 1\) case
\(\delta\) is the second element returned for the \(d_{in} = 1\) case
\(CDF_{\alpha}\) is
double_sided_geometric_cmf_exact()
>>> epsilon, delta = measurement.privacy_function(1) >>> epsilon 1 >>> delta.to_float(round_up=True) 3.8328565409781243e-22 >>> epsilon, delta = measurement.privacy_function(2) >>> epsilon 2 >>> delta.to_float(round_up=True) 5.664238400088129e-21
Methods# Returns the noise scale.
Returns the minimum noisy count to include row.
Returns the count column name.
Returns the smallest d_out satisfied by the measurement.
Return the noisy counts for common rows.
Performs measurement and returns a DataFrame with additional protections.
Return input domain for the measurement.
Distance metric on input domain.
Distance measure on output.
Returns true iff the measurement is interactive.
Return True if close inputs produce close outputs.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) –
threshold (int) –
alpha (tmlt.core.utils.exact_number.ExactNumberInput) –
count_column (Optional[str]) –
- __init__(input_domain, threshold, alpha, count_column=None)#
Constructor.
- Parameters
input_domain (
SparkDataFrameDomain
SparkDataFrameDomain
) – Domain of the input Spark DataFrames. Input cannot contain floating point columns.threshold (
int
int
) – The minimum threshold for the noisy count to have to be released. Can be nonpositive, but must be integral.alpha (
ExactNumber
|float
|int
|str
|Fraction
|Expr
Union
[ExactNumber
,float
,int
,str
,Fraction
,Expr
]) – The noise scale parameter for Geometric noise. SeeAddGeometricNoise
for more information.count_column (
str
|None
Optional
[str
] (default:None
)) – Column name for output group counts. If None, output column will be named “count”.
- property alpha#
Returns the noise scale.
- Return type
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
Tuple[tmlt.core.utils.exact_number.ExactNumber, tmlt.core.utils.exact_number.ExactNumber]
- call(val)#
Return the noisy counts for common rows.
- Parameters
val (pyspark.sql.DataFrame) –
- Return type
- __call__(val)#
Performs measurement and returns a DataFrame with additional protections.
See Specific protections against pseudo-side channel leakage for more details on the specific mitigations we apply here.
- Parameters
val (Any) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_measure#
Distance measure on output.
- Return type
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type
- class BoundSelection(input_domain, bound_column, alpha, threshold=0.95)#
Bases:
tmlt.core.measurements.base.Measurement
Discovers a noisy bound based on a DataFrame Column.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 10 1 a1 10 2 a1 10 3 a1 10 4 a1 10 .. .. .. 95 a1 40 96 a1 40 97 a1 40 98 a1 40 99 a1 40 [100 rows x 2 columns] >>> measurement = BoundSelection( ... input_domain=SparkDataFrameDomain( ... schema={ ... "A": SparkStringColumnDescriptor(), ... "B": SparkIntegerColumnDescriptor(), ... }, ... ), ... bound_column="B", ... alpha=1, ... threshold=0.95, ... ) >>> min_bound, max_bound = measurement(spark_dataframe) >>> print(f"Min: {min_bound}, Max: {max_bound}") Min: -64, Max: 64
- Measurement Contract:
Input domain -
SparkDataFrameDomain
Output type - Tuple[int, int] if the
bound_column
is int, else Tuple[float, float]Input metric -
SymmetricDifference
Output measure -
PureDP
>>> measurement.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> measurement.input_metric SymmetricDifference() >>> measurement.output_measure PureDP()
- Privacy Guarantee:
For \(d_{in} = 0\), returns \(0\)
For \(d_{in} > 0\), returns \((4/\alpha) * d_{in}\)
where:
\(\alpha\) is
alpha
>>> measurement.privacy_function(1) 4 >>> measurement.privacy_function(2) 8
Methods# Returns the column to compute the bounds for.
Returns the type of the bound column.
Returns the splits.
Returns the alpha.
Returns the threshold.
Returns the smallest d_out satisfied by the measurement.
Returns the bounds for the given column.
Return input domain for the measurement.
Distance metric on input domain.
Distance measure on output.
Returns true iff the measurement is interactive.
Return True if close inputs produce close outputs.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) –
bound_column (str) –
alpha (tmlt.core.utils.exact_number.ExactNumberInput) –
threshold (float) –
- __init__(input_domain, bound_column, alpha, threshold=0.95)#
Constructor.
- Parameters
input_domain (
SparkDataFrameDomain
SparkDataFrameDomain
) – Domain of the input Spark DataFrames. Input must either be a column of floating point numbers or a column of integers.bound_column (
str
str
) – Column name for finding the bounds. The column values must be between [-2^64 + 1, 2^64 - 1] for 64 bit integers, [-2^32 + 1, 2^32 - 1] for 32 bit integers and between [-2^100, 2^100] for floats.alpha (
ExactNumber
|float
|int
|str
|Fraction
|Expr
Union
[ExactNumber
,float
,int
,str
,Fraction
,Expr
]) – The noise scale parameter for Geometric noise that will be added to true number of values falling between the tested bounds on each round of the algorithm. Noise with scale of \(\alpha / 2\) will be added to compute the threshold. SeeAddGeometricNoise
for more information.threshold (
float
float
(default:0.95
)) – The fraction of the total count to use as the threshold. This value should be between (0, 1]. By default it is set to 0.95.
- property bound_column_type#
Returns the type of the bound column.
- property alpha#
Returns the alpha.
- Return type
- privacy_function(d_in)#
Returns the smallest d_out satisfied by the measurement.
See the architecture guide for more information.
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- __call__(sdf)#
Returns the bounds for the given column.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_measure#
Distance measure on output.
- Return type
- privacy_relation(d_in, d_out)#
Return True if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_measure.
- Return type