Table scan redirection#

Table scan redirection enables SEP to offload data access to tables accessed in one catalog to equivalent tables accessed in another catalog. This can improve performance by shifting data access to a more performant system. It can also reduce load on a data source.

Redirection is transparent to the user, and therefore provides performance improvements without the need to modify queries.

A typical use case is the redirection from a catalog configuring a relational database to a catalog using the Hive connector to access a data lake. That catalog can also take advantage of Enable Starburst Warp Speed for your cluster.

Redirection of table scans is performed by SEP after applying authentication and permission checks from the source catalog.

Table scan redirection must be enabled, and configured in the SEP coordinator, before creating a rules set that determines the behavior of table scans.

Supported connectors and table properties#

The origin catalog must use one of the following connectors:

The target catalog can use an identical connector for maximum compatibility, or any other connector. Data types are translated based on the type mapping of the connector used for the source and target catalog. This type mapping can be customized to work around unsupported types by setting an explicit type mapping for the target catalog.

If table properties like partitioning, bucketing, sorting are used, then the target can only be Hive as other connectors don’t support these table properties.

Table scan redirection for a catalog can be disabled using the table_scan_redirection_enabled catalog session property:

SET SESSION mycatalog.table_scan_redirection_enabled = false;

Enable and configure table scan redirection#

Table scan redirection is enabled for a specific catalog mycatalog in etc/catalog/mycatalog.properties using one of the following methods:

Configure using the cache service#

The cache service is the recommended source of redirections for production usage. It is responsible for the automatic creation and refresh of cached tables based on the configured refresh rules. You can customize the type mapping using configured type mapping rules The cache service is configured as the source of redirections using the following properties:

redirection.config-source=SERVICE
cache-service.uri=http://cache-service:8180

By default, the redirections obtained from the cache service are cached within SEP for 1 minute. This can be changed using cache-service.cache-ttl property.

When authentication is enabled for the cache service, the below configuration can be used to connect to it.

redirection.config-source=SERVICE
cache-service.uri=https://cache-service:8543
cache-service.user=test
cache-service.password=test
cache-service.http-client.trust-store-path=localhost.truststore
cache-service.http-client.trust-store-password=changeit
cache-service.http-client.https.hostname-verification=false

Configure using JSON file#

Creation and refresh of target tables is not managed by SEP with this configuration. Do not use this method for production.

redirection.config-source=FILE
redirection.config-file=etc/redirection-rules.json

The configuration is located in a JSON file. The following sample redirection-rules.json file configures redirection of table scans from a catalog salesdata to a catalog datalake.

{
  "salesdata": {
    "schemas": {
      "schema_name_a": {
        "table_name_1": {
          "targetCatalog": "datalake",
          "targetSchema": "cache",
          "targetTable": "table_name_a"
        }
      },
      "schema_name_b": {
        "table_name_2": {
          "targetCatalog": "datalake",
          "targetSchema": "cache",
          "targetTable": "table_name_b",
          "columns": [
            "col1",
            "col2",
            "col3"
          ]
        },
        "table_name_3": {
          "targetCatalog": "datalake",
          "targetSchema": "cache",
          "targetTable": "table_name_c"
        }
      }
    }
  }
}

If the target table stores a subset of the columns from the source table, then the columns stored by the target table must be specified as shown above for salesdata.schema_name_b.table_name_2. When columns for a target table are not specified, it is assumed that it contains all the columns from the source table.

The above configuration results in the following behavior:

  • All scans on salesdata.schema_name_a.table_name_1 are redirected to datalake.cache.table_name_a.

  • Scans on salesdata.schema_name_b.table_name_2 using the columns col1, col2, col3 are redirected to datalake.cache.table_name_b.

  • All scans on salesdata.schema_name_b.table_name_3 are redirected to datalake.cache.table_name_c.

Redirections for multiple catalogs can be defined by adding key-value pairs along the lines of "salesdata": {...} to the above configuration file.

By default, SEP reloads rules from the configuration file every minute. This can controlled using redirection.refresh-period property in etc/config.properties.

Rule sets#

A JSON file, rules.json, is used to define rules to configure which tables are cached, the catalog where the tables are accessed from, and the schedule for refreshing them. Cached tables can be refreshed either incrementally, or as a full refresh.

There are numerous general properties available for use in creating rules discussed in this section that apply either globally, or to a specific catalog.

Catalogs can have more than one rule, but a rule can apply to only one catalog. A rule can apply to one or more schemas, and one or more tables within the named catalog.

Warning

Any changes to the sources tables and their columns require updates to the caching rules that use them. This includes rules that do not specify columns, if any columns are added, removed, or renamed in the source table. Such changes require that the cache is expired with the expire_redirection command from the cache service CLI to create a new redirection with the updated table definition.

