map#

Transformations for applying user defined maps to Spark DataFrames.

See the architecture overview for more information on transformations.

Classes#

RowToRowTransformation

Transforms a single row into a different row using a user defined function.

RowToRowsTransformation

Transforms a single row into multiple rows using a user defined function.

FlatMap

Applies a RowToRowsTransformation to each row and flattens the result.

GroupingFlatMap

Applies a RowToRowsTransformation to each row and flattens the result.

Map

Applies a RowToRowTransformation to each row in a Spark DataFrame.

class RowToRowTransformation(input_domain, output_domain, trusted_f, augment)#

Bases: tmlt.core.transformations.base.Transformation

Transforms a single row into a different row using a user defined function.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row.

Examples

augment=False:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def rename_b_to_c(row: Row) -> Row:
...     return Row(A=row.A, C=row.B.replace("b", "c"))
>>> rename_b_to_c_transformation = RowToRowTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "C": SparkStringColumnDescriptor(),
...         }
...     ),
...     trusted_f=rename_b_to_c,
...     augment=False,
... )
>>> transformed_row = rename_b_to_c_transformation(spark_row)
>>> transformed_row
Row(A='a1', C='c1')

augment=True:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def constant_c_column(row: Row) -> Row:
...     return Row(C="c")
>>> add_constant_c_column_transformation = RowToRowTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...             "C": SparkStringColumnDescriptor(),
...         }
...     ),
...     trusted_f=constant_c_column,
...     augment=True,
... )
>>> transformed_and_augmented_row = add_constant_c_column_transformation(spark_row)
>>> transformed_and_augmented_row
Row(A='a1', B='b1', C='c')
Transformation Contract:
>>> rename_b_to_c_transformation.input_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_transformation.output_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_transformation.input_metric
NullMetric()
>>> rename_b_to_c_transformation.output_metric
NullMetric()
Stability Guarantee:

RowToRowsTransformation is not stable! Its stability_relation() always returns False, and its stability_function() always raises NotImplementedError.

Parameters
__init__(input_domain, output_domain, trusted_f, augment)#

Constructor.

Parameters
property trusted_f#

Returns function to be applied to each row.

Note

Returned function object should not be mutated.

Return type

Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]

property augment#

Returns whether input attributes need to be augmented to the output.

Return type

bool

stability_relation(_, __)#

Returns False.

No values are valid for input/output metrics of this transformation.

Parameters
  • _ (Any) –

  • __ (Any) –

Return type

bool

__call__(row)#

Map row.

Parameters

row (pyspark.sql.Row) –

Return type

pyspark.sql.Row

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial (add link?) for more information.

Parameters

d_in (Any) – Distance between inputs under input_metric.

Raises

NotImplementedError – If not overridden.

Return type

Any

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class RowToRowsTransformation(input_domain, output_domain, trusted_f, augment)#

Bases: tmlt.core.transformations.base.Transformation

Transforms a single row into multiple rows using a user defined function.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row

Examples

augment=False:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> # Create user defined function
>>> def duplicate(row: Row) -> List[Row]:
...     return [row, row]
>>> # Create transformation
>>> duplicate_transformation = RowToRowsTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=ListDomain(
...         SparkRowDomain(
...             {
...                 "A": SparkStringColumnDescriptor(),
...                 "B": SparkStringColumnDescriptor(),
...             }
...         )
...     ),
...     trusted_f=duplicate,
...     augment=False,
... )
>>> transformed_rows = duplicate_transformation(spark_row)
>>> transformed_rows
[Row(A='a1', B='b1'), Row(A='a1', B='b1')]

