Accelerate Data Workflows: Connecting Dataiku to Snowflake with Snowpark Connect
Table of Contents
Introduction
Dataiku and Snowflake are a powerful combination for building end-to-end data science and analytics solutions. This article introduces Snowpark Connect for Spark to execute your Dataiku Spark workloads directly within the Snowflake engine.
Snowpark Connect is a lightweight wrapper built on top of Snowpark that allows existing Spark code to run as-is — that is no code migration is required compared to Snowpark.
This integration lets data teams use the familiar Spark DataFrame API within Dataiku while leveraging Snowflake's managed compute, gaining in performance from Snowflake vectorized engine, and eliminating the complexity of maintaining Spark environments, dependencies and upgrades.
The following sections provide a step-by-step guide of how to set up and run Dataiku Spark workloads on Snowflake using Snowpark Connect. We'll go through both the Snowflake and Dataiku setups, and test the integration with simple data operations to illustrate the end-to-end workflow.
For those that would like to follow along, both Dataiku and Snowflake provide free trial accounts.
Snowflake Setup
Step 1: Create a Snowflake input table
In this first step, we'll prepare the Snowflake environment and create a table that holds sample flight data. This table will be used later to demonstrate how Dataiku reads data from Snowflake through Snowpark Connect.
-- Create database and schema
CREATE DATABASE TEST_DATABASE;
CREATE SCHEMA TEST_SCHEMA;
-- Create an (empty) input table to hold flight information
CREATE TABLE TEST_DATABASE.TEST_SCHEMA.INPUT_FLIGHTS (
FLIGHT_ID VARCHAR(36) PRIMARY KEY,
DEPARTURE_TIMESTAMP TIMESTAMP_NTZ,
ARRIVAL_TIMESTAMP TIMESTAMP_NTZ,
FLIGHT_NUMBER VARCHAR(10),
LEG VARCHAR(7),
DISTANCE_MILES INTEGER
);
-- Insert sample flight records into the input table
INSERT INTO TEST_DATABASE.TEST_SCHEMA.INPUT_FLIGHTS
(FLIGHT_ID, DEPARTURE_TIMESTAMP, ARRIVAL_TIMESTAMP, FLIGHT_NUMBER, LEG, DISTANCE_MILES)
VALUES
('82228b72-e94a-49cf-a564-3e0410f933b5', '2025-07-04 17:59', '2025-07-04 20:58', 'UA535', 'jfk-lax', 2475),
('7f9eac16-8853-4bb3-843d-6da0b1f838e8', '2025-07-04 18:20', '2025-07-04 20:52', '9E3542', 'jfk-msp', 1029),
('41d3819a-2fe3-4a4b-b71f-a471827f6491', '2025-07-04 18:29', '2025-07-04 21:43', 'UA445', 'ewr-tpa', 997),
('f5e781c1-90e9-4cdc-a981-19a8d23084ff', '2025-07-04 18:19', '2025-07-04 21:18', 'B6153', 'jfk-pbi', 1028),
('47647396-07fb-48d8-9ed7-b72f49df015a', '2025-07-04 18:55', '2025-07-04 22:16', 'B6669', 'jfk-sjc', 2569),
('e2ce3361-32db-4056-b122-68fcbf3d352d', '2025-07-04 19:10', '2025-07-04 22:32', 'DL1091', 'jfk-sat', 1587),
('5ec4fad9-1deb-43d8-a99a-a083f106e3ac', '2025-07-04 19:30', '2025-07-04 20:51', 'EV5769', 'lga-iad', 229),
('09820b55-47dc-4da3-b1eb-b81fb2495109', '2025-07-04 19:35', '2025-07-04 21:20', 'MQ3535', 'jfk-cmh', 483);
-- Check flight data was inserted successfully
SELECT * FROM TEST_DATABASE.TEST_SCHEMA.INPUT_FLIGHTS;
Step 2: Create a Snowflake output table
After reading and transforming the flight data, we will need a table to store the results.
-- Create an (empty) output table to hold flight information
CREATE TABLE TEST_DATABASE.TEST_SCHEMA.OUTPUT_FLIGHTS (
FLIGHT_ID VARCHAR(36) PRIMARY KEY,
DEPARTURE_TIMESTAMP TIMESTAMP_NTZ,
ARRIVAL_TIMESTAMP TIMESTAMP_NTZ,
FLIGHT_NUMBER VARCHAR(10),
ORIGIN VARCHAR(3),
DESTINATION VARCHAR(3),
DISTANCE_MILES INTEGER,
DISTANCE_KMS INTEGER
);
-- Check output table has been created
SELECT * FROM TEST_DATABASE.TEST_SCHEMA.OUTPUT_FLIGHTS;
Step 3: Get Snowflake connection details
Snowflake supports multiple authentication methods for programmatic access, in this guide we will use programmatic access tokens (PATs) to connect Dataiku to Snowflake.
Snowflake enforces network policies for PAT authentication. In this setup, we will create a service user that can connect exclusively from Dataiku IP addresses and assign the appropriate privileges for running the data workloads.
For Dataiku to connect to Snowflake, we will need:
- Snowflake PAT token
- Snowflake Account/Server URL
The steps below provide both pieces of information. You can also get Snowflake Account/Server URL through Snowflake UI.
-- Create a role for the service user
CREATE ROLE dataiku_service_role;
-- Grant USAGE on database
GRANT USAGE ON DATABASE TEST_DATABASE TO ROLE dataiku_service_role;
-- Grant USAGE on schema
GRANT USAGE ON SCHEMA TEST_DATABASE.TEST_SCHEMA TO ROLE dataiku_service_role;
-- Grant SELECT (read) privilege on input table
GRANT SELECT ON TABLE TEST_DATABASE.TEST_SCHEMA.INPUT_FLIGHTS TO ROLE dataiku_service_role;
-- Grant OWNERSHIP (all privileges) on output table
GRANT OWNERSHIP ON TABLE TEST_DATABASE.TEST_SCHEMA.OUTPUT_FLIGHTS TO ROLE dataiku_service_role;
-- Grant CREATE for Snowpark Connect to create or replace tables in the schema
GRANT CREATE TABLE ON SCHEMA TEST_DATABASE.TEST_SCHEMA TO ROLE dataiku_service_role;
-- Grant USAGE on snowflake default warehouse to run queries
GRANT USAGE ON WAREHOUSE COMPUTE_WH TO ROLE dataiku_service_role;
-- Create the service user
CREATE USER dataiku_service_user
TYPE = 'SERVICE'
DEFAULT_ROLE = dataiku_service_role
COMMENT = 'Service user for Dataiku PAT access';
-- Assign the role to the user
GRANT ROLE dataiku_service_role TO USER dataiku_service_user;
-- Create the network policy using Dataiku IPs
CREATE NETWORK POLICY dataiku_pat_network_policy
ALLOWED_IP_LIST = ('15.237.13.84', '15.188.61.167');
-- Assign the network policy to the service user
ALTER USER dataiku_service_user SET NETWORK_POLICY = dataiku_pat_network_policy;
-- Generate Snowflake PAT token
ALTER USER dataiku_service_user
ADD PROGRAMMATIC ACCESS TOKEN dataiku_pat
ROLE_RESTRICTION = dataiku_service_role
DAYS_TO_EXPIRY = 90
COMMENT = 'PAT for Dataiku access';
-- Generate Snowflake Account URL for Dataiku to connect
SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME() || '.snowflakecomputing.com';
Note: Dataiku IPs may change, you can find Dataiku IPs by logging into Dataiku, navigate to the Connection tab, and the IPs will be indicated at the very bottom of the screen.
Dataiku Setup
Step 1: Create a Dataiku code environment
Next, we will create a code environment to isolate dependencies and ensure stable execution.
In Dataiku Launchpad, go to Code Envs → Add a Code Env → New Python Env, and fill in the fields as shown below:
- Deployment type: Managed by DSS
- Name: snowpark-py311-env
- Python: 3.11
- Mandatory packages: Enabled
- Jupyter: Enabled
Select the environment you created. In the left-hand pane, locate "Packages to install", enter the package names listed below, check "Rebuild environment", and then click "Save and update".
snowflake==1.8.0
snowpark-connect==0.31.0
snowflake-snowpark-python==1.40.0
Once the environment has been rebuilt, you can verify that the packages were installed successfully by checking the "Currently installed packages" section.
Step 2: Register the DkuSnowparkConnectForSpark class
This class provides the logic to establish and manage Snowpark Connect sessions with Snowflake. Registering it globally ensures it can be reused across the entire Dataiku project.
Save the code below locally as snowpark_connect_for_spark.py, then go to Global Shared Code → Add → Upload File, and upload the file.
# snowpark_connect_for_spark.py
import base64
import json
import logging
import os
import re
import sys
from datetime import datetime
from distutils.version import LooseVersion
from urllib.parse import parse_qsl, urlparse
import dataiku
import snowflake.snowpark
from snowflake import snowpark_connect
from snowflake.snowpark import Session
from dataiku.base.spark_like import SparkLike
from dataiku.base.sql_dialect import SparkLikeDialect
from dataiku.core import snowflake_utils
os.environ["TZ"] = "UTC"
os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
class DkuSnowparkDialect(SparkLikeDialect):
def __init__(self):
SparkLikeDialect.__init__(self)
def _get_to_dss_types_map(self):
if self._to_dss_types_map is None:
self._to_dss_types_map = {
'ArrayType': 'string',
'BinaryType': 'string',
'BooleanType': 'boolean',
'ByteType': 'tinyint',
'DateType': 'dateonly',
'DayTimeIntervalType': 'string',
'DecimalType': 'double',
'DoubleType': 'double',
'FloatType': 'float',
'IntegerType': 'int',
'LongType': 'bigint',
'MapType': 'string',
'NullType': 'string',
'ShortType': 'smallint',
'StringType': 'string',
'StructType': 'string',
'TimestampNTZType': 'datetimenotz',
'TimestampType': 'date',
'UserDefinedType': 'string',
}
return self._to_dss_types_map
def allow_empty_schema_after_catalog(self):
return True
def identifier_quote_char(self):
return '`'
def _column_name_to_sql_column(self, identifier):
return col(self.quote_identifier(identifier))
def _python_literal_to_sql_literal(self, value, column_type, original_type=None):
if original_type is not None and original_type.lower() == 'binary':
return unhex(lit(value))
elif column_type == 'date':
return call_builtin("TO_TIMESTAMP_TZ", str(value))
elif column_type == 'dateonly':
return call_builtin("TO_DATE", str(value))
elif column_type == 'datetimenotz':
return call_builtin("TO_TIMESTAMP_NTZ", str(value))
else:
return lit(value)
def _get_components_from_df_schema(self, df_schema):
names = [f.name for f in df_schema]
fields = {
f.name: {"name": f.name, "dataType": f.dataType.__class__.__name__}
for f in df_schema
}
return (names, fields)
def get_dss_schema_from_df_schema(self, df_schema):
logging.debug(
"Computing DSS schema corresponding to dataframe schema: %s" % df_schema
)
column_names, column_fields = self._get_components_from_df_schema(df_schema)
if len(set(column_names)) != len(list(column_names)):
raise Exception(
"DSS doesn't support dataframes containing multiple columns with the same name."
)
dss_schema = []
for field_name in column_names:
field = column_fields[field_name]
t = self._get_dss_type_from_df_datatype(field["dataType"])
if isinstance(t, dict):
sc = t
else:
sc = {'type': t}
sc['name'] = field["name"]
dss_schema.append(sc)
logging.debug("Computed schema from dataframe: %s" % dss_schema)
return dss_schema
def _get_datatype_name_from_df_datatype(self, datatype):
return datatype.__class__.__name__
class DkuSnowparkConnectForSpark(SparkLike):
"""Handle creating Snowpark Connect sessions from DSS Datasets or Connections"""
def __init__(self):
SparkLike.__init__(self)
self._dialect = DkuSnowparkDialect()
self._connection_type = "Snowflake"
def _create_session(self, connection_name, connection_info, project_key=None):
connection_parameters = snowflake_utils.get_snowflake_connection_params(
connection_name, connection_info
)
from snowflake.snowpark_connect.utils.session import _get_current_snowpark_session
snowpark_session = _get_current_snowpark_session()
if snowpark_session is not None:
session = snowpark_connect.get_session()
else:
snowpark_connect.start_session(
connection_parameters=connection_parameters
)
session = snowpark_connect.get_session()
logging.info("Snowpark session established")
if (
"postConnectStatementsExpandedAndSplit" in connection_parameters
and len(connection_parameters["postConnectStatementsExpandedAndSplit"]) > 0
):
for statement in connection_parameters[
"postConnectStatementsExpandedAndSplit"
]:
logging.info("Executing statement: %s" % statement)
session.sql(statement).collect()
logging.info("Statement done")
session.dss_connection_name = connection_name
return session
def _split_jdbc_url(self, sf_url):
if not sf_url.startswith("jdbc:snowflake:"):
raise ValueError("Invalid JDBC URL. It must start with jdbc:snowflake://")
sf_url = sf_url[len("jdbc:snowflake:"):]
url_elements = urlparse(sf_url)
params = {}
params['host'] = url_elements.netloc.split(":")[0]
params['properties'] = []
result = dict(parse_qsl(url_elements.query))
for k in result:
params['properties'].append({'name': k, 'value': result[k]})
return params
@staticmethod
def __get_snowflake_account(sf_account):
if sf_account.endswith("/"):
sf_account = sf_account[:-1]
if sf_account.endswith(".snowflakecomputing.com"):
return sf_account[:-len(".snowflakecomputing.com")]
else:
return sf_account
def _check_dataframe_type(self, df):
if not df.__class__.__module__.startswith("pyspark."):
raise ValueError(
"Dataframe is not a Snowpark Connect For Spark dataframe. "
"Use dataset.write_dataframe() instead."
)
def _do_with_column(self, df, column_name, column_value):
return df.withColumn(column_name, column_value)
def _cast_to_target_types(self, df, dss_schema, qualified_table_id):
from pyspark.sql.functions import base64, col, date_format, lit, to_json
column_names, column_fields = self._dialect._get_components_from_df_schema(
df.schema
)
try:
tdf = df.sparkSession.table(qualified_table_id)
_, target_column_fields = self._dialect._get_components_from_df_schema(
tdf.schema
)
for column_name in column_names:
field = column_fields[column_name]
output_field = target_column_fields.get(column_name)
if output_field is None:
continue
datatype_name = field["dataType"]
target_datatype_name = output_field["dataType"]
if datatype_name == 'BinaryType' and target_datatype_name == 'StringType':
df = df.withColumn(column_name, base64(col(column_name)))
if datatype_name in ['TimestampType', 'TimestampNTZType'] and target_datatype_name == 'StringType':
df = df.withColumn(column_name, date_format(col(column_name), lit('yyyy-MM-dd HH:mm:ss.SSS')))
if datatype_name == 'DateType' and target_datatype_name == 'StringType':
df = df.withColumn(column_name, date_format(col(column_name), lit('yyyy-MM-dd')))
if datatype_name == 'DayTimeIntervalType' and target_datatype_name == 'StringType':
df = df.withColumn(column_name, col(column_name).cast("string"))
if datatype_name == 'ArrayType' and target_datatype_name == 'StringType':
df = df.withColumn(column_name, to_json(col(column_name)))
if datatype_name == 'MapType' and target_datatype_name == 'StringType':
df = df.withColumn(column_name, to_json(col(column_name)))
if datatype_name == 'StructType' and target_datatype_name == 'StringType':
df = df.withColumn(column_name, to_json(col(column_name)))
if datatype_name == 'DecimalType' and target_datatype_name == 'DoubleType':
df = df.withColumn(column_name, col(column_name).cast('double'))
if datatype_name in ['ByteType', 'ShortType', 'IntegerType'] and target_datatype_name == 'LongType':
df = df.withColumn(column_name, col(column_name).cast('bigint'))
if datatype_name in ['ByteType', 'ShortType'] and target_datatype_name == 'IntegerType':
df = df.withColumn(column_name, col(column_name).cast('int'))
if datatype_name == 'ByteType' and target_datatype_name == 'ShortType':
df = df.withColumn(column_name, col(column_name).cast('short'))
if datatype_name == 'StringType' and target_datatype_name == 'TimestampType':
df = df.withColumn(column_name, col(column_name).cast('timestamp'))
except Exception as e:
logging.warn("Unable to check output schema, inserting as is : %s" % str(e))
return df
def _get_table_schema(self, schema, connection_params):
if schema and schema.strip():
return schema
return self._get_connection_param(connection_params, "defaultSchema", "schema")
def _get_table_catalog(self, catalog, connection_params):
if catalog and catalog.strip():
return catalog
return self._get_connection_param(connection_params, "db", "db")
Step 3: Create the Dataiku-to-Snowflake connection
This step establishes the connection details between Dataiku and Snowflake for data operations.
In the top menu bar, click Flow → Datasets → New Dataset → Connect or Create → Snowflake. Then fill in the connection screen with the elements we generated during Snowflake Setup steps.
- Connection name: dataiku-to-snowflake-connection
- Host: Snowflake Account URL
- Database: TEST_DATABASE
- Warehouse: COMPUTE_WH
- Role: dataiku_service_role
- Schema: TEST_SCHEMA
- Auth type: User/Password
- User: dataiku_service_user
- Password: Snowflake PAT token
Once the connection has been created, you should be able to select the database, schema and list tables.
Step 4: Create Dataiku Datasets
Next, we create Dataiku Datasets and map them to the snowflake tables.
In the top menu bar, go back to Flow → Datasets → New Dataset → Connect or Create → Snowflake. Then fill in the Dataset details, test and create.
- Dataset name: INPUT_FLIGHTS
- Connection name: dataiku-to-snowflake-connection
- Table: INPUT_FLIGHTS
Repeat the process for the output table.
- Dataset name: OUTPUT_FLIGHTS
- Connection name: dataiku-to-snowflake-connection
- Table: OUTPUT_FLIGHTS
Step 5: Create a Python notebook and test Snowpark Connect!
In this step, we create a Python notebook to perform basic data operations — read, transform and write — and verify that the workload runs on the Snowflake engine.
Create a Dataiku Python notebook with the code below and run it.
import dataiku
from pyspark.sql import functions as F
from snowpark_connect_for_spark import DkuSnowparkConnectForSpark
dku_snowpark_connect_for_spark = DkuSnowparkConnectForSpark()
session = dku_snowpark_connect_for_spark.get_session(
connection_name="dataiku-to-snowflake-connection"
)
input_dataset = dataiku.Dataset("INPUT_FLIGHTS")
input_df = dku_snowpark_connect_for_spark.get_dataframe(input_dataset)
input_df.show()
# Transformations: split LEG into ORIGIN/DESTINATION, convert miles to km
output_df = (
input_df
.withColumn("ORIGIN", F.upper(F.split(F.col("LEG"), "-")[0]))
.withColumn("DESTINATION", F.upper(F.split(F.col("LEG"), "-")[1]))
.withColumn("DISTANCE_KMS", F.round(F.col("DISTANCE_MILES") * 1.60934).cast("int"))
.drop("LEG")
)
output_df.show()
output_dataset = dataiku.Dataset("OUTPUT_FLIGHTS")
dku_snowpark_connect_for_spark.write_with_schema(output_dataset, output_df)
You can check the inserts in the Snowflake output table:
SELECT * FROM TEST_DATABASE.TEST_SCHEMA.OUTPUT_FLIGHTS;
Finally, Snowpark Connect operations are conveniently tagged with SNOWPARK_CONNECT_QUERY for easier inspection:
SELECT query_tag, query_text, *
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE query_tag = 'SNOWPARK_CONNECT_QUERY';
Conclusion
This integration marks a strategic simplification of the modern data stack. It eliminates maintaining Spark clusters, reduces costly data movement, and governance challenges by enabling teams to run existing Spark code and Dataiku workflows directly on Snowflake powerful engine. This shift significantly lowers spark management overhead, freeing data teams to focus on solving higher value challenges.