Table scan refresh rules sets#

Cached tables can be refreshed either incrementally, or as a full refresh.

To avoid queries on stale data, if a cached table can not be successfully refreshed, it is not used for table scan redirection. Instead, the query executes directly against the source data.

Incremental refreshes#

When the source table is large and requires frequent refreshes, the refresh can be configured to load data from the source table incrementally. This incremental refresh approach is more performant, but not suitable for all source tables and data modifications. Specifically modifications or deletions of existing rows are ignored.

The initial refresh for an incrementally cached table is equivalent to a full refresh. It is a bulk load of all the rows from the source table. For incrementally cached tables, the cached table is not removed until the corresponding redirection rule is removed or modified in rules.json.

Depending on the properties of the source table, incremental refresh can be set up in one of two ways:

  • By specifying a strictly incrementing column used to detect new rows.

  • By specifying a monotonically increasing column from the source table in combination with a configured predicate expression.

When using a strictly incrementing column from the source table to detect new rows, the cache service copies only the rows from source table where the value of incrementalColumn is greater than the values already present in the cached table.

The following example demonstrates this configuration:

{
  "catalogName": "mysqlevents",
  "schemaName": "web",
  "tableName": "events",
  "refreshInterval": "2m",
  "gracePeriod": "15m",
  "incrementalColumn": "event_id"
},

Fact tables often contain columns that increase monotonically. Examples are integer values for identifiers, and date or datetime columns using the current time of record creation. To use such a column for incremental refreshes, you must specify a predicate expression as a filter condition. This filter is applied in addition to the filter applied on incrementalColumn by the cache service to avoid duplicating data, and delay data collection until all updates to it are presumed to be complete, as shown in the following example:

{
  "catalogName": "postgresqlevents",
  "schemaName": "web",
  "tableName": "events",
  "refreshInterval": "2m",
  "gracePeriod": "15m",
  "incrementalColumn": "event_hour",
  "predicate": "event_hour < (SELECT date_add('hour', -1, latest_event_hour) FROM (SELECT max(event_hour) AS latest_event_hour FROM postgresql.foo.events))"
}

In this example, the predicate is designed to create a one hour buffer after a record is created during which the data is subject to updates. After that buffer has expired, the data is loaded and all subsequent updates are ignored.

Finally, incremental refreshes can also be assigned a predicate that removes data that is no longer needed. In the following example, events older than 31 days are removed:

{
  "catalogName": "mysqlevents",
  "schemaName": "web",
  "tableName": "events",
  "refreshInterval": "2m",
  "gracePeriod": "15m",
  "incrementalColumn": "event_id"
  "deletePredicate": "event_date < date_add('day', -31, CURRENT_DATE)"
},

Full refreshes#

By default, each refresh of a cached table results in all the rows from the specified columns of the source table getting loaded into the cached table. This is called a full refresh. For cached tables that are completely refreshed, the cached table has a TTL (time to live) computed as the sum of the effective maximum import duration and the duration defined by its effective grace period. This allows any running query that started just before the cached table is expired to finish gracefully. After the TTL expires, the cache service removes older cached tables once the newer table is available.

Manual refreshes#

The refresh_table_redirection stored procedure is used to initiate a table refresh in situations where an event-based or manual refresh is preferred over scheduling a refresh at a timed interval.

Use the following syntax to CALL the procedure and initiate a table refresh:

CALL [catalog].system.refresh_table_redirection([schema], [table]);

General rule set properties#

This section defines the rules available for use in the rules.json to configure which tables are cached, where they are cached from, and other rule-based behaviors.

Global properties#

The global properties listed in this section, when defined, are applied to all rules that do not have a specific value set. For example, if a rule does not have a specific gracePeriod defined, the defaultGracePeriod value, if set, is used. Some properties have a default value that is used when the property does not exist in the rules.json file.

Global properties for table scan redirection rules#

Property name

Description

defaultGracePeriod

The duration which a previous version of a cached table is retained so that any active queries using it may complete, once the latest import is available. Can be overridden within a specific rule using gracePeriod. Defaults to 10m.

defaultMaxImportDuration

The maximum allowed execution time for a query used to populate a cached table. Can be overridden within a specific rule using maxImportDuration. Must be smaller than the refresh interval, and greater than or equal to 1m. Note that this field is required when using cronExpression to define a refresh schedule.

defaultCacheCatalog

The default catalog that configures the data source where the cached tables are stored. Can be overridden within a specific rule using cacheCatalog.

defaultCacheSchema

The default schema where cached tables are stored. You must ensure that the schema exists and that the cache service user has permissions to read and write to it. Can be overridden within a specific rule using cacheSchema. properties.

cleanup-interval

