Adaptive Deployments¶
Motivation¶
Most Dask deployments are static, with a single scheduler and a fixed number of workers. This results in predictable behavior, but is wasteful of resources in two situations:
- The user may not be using the cluster, perhaps they are busy interpreting a recent result or plot, and so the workers sit idly, taking up valuable shared resources from other potential users
- The user may be very active, and is limited by their original allocation.
Particularly efficient users may learn to manually add and remove workers during their session, but this is rare. Instead, we would like the size of a Dask cluster to match the computational needs at any given time. This is the goal of the adaptive deployments discussed in this document. These are particularly helpful for interactive workloads, which are characterized by long periods of inactivity interrupted with short bursts of heavy activity. Adaptive deployments can result in both faster analyses that give users much more power but with much less pressure on computational resources.
Adaptive¶
To make setting up adaptive deployments easy, some Dask deployment solutions
offer an .adapt()
method. Here is an example with
dask_kubernetes.KubeCluster.
from dask_kubernetes import KubeCluster
cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=100) # scale between 0 and 100 workers
For more keyword options, see the Adaptive class below:
Adaptive (scheduler, cluster[, interval, …]) |
Adaptively allocate workers based on scheduler load. |
Dependence on a Resource Manager¶
The Dask scheduler does not know how to launch workers on its own, instead it relies on an external resource scheduler like Kubernetes above, or Yarn, SGE, SLURM, Mesos, or some other in-house system (see setup documentation for options). In order to use adaptive deployments you must provide some mechanism for the scheduler to launch new workers. Typically this is done by using one of the solutions listed in the setup documentation, or by subclassing from the Cluster superclass, and implementing that API
Cluster |
Superclass for cluster objects |
Scaling Heuristics¶
The Dask scheduler tracks a variety of information that is useful to correctly allocate the number of workers:
- The historical runtime of every function and task that it has seen, and all of the functions that it is currently able to run for users
- The amount of memory used and available on each worker
- Which workers are idle or saturated for various reasons, like the presence of specialized hardware
From these it is able to determine a target number of workers by dividing the
cumulative expected runtime of all pending tasks by the target_duration
parameter (defaults to five seconds). This number of workers serves as a
baseline request for the resource manager. This number can be altered for a
variety of reasons:
- If the cluster needs more memory then it will choose either the target number of workers, or twice the current number of workers, whichever is larger.
- If the target is outside of the range of the minimum and maximum values then it is clipped to fit within that range.
Additionally, when scaling down Dask preferentially chooses those workers that
are idle and have the least data in memory. It moves that data to other
machines before retiring the worker. To avoid rapid cycling of the cluster up
and down in size, we only retire a worker after a few cycles have gone by where
it has consistently been a good idea to retire it (controlled by the
wait_count
and interval
parameters.)
API¶
-
class
distributed.deploy.
Adaptive
(scheduler, cluster, interval='1s', startup_cost='1s', scale_factor=2, minimum=0, maximum=None, wait_count=3, target_duration='5s', **kwargs)¶ Adaptively allocate workers based on scheduler load. A superclass.
Contains logic to dynamically resize a Dask cluster based on current use. This class needs to be paired with a system that can create and destroy Dask workers using a cluster resource manager. Typically it is built into already existing solutions, rather than used directly by users. It is most commonly used from the
.adapt(...)
method of various Dask cluster classes.Parameters: scheduler: distributed.Scheduler
cluster: object
Must have scale_up and scale_down methods/coroutines
startup_cost : timedelta or str, default “1s”
Estimate of the number of seconds for nnFactor representing how costly it is to start an additional worker. Affects quickly to adapt to high tasks per worker loads
interval : timedelta or str, default “1000 ms”
Milliseconds between checks
wait_count: int, default 3
Number of consecutive times that a worker should be suggested for removal before we remove it.
scale_factor : int, default 2
Factor to scale by when it’s determined additional workers are needed
target_duration: timedelta or str, default “5s”
Amount of time we want a computation to take. This affects how aggressively we scale up.
minimum: int
Minimum number of workers to keep around
maximum: int
Maximum number of workers to keep around
**kwargs:
Extra parameters to pass to Scheduler.workers_to_close
Notes
Subclasses can override
Adaptive.should_scale_up()
andAdaptive.workers_to_close()
to control when the cluster should be resized. The default implementation checks if there are too many tasks per worker or too little memory available (seeAdaptive.needs_cpu()
andAdaptive.needs_memory()
).Adaptive.get_scale_up_kwargs()
method controls the arguments passed to the cluster’sscale_up
method.Examples
This is commonly used from existing Dask classes, like KubeCluster
>>> from dask_kubernetes import KubeCluster >>> cluster = KubeCluster() >>> cluster.adapt(minimum=10, maximum=100)
Alternatively you can use it from your own Cluster class by subclassing from Dask’s Cluster superclass
>>> from distributed.deploy import Cluster >>> class MyCluster(Cluster): ... def scale_up(self, n): ... """ Bring worker count up to n """ ... def scale_down(self, workers): ... """ Remove worker addresses from cluster """
>>> cluster = MyCluster() >>> cluster.adapt(minimum=10, maximum=100)
-
class
distributed.deploy.
Cluster
¶ Superclass for cluster objects
This expects a local Scheduler defined on the object. It provides common methods and an IPython widget display.
Clusters inheriting from this class should provide the following:
A local
Scheduler
object at.scheduler
scale_up and scale_down methods as defined below:
- def scale_up(self, n: int):
‘’’ Brings total worker count up to
n
‘’‘- def scale_down(self, workers: List[str]):
‘’’ Close the workers with the given addresses ‘’‘
This will provide a general
scale
method as well as an IPython widget for display.See also
LocalCluster
- a simple implementation with local workers
Examples
>>> from distributed.deploy import Cluster >>> class MyCluster(cluster): ... def scale_up(self, n): ... ''' Bring the total worker count up to n ''' ... pass ... def scale_down(self, workers): ... ''' Close the workers with the given addresses ''' ... pass
>>> cluster = MyCluster() >>> cluster.scale(5) # scale manually >>> cluster.adapt(minimum=1, maximum=100) # scale automatically