map#

Transformations for applying user defined maps to Spark DataFrames.

Classes#

RowToRowTransformation

Transforms a single row into a different row using a user defined function.

RowToRowsTransformation

Transforms a single row into multiple rows using a user defined function.

FlatMap

Applies a RowToRowsTransformation to each row and flattens the result.

GroupingFlatMap

Applies a RowToRowsTransformation to each row and flattens the result.

Map

Applies a RowToRowTransformation to each row in a Spark DataFrame.

class RowToRowTransformation(input_domain, output_domain, trusted_f, augment)#

Bases: tmlt.core.transformations.base.Transformation

Transforms a single row into a different row using a user defined function.

Examples

augment=False:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def rename_b_to_c(row: Row) -> Row:
...     return Row(A=row.A, C=row.B.replace("b", "c"))
>>> rename_b_to_c_transformation = RowToRowTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "C": SparkStringColumnDescriptor(),
...         }
...     ),
...     trusted_f=rename_b_to_c,
...     augment=False,
... )
>>> transformed_row = rename_b_to_c_transformation(spark_row)
>>> transformed_row
Row(A='a1', C='c1')

augment=True:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def constant_c_column(row: Row) -> Row:
...     return Row(C="c")
>>> add_constant_c_column_transformation = RowToRowTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...             "C": SparkStringColumnDescriptor(),
...         }
...     ),
...     trusted_f=constant_c_column,
...     augment=True,
... )
>>> transformed_and_augmented_row = add_constant_c_column_transformation(spark_row)
>>> transformed_and_augmented_row
Row(A='a1', B='b1', C='c')
Transformation Contract:
>>> rename_b_to_c_transformation.input_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_transformation.output_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_transformation.input_metric
NullMetric()
>>> rename_b_to_c_transformation.output_metric
NullMetric()
Stability Guarantee:

RowToRowsTransformation is not stable! Its stability_relation() always returns False, and its stability_function() always raises NotImplementedError.

Parameters
__init__(input_domain, output_domain, trusted_f, augment)#

Constructor.

Parameters
property trusted_f(self)#

Returns function to be applied to each row.

Note

Returned function object should not be mutated.

Return type

Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]

property augment(self)#

Returns whether input attributes need to be augmented to the output.

Return type

bool

stability_relation(self, _, __)#

Returns False.

No values are valid for input/output metrics of this transformation.

Parameters
  • _ (Any) –

  • __ (Any) –

Return type

bool

__call__(self, row)#

Map row.

Parameters

row (pyspark.sql.Row) –

Return type

pyspark.sql.Row

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_function(self, d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial (add link?) for more information.

Parameters

d_in (Any) – Distance between inputs under input_metric.

Raises

NotImplementedError – If not overridden.

Return type

Any

__or__(self, other: Transformation) Transformation#
__or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class RowToRowsTransformation(input_domain, output_domain, trusted_f, augment)#

Bases: tmlt.core.transformations.base.Transformation

Transforms a single row into multiple rows using a user defined function.

Examples

augment=False:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> # Create user defined function
>>> def duplicate(row: Row) -> List[Row]:
...     return [row, row]
>>> # Create transformation
>>> duplicate_transformation = RowToRowsTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=ListDomain(
...         SparkRowDomain(
...             {
...                 "A": SparkStringColumnDescriptor(),
...                 "B": SparkStringColumnDescriptor(),
...             }
...         )
...     ),
...     trusted_f=duplicate,
...     augment=False,
... )
>>> transformed_rows = duplicate_transformation(spark_row)
>>> transformed_rows
[Row(A='a1', B='b1'), Row(A='a1', B='b1')]

augment=True:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def counting_i_column(row: Row) -> List[Row]:
...     return [Row(i=i) for i in range(3)]
>>> add_counting_i_column_transformation = RowToRowsTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=ListDomain(
...         SparkRowDomain(
...             {
...                 "A": SparkStringColumnDescriptor(),
...                 "B": SparkStringColumnDescriptor(),
...                 "C": SparkIntegerColumnDescriptor(),
...             }
...         )
...     ),
...     trusted_f=counting_i_column,
...     augment=True,
... )
>>> transformed_and_augmented_rows = add_counting_i_column_transformation(spark_row)
>>> transformed_and_augmented_rows
[Row(A='a1', B='b1', i=0), Row(A='a1', B='b1', i=1), Row(A='a1', B='b1', i=2)]
Transformation Contract:
>>> duplicate_transformation.input_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_transformation.output_domain
ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None)
>>> duplicate_transformation.input_metric
NullMetric()
>>> duplicate_transformation.output_metric
NullMetric()
Stability Guarantee:

RowToRowsTransformation is not stable! Its stability_relation() always returns False, and its stability_function() always raises NotImplementedError.

Parameters
__init__(input_domain, output_domain, trusted_f, augment)#

Constructor.

Parameters
property trusted_f(self)#

Returns function to be applied to each row.

Note

Returned function object should not be mutated.

Return type

Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]

property augment(self)#

Returns whether input attributes need to be augmented to the output.

Return type

bool

stability_relation(self, _, __)#

Returns False.

No values are valid for input/output metrics of this transformation.

Parameters
  • _ (Any) –

  • __ (Any) –

Return type

bool

__call__(self, row)#

Map row.

Parameters

row (pyspark.sql.Row) –

Return type

