map#

Transformations for applying user defined maps to Spark DataFrames.

See the architecture overview for more information on transformations.

Classes#

RowToRowTransformation

Transforms a single row into a different row using a user defined function.

RowToRowsTransformation

Transforms a single row into multiple rows using a user defined function.

RowsToRowsTransformation

Transforms a set of rows into another set of rows using a user-defined function.

FlatMap

Applies a RowToRowsTransformation to each row and flattens the result.

GroupingFlatMap

Applies a RowToRowsTransformation to each row and flattens the result.

Map

Applies a RowToRowTransformation to each row in a Spark DataFrame.

FlatMapByKey

Applies a RowsToRowsTransformation to rows, grouped by key.

class RowToRowTransformation(input_domain, output_domain, trusted_f, augment)#

Bases: tmlt.core.transformations.base.Transformation

Transforms a single row into a different row using a user defined function.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is called on a row.

Examples

augment=False:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def rename_b_to_c(row: Row) -> Row:
...     return Row(A=row.A, C=row.B.replace("b", "c"))
>>> rename_b_to_c_transformation = RowToRowTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "C": SparkStringColumnDescriptor(),
...         }
...     ),
...     trusted_f=rename_b_to_c,
...     augment=False,
... )
>>> transformed_row = rename_b_to_c_transformation(spark_row)
>>> transformed_row
Row(A='a1', C='c1')

augment=True:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def constant_c_column(row: Row) -> Row:
...     return Row(C="c")
>>> add_constant_c_column_transformation = RowToRowTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...             "C": SparkStringColumnDescriptor(),
...         }
...     ),
...     trusted_f=constant_c_column,
...     augment=True,
... )
>>> transformed_and_augmented_row = add_constant_c_column_transformation(spark_row)
>>> transformed_and_augmented_row
Row(A='a1', B='b1', C='c')
Transformation Contract:
>>> rename_b_to_c_transformation.input_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_transformation.output_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_transformation.input_metric
NullMetric()
>>> rename_b_to_c_transformation.output_metric
NullMetric()
Stability Guarantee:

RowToRowsTransformation is not stable! Its stability_relation() always returns False, and its stability_function() always raises NotImplementedError.

Parameters:
property trusted_f: Callable[[pyspark.sql.Row], pyspark.sql.Row | Dict[str, Any]]#

Returns function to be applied to each row.

Note

Returned function object should not be mutated.

Return type:

Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]

property augment: bool#

Returns whether input attributes need to be augmented to the output.

Return type:

bool

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property output_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

__init__(input_domain, output_domain, trusted_f, augment)#

Constructor.

Parameters:
  • input_domain (SparkRowDomain) – Domain for the input row.

  • output_domain (SparkRowDomain) – Domain for the output row.

  • trusted_f (Callable[[Row], Union[Row, Dict[str, Any]]]) – Transformation function to apply to input row.

  • augment (bool) – If True, the output of trusted_f will be augmented by the existing values from the input row. In that case, trusted_f must not output values for any of the original columns.

stability_relation(_, __)#

Returns False.

No values are valid for input/output metrics of this transformation.

Parameters:
  • _ (Any)

  • __ (Any)

Return type:

bool

__call__(row)#

Map row.

Parameters:

row (pyspark.sql.Row)

Return type:

pyspark.sql.Row

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial (add link?) for more information.

Parameters:

d_in (Any) – Distance between inputs under input_metric.

Raises:

NotImplementedError – If not overridden.

Return type:

Any

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class RowToRowsTransformation(input_domain, output_domain, trusted_f, augment)#

Bases: tmlt.core.transformations.base.Transformation

Transforms a single row into multiple rows using a user defined function.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is called on a row.

Examples

