session#

Interactive query evaluation using a differential privacy framework.

Session provides an interface for managing data sources and performing differentially private queries on them. A simple session with a single private datasource can be created using Session.from_dataframe(), or a more complex one with multiple datasources can be constructed using Session.Builder. Queries can then be evaluated on the data using Session.evaluate().

A Session is initialized with a PrivacyBudget, and ensures that queries evaluated on the private data do not consume more than this budget. By default, a Session enforces this privacy guarantee at the row level: the queries prevent an attacker from learning whether an individual row has been added or removed in each of the private tables, provided that the private data is not used elsewhere in the computation of the queries.

More details on the exact privacy promise provided by Session can be found in the Privacy promise topic guide.

Data#

SUPPORTED_SPARK_TYPES#

Set of Spark data types supported by Tumult Analytics.

Support for Spark data types in Analytics is currently as follows:

Type

Supported

LongType

yes

IntegerType

yes, by coercion to LongType

DoubleType

yes

FloatType

yes, by coercion to DoubleType

StringType

yes

DateType

yes

TimestampType

yes

Other Spark types

no

Columns with unsupported types must be dropped or converted to supported ones before loading the data into Analytics.

TYPE_COERCION_MAP :Dict[pyspark.sql.types.DataType, pyspark.sql.types.DataType]#

Mapping describing how Spark’s data types are coerced by Tumult Analytics.

Classes#

Session

Allows differentially private query evaluation on sensitive data.

class Session(accountant, public_sources, compiler=None)#

Allows differentially private query evaluation on sensitive data.

Sessions should not be directly constructed. Instead, they should be created using from_dataframe() or with a Builder.

Classes#

Builder

Builder for Session.

Methods#

from_dataframe()

Initializes a DP session from a Spark dataframe.

private_sources()

Returns the ids of the private sources.

public_sources()

Returns the ids of the public sources.

public_source_dataframes()

Returns a dictionary of public source dataframes.

remaining_privacy_budget()

Returns the remaining privacy_budget left in the session.

get_schema()

Returns the schema for any data source.

get_column_types()

Returns the column types for any data source.

get_grouping_column()

Returns an optional column that must be grouped by in this query.

add_public_dataframe()

Adds a public data source to the session.

evaluate()

Answers a query within the given privacy budget and returns a Spark dataframe.

create_view()

Create a new view from a transformation and possibly cache it.

delete_view()

Deletes a view and decaches it if it was cached.

partition_and_create()

Returns new sessions from a partition mapped to split name/source_id.

stop()

Close out this session, allowing other sessions to become active.

Parameters
class Builder#

Builder for Session.

build(self)#

Builds Session with specified configuration.

Return type

Session

with_privacy_budget(self, privacy_budget)#

Sets the privacy budget for the Session to be built.

Parameters

privacy_budget (tmlt.analytics.privacy_budget.PrivacyBudget) – Privacy Budget to be allocated to Session.

Return type

Session

with_private_dataframe(self, source_id, dataframe, stability=1, grouping_column=None)#

Adds a Spark DataFrame as a private source.

Not all Spark column types are supported in private sources; see SUPPORTED_SPARK_TYPES for information about which types are supported.

Parameters
  • source_id (str) – Source id for the private source dataframe.

  • dataframe (pyspark.sql.DataFrame) – Private source dataframe to perform queries on, corresponding to the source_id.

  • stability (Union[int, float]) – Maximum number of rows that may be added or removed if a single individual is added or removed. If using RhoZCDP and a grouping column, this should instead be the maximum number of rows that an individual can contribute to each group times the square root of the maximum number of groups each user can contribute to.

  • grouping_column (Optional[str]) – An input column that must be grouped on, like those generated when calling flat_map() with the grouping option set.

Return type

Session

with_public_dataframe(self, source_id, dataframe)#

Adds a Spark DataFrame as a public source.

Not all Spark column types are supported in public sources; see SUPPORTED_SPARK_TYPES for information about which types are supported.

Parameters
  • source_id (str) – Source id for the public data source.

  • dataframe (pyspark.sql.DataFrame) – Public DataFrame corresponding to the source id.

Return type

Session