augment=True:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def counting_i_column(row: Row) -> List[Row]:
...     return [Row(i=i) for i in range(3)]
>>> add_counting_i_column_transformation = RowToRowsTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=ListDomain(
...         SparkRowDomain(
...             {
...                 "A": SparkStringColumnDescriptor(),
...                 "B": SparkStringColumnDescriptor(),
...                 "i": SparkIntegerColumnDescriptor(),
...             }
...         )
...     ),
...     trusted_f=counting_i_column,
...     augment=True,
... )
>>> transformed_and_augmented_rows = add_counting_i_column_transformation(spark_row)
>>> transformed_and_augmented_rows
[Row(A='a1', B='b1', i=0), Row(A='a1', B='b1', i=1), Row(A='a1', B='b1', i=2)]
Transformation Contract:
>>> duplicate_transformation.input_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_transformation.output_domain
ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None)
>>> duplicate_transformation.input_metric
NullMetric()
>>> duplicate_transformation.output_metric
NullMetric()
Stability Guarantee:

RowToRowsTransformation is not stable! Its stability_relation() always returns False, and its stability_function() always raises NotImplementedError.

Parameters
__init__(input_domain, output_domain, trusted_f, augment)#

Constructor.

Parameters
property trusted_f#

Returns function to be applied to each row.

Note

Returned function object should not be mutated.

Return type

Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]

property augment#

Returns whether input attributes need to be augmented to the output.

Return type

bool

stability_relation(_, __)#

Returns False.

No values are valid for input/output metrics of this transformation.

Parameters
  • _ (Any) –

  • __ (Any) –

Return type

bool

__call__(row)#

Map row.

Parameters

row (pyspark.sql.Row) –

Return type

List[pyspark.sql.Row]

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial (add link?) for more information.

Parameters

d_in (Any) – Distance between inputs under input_metric.

Raises

NotImplementedError – If not overridden.

Return type

Any

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class FlatMap(metric, row_transformer, max_num_rows)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowsTransformation to each row and flattens the result.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # duplicate_transform is a RowToRowsTransformation that outputs two copies
>>> # of the input row.
>>> duplicate_flat_map = FlatMap(
...     metric=SymmetricDifference(),
...     row_transformer=duplicate_transformation,
...     max_num_rows=2,
... )
>>> # Apply transformation to data
>>> duplicated_spark_dataframe = duplicate_flat_map(spark_dataframe)
>>> print_sdf(duplicated_spark_dataframe)
    A   B
0  a1  b1
1  a1  b1
2  a2  b1
3  a2  b1
4  a3  b2
5  a3  b2
6  a3  b2
7  a3  b2
Transformation Contract:
>>> duplicate_flat_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_flat_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_flat_map.input_metric
SymmetricDifference()
>>> duplicate_flat_map.output_metric
SymmetricDifference()
Stability Guarantee:

For

  • SymmetricDifference()

  • IfGroupedBy(column, SumOf(SymmetricDifference()))

  • IfGroupedBy(column, RootSumOfSquared(SymmetricDifference()))

FlatMap’s stability_function() returns the d_in times max_num_rows. If max_num_rows is None, it returns infinity.

>>> duplicate_flat_map.stability_function(1)
2
>>> duplicate_flat_map.stability_function(2)
4

For

  • IfGroupedBy(column, SymmetricDifference())

FlatMap’s stability_function() returns d_in.

Parameters
__init__(metric, row_transformer, max_num_rows)#

Constructor.

Parameters
property max_num_rows#

Returns the enforced stability of this transformation, or None.

Return type

Optional[int]

property row_transformer#

Returns transformation object used for mapping rows to lists of rows.

Return type

RowToRowsTransformation

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the architecture overview for more information.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Flat Map.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class GroupingFlatMap(output_metric, row_transformer, max_num_rows)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowsTransformation to each row and flattens the result.

A GroupingFlatMap is a special case of a FlatMap that has different input and output metrics (a FlatMap’s input and output metrics are always identical) and allows for a tighter stability analysis.