List[pyspark.sql.Row]

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_function(self, d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial (add link?) for more information.

Parameters

d_in (Any) – Distance between inputs under input_metric.

Raises

NotImplementedError – If not overridden.

Return type

Any

__or__(self, other: Transformation) Transformation#
__or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class FlatMap(metric, row_transformer, max_num_rows)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowsTransformation to each row and flattens the result.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # duplicate_transform is a RowToRowsTransformation that outputs two copies
>>> # of the input row.
>>> duplicate_flat_map = FlatMap(
...     metric=SymmetricDifference(),
...     row_transformer=duplicate_transformation,
...     max_num_rows=2,
... )
>>> # Apply transformation to data
>>> duplicated_spark_dataframe = duplicate_flat_map(spark_dataframe)
>>> print_sdf(duplicated_spark_dataframe)
    A   B
0  a1  b1
1  a1  b1
2  a2  b1
3  a2  b1
4  a3  b2
5  a3  b2
6  a3  b2
7  a3  b2
Transformation Contract:
>>> duplicate_flat_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_flat_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_flat_map.input_metric
SymmetricDifference()
>>> duplicate_flat_map.output_metric
SymmetricDifference()
Stability Guarantee:

For

  • SymmetricDifference()

  • IfGroupedBy(column, SumOf(SymmetricDifference()))

  • IfGroupedBy(column, RootSumOfSquared(SymmetricDifference()))

FlatMap’s stability_function() returns the d_in times max_num_rows.

>>> duplicate_flat_map.stability_function(1)
2
>>> duplicate_flat_map.stability_function(2)
4

For

  • IfGroupedBy(column, SymmetricDifference())

PublicJoin’s stability_function() returns d_in

Parameters
__init__(metric, row_transformer, max_num_rows)#

Constructor.

Parameters
property max_num_rows(self)#

Returns the enforced stability of this transformation.

Return type

int

property row_transformer(self)#

Returns transformation object used for mapping rows to lists of rows.

Return type

RowToRowsTransformation

stability_function(self, d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial for more information. # TODO(#1320)

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(self, sdf)#

Flat Map.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(self, d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(self, other: Transformation) Transformation#
__or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class GroupingFlatMap(output_metric, row_transformer, max_num_rows)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowsTransformation to each row and flattens the result.

A GroupingFlatMap is a special case of a FlatMap that allows for a tighter stability analysis.

The requirements are that

  1. The row_transformer creates a single column that is augmented to the input

  2. For each input row, the values in the created column are distinct (This is enforced by the implementation).

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # add_i_transformation is a RowToRowsTransformation that
>>> # repeats each row 3 times, once with i=0, once with i=1, and once with i=2
>>> add_i_flat_map = GroupingFlatMap(
...     output_metric=RootSumOfSquared(SymmetricDifference()),
...     row_transformer=add_i_transformation,
...     max_num_rows=3,
... )
>>> # Apply transformation to data
>>> spark_dataframe_with_i = add_i_flat_map(spark_dataframe)
>>> print_sdf(spark_dataframe_with_i)
     A   B  i
0   a1  b1  0
1   a1  b1  1
2   a1  b1  2
3   a2  b1  0
4   a2  b1  1
5   a2  b1  2
6   a3  b2  0
7   a3  b2  0
8   a3  b2  1
9   a3  b2  1
10  a3  b2  2
11  a3  b2  2
Transformation Contract:
>>> add_i_flat_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> add_i_flat_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'i': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> add_i_flat_map.input_metric
SymmetricDifference()
>>> add_i_flat_map.output_metric
IfGroupedBy(column='i', inner_metric=RootSumOfSquared(inner_metric=SymmetricDifference()))
Stability Guarantee:

GroupingFlatMap’s stability_function() has two cases:

If the inner metric is SumOf(SymmetricDifference()), d_out is

d_in * self.max_num_rows

If the inner metric is RootSumOfSquared(SymmetricDifference()), d_out is

d_in * sqrt(self.max_num_rows)

>>> add_i_flat_map.stability_function(1)
sqrt(3)
>>> add_i_flat_map.stability_function(2)
2*sqrt(3)
Parameters
__init__(output_metric, row_transformer, max_num_rows)#

Constructor.

Parameters
property max_num_rows(self)#

Returns the largest number of rows a single row can be mapped to.

Return type

int

property row_transformer(self)#

Returns transformation object used for mapping rows to lists of rows.

Return type

RowToRowsTransformation

stability_function(self, d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial for more information. # TODO(#1320)

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(self, sdf)#

Flat Map.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(self, d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(self, other: Transformation) Transformation#
__or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class Map(metric, row_transformer)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowTransformation to each row in a Spark DataFrame.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # rename_b_to_c_transformation is a RowToRowTransformation that
>>> # renames the B column to C, and replaces b's in the values to c's
>>> rename_b_to_c_map = Map(
...     metric=SymmetricDifference(),
...     row_transformer=rename_b_to_c_transformation,
... )
>>> # Apply transformation to data
>>> renamed_spark_dataframe = rename_b_to_c_map(spark_dataframe)
>>> print_sdf(renamed_spark_dataframe)
    A   C
0  a1  c1
1  a2  c1
2  a3  c2
3  a3  c2
Transformation Contract:
>>> rename_b_to_c_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_map.input_metric
SymmetricDifference()
>>> rename_b_to_c_map.output_metric
SymmetricDifference()
Stability Guarantee:

Map’s stability_function() returns d_in.

>>> rename_b_to_c_map.stability_function(1)
1
>>> rename_b_to_c_map.stability_function(2)
2
Parameters
__init__(metric, row_transformer)#

Constructor.

Parameters
property row_transformer(self)#

Returns the transformation object used for mapping rows.

Return type

RowToRowTransformation

stability_function(self, d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial for more information. # TODO(#1320)

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(self, sdf)#

Return mapped DataFrame.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

property input_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain(self)#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric(self)#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(self, d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(self, other: Transformation) Transformation#
__or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.