Starburst Snowflake connector#

The Snowflake connector allows querying and creating tables in an external Snowflake database. This can be used to join data between different systems like Snowflake and Hive, or between different Snowflake instances.

Requirements#

To connect to Snowflake, you need:

In addition, the following line must be added to the jvm.config file:

--add-opens=java.base/java.nio=ALL-UNNAMED

Configuration#

To configure the Snowflake connector, create a catalog properties file in etc/catalog named example.properties, to mount the Snowflake connector as the example catalog (replace example with your database name or some other descriptive name of the catalog).

There are three connectors for Snowflake:

  • snowflake_jdbc

  • snowflake_parallel

  • snowflake_distributed (deprecated)

snowflake_jdbc uses JDBC for all reads and writes and is more efficient when the result set returned from Snowflake is small.

We recommend using the snowflake_parallel connector, which uses the JDBC client to execute queries, and requests data in Arrow format. The connector then reads each file independently from different nodes, which results in more efficient querying.

When larger result sets are extracted from Snowflake, the snowflake_distributed connector may be a better choice. Instead of requesting query results over a JDBC connection, the connector asks Snowflake to export them to object storage and SEP reads them from there. Since write and read operations are parallelized, this approach scales better for large data sets, but has a higher latency.

Warning

The snowflake_distributed connector is deprecated as of SEP 429-e. Use the snowflake_parallel connector instead, and migrate existing Snowflake catalogs to the parallel connector.

A requirement of using the distributed connector is that your Snowflake deployment runs on AWS or Azure. The connector automatically creates temporary stages in temporary storage buckets using Snowflake.

Create the catalog properties file, for example etc/catalog/example.properties, with the following contents, replacing the connection properties as appropriate for your setup (for example, replace <account_name> with the full name of your account, as provided by Snowflake).

For AWS:

connector.name=<snowflake_jdbc, snowflake_parallel, or snowflake_distributed>
connection-url=jdbc:snowflake://<account_name>.snowflakecomputing.com/
connection-user=<user_name>
connection-password=<password>
snowflake.warehouse=<warehouse_name>
snowflake.database=<database_name>

For Azure:

connector.name=<snowflake_jdbc, snowflake_parallel, or snowflake_distributed>
connection-url=jdbc:snowflake://<account_name>.<azure_region>.azure.snowflakecomputing.com/
connection-user=<user_name>
connection-password=<password>
snowflake.warehouse=<warehouse_name>
snowflake.database=<database_name>

The role used by Snowflake to execute operations can be specified as snowflake.role=<role_name>. This configuration is optional, and can not be used together with User impersonation.

The catalog configuration specifies the warehouse used to execute queries with the snowflake.warehouse property. Use the catalog session property warehouse, if you want to temporarily switch to a different warehouse in the current session for the user:

SET SESSION datacloud.warehouse = 'OTHER_WH';

If you switch to a non-existent warehouse, any following queries fail with an error message about missing an active warehouse.

You can verify the change to a warehouse with the following query:

SHOW SESSION LIKE '%warehouse';

This query either returns the session property with the name of the current warehouse in the session as value or no results, which signals that the warehouse from the catalog configuration is in use.

The Snowflake connector has the following configuration property:

Snowflake connector configuration properties#

Property name

Description

Default

snowflake.database-prefix-for-schema.enabled

Allow access to other databases in Snowflake by including the database name in double quotes with the schema name:

    SELECT *
    FROM catalog."database.schema".table

When enabled, "database.schema", including the double quotes, is required at all times as part of the fully-qualified name.

false

Additionally, there are a number of configuration properties that apply only to the distributed connector:

Distributed connector configuration properties#

Property name

Description

Default

snowflake.stage-schema

Name of the schema in which stages are created for exporting data

snowflake.max-export-retries

Number of export retries

3

snowflake.parquet.max-read-block-size

Maximum block size when reading from the export file

16MB

snowflake.max-split-size

Maximum split size for processing the export file

