partition#
Transformations for partitioning Spark DataFrames.
Classes#
Base class for partition transformations. |
|
Partition a Spark DataFrame by a list of keys and corresponding domain. |
- class Partition(input_domain, input_metric, output_metric, num_partitions=None)#
Bases:
tmlt.core.transformations.base.Transformation
Base class for partition transformations.
- Parameters
input_domain (tmlt.core.domains.base.Domain) –
input_metric (Union[tmlt.core.metrics.IfGroupedBy, tmlt.core.metrics.SymmetricDifference]) –
output_metric (Union[tmlt.core.metrics.SumOf, tmlt.core.metrics.RootSumOfSquared]) –
num_partitions (Optional[int]) –
- __init__(input_domain, input_metric, output_metric, num_partitions=None)#
Constructor.
- Parameters
input_domain (
Domain
Domain
) – Domain of inputs to transformation.input_metric (
IfGroupedBy
|SymmetricDifference
Union
[IfGroupedBy
,SymmetricDifference
]) – Distance metric for inputs.output_metric (
SumOf
|RootSumOfSquared
Union
[SumOf
,RootSumOfSquared
]) – Metric for output list.num_partitions (
int
|None
Optional
[int
] (default:None
)) – Number of partitions produced by the transformation.
- property num_partitions#
Returns the number of partitions produced by the transformation.
If this number is not known, this returns None.
- Return type
Optional[int]
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_domain#
Return input domain for the measurement.
- Return type
- property output_metric#
Distance metric on input domain.
- Return type
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.
- abstract __call__(data)#
Perform transformation.
- Parameters
data (Any) –
- Return type
Any
- class PartitionByKeys(input_domain, input_metric, use_l2, keys, list_values)#
Bases:
Partition
Partition a Spark DataFrame by a list of keys and corresponding domain.
Example
>>> # Example input >>> print_sdf(spark_dataframe) A B X 0 a1 b1 2 1 a1 b1 3 2 a1 b1 5 3 a1 b2 -1 4 a1 b2 4 5 a2 b1 -5 >>> # notice that ("a2", "b1") is skipped, >>> # and that there were no values for ("a2", "b2") in the input data >>> list_values = [("a1", "b1"), ("a1", "b2"), ("a2", "b2")] >>> # Create the transformation >>> partition = PartitionByKeys( ... input_domain=SparkDataFrameDomain( ... { ... "A": SparkStringColumnDescriptor(), ... "B": SparkStringColumnDescriptor(), ... "X": SparkIntegerColumnDescriptor(), ... }, ... ), ... input_metric=SymmetricDifference(), ... use_l2=False, ... keys=["A", "B"], ... list_values=list_values, ... ) >>> # Apply transformation to data >>> partitioned_dataframes = partition(spark_dataframe) >>> for list_value, dataframe in zip(list_values, partitioned_dataframes): ... print(list_value) ... print_sdf(dataframe) ('a1', 'b1') A B X 0 a1 b1 2 1 a1 b1 3 2 a1 b1 5 ('a1', 'b2') A B X 0 a1 b2 -1 1 a1 b2 4 ('a2', 'b2') Empty DataFrame Columns: [A, B, X] Index: []
- Transformation Contract:
Input domain -
SparkDataFrameDomain
Output domain -
ListDomain
ofSparkDataFrameDomain
Input metric -
SymmetricDifference
orIfGroupedBy
Output metric -
SumOf
orRootSumOfSquared
ofSymmetricDifference
orIfGroupedBy
>>> partition.input_domain SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)}) >>> partition.output_domain ListDomain(element_domain=SparkDataFrameDomain(schema={'A': SparkStringColumnDescriptor(allow_null=False), 'B': SparkStringColumnDescriptor(allow_null=False), 'X': SparkIntegerColumnDescriptor(allow_null=False, size=64)}), length=3) >>> partition.input_metric SymmetricDifference() >>> partition.output_metric SumOf(inner_metric=SymmetricDifference())
- Stability Guarantee:
PartitionByKeys
’stability_function()
returns d_in.>>> partition.stability_function(1) 1 >>> partition.stability_function(2) 2
# Returns list of column names to partition on.
Returns list of values corresponding to the partition keys.
Returns a list of partitions of input DataFrame.
Returns the number of partitions produced by the transformation.
Returns the smallest d_out satisfied by the transformation.
Return input domain for the measurement.
Distance metric on input domain.
Return input domain for the measurement.
Distance metric on input domain.
Returns True only if close inputs produce close outputs.
Return this transformation chained with another component.
- Parameters
input_domain (tmlt.core.domains.spark_domains.SparkDataFrameDomain) –
input_metric (Union[tmlt.core.metrics.IfGroupedBy, tmlt.core.metrics.SymmetricDifference]) –
use_l2 (bool) –
keys (List[str]) –
list_values (Sequence[Tuple]) –
- __init__(input_domain, input_metric, use_l2, keys, list_values)#
Constructor.
- Parameters
input_domain (
SparkDataFrameDomain
SparkDataFrameDomain
) – Domain of input DataFrames.input_metric (
IfGroupedBy
|SymmetricDifference
Union
[IfGroupedBy
,SymmetricDifference
]) – Distance metric for input DataFrames.use_l2 (
bool
bool
) – If True, useRootSumOfSquared
instead ofSumOf
in the output metric.keys (
List
[str
]List
[str
]) – List of column names to partition by.list_values (
Sequence
[Tuple
]Sequence
[Tuple
]) – Domain for key columns in the DataFrame. This is a list of unique n-tuples, where each value is a tuple corresponds to a key.
- property list_values#
Returns list of values corresponding to the partition keys.
- Return type
List[Tuple]
- __call__(sdf)#
Returns a list of partitions of input DataFrame.
- Parameters
sdf (pyspark.sql.DataFrame) –
- Return type
List[pyspark.sql.DataFrame]
- property num_partitions#
Returns the number of partitions produced by the transformation.
If this number is not known, this returns None.
- Return type
Optional[int]
- stability_function(d_in)#
Returns the smallest d_out satisfied by the transformation.
See the privacy and stability tutorial for more information. # TODO(#1320)
- Parameters
d_in (tmlt.core.utils.exact_number.ExactNumberInput) – Distance between inputs under input_metric.
- Return type
- property input_domain#
Return input domain for the measurement.
- Return type
- property input_metric#
Distance metric on input domain.
- Return type
- property output_domain#
Return input domain for the measurement.
- Return type
- property output_metric#
Distance metric on input domain.
- Return type
- stability_relation(d_in, d_out)#
Returns True only if close inputs produce close outputs.
See the privacy and stability tutorial (add link?) for more information.
- Parameters
d_in (Any) – Distance between inputs under input_metric.
d_out (Any) – Distance between outputs under output_metric.
- Return type
- __or__(other: Transformation) Transformation #
- __or__(other: tmlt.core.measurements.base.Measurement) tmlt.core.measurements.base.Measurement
Return this transformation chained with another component.