Getting Started
How to get started using Spark dynamic allocation with Data Flow.
The prerequisites and configuration needed to use dynamic allocation with Data Flow.
Prerequisites
- Use Spark 3.x.
- Spark dynamic allocation is beneficial only for jobs with processing times of more than 10 minutes.
- Although Spark dynamic allocation can be used with Structured Streaming, it's optimized for batch jobs. For more information, see Spark Dynamic Allocation and Spark Structured Streaming.
- Enabling Spark dynamic allocation enables shuffle tracking.
- External shuffle service isn't supported.
Configuration of Spark Dynamic Allocation
You can configure Spark dynamic allocation with Data Flow in three ways.
Using the Console
When creating an Application, click
Enable Autoscaling. The default configuration is populated into the
Spark configuration properties. The minimum number of executors
matches the value of the spark.dynamicAllocation.minExecutors
property. The
maximum number of executors matches the value of the
spark.dynamicAllocation.maxExecutors
property. You can set different
values to the defaults for the spark.dynamicAllocation.executorIdleTimeout
and spark.dynamicAllocation.schedulerBacklogTimeout
properties.
Using the API
Set both spark.dynamicAllocation.enabled
and spark.dynamicAllocation.shuffleTracking.enabled
to true
for the Data Flow application.
A Spark application with dynamic allocation enabled requests more executors when it has pending tasks waiting to be scheduled. This condition necessarily implies that the existing set of executors is insufficient to simultaneously saturate all tasks that have been submitted but not yet finished.
Spark requests executors in rounds. The actual request is triggered when there have been
pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout
seconds.
If the queue of pending tasks persists, the request is triggered again every
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
seconds
thereafter. The number of executors requested in each round increases exponentially from the
previous round. For example, an application adds 1 executor in the first round, and then 2
executors, then 4 executors, and so on in later rounds.
spark.dynamicAllocation.executorIdleTimeout
seconds. Under most circumstances, this condition is mutually exclusive with the request condition, in that an executor should not be idle if pending tasks are still to be scheduled.
Using Spark-Submit
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=1
--conf spark.dynamicAllocation.maxExecutors=4
--conf spark.dynamicAllocation.executorIdleTimeout=60
--conf spark.dynamicAllocation.shuffleTracking.enabled=true
Minimum Configuration
- spark.dynamicAllocation.enabled:
true
-
spark.dynamicAllocation.shuffleTracking.enabled:
true
-
spark.dynamicAllocation.minExecutors:
1
-
spark.dynamicAllocation.maxExecutors:
4
-
spark.dynamicAllocation.executorIdleTimeout:
60
-
spark.dynamicAllocation.schedulerBacklogTimeout:
60
-
spark.dataflow.dynamicAllocation.quotaPolicy:
max
Property Descriptions
Descriptions of the Spark dynamic allocation properties you can use with Data Flow, and their possible values.
Property Name | Default Value | Value Range | Description | Supported Spark Versions |
---|---|---|---|---|
spark.dynamicAllocation.enabled | false | true | false | Whether to use dynamic resource allocation or not. This property scales the number of executors registered with the application up and down based on the workload. | 3.x |
spark.dynamicAllocation.shuffleTracking.enabled | true | true | Enables shuffle file tracking for executors, which allows dynamic allocation without the need for an external shuffle service. This option tries to keep alive executors that are storing shuffle data for active jobs. | 3.x |
spark.dynamicAllocation.minExecutors | 1 | [1, maxExecutors] | The lower bound for the number of executors when dynamic allocation is enabled. | 3.x |
spark.dynamicAllocation.maxExecutors | 4 | [minExecutors, 1000] | The upper bound for the number of executors when dynamic allocation is enabled. | 3.x |
spark.dynamicAllocation.executorIdleTimeout | 60 | [60, 600] | If dynamic allocation is enabled, the time, in seconds, an executor must be idle for, before it's removed. | 3.x |
spark.dynamicAllocation.schedulerBacklogTimeout | 60 | [60, 600] | If dynamic allocation is enabled, the time, in seconds, there have been tasks pending, before new executors are requested | 3.x |
spark.dataflow.dynamicAllocation.quotaPolicy | min | min | max |
Data Flow reduces the tenancy quota during the start of the run by the value of
Data Flow reduces the tenancy quota during the start of the run by the value of
|
3.x |
For more information see the Spark documentation on Dynamic Resource Allocation and Dynamic Allocation.