augment=False:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> # Create user defined function
>>> def duplicate(row: Row) -> List[Row]:
...     return [row, row]
>>> # Create transformation
>>> duplicate_transformation = RowToRowsTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=ListDomain(
...         SparkRowDomain(
...             {
...                 "A": SparkStringColumnDescriptor(),
...                 "B": SparkStringColumnDescriptor(),
...             }
...         )
...     ),
...     trusted_f=duplicate,
...     augment=False,
... )
>>> transformed_rows = duplicate_transformation(spark_row)
>>> transformed_rows
[Row(A='a1', B='b1'), Row(A='a1', B='b1')]

augment=True:

>>> # Example input
>>> spark_row
Row(A='a1', B='b1')
>>> def counting_i_column(row: Row) -> List[Row]:
...     return [Row(i=i) for i in range(3)]
>>> add_counting_i_column_transformation = RowToRowsTransformation(
...     input_domain=SparkRowDomain(
...         {
...             "A": SparkStringColumnDescriptor(),
...             "B": SparkStringColumnDescriptor(),
...         }
...     ),
...     output_domain=ListDomain(
...         SparkRowDomain(
...             {
...                 "A": SparkStringColumnDescriptor(),
...                 "B": SparkStringColumnDescriptor(),
...                 "i": SparkIntegerColumnDescriptor(),
...             }
...         )
...     ),
...     trusted_f=counting_i_column,
...     augment=True,
... )
>>> transformed_and_augmented_rows = add_counting_i_column_transformation(spark_row)
>>> transformed_and_augmented_rows
[Row(A='a1', B='b1', i=0), Row(A='a1', B='b1', i=1), Row(A='a1', B='b1', i=2)]
Transformation Contract:
>>> duplicate_transformation.input_domain
SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_transformation.output_domain
ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None)
>>> duplicate_transformation.input_metric
NullMetric()
>>> duplicate_transformation.output_metric
NullMetric()
Stability Guarantee:

RowToRowsTransformation is not stable! Its stability_relation() always returns False, and its stability_function() always raises NotImplementedError.

Parameters:
property trusted_f: Callable[[pyspark.sql.Row], List[pyspark.sql.Row] | List[Dict[str, Any]]]#

Returns function to be applied to each row.

Note

Returned function object should not be mutated.

Return type:

Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]

property augment: bool#

Returns whether input attributes need to be augmented to the output.

Return type:

bool

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property output_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

__init__(input_domain, output_domain, trusted_f, augment)#

Constructor.

Parameters:
  • input_domain (SparkRowDomain) – Domain for the input row.

  • output_domain (ListDomain) – Domain for the output rows.

  • trusted_f (Callable[[Row], Union[List[Row], List[Dict[str, Any]]]]) – Transformation function to apply to input row.

  • augment (bool) – If True, the output of trusted_f will be augmented by the existing values from the input row. In that case, trusted_f must not output values for any of the original columns.

stability_relation(_, __)#

Returns False.

No values are valid for input/output metrics of this transformation.

Parameters:
  • _ (Any)

  • __ (Any)

Return type:

bool

__call__(row)#

Map row.

Parameters:

row (pyspark.sql.Row)

Return type:

List[pyspark.sql.Row]

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial (add link?) for more information.

Parameters:

d_in (Any) – Distance between inputs under input_metric.

Raises:

NotImplementedError – If not overridden.

Return type:

Any

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class RowsToRowsTransformation(input_domain, output_domain, trusted_f)#

Bases: tmlt.core.transformations.base.Transformation

Transforms a set of rows into another set of rows using a user-defined function.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is called on a group of rows.

Examples

