Setting up a Dask Client#
In this tutorial, we will:
explain what the Dask client is
initialize a Dask client using a context manager and a persistent client
configure a Dask client using the most common arguments
If you would like to know more, see:
Our Dask cluster configuration page for LSDB-specific tips.
The official Dask documentation for more general information.
Dask’s own best practices, which may also be useful to consult.
Introduction#
Dask is a framework that allows us to take advantage of distributed computing capabilities.
It is recommended to use Dask when using LSDB; otherwise, LSDB uses a single CPU core, which is extremely slow for large datasets.
1 - Launching a Dask client using a context manager#
This is most commonly seen when running HATS import pipelines.
Using a context manager is convenient because it ensures the client is automatically closed when the context block is exited, preventing resource leaks.
This is useful for temporary tasks where the client is only needed for a specific operation.
[1]:
from dask.distributed import Client
from hats_import.catalog.arguments import ImportArguments
from hats_import.pipeline import pipeline_with_client
def main():
# Define arguments for the import pipeline
args = ImportArguments(
ra_column="ra",
dec_column="dec",
# ...
)
# Use a context manager to create and close the Dask client automatically
with Client(
n_workers=10, # Number of workers
threads_per_worker=1, # Threads per worker
# ...
) as client:
pipeline_with_client(args, client)
# if __name__ == '__main__':
# main()
2 - Launching a persistent Dask client#
Sometimes it’s easier to have the Dask client live throughout the entire notebook.
This is especially useful for workflows that span multiple cells and require the client to remain active throughout the notebook.
Note that if you use this approach, you’ll need to manually close the client when you’re done using it (typically, at the end of the notebook).
[2]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1, memory_limit="auto")
client
[2]:
Client
Client-0fde9fe1-232f-11f1-8723-8e6cf3dc31f8
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
2f914c15
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 4 | Total memory: 13.09 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-66882d6a-ae57-4797-a5aa-32f8c356635e
| Comm: tcp://127.0.0.1:39987 | Workers: 0 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 0 |
| Started: Just now | Total memory: 0 B |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:39349 | Total threads: 1 |
| Dashboard: http://127.0.0.1:38845/status | Memory: 3.27 GiB |
| Nanny: tcp://127.0.0.1:34829 | |
| Local directory: /tmp/dask-scratch-space/worker-ig0pt1ku | |
Worker: 1
| Comm: tcp://127.0.0.1:35325 | Total threads: 1 |
| Dashboard: http://127.0.0.1:33151/status | Memory: 3.27 GiB |
| Nanny: tcp://127.0.0.1:39603 | |
| Local directory: /tmp/dask-scratch-space/worker-6u7qe8ou | |
Worker: 2
| Comm: tcp://127.0.0.1:39387 | Total threads: 1 |
| Dashboard: http://127.0.0.1:38649/status | Memory: 3.27 GiB |
| Nanny: tcp://127.0.0.1:34495 | |
| Local directory: /tmp/dask-scratch-space/worker-tkm75duz | |
Worker: 3
| Comm: tcp://127.0.0.1:37657 | Total threads: 1 |
| Dashboard: http://127.0.0.1:39047/status | Memory: 3.27 GiB |
| Nanny: tcp://127.0.0.1:39559 | |
| Local directory: /tmp/dask-scratch-space/worker-5kr8du7i | |
[3]:
# Code to execute.
[4]:
client.close()
3 - Common arguments#
There are a few arguments the LSDB team finds most useful. They are briefly explained as follows; for more explanation, see our Dask cluster configuration page.
3.1 - n_workers#
The number of Dask workers (or Python processes) to run.
Increasing
n_workersallows for more parallelism, but may also increase resource usage.
3.2 - threads_per_worker#
Specifies how many Python threads each worker can use.
It’s generally better to keep this low (1 or 2) to avoid contention between threads.
Instead, scale up the
n_workersargument to increase parallelism.
3.3 - memory_limit#
Specifies how much memory each worker is allocated.
Generally, we find diminishing returns beyond 10 GB per thread.
For example, if
memory_limit="20GB"andthreads_per_worker=2, the worker fails when the combined usage of the threads is >20GB.You can also set
memory_limit="auto"to let Dask automatically allocate based on the available system memory.
3.4 - local_directory#
Specifies where Dask workers store temporary files.
Useful if the default system temp directory has limited space (e.g., pointing to a /data/ directory with more available disk space).
Using a temporary directory can also help manage cleanup automatically.
[5]:
import tempfile
tmp_path = tempfile.TemporaryDirectory()
tmp_dir = tmp_path.name
client = Client(local_directory=tmp_dir)
# Do things here.
client.close()
tmp_path.cleanup()
About#
Author(s): Olivia Lynn and Melissa DeLucchi
Last updated on: May 22, 2025
If you use lsdb for published research, please cite following instructions.