session#

Interactive query evaluation using a differential privacy framework.

Session provides an interface for managing data sources and performing differentially private queries on them. A simple session with a single private datasource can be created using Session.from_dataframe(), or a more complex one with multiple datasources can be constructed using Session.Builder. Queries can then be evaluated on the data using Session.evaluate().

A Session is initialized with a PrivacyBudget, and ensures that queries evaluated on the private data do not consume more than this budget. By default, a Session enforces this privacy guarantee at the row level: the queries prevent an attacker from learning whether an individual row has been added or removed in each of the private tables, provided that the private data is not used elsewhere in the computation of the queries.

More details on the exact privacy promise provided by Session can be found in the Privacy promise topic guide.

Data#

SUPPORTED_SPARK_TYPES#

Set of Spark data types supported by Tumult Analytics.

Support for Spark data types in Analytics is currently as follows:

Type

Supported

LongType

yes

IntegerType

yes, by coercion to LongType

DoubleType

yes

FloatType

yes, by coercion to DoubleType

StringType

yes

DateType

yes

TimestampType

yes

Other Spark types

no

Columns with unsupported types must be dropped or converted to supported ones before loading the data into Analytics.

TYPE_COERCION_MAP: Dict[pyspark.sql.types.DataType, pyspark.sql.types.DataType]#

Mapping describing how Spark’s data types are coerced by Tumult Analytics.

Classes#

class Session(accountant, public_sources)#

Allows differentially private query evaluation on sensitive data.

Sessions should not be directly constructed. Instead, they should be created using from_dataframe() or with a Builder.

Classes#

Builder

Builder for Session.

Properties#

private_sources

Returns the IDs of the private sources.

public_sources

Returns the IDs of the public sources.

public_source_dataframes

Returns a dictionary of public source DataFrames.

remaining_privacy_budget

Returns the remaining privacy_budget left in the session.

Methods#

from_dataframe()

Initializes a DP session from a Spark dataframe.

describe()

Describes this session, or one of its tables, or the result of a query.

get_schema()

Returns the schema for any data source.

get_column_types()

Returns the column types for any data source.

get_grouping_column()

Returns an optional column that must be grouped by in this query.

get_id_column()

Returns the ID column of a table, if it has one.

get_id_space()

Returns the ID space of a table, if it has one.

add_public_dataframe()

Adds a public data source to the session.

evaluate()

Answers a query within the given privacy budget and returns a Spark dataframe.

create_view()

Creates a new view from a transformation and possibly cache it.

delete_view()

Deletes a view and decaches it if it was cached.

partition_and_create()

Returns new sessions from a partition mapped to split name/source_id.

stop()

Closes out this session, allowing other sessions to become active.

Parameters:
  • accountant (tmlt.core.measurements.interactive_measurements.PrivacyAccountant) –

  • public_sources (Dict[str, pyspark.sql.DataFrame]) –

class Builder#

Builder for Session.

build()#

Builds Session with specified configuration.

Return type:

Session

with_private_dataframe(source_id, dataframe, protected_change)#

Adds a Spark DataFrame as a private source.

Not all Spark column types are supported in private sources; see tmlt.analytics.session.SUPPORTED_SPARK_TYPES for information about which types are supported.

Parameters:
with_public_dataframe(source_id, dataframe)#

Adds a public dataframe.

Parameters:
with_id_space(id_space)#

Adds an identifier space.

This defines a space of identifiers that map 1-to-1 to the identifiers being protected by a table with the AddRowsWithID protected change. Any table with such a protected change must be a member of some identifier space.

Parameters:

id_space (str) –

with_privacy_budget(privacy_budget)#

Set the privacy budget for the object being built.

Parameters:

privacy_budget (tmlt.analytics.privacy_budget.PrivacyBudget) –

property private_sources: List[str]#

Returns the IDs of the private sources.

Return type:

List[str]

property public_sources: List[str]#

Returns the IDs of the public sources.

Return type:

List[str]

property public_source_dataframes: Dict[str, pyspark.sql.DataFrame]#

Returns a dictionary of public source DataFrames.

Return type:

Dict[str, pyspark.sql.DataFrame]

property remaining_privacy_budget: tmlt.analytics.privacy_budget.PrivacyBudget#

Returns the remaining privacy_budget left in the session.

The type of the budget (e.g., PureDP or rho-zCDP) will be the same as the type of the budget the Session was initialized with.

Return type:

tmlt.analytics.privacy_budget.PrivacyBudget