>>> # Example input
>>> input
[Row(A='a1', B='b1'), Row(A='a2', B='b2')]
>>> # Create user defined function
>>> def merge(rows: List[Row]) -> List[Row]:
...     return [Row(A=' '.join(r.A for r in rows), B=' '.join(r.B for r in rows))]
>>> # Create transformation
>>> merge_transformation = RowsToRowsTransformation(
...     input_domain=ListDomain(SparkRowDomain({
...         "A": SparkStringColumnDescriptor(),
...         "B": SparkStringColumnDescriptor(),
...     })),
...     output_domain=ListDomain(SparkRowDomain({
...         "A": SparkStringColumnDescriptor(),
...         "B": SparkStringColumnDescriptor(),
...     })),
...     trusted_f=merge,
... )
>>> transformed_rows = merge_transformation(input)
>>> transformed_rows
[Row(A='a1 a2', B='b1 b2')]
Transformation Contract:
>>> merge_transformation.input_domain
ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None)
>>> merge_transformation.output_domain
ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None)
>>> merge_transformation.input_metric
NullMetric()
>>> merge_transformation.output_metric
NullMetric()
Stability Guarantee:

RowsToRowsTransformation is not stable! Its stability_relation() always returns False, and its stability_function() always raises NotImplementedError.

Parameters:
property trusted_f: Callable[[List[pyspark.sql.Row]], List[pyspark.sql.Row] | List[Dict[str, Any]]]#

The function to be applied to each group of rows.

Note

Returned function object should not be mutated.

Return type:

