map#
Transformations for applying user defined maps to Spark DataFrames.
See the architecture overview for more information on transformations.
Classes#
Transforms a single row into a different row using a user defined function. |
|
Transforms a single row into multiple rows using a user defined function. |
|
Transforms a set of rows into another set of rows using a user-defined function. |
|
Applies a |
|
Applies a |
|
Applies a |
|
Applies a |
- class RowToRowTransformation(input_domain, output_domain, trusted_f, augment)#
Bases:
tmlt.core.transformations.base.Transformation
Transforms a single row into a different row using a user defined function.
Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is called on a row.
Examples
augment=False:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def rename_b_to_c(row: Row) -> Row: ... return Row(A=row.A, C=row.B.replace("b", "c")) >>> rename_b_to_c_transformation = RowToRowTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... trusted_f=rename_b_to_c, ... augment=False, ... ) >>> transformed_row = rename_b_to_c_transformation(spark_row) >>> transformed_row Row(A='a1', C='c1')
augment=True:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def constant_c_column(row: Row) -> Row: ... return Row(C="c") >>> add_constant_c_column_transformation = RowToRowTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "C": SparkStringColumnDescriptor(), ... } ... ), ... trusted_f=constant_c_column, ... augment=True, ... ) >>> transformed_and_augmented_row = add_constant_c_column_transformation(spark_row) >>> transformed_and_augmented_row Row(A='a1', B='b1', C='c')
- Transformation Contract:
Input domain -
SparkRowDomain
Output domain -
SparkRowDomain
Input metric -
NullMetric
Output metric -
NullMetric
>>> rename_b_to_c_transformation.input_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_transformation.output_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_transformation.input_metric NullMetric() >>> rename_b_to_c_transformation.output_metric NullMetric()
- Stability Guarantee:
RowToRowsTransformation
is not stable! Itsstability_relation()
always returns False, and itsstability_function()
always raisesNotImplementedError
.
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkRowDomain)
output_domain (tmlt.core.domains.spark_domains.SparkRowDomain)
trusted_f (Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]])
augment (bool)
- property trusted_f: Callable[[pyspark.sql.Row], pyspark.sql.Row | Dict[str, Any]]#
Returns function to be applied to each row.
Note
Returned function object should not be mutated.
- Return type:
Callable[[pyspark.sql.Row], Union[pyspark.sql.Row, Dict[str, Any]]]
- property augment: bool#
Returns whether input attributes need to be augmented to the output.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(input_domain, output_domain, trusted_f, augment)#
Constructor.
- Parameters:
input_domain (
SparkRowDomain
) – Domain for the input row.output_domain (
SparkRowDomain
) – Domain for the output row.trusted_f (
Callable
[[Row
],Union
[Row
,Dict
[str
,Any
]]]) – Transformation function to apply to input row.augment (
bool
) – If True, the output oftrusted_f
will be augmented by the existing values from the input row. In that case,trusted_f
must not output values for any of the original columns.
- stability_relation(_, __)#
Returns False.
No values are valid for input/output metrics of this transformation.
- Parameters:
_ (Any)
__ (Any)
- Return type:
- __call__(row)#
Map row.
- Parameters:
row (pyspark.sql.Row)
- Return type:
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
- Raises:
NotImplementedError – If not overridden.
- Return type:
Any
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class RowToRowsTransformation(input_domain, output_domain, trusted_f, augment)#
Bases:
tmlt.core.transformations.base.Transformation
Transforms a single row into multiple rows using a user defined function.
Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is called on a row.
Examples
augment=False:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> # Create user defined function >>> def duplicate(row: Row) -> List[Row]: ... return [row, row] >>> # Create transformation >>> duplicate_transformation = RowToRowsTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=ListDomain( ... SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ) ... ), ... trusted_f=duplicate, ... augment=False, ... ) >>> transformed_rows = duplicate_transformation(spark_row) >>> transformed_rows [Row(A='a1', B='b1'), Row(A='a1', B='b1')]
augment=True:
>>> # Example input >>> spark_row Row(A='a1', B='b1') >>> def counting_i_column(row: Row) -> List[Row]: ... return [Row(i=i) for i in range(3)] >>> add_counting_i_column_transformation = RowToRowsTransformation( ... input_domain=SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... } ... ), ... output_domain=ListDomain( ... SparkRowDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "i": SparkIntegerColumnDescriptor(), ... } ... ) ... ), ... trusted_f=counting_i_column, ... augment=True, ... ) >>> transformed_and_augmented_rows = add_counting_i_column_transformation(spark_row) >>> transformed_and_augmented_rows [Row(A='a1', B='b1', i=0), Row(A='a1', B='b1', i=1), Row(A='a1', B='b1', i=2)]
- Transformation Contract:
Input domain -
SparkRowDomain
Output domain -
ListDomain
ofSparkRowDomain
Input metric -
NullMetric
Output metric -
NullMetric
>>> duplicate_transformation.input_domain SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_transformation.output_domain ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None) >>> duplicate_transformation.input_metric NullMetric() >>> duplicate_transformation.output_metric NullMetric()
- Stability Guarantee:
RowToRowsTransformation
is not stable! Itsstability_relation()
always returns False, and itsstability_function()
always raisesNotImplementedError
.
- Parameters:
input_domain (tmlt.core.domains.spark_domains.SparkRowDomain)
output_domain (tmlt.core.domains.collections.ListDomain)
trusted_f (Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]])
augment (bool)
- property trusted_f: Callable[[pyspark.sql.Row], List[pyspark.sql.Row] | List[Dict[str, Any]]]#
Returns function to be applied to each row.
Note
Returned function object should not be mutated.
- Return type:
Callable[[pyspark.sql.Row], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]
- property augment: bool#
Returns whether input attributes need to be augmented to the output.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(input_domain, output_domain, trusted_f, augment)#
Constructor.
- Parameters:
input_domain (
SparkRowDomain
) – Domain for the input row.output_domain (
ListDomain
) – Domain for the output rows.trusted_f (
Callable
[[Row
],Union
[List
[Row
],List
[Dict
[str
,Any
]]]]) – Transformation function to apply to input row.augment (
bool
) – If True, the output oftrusted_f
will be augmented by the existing values from the input row. In that case,trusted_f
must not output values for any of the original columns.
- stability_relation(_, __)#
Returns False.
No values are valid for input/output metrics of this transformation.
- Parameters:
_ (Any)
__ (Any)
- Return type:
- __call__(row)#
Map row.
- Parameters:
row (pyspark.sql.Row)
- Return type:
List[pyspark.sql.Row]
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
- Raises:
NotImplementedError – If not overridden.
- Return type:
Any
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class RowsToRowsTransformation(input_domain, output_domain, trusted_f)#
Bases:
tmlt.core.transformations.base.Transformation
Transforms a set of rows into another set of rows using a user-defined function.
Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is called on a group of rows.
Examples
>>> # Example input >>> input [Row(A='a1', B='b1'), Row(A='a2', B='b2')] >>> # Create user defined function >>> def merge(rows: List[Row]) -> List[Row]: ... return [Row(A=' '.join(r.A for r in rows), B=' '.join(r.B for r in rows))] >>> # Create transformation >>> merge_transformation = RowsToRowsTransformation( ... input_domain=ListDomain(SparkRowDomain({ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... })), ... output_domain=ListDomain(SparkRowDomain({ ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... })), ... trusted_f=merge, ... ) >>> transformed_rows = merge_transformation(input) >>> transformed_rows [Row(A='a1 a2', B='b1 b2')]
- Transformation Contract:
Input domain -
ListDomain
ofSparkRowDomain
Output domain -
ListDomain
ofSparkRowDomain
Input metric -
NullMetric
Output metric -
NullMetric
>>> merge_transformation.input_domain ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None) >>> merge_transformation.output_domain ListDomain(element_domain=SparkRowDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}), length=None) >>> merge_transformation.input_metric NullMetric() >>> merge_transformation.output_metric NullMetric()
- Stability Guarantee:
RowsToRowsTransformation
is not stable! Itsstability_relation()
always returns False, and itsstability_function()
always raisesNotImplementedError
.
- Parameters:
input_domain (tmlt.core.domains.collections.ListDomain)
output_domain (tmlt.core.domains.collections.ListDomain)
trusted_f (Callable[[List[pyspark.sql.Row]], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]])
- property trusted_f: Callable[[List[pyspark.sql.Row]], List[pyspark.sql.Row] | List[Dict[str, Any]]]#
The function to be applied to each group of rows.
Note
Returned function object should not be mutated.
- Return type:
Callable[[List[pyspark.sql.Row]], Union[List[pyspark.sql.Row], List[Dict[str, Any]]]]
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(input_domain, output_domain, trusted_f)#
Constructor.
- stability_relation(_, __)#
Returns False.
No values are valid for input/output metrics of this transformation.
- Parameters:
_ (Any)
__ (Any)
- Return type:
- __call__(rows)#
Map row.
- Parameters:
rows (List[pyspark.sql.Row])
- Return type:
List[pyspark.sql.Row]
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
- Raises:
NotImplementedError – If not overridden.
- Return type:
Any
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class FlatMap(metric, row_transformer, max_num_rows)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowsTransformation
to each row and flattens the result.Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is applied.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # duplicate_transform is a RowToRowsTransformation that outputs two copies >>> # of the input row. >>> duplicate_flat_map = FlatMap( ... metric=SymmetricDifference(), ... row_transformer=duplicate_transformation, ... max_num_rows=2, ... ) >>> # Apply transformation to data >>> duplicated_spark_dataframe = duplicate_flat_map(spark_dataframe) >>> print_sdf(duplicated_spark_dataframe) A B 0 a1 b1 1 a1 b1 2 a2 b1 3 a2 b1 4 a3 b2 5 a3 b2 6 a3 b2 7 a3 b2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
orIfGroupedBy
Output metric -
SymmetricDifference
orIfGroupedBy
(matches input metric)
>>> duplicate_flat_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_flat_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> duplicate_flat_map.input_metric SymmetricDifference() >>> duplicate_flat_map.output_metric SymmetricDifference()
- Stability Guarantee:
For
SymmetricDifference()
IfGroupedBy(column, SumOf(SymmetricDifference()))
IfGroupedBy(column, RootSumOfSquared(SymmetricDifference()))
FlatMap
’sstability_function()
returns thed_in
timesmax_num_rows
. Ifmax_num_rows
is None, it returns infinity.>>> duplicate_flat_map.stability_function(1) 2 >>> duplicate_flat_map.stability_function(2) 4
For
IfGroupedBy(column, SymmetricDifference())
FlatMap
’sstability_function()
returnsd_in
.
- Parameters:
metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.IfGroupedBy])
row_transformer (RowToRowsTransformation)
max_num_rows (Optional[int])
- property max_num_rows: int | None#
Returns the enforced stability of this transformation, or None.
- Return type:
Optional[int]
- property row_transformer: RowToRowsTransformation#
Returns transformation object used for mapping rows to lists of rows.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(metric, row_transformer, max_num_rows)#
Constructor.
- Parameters:
metric (
Union
[SymmetricDifference
,IfGroupedBy
]) – Distance metric for input and output DataFrames.row_transformer (
RowToRowsTransformation
) – Transformation to apply to each row.max_num_rows (
Optional
[int
]) – The maximum number of rows to allow fromrow_transformer
. If more rows are output, the additional rows are suppressed. If this value is None, the transformation will not impose a limit on the number of rows. None is only allowed if the metric isIfGroupedBy(SymmetricDifference())
.
- stability_function(d_in)#
Returns the smallest
d_out
satisfied by the transformation.See Tumult Core Architecture for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type:
- __call__(sdf)#
Flat Map.
- Parameters:
sdf (pyspark.sql.DataFrame)
- Return type:
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type:
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class GroupingFlatMap(output_metric, row_transformer, max_num_rows)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowsTransformation
to each row and flattens the result.A
GroupingFlatMap
is a special case of aFlatMap
that has different input and output metrics (aFlatMap
’s input and output metrics are always identical) and allows for a tighter stability analysis.Compared to a regular
FlatMap
, aGroupingFlatMap
also requires that:The
row_transformer
creates a single column that is augmented to the inputFor each input row, the
row_transformer
creates no duplicate values in the created column (This is enforced by the implementation).
Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is applied.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # add_i_transformation is a RowToRowsTransformation that >>> # repeats each row 3 times, once with i=0, once with i=1, and once with i=2 >>> add_i_flat_map = GroupingFlatMap( ... output_metric=RootSumOfSquared(SymmetricDifference()), ... row_transformer=add_i_transformation, ... max_num_rows=3, ... ) >>> # Apply transformation to data >>> spark_dataframe_with_i = add_i_flat_map(spark_dataframe) >>> print_sdf(spark_dataframe_with_i) A B i 0 a1 b1 0 1 a1 b1 1 2 a1 b1 2 3 a2 b1 0 4 a2 b1 1 5 a2 b1 2 6 a3 b2 0 7 a3 b2 0 8 a3 b2 1 9 a3 b2 1 10 a3 b2 2 11 a3 b2 2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
Output metric -
IfGroupedBy
>>> add_i_flat_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> add_i_flat_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'i': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> add_i_flat_map.input_metric SymmetricDifference() >>> add_i_flat_map.output_metric IfGroupedBy(column='i', inner_metric=RootSumOfSquared(inner_metric=SymmetricDifference()))
- Stability Guarantee:
GroupingFlatMap
supports two different output metrics:IfGroupedBy(column=’new_column’, inner_metric=SumOf(SummetricDifference()))
IfGroupedBy(column=’new_column’, inner_metric=RootSumOfSquared(SymmetricDifference()))
The meth:~.stability_function is different depending on the output metric:
If the inner metric is
SumOf(SymmetricDifference())
,d_out
isd_in * self.max_num_rows
If the inner metric is
RootSumOfSquared(SymmetricDifference())
, we can use the added structure of therow_transformer
to achieve a tighter analysis. We know that for each input row, the function will produce at most one output row per value of the new column, so in total we can produce up tod_in
rows for each of up toself.max_num_rows
values of the new column. Therefore, underRootSumOfSquared
,d_out
isd_in * sqrt(self.max_num_rows)
>>> add_i_flat_map.stability_function(1) sqrt(3) >>> add_i_flat_map.stability_function(2) 2*sqrt(3)
- Parameters:
output_metric (Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared])
row_transformer (RowToRowsTransformation)
max_num_rows (int)
- property max_num_rows: int#
Returns the largest number of rows a single row can be mapped to.
- Return type:
- property row_transformer: RowToRowsTransformation#
Returns transformation object used for mapping rows to lists of rows.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(output_metric, row_transformer, max_num_rows)#
Constructor.
- Parameters:
output_metric (
Union
[SumOf
,RootSumOfSquared
]) – Inner metric forIfGroupedBy
output DataFrames.row_transformer (
RowToRowsTransformation
) – Transformation to apply to each row.max_num_rows (
int
) – The maximum number of rows to allow fromrow_transformer
.
- stability_function(d_in)#
Returns the smallest
d_out
satisfied by the transformation.See Tumult Core Architecture for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type:
- __call__(sdf)#
Flat Map.
- Parameters:
sdf (pyspark.sql.DataFrame)
- Return type:
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type:
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class Map(metric, row_transformer)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowToRowTransformation
to each row in a Spark DataFrame.Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the RowToRowTransformation is called on a row
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B 0 a1 b1 1 a2 b1 2 a3 b2 3 a3 b2 >>> # rename_b_to_c_transformation is a RowToRowTransformation that >>> # renames the B column to C, and replaces b's in the values to c's >>> rename_b_to_c_map = Map( ... metric=SymmetricDifference(), ... row_transformer=rename_b_to_c_transformation, ... ) >>> # Apply transformation to data >>> renamed_spark_dataframe = rename_b_to_c_map(spark_dataframe) >>> print_sdf(renamed_spark_dataframe) A C 0 a1 c1 1 a2 c1 2 a3 c2 3 a3 c2
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
SymmetricDifference
,HammingDistance
, orIfGroupedBy
Output metric -
SymmetricDifference
,HammingDistance
, orIfGroupedBy
(matches input metric)
>>> rename_b_to_c_map.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_map.output_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'C': SparkStringColumnDescriptor(allow_null=False)}) >>> rename_b_to_c_map.input_metric SymmetricDifference() >>> rename_b_to_c_map.output_metric SymmetricDifference()
- Stability Guarantee:
Map
’sstability_function()
returnsd_in
.>>> rename_b_to_c_map.stability_function(1) 1 >>> rename_b_to_c_map.stability_function(2) 2
- Parameters:
metric (Union[tmlt.core.metrics.SymmetricDifference, tmlt.core.metrics.HammingDistance, tmlt.core.metrics.IfGroupedBy])
row_transformer (RowToRowTransformation)
- property row_transformer: RowToRowTransformation#
Returns the transformation object used for mapping rows.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(metric, row_transformer)#
Constructor.
- Parameters:
metric (
Union
[SymmetricDifference
,HammingDistance
,IfGroupedBy
]) – Distance metric for input and output DataFrames.row_transformer (
RowToRowTransformation
) – Transformation to apply to each row.
- stability_function(d_in)#
Returns the smallest
d_out
satisfied by the transformation.See Tumult Core Architecture for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type:
- __call__(sdf)#
Return mapped DataFrame.
- Parameters:
sdf (pyspark.sql.DataFrame)
- Return type:
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type:
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- class FlatMapByKey(metric, row_transformer)#
Bases:
tmlt.core.transformations.base.Transformation
Applies a
RowsToRowsTransformation
to rows, grouped by key.Note
The transformation function must not contain any objects that directly or indirectly reference Spark DataFrames or Spark contexts. If the function does contain an object that directly or indirectly references a Spark DataFrame or a Spark context, an error will occur when the transformation is applied.
Example
>>> # Example input >>> print_sdf(spark_dataframe) id v 0 a 1 1 b 2 2 c 3 3 c 4 >>> # sum_by_key_transformation is a RowsToRowsTransformation that sums column v >>> # for each ID group. >>> sum_by_key = FlatMapByKey( ... metric=IfGroupedBy("id", SymmetricDifference()), ... row_transformer=sum_by_key_transformation, ... ) >>> # Apply transformation to data >>> transformed_spark_dataframe = sum_by_key(spark_dataframe) >>> print_sdf(transformed_spark_dataframe) id sum 0 a 1 1 b 2 2 c 7
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
SparkDataFrameDomain
Input metric -
IfGroupedBy
with inner metricSymmetricDifference
Output metric -
IfGroupedBy
(matches input metric)
>>> sum_by_key.input_domain SparkDataFrameDomain(schema={'id': SparkStringColumnDescriptor(allow_null=False), 'v': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> sum_by_key.output_domain SparkDataFrameDomain(schema={'id': SparkStringColumnDescriptor(allow_null=False), 'sum': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> sum_by_key.input_metric IfGroupedBy(column='id', inner_metric=SymmetricDifference()) >>> sum_by_key.output_metric IfGroupedBy(column='id', inner_metric=SymmetricDifference())
- Stability Guarantee:
FlatMapByKey
’sstability_function()
returnsd_in
.
- Parameters:
metric (tmlt.core.metrics.IfGroupedBy)
row_transformer (RowsToRowsTransformation)
- property row_transformer: RowsToRowsTransformation#
Returns transformation object used for mapping rows to lists of rows.
- Return type:
- property input_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property input_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- property output_domain: tmlt.core.domains.base.Domain#
Return input domain for the measurement.
- Return type:
- property output_metric: tmlt.core.metrics.Metric#
Distance metric on input domain.
- Return type:
- __init__(metric, row_transformer)#
Constructor.
- Parameters:
metric (
IfGroupedBy
) – Distance metric for input and output DataFrames.row_transformer (
RowsToRowsTransformation
) – Transformation to apply to each group of rows. This transformation should have the key column in its input domain, but it must not include the key column in its output domain.
- stability_function(d_in)#
Returns the smallest
d_out
satisfied by the transformation.See Tumult Core Architecture for more information.
- Parameters:
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type:
- __call__(sdf)#
Apply transformation.
- Parameters:
sdf (pyspark.sql.DataFrame)
- Return type:
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters:
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type:
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.