64MB

snowflake.max-initial-split-size

Maximum initial split size

Half of snowflake.max-split-size

snowflake.export-file-max-size

Maximum size of files to create when exporting data

16MB

Migrate to snowflake_parallel#

To configure an existing Snowflake catalog to use the snowflake_parallel connector, change the value of connector.name in the catalog configuration properties to snowflake_parallel. The following catalog configuration properties are not used by the parallel connector and must be removed:

  • snowflake.stage-schema

  • snowflake.max-export-retries

  • snowflake.parquet.max-read-block-size

  • snowflake.max-split-size

  • snowflake.max-initial-split-size

  • snowflake.export-file-max-size

The rest of the configuration is identical to the snowflake_jdbc connector.

General configuration properties#

The following table describes general catalog configuration properties for the connector:

Property name

Description

Default value

case-insensitive-name-matching

Support case insensitive schema and table names.

false

case-insensitive-name-matching.cache-ttl

This value should be a duration.

1m

case-insensitive-name-matching.config-file

Path to a name mapping configuration file in JSON format that allows Trino to disambiguate between schemas and tables with similar names in different cases.

null

case-insensitive-name-matching.config-file.refresh-period

Frequency with which Trino checks the name matching configuration file for changes. This value should be a duration.

(refresh disabled)

metadata.cache-ttl

The duration for which metadata, including table and column statistics, is cached.

0s (caching disabled)

metadata.cache-missing

Cache the fact that metadata, including table and column statistics, is not available

false

metadata.cache-maximum-size

Maximum number of objects stored in the metadata cache

10000

write.batch-size

Maximum number of statements in a batched execution. Do not change this setting from the default. Non-default values may negatively impact performance.

1000

dynamic-filtering.enabled

Push down dynamic filters into JDBC queries

true

dynamic-filtering.wait-timeout

Maximum duration for which Trino will wait for dynamic filters to be collected from the build side of joins before starting a JDBC query. Using a large timeout can potentially result in more detailed dynamic filters. However, it can also increase latency for some queries.

20s

Type mapping#

Because Trino and Snowflake each support types that the other does not, this connector modifies some types when reading or writing data. Data types may not map the same way in both directions between Trino and the data source. Refer to the following sections for type mapping in each direction.

Snowflake to Trino type mapping#

The connector maps Snowflake types to the corresponding Trino types according to the following table:

Snowflake to Trino type mapping#

Snowflake type

Trino type

Notes

BOOLEAN

BOOLEAN

NUMBER(p, s)

DECIMAL(p, s)

FLOAT

DOUBLE

DOUBLE

DOUBLE

DOUBLE PRECISION

DOUBLE

REAL

REAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

VARIANT

VARCHAR(max)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

VARBINARY(n)

VARBINARY(n)

OBJECT

VARCHAR(max)

ARRAY

VARCHAR(max)

DATE

DATE

TIME

TIME

TIMESTAMP_NTZ(n)

TIMESTAMP(n)

Supports only precision 3 in distributed mode

TIMESTAMP_LTZ(n)

TIMESTAMP(n) WITH TIME ZONE

Supports only precision 3 in distributed mode

TIMESTAMP_TZ(n)

TIMESTAMP(n) WITH TIME ZONE

Supports only precision 3 in distributed mode

TIMESTAMP(n)

TIMESTAMP(n)

Supports only precision 3 in distributed mode

No other types are supported.

Trino to Snowflake type mapping#

The connector maps Trino types to the corresponding Snowflake types according to the following table:

Trino to Snowflake type mapping#

Trino type

Snowflake type

Notes

BOOLEAN

BOOLEAN

DECIMAL(p, s)

NUMBER(p, s)

DOUBLE

DOUBLE

DOUBLE

DOUBLE PRECISION

REAL

REAL

TINYINT

NUMBER(3)

SMALLINT

NUMBER(5)

INTEGER

NUMBER(10)

BIGINT

NUMBER(19)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

