Passing parameters to a stored procedure#
In this additional section of the topic guide, we will explain how to pass parameters from BigQuery to the stored procedure containing our Tumult Analytics program. This makes it possible to customize the call, by specifying e.g. different inputs or outputs table, or privacy parameters, without modifying the underlying program.
Recall that our remote procedure from earlier had no parameters.
CREATE OR REPLACE PROCEDURE `tumult-labs.analytics_tutorial.count_members`()
WITH CONNECTION `tumult-labs.us.bigspark`
OPTIONS (
engine='SPARK',
container_image='us-docker.pkg.dev/tumult-labs/analytics/tutorial:demo',
main_file_uri='gs://tumult-shared-procedures/library_members.py'
)
LANGUAGE python
We want to add three parameters, being the bucket
where the Spark
warehouse is located, the input
where the input data is located, and the
output
where the output data is located.
To do this, we simply need to add the parameters to the procedure definition.
CREATE OR REPLACE PROCEDURE `tumult-labs.analytics_tutorial.count_members`(
bucket STRING,
input STRING,
output STRING
)
WITH CONNECTION `tumult-labs.us.bigspark`
OPTIONS (
engine='SPARK',
container_image='us-docker.pkg.dev/tumult-labs/analytics/tutorial:demo',
main_file_uri='gs://tumult-shared-procedures/library_members.py'
)
LANGUAGE python
Now, we can call the procedure with the parameters as follows.
CALL `tumult-labs.analytics_tutorial.count_members`(
"tumult-warehouse",
"tumult-labs.analytics_tutorial.library_members",
"tumult-labs.analytics_tutorial.member_counts"
)
Note
Replace the bucket, input, and output with the values specific to your project.
Now, recall our Tumult Analytics program defined earlier.
import json
import os
from pyspark.sql import SparkSession
from tmlt.analytics import (
AddOneRow,
PureDPBudget,
QueryBuilder,
Session,
)
BUCKET = "tumult-warehouse"
INPUT_TABLE = "tumult-labs.analytics_tutorial.library_members"
OUTPUT_TABLE = "tumult-labs.analytics_tutorial.member_counts"
spark = (
SparkSession
.builder
.config("spark.sql.warehouse.dir", os.path.join("gs://", BUCKET, "/spark-warehouse/"))
.config("temporaryGcsBucket", BUCKET)
.getOrCreate()
)
members_df = (
spark.read.format("bigquery")
.option("table", INPUT_TABLE)
.load()
)
session = Session.from_dataframe(
privacy_budget=PureDPBudget(3),
source_id="members",
dataframe=members_df,
protected_change=AddOneRow(),
)
count_query = QueryBuilder("members").count()
total_count = session.evaluate(
count_query,
privacy_budget=PureDPBudget(epsilon=1)
)
(
total_count
.write.format("bigquery")
.mode("overwrite")
.option("table", OUTPUT_TABLE)
.save()
)
We need to modify this so that we can receive the parameters bucket
, input
, and output
.
To read in our new parameters, we need to read the environment variables.
Each parameter is stored in the environment variable in JSON format, and its
name has the following format: BIGQUERY_PROC_PARAM.[PARAMETER NAME]
. For example,
if we have a parameter named epsilon
, we can access it with
os.environ["BIGQUERY_PROC_PARAM.epsilon"]
.
+import json
+import os
-BUCKET = "tumult-warehouse"
-INPUT_TABLE = "tumult-labs.analytics_tutorial.library_members"
-OUTPUT_TABLE = "tumult-labs.analytics_tutorial.member_counts"
+BUCKET = json.loads(os.environ["BIGQUERY_PROC_PARAM.bucket"])
+INPUT_TABLE = json.loads(os.environ["BIGQUERY_PROC_PARAM.input"])
+OUTPUT_TABLE = json.loads(os.environ["BIGQUERY_PROC_PARAM.output"])
Full example#
In the end, your program should look structurally similar to this final program.
import json
import os
from pyspark.sql import SparkSession
from tmlt.analytics import (
AddOneRow,
PureDPBudget,
QueryBuilder,
Session,
)
BUCKET = json.loads(os.environ["BIGQUERY_PROC_PARAM.bucket"])
INPUT_TABLE = json.loads(os.environ["BIGQUERY_PROC_PARAM.input"])
OUTPUT_TABLE = json.loads(os.environ["BIGQUERY_PROC_PARAM.output"])
spark = (
SparkSession
.builder
.config("spark.sql.warehouse.dir", os.path.join("gs://", BUCKET, "/spark-warehouse/"))
.config("temporaryGcsBucket", BUCKET)
.getOrCreate()
)
members_df = (
spark.read.format("bigquery")
.option("table", INPUT_TABLE)
.load()
)
session = Session.from_dataframe(
privacy_budget=PureDPBudget(3),
source_id="members",
dataframe=members_df,
protected_change=AddOneRow(),
)
count_query = QueryBuilder("members").count()
total_count = session.evaluate(
count_query,
privacy_budget=PureDPBudget(epsilon=1)
)
(
total_count
.write.format("bigquery")
.mode("overwrite")
.option("table", OUTPUT_TABLE)
.save()
)
In the final part of this topic guide, we will see how to create a customized GCP-compatible Docker image to run Tumult Analytics.