map#
Transformations for applying user defined maps to Spark DataFrames.
Classes#
Transforms a single row into a different row using a user defined function. |
|
Transforms a single row into multiple rows using a user defined function. |
|
Applies a |
|
Applies a |
|
Applies a |
- class RowToRowTransformation(input_domain, output_domain, trusted_f, augment)#
Bases:
tmlt.core.transformations.base.Transformation
Transforms a single row into a different row using a user defined function.
Examples
augment=False:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def rename_b_to_c(row: Row) -> Row: ... return Row(A=row.A, C=row.B.replace("b", "c")) >>> rename_b_to_c_transformation = RowToRowTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... trusted_f=rename_b_to_c, ... augment=False, ... ) >>> transformed_row = rename_b_to_c_transformation(spark_row) >>> transformed_row Row(A='a1', C='c1')
augment=True:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def constant_c_column(row: Row) -> Row: ... return Row(C="c") >>> add_constant_c_column_transformation = RowToRowTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... trusted_f=constant_c_column, ... augment=True, ... ) >>> transformed_and_augmented_row = add_constant_c_column_transformation(spark_row) >>> transformed_and_augmented_row Row(A='a1', B='b1', C='c')
- Transformation Contract:
Input domain -
SparkRowDomain
Output domain -
SparkRowDomain
Input metric -
NullMetric
Output metric -
NullMetric
>>> rename_b_to_c_transformation.input_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_transformation.output_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_transformation.input_metric NullMetric() >>> rename_b_to_c_transformation.output_metric NullMetric()
- Stability Guarantee:
RowToRowsTransformation
is not stable! Itsstability_relation()
always returns False, and itsstability_function()
always raisesNotImplementedError
.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkRowDomain) –
output_domain (tmlt.core.domains.spark_domains.SparkRowDomain) –
trusted_f (Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]) –
augment (bool) –
- __init__(input_domain, output_domain, trusted_f, augment)#
Constructor.
- Parameters
input_domain (
SparkRowDomain
SparkRowDomain
) – Domain for the input row.output_domain (
SparkRowDomain
SparkRowDomain
) – Domain for the output row.trusted_f ((
Row
) →Row
| {str
:Any
}Callable
[[Row
],Union
[Row
,Dict
[str
,Any
]]]) – Transformation function to apply to input row.augment (
bool
bool
) – If True, the output of trusted_f will be augmented by the existing values from the input row. Note that if the column already exists, the original value is used.
- property trusted_f(self)#
Returns function to be applied to each row.
Note
Returned function object should not be mutated.
- Return type
Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]
- property augment(self)#
Returns whether input attributes need to be augmented to the output.
- Return type
- stability_relation(self, _, __)#
Returns False.
No values are valid for input/output metrics of this transformation.
- Parameters
_ (Any) –
__ (Any) –
- Return type
- __call__(self, row)#
Map row.
- Parameters
row (pyspark.sql.Row) –
- Return type
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_domain(self)#
Return input domain for the measurement.
- Return type
- property output_metric(self)#
Distance metric on input domain.
- Return type
- stability_function(self, d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If not overridden.
- Return type
Any
- __or__(self, other: Transformation) Transformation #
- __or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class RowToRowsTransformation(input_domain, output_domain, trusted_f, augment)#
Bases:
tmlt.core.transformations.base.Transformation
Transforms a single row into multiple rows using a user defined function.
Examples
augment=False:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> # Create user defined function >>> def duplicate(row: Row) -> List[Row]: ... return [row, row] >>> # Create transformation >>> duplicate_transformation = RowToRowsTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=ListDomain( ... SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ) ... ), ... trusted_f=duplicate, ... augment=False, ... ) >>> transformed_rows = duplicate_transformation(spark_row) >>> transformed_rows [Row(A='a1', B='b1'), Row(A='a1', B='b1')]
augment=True:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def counting_i_column(row: Row) -> List[Row]: ... return [Row(i=i) for i in range(3)] >>> add_counting_i_column_transformation = RowToRowsTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=ListDomain( ... SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "C": SparkIntegerColumnDescriptor(), ... } ... ) ... ), ... trusted_f=counting_i_column, ... augment=True, ... ) >>> transformed_and_augmented_rows = add_counting_i_column_transformation(spark_row) >>> transformed_and_augmented_rows [Row(A='a1', B='b1', i=0), Row(A='a1', B='b1', i=1), Row(A='a1', B='b1', i=2)]
- Transformation Contract:
Input domain -
SparkRowDomain
Output domain -
SparkRowDomain
Input metric -
NullMetric
Output metric -
NullMetric
>>> duplicate_transformation.input_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_transformation.output_domain ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None) >>> duplicate_transformation.input_metric NullMetric() >>> duplicate_transformation.output_metric NullMetric()
- Stability Guarantee:
RowToRowsTransformation
is not stable! Itsstability_relation()
always returns False, and itsstability_function()
always raisesNotImplementedError
.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkRowDomain) –
output_domain (tmlt.core.domains.collections.ListDomain) –
trusted_f (Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]) –
augment (bool) –
- __init__(input_domain, output_domain, trusted_f, augment)#
Constructor.
- Parameters
input_domain (
SparkRowDomain
SparkRowDomain
) – Domain for the input row.output_domain (
ListDomain
ListDomain
) – Domain for the output rows.trusted_f ((
Row
) →List
[Row
] |List
[Dict
[str
,Any
]]Callable
[[Row
],Union
[List
[Row
],List
[Dict
[str
,Any
]]]]) – Transformation function to apply to input row.augment (
bool
bool
) – If True, the output of trusted_f will be augmented by the existing values from the input row. Note that if the column already exists, the original value is used.
- property trusted_f(self)#
Returns function to be applied to each row.
Note
Returned function object should not be mutated.
- Return type
Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]
- property augment(self)#
Returns whether input attributes need to be augmented to the output.
- Return type
- stability_relation(self, _, __)#
Returns False.
No values are valid for input/output metrics of this transformation.
- Parameters
_ (Any) –
__ (Any) –
- Return type
- __call__(self, row)#
Map row.
- Parameters
row (pyspark.sql.Row) –
- Return type
List[pyspark.sql.Row]
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_domain(self)#
Return input domain for the measurement.
- Return type
- property output_metric(self)#
Distance metric on input domain.
- Return type
- stability_function(self, d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
- Raises
NotImplementedError – If not overridden.
- Return type
Any
- __or__(self, other: Transformation) Transformation #
- __or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class FlatMap(metric, row_transformer, max_num_rows)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowsTransformation
to each row and flattens the result.Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # duplicate_transform is a RowToRowsTransformation that outputs two copies >>> # of the input row. >>> duplicate_flat_map = FlatMap( ... metric=SymmetricDifference(), ... row_transformer=duplicate_transformation, ... max_num_rows=2, ... ) >>> # Apply transformation to data >>> duplicated_spark_dataframe = duplicate_flat_map(spark_dataframe) >>> print_sdf(duplicated_spark_dataframe) A B 0 a1 b1 1 a1 b1 2 a2 b1 3 a2 b1 4 a3 b2 5 a3 b2 6 a3 b2 7 a3 b2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
orIfGroupedBy
Output metric -
SymmetricDifference
orIfGroupedBy
(matches input metric)
>>> duplicate_flat_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_flat_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_flat_map.input_metric SymmetricDifference() >>> duplicate_flat_map.output_metric SymmetricDifference()
- Stability Guarantee:
For
SymmetricDifference()
IfGroupedBy(column, SumOf(SymmetricDifference()))
IfGroupedBy(column, RootSumOfSquared(SymmetricDifference()))
FlatMap
’sstability_function()
returns the d_in timesmax_num_rows
.>>> duplicate_flat_map.stability_function(1) 2 >>> duplicate_flat_map.stability_function(2) 4
For
IfGroupedBy(column, SymmetricDifference())
PublicJoin
’sstability_function()
returns d_in
- Parameters
metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.IfGroupedBy]) –
row_transformer (RowToRowsTransformation) –
max_num_rows (int) –
- __init__(metric, row_transformer, max_num_rows)#
Constructor.
- Parameters
metric (
SymmetricDifference
|IfGroupedBy
Union
[SymmetricDifference
,IfGroupedBy
]) – Distance metric for input and output DataFrames.row_transformer (
RowToRowsTransformation
RowToRowsTransformation
) – Transformation to apply to each row.max_num_rows (
int
int
) – The maximum number of rows to allow from row_transformer. If more rows are output, the additional rows are suppressed.
- property row_transformer(self)#
Returns transformation object used for mapping rows to lists of rows.
- Return type
- stability_function(self, d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- __call__(self, sdf)#
Flat Map.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_domain(self)#
Return input domain for the measurement.
- Return type
- property output_metric(self)#
Distance metric on input domain.
- Return type
- stability_relation(self, d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(self, other: Transformation) Transformation #
- __or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class GroupingFlatMap(output_metric, row_transformer, max_num_rows)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowsTransformation
to each row and flattens the result.A
GroupingFlatMap
is a special case of aFlatMap
that allows for a tighter stability analysis.The requirements are that
The row_transformer creates a single column that is augmented to the input
For each input row, the values in the created column are distinct (This is enforced by the implementation).
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # add_i_transformation is a RowToRowsTransformation that >>> # repeats each row 3 times, once with i=0, once with i=1, and once with i=2 >>> add_i_flat_map = GroupingFlatMap( ... output_metric=RootSumOfSquared(SymmetricDifference()), ... row_transformer=add_i_transformation, ... max_num_rows=3, ... ) >>> # Apply transformation to data >>> spark_dataframe_with_i = add_i_flat_map(spark_dataframe) >>> print_sdf(spark_dataframe_with_i) A B i 0 a1 b1 0 1 a1 b1 1 2 a1 b1 2 3 a2 b1 0 4 a2 b1 1 5 a2 b1 2 6 a3 b2 0 7 a3 b2 0 8 a3 b2 1 9 a3 b2 1 10 a3 b2 2 11 a3 b2 2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
Output metric -
IfGroupedBy
>>> add_i_flat_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> add_i_flat_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'i': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> add_i_flat_map.input_metric SymmetricDifference() >>> add_i_flat_map.output_metric IfGroupedBy(column='i', inner_metric=RootSumOfSquared(inner_metric=SymmetricDifference()))
- Stability Guarantee:
GroupingFlatMap
’sstability_function()
has two cases:If the inner metric is SumOf(SymmetricDifference()), d_out is
d_in * self.max_num_rows
If the inner metric is RootSumOfSquared(SymmetricDifference()), d_out is
d_in * sqrt(self.max_num_rows)
>>> add_i_flat_map.stability_function(1) sqrt(3) >>> add_i_flat_map.stability_function(2) 2*sqrt(3)
- Parameters
output_metric (Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared]) –
row_transformer (RowToRowsTransformation) –
max_num_rows (int) –
- __init__(output_metric, row_transformer, max_num_rows)#
Constructor.
- Parameters
output_metric (
SumOf
|RootSumOfSquared
Union
[SumOf
,RootSumOfSquared
]) – Inner metric forIfGroupedBy
output DataFrames.row_transformer (
RowToRowsTransformation
RowToRowsTransformation
) – Transformation to apply to each row.max_num_rows (
int
int
) – The maximum number of rows to allow from row_transformer.
- property max_num_rows(self)#
Returns the largest number of rows a single row can be mapped to.
- Return type
- property row_transformer(self)#
Returns transformation object used for mapping rows to lists of rows.
- Return type
- stability_function(self, d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- __call__(self, sdf)#
Flat Map.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_domain(self)#
Return input domain for the measurement.
- Return type
- property output_metric(self)#
Distance metric on input domain.
- Return type
- stability_relation(self, d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(self, other: Transformation) Transformation #
- __or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class Map(metric, row_transformer)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowTransformation
to each row in a Spark DataFrame.Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # rename_b_to_c_transformation is a RowToRowTransformation that >>> # renames the B column to C, and replaces b's in the values to c's >>> rename_b_to_c_map = Map( ... metric=SymmetricDifference(), ... row_transformer=rename_b_to_c_transformation, ... ) >>> # Apply transformation to data >>> renamed_spark_dataframe = rename_b_to_c_map(spark_dataframe) >>> print_sdf(renamed_spark_dataframe) A C 0 a1 c1 1 a2 c1 2 a3 c2 3 a3 c2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
,HammingDistance
, orIfGroupedBy
Output metric -
SymmetricDifference
,HammingDistance
, orIfGroupedBy
(matches input metric)
>>> rename_b_to_c_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_map.input_metric SymmetricDifference() >>> rename_b_to_c_map.output_metric SymmetricDifference()
- Stability Guarantee:
Map
’sstability_function()
returns d_in.>>> rename_b_to_c_map.stability_function(1) 1 >>> rename_b_to_c_map.stability_function(2) 2
- Parameters
metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance, tmlt.core.metrics.IfGroupedBy]) –
row_transformer (RowToRowTransformation) –
- __init__(metric, row_transformer)#
Constructor.
- Parameters
metric (
SymmetricDifference
|HammingDistance
|IfGroupedBy
Union
[SymmetricDifference
,HammingDistance
,IfGroupedBy
]) – Distance metric for input and output DataFrames.row_transformer (
RowToRowTransformation
RowToRowTransformation
) – Transformation to apply to each row.
- property row_transformer(self)#
Returns the transformation object used for mapping rows.
- Return type
- stability_function(self, d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- __call__(self, sdf)#
Return mapped DataFrame.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
- property input_domain(self)#
Return input domain for the measurement.
- Return type
- property input_metric(self)#
Distance metric on input domain.
- Return type
- property output_domain(self)#
Return input domain for the measurement.
- Return type
- property output_metric(self)#
Distance metric on input domain.
- Return type
- stability_relation(self, d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(self, other: Transformation) Transformation #
- __or__(self, other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.