Callable[[List[pyspark.sql.Row]], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property output_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

__init__(input_domain, output_domain, trusted_f)#

Constructor.

Parameters:
stability_relation(_, __)#

Returns False.

No values are valid for input/output metrics of this transformation.

Parameters:
  • _ (Any)

  • __ (Any)

Return type:

bool

__call__(rows)#

Map row.

Parameters:

rows (List[pyspark.sql.Row])

Return type:

List[pyspark.sql.Row]

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See the privacy and stability tutorial (add link?) for more information.

Parameters:

d_in (Any) – Distance between inputs under input_metric.

Raises:

NotImplementedError – If not overridden.

Return type:

Any

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class FlatMap(metric, row_transformer, max_num_rows)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowsTransformation to each row and flattens the result.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is applied.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # duplicate_transform is a RowToRowsTransformation that outputs two copies
>>> # of the input row.
>>> duplicate_flat_map = FlatMap(
...     metric=SymmetricDifference(),
...     row_transformer=duplicate_transformation,
...     max_num_rows=2,
... )
>>> # Apply transformation to data
>>> duplicated_spark_dataframe = duplicate_flat_map(spark_dataframe)
>>> print_sdf(duplicated_spark_dataframe)
    A   B
0  a1  b1
1  a1  b1
2  a2  b1
3  a2  b1
4  a3  b2
5  a3  b2
6  a3  b2
7  a3  b2
Transformation Contract:
>>> duplicate_flat_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_flat_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> duplicate_flat_map.input_metric
SymmetricDifference()
>>> duplicate_flat_map.output_metric
SymmetricDifference()
Stability Guarantee:

For

  • SymmetricDifference()

  • IfGroupedBy(column, SumOf(SymmetricDifference()))

  • IfGroupedBy(column, RootSumOfSquared(SymmetricDifference()))

FlatMap’s stability_function() returns the d_in times max_num_rows. If max_num_rows is None, it returns infinity.

>>> duplicate_flat_map.stability_function(1)
2
>>> duplicate_flat_map.stability_function(2)
4

For

  • IfGroupedBy(column, SymmetricDifference())

FlatMap’s stability_function() returns d_in.

Parameters:
property max_num_rows: int | None#

Returns the enforced stability of this transformation, or None.

Return type:

Optional[int]

property row_transformer: RowToRowsTransformation#

Returns transformation object used for mapping rows to lists of rows.

Return type:

RowToRowsTransformation

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property output_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

__init__(metric, row_transformer, max_num_rows)#

Constructor.

Parameters:
  • metric (Union[SymmetricDifference, IfGroupedBy]) – Distance metric for input and output DataFrames.

  • row_transformer (RowToRowsTransformation) – Transformation to apply to each row.

  • max_num_rows (Optional[int]) – The maximum number of rows to allow from row_transformer. If more rows are output, the additional rows are suppressed. If this value is None, the transformation will not impose a limit on the number of rows. None is only allowed if the metric is IfGroupedBy(SymmetricDifference()).

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See Tumult Core Architecture for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type:

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Flat Map.

Parameters:

sdf (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type:

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class GroupingFlatMap(output_metric, row_transformer, max_num_rows)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowsTransformation to each row and flattens the result.

A GroupingFlatMap is a special case of a FlatMap that has different input and output metrics (a FlatMap’s input and output metrics are always identical) and allows for a tighter stability analysis.

Compared to a regular FlatMap, a GroupingFlatMap also requires that:

  1. The row_transformer creates a single column that is augmented to the input

  2. For each input row, the row_transformer creates no duplicate values in the created column (This is enforced by the implementation).

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is applied.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # add_i_transformation is a RowToRowsTransformation that
>>> # repeats each row 3 times, once with i=0, once with i=1, and once with i=2
>>> add_i_flat_map = GroupingFlatMap(
...     output_metric=RootSumOfSquared(SymmetricDifference()),
...     row_transformer=add_i_transformation,
...     max_num_rows=3,
... )
>>> # Apply transformation to data
>>> spark_dataframe_with_i = add_i_flat_map(spark_dataframe)
>>> print_sdf(spark_dataframe_with_i)
     A   B  i
0   a1  b1  0
1   a1  b1  1
2   a1  b1  2
3   a2  b1  0
4   a2  b1  1
5   a2  b1  2
6   a3  b2  0
7   a3  b2  0
8   a3  b2  1
9   a3  b2  1
10  a3  b2  2
11  a3  b2  2
Transformation Contract:
>>> add_i_flat_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> add_i_flat_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'i': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> add_i_flat_map.input_metric
SymmetricDifference()
>>> add_i_flat_map.output_metric
IfGroupedBy(column='i', inner_metric=RootSumOfSquared(inner_metric=SymmetricDifference()))
Stability Guarantee:

GroupingFlatMap supports two different output metrics:

  • IfGroupedBy(column=’new_column’, inner_metric=SumOf(SummetricDifference()))

  • IfGroupedBy(column=’new_column’, inner_metric=RootSumOfSquared(SymmetricDifference()))

The meth:~.stability_function is different depending on the output metric:

If the inner metric is SumOf(SymmetricDifference()), d_out is

d_in * self.max_num_rows

If the inner metric is RootSumOfSquared(SymmetricDifference()), we can use the added structure of the row_transformer to achieve a tighter analysis. We know that for each input row, the function will produce at most one output row per value of the new column, so in total we can produce up to d_in rows for each of up to self.max_num_rows values of the new column. Therefore, under RootSumOfSquared, d_out is

d_in * sqrt(self.max_num_rows)

>>> add_i_flat_map.stability_function(1)
sqrt(3)
>>> add_i_flat_map.stability_function(2)
2*sqrt(3)
Parameters:
property max_num_rows: int#

Returns the largest number of rows a single row can be mapped to.

Return type:

int

property row_transformer: RowToRowsTransformation#

Returns transformation object used for mapping rows to lists of rows.

Return type:

RowToRowsTransformation

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property output_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

__init__(output_metric, row_transformer, max_num_rows)#

Constructor.

Parameters:
stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See Tumult Core Architecture for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type:

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Flat Map.

Parameters:

sdf (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type:

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class Map(metric, row_transformer)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowToRowTransformation to each row in a Spark DataFrame.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
    A   B
0  a1  b1
1  a2  b1
2  a3  b2
3  a3  b2
>>> # rename_b_to_c_transformation is a RowToRowTransformation that
>>> # renames the B column to C, and replaces b's in the values to c's
>>> rename_b_to_c_map = Map(
...     metric=SymmetricDifference(),
...     row_transformer=rename_b_to_c_transformation,
... )
>>> # Apply transformation to data
>>> renamed_spark_dataframe = rename_b_to_c_map(spark_dataframe)
>>> print_sdf(renamed_spark_dataframe)
    A   C
0  a1  c1
1  a2  c1
2  a3  c2
3  a3  c2
Transformation Contract:
>>> rename_b_to_c_map.input_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_map.output_domain
SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)})
>>> rename_b_to_c_map.input_metric
SymmetricDifference()
>>> rename_b_to_c_map.output_metric
SymmetricDifference()
Stability Guarantee:

