Table scan redirection#
Table scan redirection enables SEP to offload data access to tables accessed in one catalog to equivalent tables accessed in another catalog. This can improve performance by shifting data access to a more performant system. It can also reduce load on a data source.
Redirection is transparent to the user, and therefore provides performance improvements without the need to modify queries.
A typical use case is the redirection from a catalog configuring a relational database to a catalog using the Hive connector to access a data lake. That catalog can also take advantage of Hive connector storage caching.
Redirection of table scans is performed by SEP after applying authentication and permission checks from the source catalog.
Table scan redirection must be enabled, and configured in the SEP coordinator, before creating a rules set that determines the behavior of table scans.
Supported connectors and table properties#
The origin catalog must use one of the following connectors:
The target catalog can use an identical connector for maximum compatibility, or any other connector. Data types are translated based on the type mapping of the connector used for the source and target catalog. This type mapping can be customized to work around unsupported types by setting an explicit type mapping for the target catalog.
If table properties like partitioning, bucketing, sorting are used, then the target can only be Hive as other connectors don’t support these table properties.
Table scan redirection for a catalog can be disabled using the
table_scan_redirection_enabled
catalog session property:
SET SESSION mycatalog.table_scan_redirection_enabled = false;
Enable and configure table scan redirection#
Table scan redirection is enabled for a specific catalog mycatalog
in
etc/catalog/mycatalog.properties
using one of the following methods:
cache service - for production or testing
JSON file as the source of redirections - for testing purposes only
Configure using the cache service#
The cache service is the recommended source of redirections for production usage. It is responsible for the automatic creation and refresh of cached tables based on the configured refresh rules. You can customize the type mapping using configured type mapping rules The cache service is configured as the source of redirections using the following properties:
redirection.config-source=SERVICE
cache-service.uri=http://cache-service:8180
By default, the redirections obtained from the cache service are cached within
SEP for 1 minute. This can be changed using cache-service.cache-ttl
property.
When authentication is enabled for the cache service, the below configuration can be used to connect to it.
redirection.config-source=SERVICE
cache-service.uri=https://cache-service:8180
cache-service.user=test
cache-service.password=test
cache-service.http-client.trust-store-path=localhost.truststore
cache-service.http-client.trust-store-password=changeit
cache-service.http-client.https.hostname-verification=false
Configure using JSON file#
Creation and refresh of target tables is not managed by SEP with this configuration. Do not use this method for production.
redirection.config-source=FILE
redirection.config-file=etc/redirection-rules.json
The configuration is located in a JSON file. The following sample
redirection-rules.json
file configures redirection of table scans from a
catalog salesdata
to a catalog datalake
.
{
"salesdata": {
"schemas": {
"schema_name_a": {
"table_name_1": {
"targetCatalog": "datalake",
"targetSchema": "cache",
"targetTable": "table_name_a"
}
},
"schema_name_b": {
"table_name_2": {
"targetCatalog": "datalake",
"targetSchema": "cache",
"targetTable": "table_name_b",
"columns": [
"col1",
"col2",
"col3"
]
},
"table_name_3": {
"targetCatalog": "datalake",
"targetSchema": "cache",
"targetTable": "table_name_c"
}
}
}
}
}
If the target table stores a subset of the columns from the source table, then
the columns stored by the target table must be specified as shown above for
salesdata.schema_name_b.table_name_2
. When columns
for a target table
are not specified, it is assumed that it contains all the columns from the
source table.
The above configuration results in the following behavior:
All scans on
salesdata.schema_name_a.table_name_1
are redirected todatalake.cache.table_name_a
.Scans on
salesdata.schema_name_b.table_name_2
using the columnscol1, col2, col3
are redirected todatalake.cache.table_name_b
.All scans on
salesdata.schema_name_b.table_name_3
are redirected todatalake.cache.table_name_c
.
Redirections for multiple catalogs can be defined by adding key-value pairs
along the lines of "salesdata": {...}
to the above configuration file.
By default, SEP reloads rules from the configuration file every minute. This
can controlled using redirection.refresh-period
property in
etc/config.properties
.
Rule sets#
A JSON file, rules.json
, is used to define rules to configure which tables
are cached, the catalog where the tables are accessed from, and the schedule for
refreshing them. Cached tables can be refreshed either incrementally, or as a full refresh.
There are numerous general properties available for use in creating rules discussed in this section that apply either globally, or to a specific catalog.
Catalogs can have more than one rule, but a rule can apply to only one catalog. A rule can apply to one or more schemas, and one or more tables within the named catalog.
Warning
Any changes to the sources tables and their columns require updates to the
caching rules that use them. This includes rules that do not specify columns,
if any columns are added, removed, or renamed in the source table. Such changes
require that the cache is expired with the expire_redirection
command from
the cache service CLI to create a new
redirection with the updated table definition.
Table scan refresh rules sets#
Cached tables can be refreshed either incrementally, or as a full refresh.
To avoid queries on stale data, if a cached table can not be successfully refreshed, it is not used for table scan redirection. Instead, the query executes directly against the source data.
Incremental refreshes#
When the source table is large and requires frequent refreshes, the refresh can be configured to load data from the source table incrementally. This incremental refresh approach is more performant, but not suitable for all source tables and data modifications. Specifically modifications or deletions of existing rows are ignored.
The initial refresh for an incrementally cached table is equivalent to a full
refresh. It is a bulk load of all the rows from the source table. For
incrementally cached tables, the cached table is not removed until the
corresponding redirection rule is removed or modified in rules.json
.
Depending on the properties of the source table, incremental refresh can be set up in one of two ways:
By specifying a strictly incrementing column used to detect new rows.
By specifying a monotonically increasing column from the source table in combination with a configured predicate expression.
When using a strictly incrementing column from the source table to detect new
rows, the cache service copies only the rows from source table where the value
of incrementalColumn
is greater than the values already present in the
cached table.
The following example demonstrates this configuration:
{
"catalogName": "mysqlevents",
"schemaName": "web",
"tableName": "events",
"refreshInterval": "2m",
"gracePeriod": "15m",
"incrementalColumn": "event_id"
},
Fact tables often contain columns that increase monotonically. Examples are
integer values for identifiers, and date or datetime columns using the current
time of record creation. To use such a column for incremental refreshes, you
must specify a predicate expression as a filter condition. This filter is
applied in addition to the filter applied on incrementalColumn
by the cache
service to avoid duplicating data, and delay data collection until all updates
to it are presumed to be complete, as shown in the following example:
{
"catalogName": "postgresqlevents",
"schemaName": "web",
"tableName": "events",
"refreshInterval": "2m",
"gracePeriod": "15m",
"incrementalColumn": "event_hour",
"predicate": "event_hour < (SELECT date_add('hour', -1, latest_event_hour) FROM (SELECT max(event_hour) AS latest_event_hour FROM postgresql.foo.events))"
}
In this example, the predicate is designed to create a one hour buffer after a record is created during which the data is subject to updates. After that buffer has expired, the data is loaded and all subsequent updates are ignored.
Finally, incremental refreshes can also be assigned a predicate that removes data that is no longer needed. In the following example, events older than 31 days are removed:
{
"catalogName": "mysqlevents",
"schemaName": "web",
"tableName": "events",
"refreshInterval": "2m",
"gracePeriod": "15m",
"incrementalColumn": "event_id"
"deletePredicate": "event_date < date_add('day', -31, CURRENT_DATE)"
},
Full refreshes#
By default, each refresh of a cached table results in all the rows from the specified columns of the source table getting loaded into the cached table. This is called a full refresh. For cached tables that are completely refreshed, the cached table has a TTL (time to live) computed as the sum of the effective maximum import duration and the duration defined by its effective grace period. This allows any running query that started just before the cached table is expired to finish gracefully. After the TTL expires, the cache service removes older cached tables once the newer table is available.
General rule set properties#
This section defines the rules available for use in the rules.json
to
configure which tables are cached, where they are cached from, and other
rule-based behaviors.
Global properties#
The global properties listed in this section, when defined, are applied to all
rules that do not have a specific value set. For example, if a rule does not
have a specific gracePeriod
defined, the defaultGracePeriod
value, if
set, is used. Some properties have a default value that is used when the
property does not exist in the rules.json
file.
Property name |
Description |
---|---|
defaultGracePeriod |
The duration which a previous version of a cached table is retained so
that any active queries using it may complete, once the latest import
is available. Can be overridden within a specific rule using
|
defaultMaxImportDuration |
The maximum allowed execution time for a query used to populate a cached
table. Can be overridden within a specific rule using
|
defaultCacheCatalog |
The default catalog that configures the data source where the cached
tables are stored. Can be overridden within a specific rule using
|
defaultCacheSchema |
The default schema where cached tables are stored. You must ensure that
the schema exists and that the cache service user has permissions to read
and write to it. Can be overridden within a specific rule using
|
defaultUnpartitionedImportConfig |
A collection of import properties applied by default when writing unpartitioned cached tables named in the rule. Uses nested importConfig settings properties. |
defaultPartitionedImportConfig |
A collection of import properties applied by default when writing partitioned cached tables named in the rule. Uses nested importConfig settings properties. |
cleanup-interval |
Interval at which the cache service removes stale cached tables and runs
cleanup queries for rules where a |
The following example shows these properties used in a rules.json
file:
{
"defaultGracePeriod": "42m",
"defaultMaxImportDuration": "1m",
"defaultCacheCatalog": "default_cache_catalog",
"defaultCacheSchema": "default_cache_schema",
"defaultUnpartitionedImportConfig": {
"usePreferredWritePartitioning": false,
"preferredWritePartitioningMinNumberOfPartitions": 1,
"writerCount": 128,
"scaleWriters": false,
"writerMinSize": "110MB"
},
"defaultPartitionedImportConfig": {
"usePreferredWritePartitioning": true,
"preferredWritePartitioningMinNumberOfPartitions": 40,
"writerCount": 256,
"scaleWriters": false,
"writerMinSize": "52MB"
}
}
Rule-specific properties#
The rule-specific properties listed in this section override a similar available
global property. For example, if a rule sets a specific maxImportDuration
,
the global defaultMaxImportDuration
value is ignored.
Property name |
Description |
---|---|
catalogName |
The name of the source catalog. Required. |
cacheCatalog |
The catalog that defines the data source where the cached tables are stored. |
cacheSchema |
The schema where the cached tables are stored. You must ensure that the schema exists and that the cache service user has permissions to read and write to it. |
schemaName |
The specific schema to use within a rule. You must specify this or
|
schemaNameLike |
SQL |
tableName |
The specific table to use within a rule. You must specify this or
|
tableNameLike |
SQL |
refreshInterval |
The time duration between refreshes. Cannot be used with
|
cronExpression |
A cron expression defining the
refresh schedule. Cannot be used with |
gracePeriod |
The duration which a previous version of a cached table is retained so
that any active queries using it may complete. Overrides the global
|
maxImportDuration |
Overrides the global |
columns |
Comma-separated list of columns to be imported. If not specified, all available columns are imported. Note that redirections can only be used when all columns required by a given table scan are present. |
partitionColumns |
Comma-separated list of columns to use for partitioning. |
bucketColumns |
Comma-separated list specifying the columns to be bucketed. Requires the
use of |
bucketCount |
Defines the number of buckets for data in the specified columns. |
sortColumns |
Comma separated list specifying the columns to sort on. Requires the use
of |
incrementalColumn |
Required for incremental refresh; if not specified, a full refresh is
performed. This column is used by the service to apply a
|
predicate |
SQL expression used in conjunction with |
deletePredicate |
Optionally specifies a predicate clause to clean up old data from the
cached table. It is used by the cache service to run a
|
importConfig |
Overrides the default import config (partitioned or unpartitioned,
depending on the use of |
incrementalImportConfig |
Overrides the default import config (partitioned or unpartitioned,
depending on the use of |
The following example shows rules defined for several catalogs, using a variety of options:
{
"catalogName": "maketing_campaigns",
"schemaNameLike": "apac",
"tableNameLike": "organic",
"refreshInterval": "60m"
"gracePeriod": "15m",
"incrementalColumn": "event_id",
"deletePredicate": "event_date < date_add('day', -31, CURRENT_DATE)"
},
{
"catalogName": "sales",
"schemaNameLike": "contacts",
"tableNameLike": "leads",
"cronExpression": "* * * 2 *"
"gracePeriod": "15m",
"incrementalColumn": "event_hour",
"predicate": "event_hour < (SELECT date_add('hour', -1, latest_event_hour) FROM (SELECT max(event_hour) AS latest_event_hour FROM postgresql.foo.events))"
},
{
"catalogName": "telemetry",
"schemaName": "allproducts",
"tableName": "events",
"columns": [
"pingtime",
"avgMem",
"apiCalls"
],
"partitionColumns": [
"pingtime"
],
"bucketColumns": [
"avgMem"
],
"bucketCount": 5,
"sortColumns": [
"apiCalls"
],
"refreshInterval": "123h",
"gracePeriod": "80m",
"maxImportDuration": "67h",
"cacheCatalog": "ctp_table_catalog",
"cacheSchema": "ctp_table_schema",
"importConfig": {
"usePreferredWritePartitioning": true,
"preferredWritePartitioningMinNumberOfPartitions": 4,
"writerCount": 32,
"scaleWriters": false,
"writerMinSize": "100MB"
},
"incrementalImportConfig": {
"usePreferredWritePartitioning": false,
"preferredWritePartitioningMinNumberOfPartitions": 1,
"writerCount": 4,
"scaleWriters": false,
"writerMinSize": "100MB"
}
}
Metrics#
Metrics about table scan redirection are reported in a JMX table for each
specific catalog mycatalog
using any connector:
jmx.current."com.starburstdata.presto.plugin.jdbc.redirection:name=mycatalog,type=redirectionstats"
Metrics include information about the number of successful redirections and the number of times redirection could not be performed due to the target table missing some columns from the source table.
Limitations#
Redirections are supported for Hive tables but not Hive views.