__init__(accountant, public_sources, compiler=None)#

Initializes a DP session from a queryable.

This constructor is not intended to be used directly. Use Session.Builder or from_ constructors instead.

Parameters
Return type

None

classmethod from_dataframe(cls, privacy_budget, source_id, dataframe, stability=1, grouping_column=None)#

Initializes a DP session from a Spark dataframe.

Only one private data source is supported with this initialization method; if you need multiple data sources, use Builder.

Not all Spark column types are supported in private sources; see SUPPORTED_SPARK_TYPES for information about which types are supported.

Example

>>> spark_data.toPandas()
   A  B  X
0  0  1  0
1  1  0  1
2  1  2  1
>>> # Declare budget for the session.
>>> session_budget = PureDPBudget(1)
>>> # Set up Session
>>> sess = Session.from_dataframe(
...     privacy_budget=session_budget,
...     source_id="my_private_data",
...     dataframe=spark_data,
... )
>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
Parameters
  • privacy_budget (tmlt.analytics.privacy_budget.PrivacyBudget) – The total privacy budget allocated to this session.

  • source_id (str) – The source id for the private source dataframe.

  • dataframe (pyspark.sql.DataFrame) – The private source dataframe to perform queries on, corresponding to the source_id.

  • stability (Union[int, float]) – Maximum number of rows that may be added or removed if a single individual is added or removed. If using RhoZCDP and a grouping column, this should instead be the maximum number of rows that an individual can contribute to each group times the square root of the maximum number of groups each user can contribute to.

  • grouping_column (Optional[str]) – An input column that must be grouped on, like those generated when calling flat_map() with the grouping option set.

Return type

Session

property private_sources(self)#

Returns the ids of the private sources.

Return type

List[str]

property public_sources(self)#

Returns the ids of the public sources.

Return type

List[str]

property public_source_dataframes(self)#

Returns a dictionary of public source dataframes.

Return type

Dict[str, pyspark.sql.DataFrame]

property remaining_privacy_budget(self)#

Returns the remaining privacy_budget left in the session.

The type of the budget (e.g., PureDP or RhoZCDP) will be the same as the type of the budget the Session was initialized with.

Return type

Union[tmlt.analytics.privacy_budget.PureDPBudget, tmlt.analytics.privacy_budget.RhoZCDPBudget]

get_schema(self, source_id)#

Returns the schema for any data source.

This includes information on whether the columns are nullable.

Parameters

source_id (str) – The ID for the data source whose column types are being retrieved.

Return type

tmlt.analytics._schema.Schema

get_column_types(self, source_id)#

Returns the column types for any data source.

This does not include information on whether the columns are nullable.

Parameters

source_id (str) –

Return type

Dict[str, tmlt.analytics.query_builder.ColumnType]

get_grouping_column(self, source_id)#

Returns an optional column that must be grouped by in this query.

When a groupby aggregation is appended to any query on this table, it must include this column as a groupby column.

Parameters

source_id (str) – The ID for the data source whose grouping column is being retrieved.

Return type

Optional[str]

add_public_dataframe(self, source_id, dataframe)#

Adds a public data source to the session.

Not all Spark column types are supported in public sources; see SUPPORTED_SPARK_TYPES for information about which types are supported.

Example

>>> public_spark_data.toPandas()
   A  C
0  0  0
1  0  1
2  1  1
3  1  2
>>> # Add public data
>>> sess.add_public_dataframe(
...     source_id="my_public_data", dataframe=public_spark_data
... )
>>> sess.public_sources
['my_public_data']
>>> sess.get_schema('my_public_data').column_types 
{'A': 'VARCHAR', 'C': 'INTEGER'}
Parameters
  • source_id (str) – The name of the public data source.

  • dataframe (pyspark.sql.DataFrame) – The public data source corresponding to the source_id.

evaluate(self, query_expr, privacy_budget)#

Answers a query within the given privacy budget and returns a Spark dataframe.

The type of privacy budget that you use must match the type your Session was initialized with (i.e., you cannot evaluate a query using RhoZCDPBudget if the Session was initialized with a PureDPBudget, and vice versa).

Example