Map’s stability_function() returns d_in.

>>> rename_b_to_c_map.stability_function(1)
1
>>> rename_b_to_c_map.stability_function(2)
2
Parameters:
property row_transformer: RowToRowTransformation#

Returns the transformation object used for mapping rows.

Return type:

RowToRowTransformation

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property output_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

__init__(metric, row_transformer)#

Constructor.

Parameters:
stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See Tumult Core Architecture for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type:

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Return mapped DataFrame.

Parameters:

sdf (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type:

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.

class FlatMapByKey(metric, row_transformer)#

Bases: tmlt.core.transformations.base.Transformation

Applies a RowsToRowsTransformation to rows, grouped by key.

Note

The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is applied.

Example

>>> # Example input
>>> print_sdf(spark_dataframe)
  id  v
0  a  1
1  b  2
2  c  3
3  c  4
>>> # sum_by_key_transformation is a RowsToRowsTransformation that sums column v
>>> # for each ID group.
>>> sum_by_key = FlatMapByKey(
...     metric=IfGroupedBy("id", SymmetricDifference()),
...     row_transformer=sum_by_key_transformation,
... )
>>> # Apply transformation to data
>>> transformed_spark_dataframe = sum_by_key(spark_dataframe)
>>> print_sdf(transformed_spark_dataframe)
  id  sum
0  a    1
1  b    2
2  c    7
Transformation Contract:
>>> sum_by_key.input_domain
SparkDataFrameDomain(schema={'id': SparkStringColumnDescriptor(allow_null=False), 'v': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> sum_by_key.output_domain
SparkDataFrameDomain(schema={'id': SparkStringColumnDescriptor(allow_null=False), 'sum': SparkIntegerColumnDescriptor(allow_null=False, size=64)})
>>> sum_by_key.input_metric
IfGroupedBy(column='id', inner_metric=SymmetricDifference())
>>> sum_by_key.output_metric
IfGroupedBy(column='id', inner_metric=SymmetricDifference())
Stability Guarantee:

FlatMapByKey’s stability_function() returns d_in.

Parameters:
property row_transformer: RowsToRowsTransformation#

Returns transformation object used for mapping rows to lists of rows.

Return type:

RowsToRowsTransformation

property input_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property input_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

property output_domain: tmlt.core.domains.base.Domain#

Return input domain for the measurement.

Return type:

tmlt.core.domains.base.Domain

property output_metric: tmlt.core.metrics.Metric#

Distance metric on input domain.

Return type:

tmlt.core.metrics.Metric

__init__(metric, row_transformer)#

Constructor.

Parameters:
  • metric (IfGroupedBy) – Distance metric for input and output DataFrames.

  • row_transformer (RowsToRowsTransformation) – Transformation to apply to each group of rows. This transformation should have the key column in its input domain, but it must not include the key column in its output domain.

stability_function(d_in)#

Returns the smallest d_out satisfied by the transformation.

See Tumult Core Architecture for more information.

Parameters:

d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.

Return type:

tmlt.core.utils.exact_number.ExactNumber

__call__(sdf)#

Apply transformation.

Parameters:

sdf (pyspark.sql.DataFrame)

Return type:

pyspark.sql.DataFrame

stability_relation(d_in, d_out)#

Returns True only if close inputs produce close outputs.

See the privacy and stability tutorial (add link?) for more information.

Parameters:
  • d_in (Any) – Distance between inputs under input_metric.

  • d_out (Any) – Distance between outputs under output_metric.

Return type:

bool

__or__(other: Transformation) Transformation#
__or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement

Return this transformation chained with another component.