Interval at which the cache service removes stale cached tables and runs cleanup queries for rules where a deletePredicate is defined. Defaults to 5m.

The following example shows these properties used in a rules.json file:

{
  "defaultGracePeriod": "42m",
  "defaultMaxImportDuration": "1m",
  "defaultCacheCatalog": "default_cache_catalog",
  "defaultCacheSchema": "default_cache_schema"
}

Rule-specific properties#

The rule-specific properties listed in this section override a similar available global property. For example, if a rule sets a specific maxImportDuration, the global defaultMaxImportDuration value is ignored.

Rule-specific properties for table scan redirection#

Property name

Description

catalogName

The name of the source catalog. Required.

cacheCatalog

The catalog that defines the data source where the cached tables are stored.

cacheSchema

The schema where the cached tables are stored. You must ensure that the schema exists and that the cache service user has permissions to read and write to it.

schemaName

The specific schema to use within a rule. You must specify this or schemaNameLike, but not both.

schemaNameLike

SQL LIKE expression to match multiple schema names. You must specify this or schemaName, but not both.

tableName

The specific table to use within a rule. You must specify this or tableNameLike, but not both.

tableNameLike

SQL LIKE expression to match multiple table names. You must specify this or tableName, but not both.

refreshInterval

The time duration between refreshes. Cannot be used with cronExpression.

cronExpression

A cron expression defining the refresh schedule. Cannot be used with refreshInterval.

gracePeriod

The duration which a previous version of a cached table is retained so that any active queries using it may complete. Overrides the global defaultGracePeriod if set.

maxImportDuration

Overrides the global defaultMaxImportDuration if set. Must be smaller than the refresh interval, and greater than or equal to 1m. Note that this field is required when using cronExpression to define a refresh schedule.

columns

Comma-separated list of columns to be imported. If not specified, all available columns are imported. Note that redirections can only be used when all columns required by a given table scan are present.

partitionColumns

Comma-separated list of columns to use for partitioning.

bucketColumns

Comma-separated list specifying the columns to be bucketed. Requires the use of bucketCount.

bucketCount

Defines the number of buckets for data in the specified columns.

sortColumns

Comma separated list specifying the columns to sort on. Requires the use of bucketColumns.

incrementalColumn

Required for incremental refresh; if not specified, a full refresh is performed. This column is used by the service to apply a incrementalColumn > (SELECT max(incrementalColumn) FROM cachedTable) filter when loading data incrementally from the source table. This facilitates loading only newer data from the source table instead of the entire table in each refresh iteration.

predicate

SQL expression used in conjunction with incrementalColumn to delay loading data until all presumed updates are complete.

deletePredicate

Optionally specifies a predicate clause to clean up old data from the cached table. It is used by the cache service to run a DELETE FROM cachedTable WHERE <deletePredicate> query at every cleanup-interval. Available where connector used to store the cached table supports DELETE FROM statements.

The following example shows rules defined for several catalogs, using a variety of options:

{
  "catalogName": "maketing_campaigns",
  "schemaNameLike": "apac",
  "tableNameLike": "organic",
  "refreshInterval": "60m"
  "gracePeriod": "15m",
  "incrementalColumn": "event_id",
  "deletePredicate": "event_date < date_add('day', -31, CURRENT_DATE)"
},
{
  "catalogName": "sales",
  "schemaNameLike": "contacts",
  "tableNameLike": "leads",
  "cronExpression": "* * * 2 *"
  "gracePeriod": "15m",
  "incrementalColumn": "event_hour",
  "predicate": "event_hour < (SELECT date_add('hour', -1, latest_event_hour) FROM (SELECT max(event_hour) AS latest_event_hour FROM postgresql.foo.events))"
},
{
  "catalogName": "telemetry",
  "schemaName": "allproducts",
  "tableName":  "events",
  "columns": [
    "pingtime",
    "avgMem",
    "apiCalls"
  ],
  "partitionColumns": [
    "pingtime"
  ],
  "bucketColumns": [
    "avgMem"
  ],
  "bucketCount": 5,
  "sortColumns": [
    "apiCalls"
  ],
  "refreshInterval": "123h",
  "gracePeriod": "80m",
  "maxImportDuration": "67h",
  "cacheCatalog": "ctp_table_catalog",
  "cacheSchema": "ctp_table_schema",
}

Metrics#

Metrics about table scan redirection are reported in a JMX table for each specific catalog mycatalog using any connector:

jmx.current."com.starburstdata.presto.plugin.jdbc.redirection:name=mycatalog,type=redirectionstats"

Metrics include information about the number of successful redirections and the number of times redirection could not be performed due to the target table missing some columns from the source table.

Limitations#

  • Redirections are supported for Hive tables but not Hive views.