Fault-tolerant execution#
By default, if a node lacks the resources to execute a task or otherwise fails during query execution, the query fails and must be run again manually. The longer the runtime of a query, the more likely it is to be susceptible to such failures.
Fault-tolerant execution is a mechanism in SEP that enables a cluster to mitigate query failures by retrying queries or their component tasks in the event of failure. With fault-tolerant execution enabled, intermediate exchange data is spooled and can be re-used by another worker in the event of a worker outage or other fault during query execution.
Note
Fault tolerance does not apply to broken queries or other user error. For example, SEP does not spend resources retrying a query that fails because its SQL cannot be parsed.
Configuration#
Fault-tolerant execution is disabled by default. To enable the feature, set the
retry-policy
configuration property to either QUERY
or TASK
depending on the desired retry policy.
retry-policy=QUERY
Warning
Setting retry-policy
disables write operations with connectors that do not support fault-tolerant
execution of write operations, resulting in a “This connector does not support
query retries” error message.
Support for fault-tolerant execution of SQL statements varies on a per-connector basis:
Fault-tolerant execution of read operations is supported by all connectors except the Starburst Teradata direct connector. The Starburst Teradata direct connector only supports fault-tolerant execution of read operations with a
QUERY
retry policy.Fault-tolerant execution of write operations is supported by the following connectors:
The following configuration properties control the behavior of fault-tolerant execution on a SEP cluster:
Property name |
Description |
Default value |
---|---|---|
|
Configures what is retried in the event of failure, either
|
|
|
Size of the coordinator’s in-memory buffer used by fault-tolerant execution to store output of query stages. If this buffer is filled during query execution, the query fails with a “Task descriptor storage capacity has been exceeded” error message unless an exchange manager is configured. |
|
|
Enable compression of spooling data. Setting to |
|
Retry policy#
The retry-policy
configuration property designates whether SEP retries
entire queries or a query’s individual tasks in the event of failure.
QUERY#
A QUERY
retry policy instructs SEP to automatically retry a query in the
event of an error occuring on a worker node. A QUERY
retry policy is
recommended when the majority of the SEP cluster’s workload consists of many
small queries, or if an exchange manager is not
configured.
By default SEP does not implement fault tolerance for queries whose result set
exceeds 32MB in size, such as SELECT statements that return a very
large data set to the user. This limit can be increased by modifying the
exchange.deduplication-buffer-size
configuration property to be greater than
the default value of 32MB
, but this results in higher memory usage on the
coordinator.
To enable fault-tolerant execution on queries with a larger result set, it is strongly recommended to configure an exchange manager that utilizes external storage for spooled data and therefore allows for storage of spilled data beyond the in-memory buffer size.
TASK#
A TASK
retry policy instructs SEP to retry individual query
tasks in the event of failure. This policy is
recommended when executing large batch queries, as the cluster can more
efficiently retry smaller tasks within the query rather than retry the whole
query.
When a cluster is configured with a TASK
retry policy, some relevant
configuration properties have their default values changed to follow best
practices for a fault-tolerant cluster. However, this automatic change does not
affect clusters that have these properties manually configured. If you have
any of the following properties configured in the config.properties
file on
a cluster with a TASK
retry policy, it is strongly recommended to make the
following changes:
Set the
task.low-memory-killer.policy
query management property tototal-reservation-on-blocked-nodes
, or queries may need to be manually killed if the cluster runs out of memory.Set the
query.low-memory-killer.delay
query management property to0s
so the cluster immediately unblocks nodes that run out of memory.Modify the
query.remote-task.max-error-duration
query management property to adjust how long SEP allows a remote task to try reconnecting before considering it lost and rescheduling.
Note
A TASK
retry policy is best suited for large batch queries, but this
policy can result in higher latency for short-running queries executed in high
volume. As a best practice, it is recommended to run a dedicated cluster
with a TASK
retry policy for large batch queries, separate from another
cluster that handles short queries.
Advanced configuration#
You can further configure fault-tolerant execution with the following configuration properties. The default values for these properties should work for most deployments, but you can change these values for testing or troubleshooting purposes.
Retry limits#
The following configuration properties control the thresholds at which queries/tasks are no longer retried in the event of repeated failures:
Property name |
Description |
Default value |
Retry policy |
---|---|---|---|
|
Maximum number of times SEP may attempt to retry a query before declaring the query as failed. |
|
Only |
|
Maximum number of times SEP may attempt to retry a single task before declaring the query as failed. |
|
Only |
|
Minimum time that a failed query or task must wait before it is retried. May be
overridden with the |
|
|
|
Maximum time that a failed query or task must wait before it is retried.
Wait time is increased on each subsequent failure. May be
overridden with the |
|
|
|
Factor by which retry delay is increased on each query or task failure. May be
overridden with the |
|
|
Task sizing#
With a TASK
retry policy, it is important to manage the amount of data
processed in each task. If tasks are too small, the management of task
coordination can take more processing time and resources than executing the task
itself. If tasks are too large, then a single task may require more resources
than are available on any one node and therefore prevent the query from
completing.
SEP supports limited automatic task sizing. If issues are occurring during
fault-tolerant task execution, you can configure the following configuration
properties to manually control task sizing. These configuration properties only
apply to a TASK
retry policy.
Property name |
Description |
Default value |
---|---|---|
|
Target size in bytes of all task inputs for a single fault-tolerant task. Applies to tasks that read input from spooled data written by other tasks. May be overridden for the current session with the
|
|
|
Target number of standard splits processed by a single task that reads data from source tables. Value is interpreted with split weight taken into account. If the weight of splits produced by a catalog denotes that they are lighter or heavier than “standard” split, then the number of splits processed by single task is adjusted accordingly. May be overridden for the current session with the
|
|
|
Maximum number of splits processed by a single task. This value is not split weight-adjusted and serves as protection against situations where catalogs report an incorrect split weight. May be overridden for the current session with the
|
|
Node allocation#
With a TASK
retry policy, nodes are allocated to tasks based on available
memory and estimated memory usage. If task failure occurs due to exceeding
available memory on a node, the task is restarted with a request to allocate the
full node for its execution.
The initial task memory-requirements estimation is static and configured with
the fault-tolerant-task-memory
configuration property. This property only
applies to a TASK
retry policy.
Property name |
Description |
Default value |
---|---|---|
|
Initial task memory estimation used for bin-packing when allocating nodes
for tasks. May be overridden for the current session with the
|
|
Other tuning#
The following additional configuration property can be used to manage fault-tolerant execution:
Property name |
Description |
Default value |
Retry policy |
---|---|---|---|
|
Maximum amount of memory to be used to store task descriptors for fault tolerant queries on coordinator. Extra memory is needed to be able to reschedule tasks in case of a failure. |
(JVM heap size * 0.15) |
Only |
|
Number of partitions to use for distributed joins and aggregations,
similar in function to the |
|
Only |
|
Allow for up to configured number of tasks to wait for node allocation per stage, before pausing scheduling for other tasks from this stage. |
5 |
Only |
Exchange manager#
Exchange spooling is responsible for storing and managing spooled data for fault-tolerant execution. You can configure a filesystem-based exchange manager that stores spooled data in a specified location, such as AWS S3 and S3-compatible systems, Azure Blob Storage, Google Cloud Storage, or HDFS.
Configuration#
To configure an exchange manager, create a new
etc/exchange-manager.properties
configuration file on the coordinator and
all worker nodes. In this file, set the exchange-manager.name
configuration
property to filesystem
or hdfs
, and set additional configuration
properties as needed for your storage solution.
The following table lists the available configuration properties for
exchange-manager.properties
, their default values, and which filesystem(s)
the property may be configured for:
Property name |
Description |
Default value |
Supported filesystem |
---|---|---|---|
|
Comma-separated list of URI locations that the exchange manager uses to store spooling data. |
Any |
|
|
The minimum buffer pool size for an exchange sink. The larger the buffer pool size, the larger the write parallelism and memory usage. |
|
Any |
|
The number of buffers per partition in the buffer pool. The larger the buffer pool size, the larger the write parallelism and memory usage. |
|
Any |
|
Max size of files written by exchange sinks. |
|
Any |
|
Number of concurrent readers to read from spooling storage. The larger the number of concurrent readers, the larger the read parallelism and memory usage. |
|
Any |
|
AWS access key to use. Required for a connection to AWS S3 and GCS, can be ignored for other S3 storage systems. |
AWS S3, GCS |
|
|
AWS secret key to use. Required for a connection to AWS S3 and GCS, can be ignored for other S3 storage systems. |
AWS S3, GCS |
|
|
IAM role to assume. |
AWS S3, GCS |
|
|
External ID for the IAM role trust policy. |
AWS S3, GCS |
|
|
Region of the S3 bucket. |
AWS S3, GCS |
|
|
S3 storage endpoint server if using an S3-compatible storage system that
is not AWS. If using AWS S3, this can be ignored. If using GCS, set it
to |
Any S3-compatible storage |
|
|
Maximum number of times the exchange manager’s S3 client should retry a request. |
|
Any S3-compatible storage |
|
Enables using path-style access for all requests to S3. |
|
Any S3-compatible storage |
|
Part size for S3 multi-part upload. |
|
Any S3-compatible storage |
|
Path to the JSON file that contains your Google Cloud Platform
service account key. Not to be set together with
|
GCS |
|
|
Your Google Cloud Platform service account key in JSON format.
Not to be set together with |
GCS |
|
|
Connection string used to access the spooling container. |
Azure Blob Storage |
|
|
Block size for Azure block blob parallel upload. |
|
Azure Blob Storage |
|
Maximum number of times the exchange manager’s Azure client should retry a request. |
|
Azure Blob Storage |
|
Block size for HDFS storage. |
|
HDFS |
|
Comma-separated list of paths to HDFS configuration files, for example
|
HDFS |
It is recommended to set the exchange.compression-enabled
property to
true
in the cluster’s config.properties
file, to reduce the exchange
manager’s overall I/O load. It is also recommended to configure a bucket
lifecycle rule to automatically expire abandoned objects in the event of a node
crash.
AWS S3#
The following example exchange-manager.properties
configuration specifies an
AWS S3 bucket as the spooling storage destination. Note that the destination
does not have to be in AWS, but can be any S3-compatible storage system.
exchange-manager.name=filesystem
exchange.base-directories=s3://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
You can configure multiple S3 buckets for the exchange manager to distribute
spooled data across buckets, reducing the I/O load on any one bucket. If a query
fails with the error message
“software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your
request rate”, this indicates that the workload is I/O intensive, and you should
specify multiple S3 buckets in exchange.base-directories
to balance the
load:
exchange.base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2
Azure Blob Storage#
The following example exchange-manager.properties
configuration specifies an
Azure Blob Storage container as the spooling storage destination.
exchange-manager.name=filesystem
exchange.base-directories=abfs://container_name@account_name.dfs.core.windows.net
exchange.azure.connection-string=connection-string
Google Cloud Storage#
To enable exchange spooling on GCS in SEP, change the request endpoint to the
https://storage.googleapis.com
Google storage URI, and configure your AWS
access/secret keys to use the GCS HMAC keys. If you deploy SEP on GCP, you
must either create a service account with access to your spooling bucket or
configure the key path to your GCS credential file.
For more information on GCS’s S3 compatibility, refer to the Google Cloud documentation on S3 migration.
The following example exchange-manager.properties
configuration specifies a
GCS bucket as the spooling storage destination.
exchange-manager.name=filesystem
exchange.base-directories=gs://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
exchange.s3.endpoint=https://storage.googleapis.com
exchange.gcs.json-key-file-path=/path/to/gcs_keyfile.json
HDFS#
The following exchange-manager.properties
configuration example specifies
HDFS as the spooling storage destination.
exchange-manager.name=hdfs
exchange.base-directories=hadoop-master:9000/exchange-spooling-directory
hdfs.config.resources=/usr/lib/hadoop/etc/hadoop/core-site.xml
Local filesystem storage#
The following example exchange-manager.properties
configuration specifies a
local directory, /tmp/trino-exchange-manager
, as the spooling storage
destination.
Note
It is only recommended to use a local filesystem for exchange in standalone, non-production clusters. A local directory can only be used for exchange in a distributed cluster if the exchange directory is shared and accessible from all worker nodes.
exchange-manager.name=filesystem
exchange.base-directories=/tmp/trino-exchange-manager