classmethod from_dataframe(privacy_budget, source_id, dataframe, protected_change)#

Initializes a DP session from a Spark dataframe.

Only one private data source is supported with this initialization method; if you need multiple data sources, use Builder.

Not all Spark column types are supported in private sources; see SUPPORTED_SPARK_TYPES for information about which types are supported.

Example

>>> spark_data.toPandas()
   A  B  X
0  0  1  0
1  1  0  1
2  1  2  1
>>> # Declare budget for the session.
>>> session_budget = PureDPBudget(1)
>>> # Set up Session
>>> sess = Session.from_dataframe(
...     privacy_budget=session_budget,
...     source_id="my_private_data",
...     dataframe=spark_data,
...     protected_change=AddOneRow(),
... )
>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
Parameters:
Return type:

Session

describe(obj=None)#

Describes this session, or one of its tables, or the result of a query.

If obj is not specified, session.describe() will describe the Session and all of the tables it contains.

If obj is a QueryBuilder or QueryExpr, session.describe(obj) will describe the table that would result from that query if it were applied to the Session.

If obj is a string, session.describe(obj) will describe the table with that name. This is a shorthand for session.describe(QueryBuilder(obj)).

Examples

>>> # describe a session, "sess"
>>> sess.describe() 
The session has a remaining privacy budget of PureDPBudget(epsilon=1).
The following private tables are available:
Table 'my_private_data' (no constraints):
    Columns:
        - 'A'  VARCHAR
        - 'B'  INTEGER
        - 'X'  INTEGER
>>> # describe a query object
>>> query = QueryBuilder("my_private_data").drop_null_and_nan(["B", "X"])
>>> sess.describe(query) 
Columns:
    - 'A'  VARCHAR
    - 'B'  INTEGER, not null
    - 'X'  INTEGER, not null
>>> # describe a table by name
>>> sess.describe("my_private_data") 
Columns:
    - 'A'  VARCHAR
    - 'B'  INTEGER
    - 'X'  INTEGER
Parameters:

obj (Optional[Union[tmlt.analytics._query_expr.QueryExpr, tmlt.analytics.query_builder.QueryBuilder, tmlt.analytics.query_builder.GroupedQueryBuilder, tmlt.analytics.query_builder.AggregatedQueryBuilder, str]]) – The table or query to be described, or None to describe the whole Session.

Return type:

None

get_schema(source_id)#

Returns the schema for any data source.

This includes information on whether the columns are nullable.

Parameters:

source_id (str) – The ID for the data source whose column types are being retrieved.

Return type:

tmlt.analytics._schema.Schema

get_column_types(source_id)#

Returns the column types for any data source.

This does not include information on whether the columns are nullable.

Parameters:

source_id (str) –

Return type:

Dict[str, tmlt.analytics.query_builder.ColumnType]

get_grouping_column(source_id)#

Returns an optional column that must be grouped by in this query.

When a groupby aggregation is appended to any query on this table, it must include this column as a groupby column.

Parameters:

source_id (str) – The ID for the data source whose grouping column is being retrieved.

Return type:

Optional[str]

get_id_column(source_id)#

Returns the ID column of a table, if it has one.

Parameters:

source_id (str) – The name of the table whose ID column is being retrieved.

Return type:

Optional[str]

get_id_space(source_id)#

Returns the ID space of a table, if it has one.

Parameters:

source_id (str) – The name of the table whose ID space is being retrieved.

Return type:

Optional[str]

add_public_dataframe(source_id, dataframe)#

Adds a public data source to the session.

Not all Spark column types are supported in public sources; see SUPPORTED_SPARK_TYPES for information about which types are supported.

Example

>>> public_spark_data.toPandas()
   A  C
0  0  0
1  0  1
2  1  1
3  1  2
>>> # Add public data
>>> sess.add_public_dataframe(
...     source_id="my_public_data", dataframe=public_spark_data
... )
>>> sess.public_sources
['my_public_data']
>>> sess.get_schema('my_public_data').column_types 
{'A': 'VARCHAR', 'C': 'INTEGER'}
Parameters:
  • source_id (str) – The name of the public data source.

  • dataframe (pyspark.sql.DataFrame) – The public data source corresponding to the source_id.

evaluate(query_expr, privacy_budget)#

Answers a query within the given privacy budget and returns a Spark dataframe.

The type of privacy budget that you use must match the type your Session was initialized with (i.e., you cannot evaluate a query using RhoZCDPBudget if the Session was initialized with a PureDPBudget, and vice versa).

