Setting up a Dask Client#

In this tutorial, we will:

  • explain what the Dask client is

  • initialize a Dask client using a context manager and a persistent client

  • configure a Dask client using the most common arguments

If you would like to know more, see:

Introduction#

Dask is a framework that allows us to take advantage of distributed computing capabilities.

It is recommended to use Dask when using LSDB; otherwise, LSDB uses a single CPU core, which is extremely slow for large datasets.

1 - Launching a Dask client using a context manager#

This is most commonly seen when running HATS import pipelines.

Using a context manager is convenient because it ensures the client is automatically closed when the context block is exited, preventing resource leaks.

This is useful for temporary tasks where the client is only needed for a specific operation.

[1]:
from dask.distributed import Client
from hats_import.catalog.arguments import ImportArguments
from hats_import.pipeline import pipeline_with_client


def main():
    # Define arguments for the import pipeline
    args = ImportArguments(
        ra_column="ra",
        dec_column="dec",
        # ...
    )
    # Use a context manager to create and close the Dask client automatically
    with Client(
        n_workers=10,  # Number of workers
        threads_per_worker=1,  # Threads per worker
        # ...
    ) as client:
        pipeline_with_client(args, client)


# if __name__ == '__main__':
#     main()

2 - Launching a persistent Dask client#

Sometimes it’s easier to have the Dask client live throughout the entire notebook.

This is especially useful for workflows that span multiple cells and require the client to remain active throughout the notebook.

Note that if you use this approach, you’ll need to manually close the client when you’re done using it (typically, at the end of the notebook).

[2]:
from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1, memory_limit="auto")
client
[2]:

Client

Client-0fde9fe1-232f-11f1-8723-8e6cf3dc31f8

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

[3]:
# Code to execute.
[4]:
client.close()

3 - Common arguments#

There are a few arguments the LSDB team finds most useful. They are briefly explained as follows; for more explanation, see our Dask cluster configuration page.

3.1 - n_workers#

  • The number of Dask workers (or Python processes) to run.

  • Increasing n_workers allows for more parallelism, but may also increase resource usage.

3.2 - threads_per_worker#

  • Specifies how many Python threads each worker can use.

  • It’s generally better to keep this low (1 or 2) to avoid contention between threads.

  • Instead, scale up the n_workers argument to increase parallelism.

3.3 - memory_limit#

  • Specifies how much memory each worker is allocated.

  • Generally, we find diminishing returns beyond 10 GB per thread.

  • For example, if memory_limit="20GB" and threads_per_worker=2, the worker fails when the combined usage of the threads is >20GB.

  • You can also set memory_limit="auto" to let Dask automatically allocate based on the available system memory.

3.4 - local_directory#

  • Specifies where Dask workers store temporary files.

  • Useful if the default system temp directory has limited space (e.g., pointing to a /data/ directory with more available disk space).

  • Using a temporary directory can also help manage cleanup automatically.

[5]:
import tempfile

tmp_path = tempfile.TemporaryDirectory()
tmp_dir = tmp_path.name

client = Client(local_directory=tmp_dir)

# Do things here.

client.close()
tmp_path.cleanup()

About#

Author(s): Olivia Lynn and Melissa DeLucchi

Last updated on: May 22, 2025

If you use lsdb for published research, please cite following instructions.