VARBINARY(n)

VARBINARY(n)

DATE

DATE

TIME

TIME

TIMESTAMP(n)

TIMESTAMP_NTZ(n)

TIMESTAMP(n) WITH TIME ZONE

TIMESTAMP_TZ(n)

No other types are supported.

Type mapping configuration properties#

The following properties can be used to configure how data types from the connected data source are mapped to Trino data types and how the metadata is cached in Trino.

Property name

Description

Default value

unsupported-type-handling

Configure how unsupported column data types are handled:

  • IGNORE, column is not accessible.

  • CONVERT_TO_VARCHAR, column is converted to unbounded VARCHAR.

The respective catalog session property is unsupported_type_handling.

IGNORE

jdbc-types-mapped-to-varchar

Allow forced mapping of comma separated lists of data types to convert to unbounded VARCHAR

SQL support#

The connector provides read and write access to data and metadata in the Snowflake database. In addition to the globally available and read operation statements, the connector supports the following features:

SQL DELETE#

If a WHERE clause is specified, the DELETE operation only works if the predicate in the clause can be fully pushed down to the data source.

ALTER TABLE EXECUTE#

The connector supports the following commands for use with ALTER TABLE EXECUTE:

collect_statistics#

The collect_statistics command is used with Managed statistics to collect statistics for a table and its columns.

The following statement collects statistics for the example_table table and all of its columns:

ALTER TABLE example_table EXECUTE collect_statistics;

Collecting statistics for all columns in a table may be unnecessarily performance-intensive, especially for wide tables. To only collect statistics for a subset of columns, you can include the columns parameter with an array of column names. For example:

ALTER TABLE example_table
    EXECUTE collect_statistics(columns => ARRAY['customer','line_item']);

Table functions#

The snowflake_jdbc connector provides specific table functions to access Snowflake.

Note

The snowflake_distributed connector does not support table functions. Only snowflake_jdbc and snowflake_parallel connectors support table functions.

query(VARCHAR) -> table#

The query function allows you to query the underlying database directly. It requires syntax native to the data source, because the full query is pushed down and processed in the data source. This can be useful for accessing native features or for improving query performance in situations where running a query natively may be faster.

The query table function is available in the system schema of any catalog that uses the Snowflake connector, such as example. The following example passes myQuery to the data source. myQuery has to be a valid query for the data source, and is required to return a table as a result:

SELECT
  *
FROM
  TABLE(
    example.system.query(
      query => 'myQuery'
    )
  );

Performance#

The connector includes a number of performance improvements, detailed in the following sections.

Table statistics#

The Snowflake connector reads table statistics, collected automatically by Snowflake, for cost based optimizations to improve query processing performance based on the actual data in the data source. These table statistics are read from Snowflake’s INFORMATION_SCHEMA.TABLES table.

Note

The Snowflake connector can only collect table statistics. Column statistics are not made available and therefore return as NULL in a SHOW STATS query. You can instead use managed statistics to have SEP collect statistics from Snowflake, including column statistics.

Managed statistics#

The snowflake_jdbc and snowflake_parallel connectors support Managed statistics allowing SEP to collect and store its own table and column statistics that can then be used for performance optimizations in query planning.

Statistics must be collected manually using the built-in collect_statistics command, see collect_statistics for details and examples.

Pushdown#

The connector supports pushdown for a number of operations:

Aggregate pushdown for the following functions:

Join pushdown#

The join-pushdown.enabled catalog configuration property or join_pushdown_enabled catalog session property control whether the connector pushes down join operations. The property defaults to false, and enabling join pushdowns may negatively impact performance for some queries.

Join pushdown is set to false by default in Snowflake catalogs to provide an effective measure against higher compute costs.

JSON predicate pushdown#

Pushdown of json_extract() and json_extract_scalar() is considered experimental, and must be enabled by setting the snowflake.experimental-pushdown.enabled catalog configuration property or experimental_pushdown_enabled catalog session property to true.