Example

>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> sess.remaining_privacy_budget
PureDPBudget(epsilon=1)
>>> # Evaluate Queries
>>> filter_query = QueryBuilder("my_private_data").filter("A > 0")
>>> count_query = filter_query.groupby(KeySet.from_dict({"X": [0, 1]})).count()
>>> count_answer = sess.evaluate(
...     query_expr=count_query,
...     privacy_budget=PureDPBudget(0.5),
... )
>>> sum_query = filter_query.sum(column="B", low=0, high=1)
>>> sum_answer = sess.evaluate(
...     query_expr=sum_query,
...     privacy_budget=PureDPBudget(0.5),
... )
>>> count_answer # TODO(#798): Seed randomness and change to toPandas()
DataFrame[X: bigint, count: bigint]
>>> sum_answer # TODO(#798): Seed randomness and change to toPandas()
DataFrame[B_sum: bigint]
Parameters:
Return type:

Any

create_view(query_expr, source_id, cache)#

Creates a new view from a transformation and possibly cache it.

Example

>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> public_spark_data.toPandas()
   A  C
0  0  0
1  0  1
2  1  1
3  1  2
>>> sess.add_public_dataframe("my_public_data", public_spark_data)
>>> # Create a view
>>> join_query = (
...     QueryBuilder("my_private_data")
...     .join_public("my_public_data")
...     .select(["A", "B", "C"])
... )
>>> sess.create_view(
...     join_query,
...     source_id="private_public_join",
...     cache=True
... )
>>> sess.private_sources
['private_public_join', 'my_private_data']
>>> sess.get_schema("private_public_join").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'C': 'INTEGER'}
>>> # Delete the view
>>> sess.delete_view("private_public_join")
>>> sess.private_sources
['my_private_data']
Parameters:
delete_view(source_id)#

Deletes a view and decaches it if it was cached.

Parameters:

source_id (str) – The name of the view.

partition_and_create(source_id, privacy_budget, column, splits)#

Returns new sessions from a partition mapped to split name/source_id.

The type of privacy budget that you use must match the type your Session was initialized with (i.e., you cannot use a RhoZCDPBudget to partition your Session if the Session was created using a PureDPBudget, and vice versa).

The sessions returned must be used in the order that they were created. Using this session again or calling stop() will stop all partition sessions.

Example

This example partitions the session into two sessions, one with A = “0” and one with A = “1”. Due to parallel composition, each of these sessions are given the same budget, while only one count of that budget is deducted from session.

>>> sess.private_sources
['my_private_data']
>>> sess.get_schema("my_private_data").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> sess.remaining_privacy_budget
PureDPBudget(epsilon=1)
>>> # Partition the Session
>>> new_sessions = sess.partition_and_create(
...     "my_private_data",
...     privacy_budget=PureDPBudget(0.75),
...     column="A",
...     splits={"part0":"0", "part1":"1"}
... )
>>> sess.remaining_privacy_budget
PureDPBudget(epsilon=0.25)
>>> new_sessions["part0"].private_sources
['part0']
>>> new_sessions["part0"].get_schema("part0").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> new_sessions["part0"].remaining_privacy_budget
PureDPBudget(epsilon=0.75)
>>> new_sessions["part1"].private_sources
['part1']
>>> new_sessions["part1"].get_schema("part1").column_types 
{'A': 'VARCHAR', 'B': 'INTEGER', 'X': 'INTEGER'}
>>> new_sessions["part1"].remaining_privacy_budget
PureDPBudget(epsilon=0.75)

When you are done with a new session, you can use the stop() method to allow the next one to become active:

>>> new_sessions["part0"].stop()
>>> new_sessions["part1"].private_sources
['part1']
>>> count_query = QueryBuilder("part1").count()
>>> count_answer = new_sessions["part1"].evaluate(
...     count_query,
...     PureDPBudget(0.75),
... )
>>> count_answer.toPandas() 
   count
0    ...
Parameters:
  • source_id (str) – The private source to partition.

  • privacy_budget (tmlt.analytics.privacy_budget.PrivacyBudget) – Privacy budget to pass to each new session.

  • column (str) – The name of the column partitioning on.

  • splits (Union[Dict[str, str], Dict[str, int]]) – Mapping of split name to value of partition. Split name is source_id in new session.

Return type:

Dict[str, Session]

stop()#

Closes out this session, allowing other sessions to become active.

Return type:

None