Compared to a regular FlatMap, a GroupingFlatMap also requires that:

  1. The row_transformer creates a single column that is augmented to the input

  2. For each input row, the row_transformer creates no duplicate values in the created column (This is enforced by the implementation).

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # add_i_transformation is a RowToRowsTransformation that
>>> # repeats each row 3 times, once with i=0, once with i=1, and once with i=2
>>> add_i_flat_map = GroupingFlatMap(
...     output_metric=RootSumOfSquared(SymmetricDifference()),
...     row_transformer=add_i_transformation,
...     max_num_rows=3,
... )
>>> # Apply transformation to data
>>> spark_dataframe_with_i = add_i_flat_map(spark_dataframe)
>>> print_sdf(spark_dataframe_with_i)
     A   B  i
0   a1  b1  0
1   a1  b1  1
2   a1  b1  2
3   a2  b1  0
4   a2  b1  1
5   a2  b1  2
6   a3  b2  0
7   a3  b2  0
8   a3  b2  1
9   a3  b2  1
10  a3  b2  2
11  a3  b2  2
Transformation Contract:
>>> add_i_flat_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> add_i_flat_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'i': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> add_i_flat_map.input_metric
SymmetricDifference()
>>> add_i_flat_map.output_metric
IfGroupedBy(column='i', inner_metric=RootSumOfSquared(inner_metric=SymmetricDifference()))
Stability Guarantee:

GroupingFlatMap supports two different output metrics:

  • IfGroupedBy(column=’new_column’, inner_metric=SumOf(SummetricDifference()))

  • IfGroupedBy(column=’new_column’, inner_metric=RootSumOfSquared(SymmetricDifference()))

The meth:~.stability_function is different depending on the output metric:

If the inner metric is SumOf(SymmetricDifference()), d_out is

d_in * self.max_num_rows

If the inner metric is RootSumOfSquares(SymmetricDifference()), we can use the added structure of the row_transformer to achieve a tighter analysis. We know that for each input row, the function will produce at most one output row per value of the new column, so in total we can produce up to d_in rows for each of up to self.max_num_rows values of the new column. Therefore, under RootSumOfSquares, d_out is

d_in * sqrt(self.max_num_rows)

>>> add_i_flat_map.stability_function(1)
sqrt(3)
>>> add_i_flat_map.stability_function(2)
2*sqrt(3)
Parameters
__init__(output_metric, row_transformer, max_num_rows)#

Constructor.

Parameters
property max_num_rows#

Returns the largest number of rows a single row can be mapped to.

Return type

int

property row_transformer#

Returns transformation object used for mapping rows to lists of rows.

Return type

RowToRowsTransformation

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the architecture overview for more information.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Flat Map.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class Map(metric, row_transformer)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowTransformation to each row in a Spark DataFrame.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # rename_b_to_c_transformation is a RowToRowTransformation that
>>> # renames the B column to C, and replaces b's in the values to c's
>>> rename_b_to_c_map = Map(
...     metric=SymmetricDifference(),
...     row_transformer=rename_b_to_c_transformation,
... )
>>> # Apply transformation to data
>>> renamed_spark_dataframe = rename_b_to_c_map(spark_dataframe)
>>> print_sdf(renamed_spark_dataframe)
    A   C
0  a1  c1
1  a2  c1
2  a3  c2
3  a3  c2
Transformation Contract:
>>> rename_b_to_c_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_map.input_metric
SymmetricDifference()
>>> rename_b_to_c_map.output_metric
SymmetricDifference()
Stability Guarantee:

Map’s stability_function() returns d_in.

>>> rename_b_to_c_map.stability_function(1)
1
>>> rename_b_to_c_map.stability_function(2)
2
Parameters
__init__(metric, row_transformer)#

Constructor.

Parameters
property row_transformer#

Returns the transformation object used for mapping rows.

Return type

RowToRowTransformation

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the architecture overview for more information on transformations.

Parameters

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Return mapped DataFrame.

Parameters

sdf (pyspark.sql.DataFrame) –

Return type

pyspark.sql.DataFrame

property input_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property input_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

property output_domain#

Return input domain for the measurement.

Return type

tmlt.core.domains.base.Domain

property output_metric#

Distance metric on input domain.

Return type

tmlt.core.metrics.Metric

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.