>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> sess.remaining_privacy_budget
PureDPBudget(epsilon=1)
>>> # Evaluate Queries
>>> filter_query = QueryBuilder("my_private_data").filter("A > 0")
>>> count_query = filter_query.groupby(KeySet.from_dict({"X": [0, 1]})).count()
>>> count_answer = sess.evaluate(
...     query_expr=count_query,
...     privacy_budget=PureDPBudget(0.5),
... )
>>> sum_query = filter_query.sum(column="B", low=0, high=1)
>>> sum_answer = sess.evaluate(
...     query_expr=sum_query,
...     privacy_budget=PureDPBudget(0.5),
... )
>>> count_answer # TODO(#798): Seed randomness and change to toPandas()
DataFrame[X: bigint, count: bigint]
>>> sum_answer # TODO(#798): Seed randomness and change to toPandas()
DataFrame[B_sum: bigint]
Parameters
Return type

pyspark.sql.DataFrame

create_view(self, query_expr, source_id, cache)#

Create a new view from a transformation and possibly cache it.

Example

>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> public_spark_data.toPandas()
   A  C
0  0  0
1  0  1
2  1  1
3  1  2
>>> sess.add_public_dataframe("my_public_data", public_spark_data)
>>> # Create a view
>>> join_query = (
...     QueryBuilder("my_private_data")
...     .join_public("my_public_data")
...     .select(["A", "B", "C"])
... )
>>> sess.create_view(
...     join_query,
...     source_id="private_public_join",
...     cache=True
... )
>>> sess.private_sources
['my_private_data', 'private_public_join']
>>> sess.get_schema("private_public_join").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'C': 'INTEGER'}
>>> # Delete the view
>>> sess.delete_view("private_public_join")
>>> sess.private_sources
['my_private_data']
Parameters
delete_view(self, source_id)#

Deletes a view and decaches it if it was cached.

Parameters

source_id (str) – The name of the view.

partition_and_create(self, source_id, privacy_budget, attr_name, splits)#

Returns new sessions from a partition mapped to split name/source_id.

The type of privacy budget that you use must match the type your Session was initialized with (i.e., you cannot use a RhoZCDPBudget to partition your Session if the Session was created using a PureDPBudget, and vice versa).

The sessions returned must be used in the order that they were created. Using this session again or calling stop() will stop all partition sessions.

Example

This example partitions the session into two sessions, one with A = “0” and one with A = “1”. Due to parallel composition, each of these sessions are given the same budget, while only one count of that budget is deducted from session.

>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> sess.remaining_privacy_budget
PureDPBudget(epsilon=1)
>>> # Partition the Session
>>> new_sessions = sess.partition_and_create(
...     "my_private_data",
...     privacy_budget=PureDPBudget(0.75),
...     attr_name="A",
...     splits={"part0":"0", "part1":"1"}
... )
>>> sess.remaining_privacy_budget
PureDPBudget(epsilon=0.25)
>>> new_sessions["part0"].private_sources
['part0']
>>> new_sessions["part0"].get_schema("part0").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> new_sessions["part0"].remaining_privacy_budget
PureDPBudget(epsilon=0.75)
>>> new_sessions["part1"].private_sources
['part1']
>>> new_sessions["part1"].get_schema("part1").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> new_sessions["part1"].remaining_privacy_budget
PureDPBudget(epsilon=0.75)

When you are done with a new session, you can use the stop() method to allow the next one to become active:

>>> new_sessions["part0"].stop()
>>> new_sessions["part1"].private_sources
['part1']
>>> count_query = QueryBuilder("part1").count()
>>> count_answer = new_sessions["part1"].evaluate(
...     count_query,
...     PureDPBudget(0.75),
... )
>>> count_answer.toPandas() 
   count
0    ...
Parameters
  • source_id (str) – The private source to partition.

  • privacy_budget (tmlt.analytics.privacy_budget.PrivacyBudget) – Amount of privacy budget to pass to each new session.

  • attr_name (str) – The name of the column partitioning on.

  • splits (Union[Dict[str, str], Dict[str, int]]) – Mapping of split name to value of partition. Split name is source_id in new session.

Return type

Dict[str, Session]

stop(self)#

Close out this session, allowing other sessions to become active.

Return type

None