map#
Transformations for applying user defined maps to Spark DataFrames.
See the architecture overview for more information on transformations.
Classes#
Transforms a single row into a different row using a user defined function. |
|
Transforms a single row into multiple rows using a user defined function. |
|
Applies a |
|
Applies a |
|
Applies a |
- class RowToRowTransformation(input_domain, output_domain, trusted_f, augment)#
Bases:
tmlt.core.transformations.base.Transformation
Transforms a single row into a different row using a user defined function.
Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row.
Examples
augment=False:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def rename_b_to_c(row: Row) -> Row: ... return Row(A=row.A, C=row.B.replace("b", "c")) >>> rename_b_to_c_transformation = RowToRowTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... trusted_f=rename_b_to_c, ... augment=False, ... ) >>> transformed_row = rename_b_to_c_transformation(spark_row) >>> transformed_row Row(A='a1', C='c1')
augment=True:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def constant_c_column(row: Row) -> Row: ... return Row(C="c") >>> add_constant_c_column_transformation = RowToRowTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... trusted_f=constant_c_column, ... augment=True, ... ) >>> transformed_and_augmented_row = add_constant_c_column_transformation(spark_row) >>> transformed_and_augmented_row Row(A='a1', B='b1', C='c')
- Transformation Contract:
Input domain -
SparkRowDomain
Output domain -
SparkRowDomain
Input metric -
NullMetric
Output metric -
NullMetric
>>> rename_b_to_c_transformation.input_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_transformation.output_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_transformation.input_metric NullMetric() >>> rename_b_to_c_transformation.output_metric NullMetric()
- Stability Guarantee:
RowToRowsTransformation
is not stable! Itsstability_relation()
always returns False, and itsstability_function()
always raisesNotImplementedError
.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkRowDomain) –
output_domain (tmlt.core.domains.spark_domains.SparkRowDomain) –
trusted_f (Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]) –
augment (bool) –
- __init__(input_domain, output_domain, trusted_f, augment)#
Constructor.
- Parameters
input_domain (
SparkRowDomain
SparkRowDomain
) – Domain for the input row.output_domain (
SparkRowDomain
SparkRowDomain
) – Domain for the output row.trusted_f ((
Row
) →Row
| {str
:Any
}Callable
[[Row
],Union
[Row
,Dict
[str
,Any
]]]) – Transformation function to apply to input row.augment (
bool
bool
) – If True, the output of trusted_f will be augmented by the existing values from the input row. Note that if the column already exists, the original value is used.
- property trusted_f#
Returns function to be applied to each row.
Note
Returned function object should not be mutated.
- Return type
Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]
- property augment#
Returns whether input attributes need to be augmented to the output.
- Return type
- stability_relation(_, __)#
Returns False.
No values are valid for input/output metrics of this transformation.
- Parameters
_ (Any) –
__ (Any) –
- Return type
- __call__(row)#
Map row.
- Parameters
row (pyspark.sql.Row) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_domain#
Return input domain for the measurement.
- Return type
- property output_metric#
Distance metric on input domain.
- Return type
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If not overridden.
- Return type
Any
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class RowToRowsTransformation(input_domain, output_domain, trusted_f, augment)#
Bases:
tmlt.core.transformations.base.Transformation
Transforms a single row into multiple rows using a user defined function.
Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row
Examples
augment=False:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> # Create user defined function >>> def duplicate(row: Row) -> List[Row]: ... return [row, row] >>> # Create transformation >>> duplicate_transformation = RowToRowsTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=ListDomain( ... SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ) ... ), ... trusted_f=duplicate, ... augment=False, ... ) >>> transformed_rows = duplicate_transformation(spark_row) >>> transformed_rows [Row(A='a1', B='b1'), Row(A='a1', B='b1')]
augment=True:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def counting_i_column(row: Row) -> List[Row]: ... return [Row(i=i) for i in range(3)] >>> add_counting_i_column_transformation = RowToRowsTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=ListDomain( ... SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "i": SparkIntegerColumnDescriptor(), ... } ... ) ... ), ... trusted_f=counting_i_column, ... augment=True, ... ) >>> transformed_and_augmented_rows = add_counting_i_column_transformation(spark_row) >>> transformed_and_augmented_rows [Row(A='a1', B='b1', i=0), Row(A='a1', B='b1', i=1), Row(A='a1', B='b1', i=2)]
- Transformation Contract:
Input domain -
SparkRowDomain
Output domain -
SparkRowDomain
Input metric -
NullMetric
Output metric -
NullMetric
>>> duplicate_transformation.input_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_transformation.output_domain ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None) >>> duplicate_transformation.input_metric NullMetric() >>> duplicate_transformation.output_metric NullMetric()
- Stability Guarantee:
RowToRowsTransformation
is not stable! Itsstability_relation()
always returns False, and itsstability_function()
always raisesNotImplementedError
.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkRowDomain) –
output_domain (tmlt.core.domains.collections.ListDomain) –
trusted_f (Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]) –
augment (bool) –
- __init__(input_domain, output_domain, trusted_f, augment)#
Constructor.
- Parameters
input_domain (
SparkRowDomain
SparkRowDomain
) – Domain for the input row.output_domain (
ListDomain
ListDomain
) – Domain for the output rows.trusted_f ((
Row
) →List
[Row
] |List
[Dict
[str
,Any
]]Callable
[[Row
],Union
[List
[Row
],List
[Dict
[str
,Any
]]]]) – Transformation function to apply to input row.augment (
bool
bool
) – If True, the output of trusted_f will be augmented by the existing values from the input row. Note that if the column already exists, the original value is used.
- property trusted_f#
Returns function to be applied to each row.
Note
Returned function object should not be mutated.
- Return type
Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]
- property augment#
Returns whether input attributes need to be augmented to the output.
- Return type
- stability_relation(_, __)#
Returns False.
No values are valid for input/output metrics of this transformation.
- Parameters
_ (Any) –
__ (Any) –
- Return type
- __call__(row)#
Map row.
- Parameters
row (pyspark.sql.Row) –
- Return type
List[pyspark.sql.Row]
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_domain#
Return input domain for the measurement.
- Return type
- property output_metric#
Distance metric on input domain.
- Return type
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If not overridden.
- Return type
Any
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class FlatMap(metric, row_transformer, max_num_rows)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowsTransformation
to each row and flattens the result.Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # duplicate_transform is a RowToRowsTransformation that outputs two copies >>> # of the input row. >>> duplicate_flat_map = FlatMap( ... metric=SymmetricDifference(), ... row_transformer=duplicate_transformation, ... max_num_rows=2, ... ) >>> # Apply transformation to data >>> duplicated_spark_dataframe = duplicate_flat_map(spark_dataframe) >>> print_sdf(duplicated_spark_dataframe) A B 0 a1 b1 1 a1 b1 2 a2 b1 3 a2 b1 4 a3 b2 5 a3 b2 6 a3 b2 7 a3 b2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
orIfGroupedBy
Output metric -
SymmetricDifference
orIfGroupedBy
(matches input metric)
>>> duplicate_flat_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_flat_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_flat_map.input_metric SymmetricDifference() >>> duplicate_flat_map.output_metric SymmetricDifference()
- Stability Guarantee:
For
SymmetricDifference()
IfGroupedBy(column, SumOf(SymmetricDifference()))
IfGroupedBy(column, RootSumOfSquared(SymmetricDifference()))
FlatMap
’sstability_function()
returns the d_in timesmax_num_rows
. Ifmax_num_rows
is None, it returns infinity.>>> duplicate_flat_map.stability_function(1) 2 >>> duplicate_flat_map.stability_function(2) 4
For
IfGroupedBy(column, SymmetricDifference())
FlatMap
’sstability_function()
returns d_in.
- Parameters
metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.IfGroupedBy]) –
row_transformer (RowToRowsTransformation) –
max_num_rows (Optional[int]) –
- __init__(metric, row_transformer, max_num_rows)#
Constructor.
- Parameters
metric (
SymmetricDifference
|IfGroupedBy
Union
[SymmetricDifference
,IfGroupedBy
]) – Distance metric for input and output DataFrames.row_transformer (
RowToRowsTransformation
RowToRowsTransformation
) – Transformation to apply to each row.max_num_rows (
int
|None
Optional
[int
]) – The maximum number of rows to allow from row_transformer. If more rows are output, the additional rows are suppressed. If this value is None, the transformation will not impose a limit on the number of rows. None is only allowed if the metric is IfGroupedBy(SymmetricDifference()).
- property max_num_rows#
Returns the enforced stability of this transformation, or None.
- Return type
Optional[int]
- property row_transformer#
Returns transformation object used for mapping rows to lists of rows.
- Return type
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the architecture overview for more information.
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- __call__(sdf)#
Flat Map.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_domain#
Return input domain for the measurement.
- Return type
- property output_metric#
Distance metric on input domain.
- Return type
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class GroupingFlatMap(output_metric, row_transformer, max_num_rows)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowsTransformation
to each row and flattens the result.A
GroupingFlatMap
is a special case of aFlatMap
that has different input and output metrics (a FlatMap’s input and output metrics are always identical) and allows for a tighter stability analysis.Compared to a regular FlatMap, a GroupingFlatMap also requires that:
The row_transformer creates a single column that is augmented to the input
For each input row, the row_transformer creates no duplicate values in the created column (This is enforced by the implementation).
Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # add_i_transformation is a RowToRowsTransformation that >>> # repeats each row 3 times, once with i=0, once with i=1, and once with i=2 >>> add_i_flat_map = GroupingFlatMap( ... output_metric=RootSumOfSquared(SymmetricDifference()), ... row_transformer=add_i_transformation, ... max_num_rows=3, ... ) >>> # Apply transformation to data >>> spark_dataframe_with_i = add_i_flat_map(spark_dataframe) >>> print_sdf(spark_dataframe_with_i) A B i 0 a1 b1 0 1 a1 b1 1 2 a1 b1 2 3 a2 b1 0 4 a2 b1 1 5 a2 b1 2 6 a3 b2 0 7 a3 b2 0 8 a3 b2 1 9 a3 b2 1 10 a3 b2 2 11 a3 b2 2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
Output metric -
IfGroupedBy
>>> add_i_flat_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> add_i_flat_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'i': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> add_i_flat_map.input_metric SymmetricDifference() >>> add_i_flat_map.output_metric IfGroupedBy(column='i', inner_metric=RootSumOfSquared(inner_metric=SymmetricDifference()))
- Stability Guarantee:
GroupingFlatMap
supports two different output metrics:IfGroupedBy(column=’new_column’, inner_metric=SumOf(SummetricDifference()))
IfGroupedBy(column=’new_column’, inner_metric=RootSumOfSquared(SymmetricDifference()))
The meth:~.stability_function is different depending on the output metric:
If the inner metric is SumOf(SymmetricDifference()), d_out is
d_in * self.max_num_rows
If the inner metric is RootSumOfSquares(SymmetricDifference()), we can use the added structure of the row_transformer to achieve a tighter analysis. We know that for each input row, the function will produce at most one output row per value of the new column, so in total we can produce up to d_in rows for each of up to self.max_num_rows values of the new column. Therefore, under RootSumOfSquares, d_out is
d_in * sqrt(self.max_num_rows)
>>> add_i_flat_map.stability_function(1) sqrt(3) >>> add_i_flat_map.stability_function(2) 2*sqrt(3)
- Parameters
output_metric (Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared]) –
row_transformer (RowToRowsTransformation) –
max_num_rows (int) –
- __init__(output_metric, row_transformer, max_num_rows)#
Constructor.
- Parameters
output_metric (
SumOf
|RootSumOfSquared
Union
[SumOf
,RootSumOfSquared
]) – Inner metric forIfGroupedBy
output DataFrames.row_transformer (
RowToRowsTransformation
RowToRowsTransformation
) – Transformation to apply to each row.max_num_rows (
int
int
) – The maximum number of rows to allow from row_transformer.
- property max_num_rows#
Returns the largest number of rows a single row can be mapped to.
- Return type
- property row_transformer#
Returns transformation object used for mapping rows to lists of rows.
- Return type
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the architecture overview for more information.
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- __call__(sdf)#
Flat Map.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_domain#
Return input domain for the measurement.
- Return type
- property output_metric#
Distance metric on input domain.
- Return type
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class Map(metric, row_transformer)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowTransformation
to each row in a Spark DataFrame.Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # rename_b_to_c_transformation is a RowToRowTransformation that >>> # renames the B column to C, and replaces b's in the values to c's >>> rename_b_to_c_map = Map( ... metric=SymmetricDifference(), ... row_transformer=rename_b_to_c_transformation, ... ) >>> # Apply transformation to data >>> renamed_spark_dataframe = rename_b_to_c_map(spark_dataframe) >>> print_sdf(renamed_spark_dataframe) A C 0 a1 c1 1 a2 c1 2 a3 c2 3 a3 c2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
,HammingDistance
, orIfGroupedBy
Output metric -
SymmetricDifference
,HammingDistance
, orIfGroupedBy
(matches input metric)
>>> rename_b_to_c_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_map.input_metric SymmetricDifference() >>> rename_b_to_c_map.output_metric SymmetricDifference()
- Stability Guarantee:
Map
’sstability_function()
returns d_in.>>> rename_b_to_c_map.stability_function(1) 1 >>> rename_b_to_c_map.stability_function(2) 2
- Parameters
metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance, tmlt.core.metrics.IfGroupedBy]) –
row_transformer (RowToRowTransformation) –
- __init__(metric, row_transformer)#
Constructor.
- Parameters
metric (
SymmetricDifference
|HammingDistance
|IfGroupedBy
Union
[SymmetricDifference
,HammingDistance
,IfGroupedBy
]) – Distance metric for input and output DataFrames.row_transformer (
RowToRowTransformation
RowToRowTransformation
) – Transformation to apply to each row.
- property row_transformer#
Returns the transformation object used for mapping rows.
- Return type
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the architecture overview for more information on transformations.
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- __call__(sdf)#
Return mapped DataFrame.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_domain#
Return input domain for the measurement.
- Return type
- property output_metric#
Distance metric on input domain.
- Return type
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.