Dynamic filtering#

Dynamic filtering is enabled by default. It causes the connector to wait for dynamic filtering to complete before starting a JDBC query.

You can disable dynamic filtering by setting the dynamic-filtering.enabled property in your catalog configuration file to false.

Wait timeout#

By default, table scans on the connector are delayed up to 20 seconds until dynamic filters are collected from the build side of joins. Using a large timeout can potentially result in more detailed dynamic filters. However, it can also increase latency for some queries.

You can configure the dynamic-filtering.wait-timeout property in your catalog properties file:

dynamic-filtering.wait-timeout=1m

You can use the dynamic_filtering_wait_timeout catalog session property in a specific session:

SET SESSION catalogname.dynamic_filtering_wait_timeout = 1s;

Compaction#

The maximum size of dynamic filter predicate, that is pushed down to the connector during table scan for a column, is configured using the domain-compaction-threshold property in the catalog properties file:

domain-compaction-threshold=100

You can use the domain_compaction_threshold catalog session property:

SET SESSION domain_compaction_threshold = 10;

By default, domain-compaction-threshold is set to 32. When the dynamic predicate for a column exceeds this threshold, it is compacted into a single range predicate.

For example, if the dynamic filter collected for a date column dt on the fact table selects more than 32 days, the filtering condition is simplified from dt IN ('2020-01-10', '2020-01-12',..., '2020-05-30') to dt BETWEEN '2020-01-10' AND '2020-05-30'. Using a large threshold can result in increased table scan overhead due to a large IN list getting pushed down to the data source.

Metrics#

Metrics about dynamic filtering are reported in a JMX table for each catalog:

jmx.current."com.starburstdata.presto.plugin.jdbc.dynamicfiltering:catalog=snowflake,name=snowflake,type=dynamicfilteringstats"

Metrics include information about the total number of dynamic filters, the number of completed dynamic filters, the number of available dynamic filters and the time spent waiting for dynamic filters.

JDBC connection pooling#

When JDBC connection pooling is enabled, each node creates and maintains a connection pool instead of opening and closing separate connections to the data source. Each connection is available to connect to the data source and retrieve data. After completion of an operation, the connection is returned to the pool and can be reused. This improves performance by a small amount, reduces the load on any required authentication system used for establishing the connection, and helps avoid running into connection limits on data sources.

JDBC connection pooling is disabled by default. You can enable JDBC connection pooling by setting the connection-pool.enabled property to true in your catalog configuration file:

connection-pool.enabled=true

The following catalog configuration properties can be used to tune connection pooling:

JDBC connection pooling catalog configuration properties#

Property name

Description

Default value

connection-pool.enabled

Enable connection pooling for the catalog.

false

connection-pool.max-size

The maximum number of idle and active connections in the pool.

10

connection-pool.max-connection-lifetime

The maximum lifetime of a connection. When a connection reaches this lifetime it is removed, regardless of how recently it has been active.

30m

connection-pool.pool-cache-max-size

The maximum size of the JDBC data source cache.

1000

connection-pool.pool-cache-ttl

The expiration time of a cached data source when it is no longer accessed.

30m

Starburst Cached Views#

The connectors supports table scan redirection to improve performance and reduce load on the data source.

Security#

The connector includes a number of security-related features, detailed in the following sections.

User impersonation#

The Snowflake connector supports User impersonation. It can be configured to use a number of different impersonation mechanisms, specified by the values configured for the property snowflake.impersonation-type:

snowflake.impersonation-type=ROLE
NONE (default value)

Connect as the service user with the credentials from the catalog properties file and assume the role defined by the snowflake.role property, or the service user’s default role, if the property is missing.

ROLE

Connect as the service user and use auth-to-local mapping to map the user to a role.

OKTA_LDAP_PASSTHROUGH

Assume the identity of the SEP user, authenticated with Okta using LDAP credentials, and use the default Snowflake user role.

ROLE_OKTA_LDAP_PASSTHROUGH

As above, but additionally use auth-to-local mapping to map the user to a role.

Note

Impersonation using Okta is not supported for Azure.

Authentication with Okta#

The Snowflake connector supports the usage of the Okta Single Sign-On system to authenticate users via SEP to Snowflake.

Note

Okta authentication is not supported for Azure.

The setup allows users to authenticate to SEP using their LDAP credentials and use the same credentials to authenticate to Snowflake through Okta. The same credentials are used by SEP when accessing data in Snowflake.

Behind the scenes SEP and the Snowflake connector authenticate to Okta with the LDAP credentials of the user. After the user authenticates with Okta, including MFA potentially, a SAML assertion allows Snowflake to issue an an OAuth 2.0 token pair. The tokens are cached in SEP and used for further authentications until they expire, and another authentication is requested.

If Okta multi-factor authentication (MFA) is configured, users have to confirm authentication with it. One time codes are not supported.

To enable the Okta integration, SEP and Snowflake need to be configured correctly.

Okta and SEP are configured to use LDAP authentication using the same user identifiers and LDAP directory. In addition to the usual LDAP configuration, LDAP authentication, for SEP, you need to enable Password credential pass-through in etc/config.properties:

http-server.authentication.type=DELEGATED-PASSWORD

Snowflake is configured in Okta as a SAML application as detailed in the Snowflake documentation. Note that the Snowflake login_name must match the corresponding SAML Subject NameID attribute value.

SEP is configured as an OAuth client in Snowflake, again detailed in the Snowflake OAuth documentation.

Note

You must pre-authorize user consent for a role to allow authentication to occur without user interaction.

A number of properties are required to configure the Okta integration in the Snowflake catalog properties file:

snowflake.account-name

Name of Snowflake account.

snowflake.account-url

URL of the Snowflake account. The URL usually has the form https://account_name.snowflakecomputing.com, but might include additional segments.

snowflake.client-id

Snowflake OAuth client id. This can be retrieved with the secret name and a query like select system$show_oauth_client_secrets('OAUTH_TEST_INT');.

snowflake.client-secret

Snowflake OAuth client secret.

snowflake.credential.cache-ttl

Duration the OAuth refresh token is cached. This value cannot exceed the oauth_refresh_token_validity value used when the OAuth integration was created. E.g. 24h.

okta.account-url

The Okta URL, typically https://your_okta_account_name.okta.com).

Optional properties allow you to override the default values:

snowflake.credential.cache-size

The size of the OAuth credentials cache. Use a value that accommodates the expected number of users that might connect to Snowflake through SEP during the period defined by the TTL of the token. Defaults to 10000.

snowflake.credential.http-connect-timeout

Connection timeout. Defaults to 30s.

snowflake.credential.http-read-timeout

Connection read timeout. Defaults to 30s.

snowflake.credential.http-write-timeout

Connection write timeout. Defaults to 30s.

snowflake.redirect-uri

The redirect URI for OAUTH. Value must match the redirect URI specified when creating the security integration (oauth_redirect_uri). Defaults to https://localhost.

okta.credential.http-connect-timeout

Connection timeout. Defaults to 30s.

okta.credential.http-read-timeout

Connection read timeout. Defaults to 30s.

okta.credential.http-write-timeout

Connection write timeout. Defaults to 30s.

OAuth 2.0 token pass-through#

The Snowflake connector supports OAuth 2.0 token pass-through.

Configure this option the same as Authentication with Okta, except for the settings described in this section.

Set the authentication type and OAuth 2.0 scope in the coordinator’s config properties file:

http-server.authentication.type=DELEGATED-OAUTH2
http-server.authentication.oauth2.scopes=<EXISTING_SCOPES>,session:role:TEST_ROLE

The session:role prefix determines the role assigned to the user after successful authentication.

Additionally enable OAUTH2_PASSTHROUGH in the catalog properties file using the Snowflake connector:

snowflake.impersonation-type=OAUTH2_PASSTHROUGH