Determining the Right Dask Cluster Parameters

Contents

Determining the Right Dask Cluster Parameters#

In this tutorial, we will:

  • configure clusters based on the memory demands of larger data partitions

  • find the largest data partitions in a dataset

  • apply general strategies for scaling in-memory code to large out-of-memory runs

Introduction#

Working at scale almost always demands that you, the user, understand details of your workflows memory usage to properly setup the various parameters of your cluster setup. This can also be a very challenging task, as many users are not used to being aware of how much memory their code needs to execute. In this tutorial, we’ll walk through a few strategies for determining how much memory your workers will need in the “worst case” and try to equip you with some code and hueristics for how to think about setting up a cluster for your workflow.

Install external packages needed for this notebook with the command below:

[ ]:
# pip install light-curve memory-profiler
[1]:
import lsdb
import light_curve as licu
import numpy as np

For this tutorial, we’ll be looking at ZTF DR22. The first thing we’ll do, is find the largest pixel (not by spatial size, but by the amount of data stored within it).

Finding the largest pixel can be really helpful setting our cluster up for success, as it will usually be the most challenging partition for a worker to handle. Below, we compute per_pixel_statistics for the catalog, and display the top 5 largest pixels by nested row_count:

[11]:
ztf_cat = lsdb.open_catalog("https://data.lsdb.io/hats/ztf_dr22")
stats = ztf_cat.per_pixel_statistics(include_columns=["objectid", "hmjd"], include_stats=["row_count"])
stats.sort_values(["hmjd: row_count"], ascending=False).head(5)
[11]:
objectid: row_count hmjd: row_count
Order: 5, Pixel: 7457 911483.0 327483964.0
Order: 5, Pixel: 7460 888057.0 308832839.0
Order: 6, Pixel: 14533 950980.0 304487284.0
Order: 6, Pixel: 30526 984845.0 302272221.0
Order: 6, Pixel: 29868 931536.0 300487880.0

Some of our largest partitions in this dataset have just shy of a million objects, with 300 million timeseries observations for them in total. Let’s grab one of them into memory, by searching for it via PixelSearch and then using compute. We can work with this partition locally, as a means to directly develop and test our analysis code.

[2]:
ztf_single_pix = lsdb.open_catalog(
    "https://data.lsdb.io/hats/ztf_dr22", search_filter=lsdb.PixelSearch((5, 7457))
)
ztf_single_pix = ztf_single_pix.nest_lists(
    list_columns=["hmjd", "mag", "magerr", "clrcoeff", "catflags"], name="lc"
).compute()
ztf_single_pix
[2]:
  objectid filterid objra objdec nepochs lc
2098958901716483082 385202300127741 2 288.283020 -13.245409 780
hmjd mag magerr clrcoeff catflags
58246.45959 18.74802 0.047087 0.101397 0

780 rows × 5 columns

2098958901716483896 385102300097293 1 288.283020 -13.245406 213
58246.46433 19.302635 0.068967 -0.020058 0

213 rows × 5 columns

2098958901746886855 385202300096509 2 288.281982 -13.245864 955
58246.45959 16.59124 0.015825 0.101397 0

955 rows × 5 columns

2098958901746923271 385102300085934 1 288.281982 -13.245849 298
58234.40196 17.127243 0.021141 -0.01969 32768

298 rows × 5 columns

2098958901946483197 385202300139959 2 288.280243 -13.245934 1
58716.21867 21.299936 0.219311 0.088564 0

1 rows × 5 columns

... ... ... ... ... ... ...
921107 rows x 6 columns

info is a nice way to check its characteristics, we see it’s almost 8GBs in memory. While not specifically mentioned in the output, the vast majority of that will be the nested “lc” data. As an initial thought, we know from this that just holding the data will at worst cost 8GBs, meaning that immediately we’ve discounted having many ~8GB or smaller workers.

[3]:
ztf_single_pix.info()
<class 'nested_pandas.nestedframe.core.NestedFrame'>
Index: 921107 entries, 2098958901716483082 to 2099240376120221204
Data columns (total 6 columns):
 #   Column    Non-Null Count   Dtype
---  ------    --------------   -----
 0   objectid  921107 non-null  int64[pyarrow]
 1   filterid  921107 non-null  int8[pyarrow]
 2   objra     921107 non-null  float[pyarrow]
 3   objdec    921107 non-null  float[pyarrow]
 4   nepochs   921107 non-null  int64[pyarrow]
 5   lc        921107 non-null  nested<hmjd: [double], mag: [float], magerr: [float], clrcoeff: [float], catflags: [int32]>
dtypes: float[pyarrow](2), int64[pyarrow](2), int8[pyarrow](1), nested<hmjd: [double], mag: [float], magerr: [float], clrcoeff: [float], catflags: [int32]>(1)
memory usage: 7.7 GB

Now, let’s create an analysis function that calculates the periods for each of our lightcurves:

[ ]:
def calc_periods(pix_df):
    """Calculate the period of all objects in a pixel"""

    # pix_df is a nested-pandas NestedFrame, so we use the NestedFrame API within this function
    # First, cut our photometry on catflags
    pix_df = pix_df.query("lc.catflags == 0")

    # Drop any empty light curves
    pix_df = pix_df.dropna(subset=["lc"])

    # Now we can calculate the periods
    extractor = licu.Extractor(
        licu.Periodogram(
            peaks=1,
            max_freq_factor=1.0,
            fast=True,
        ),  # Would give two features: peak period and signa-to-noise ratio of the peak
    )

    # light-curve requires all arrays to be the same dtype.
    # It also requires the time array to be ordered and to have no duplicates.
    def _extract_features(mjd, mag, **kwargs):
        # We offset date, so we still would have <1 second precision
        if len(mjd) < 50:
            return dict(zip(extractor.names, [np.nan] * len(extractor.names)))
        t = np.asarray(mjd - 60000, dtype=np.float32)
        # print(t)
        _, sort_index = np.unique(t, return_index=True)
        features = extractor(
            t[sort_index],
            mag[sort_index],
            **kwargs,
        )
        # Return the features as a dictionary
        return dict(zip(extractor.names, features))

    features = pix_df.map_rows(
        _extract_features,
        columns=["lc.hmjd", "lc.mag"],
        row_container="args",  # Pass columns as individual arguments
    )

    return features

It’s very useful to test our new analysis function on a single partition of the dataset, and we can easily use the one we just grabbed above. (For larger functions, it’s also a great way to iteratively build those functions!)

[5]:
# use the computed pixel to test the function
single_res = calc_periods(ztf_single_pix)
single_res
[5]:
period_0 period_s_to_n_0
_healpix_29
2098958901716483082 97.901375 9.427485
2098958901716483896 56.258545 5.917771
... ... ...
2099240375263563088 179.489136 4.851810
2099240376120221204 NaN NaN

909850 rows × 2 columns

We see above that our function works, and what it returns.

Understanding what cluster parameters to use (number of workers, worker memory limits, etc.) can be a very tricky task. Especially when writing a custom function, as we have above, the needs of the cluster will directly depend on the memory demands of our function. Below, we use a memory profiler to estimate the amount of memory used for operating on a single partition. Above, we picked out one of the largest partitions in our sample, which positions this estimate as more of an upper bound to the needed worker memory.

IMPORTANT: Memory Profiling Caveats: It’s very easy to use this incorrectly, especially within a notebook environment. Modules like this are almost always measuring the kernel memory, meaning it’s not just a measure of the function run, but anything else in memory at the time. If you were to rerun this, you would see the number increasing every time, until garbage collection eventually stepped in. Running the single pixel calculation in the cell above would also affect the result. In principle, this number serves as a reasonable estimate if you restart the kernel and make sure to only run the needed cells ahead of running the memory profiler, and perhaps more ideally you would create a dedicated script executed separately to run the profiler. With these caveats in mind, this is still a useful exercise to get a rough sense of how much memory you will need.

[6]:
# Assessing Memory Usage for a single pixel
from memory_profiler import memory_usage


def calc_periods_mem():
    """Calculate the period of all objects in a pixel, with memory profiling"""

    return calc_periods(ztf_single_pix)


mem = max(
    memory_usage(
        proc=calc_periods_mem,
    )
)
print("Maximum memory used: {} MiB".format(mem))
Maximum memory used: 32424.65234375 MiB

We see that for our largest partition, our analysis required a bit more than 32 GBs of memory. As a rough hueristic, it’s good to take this number and multiply it by 1.5-2x when defining memory limits. This overhead is for a couple reasons, dask will be storing intermediate results alongside doing these operations, and the largest partition we found may not be the largest partition in the full dataset. With a smaller number of workers, you might need closer to ~2x as those workers take on more burden in terms of storing intermediate results, whereas with a larger number of workers you might be able to operate comfortably at the ~1.5x regime. From a stability perspective, more worker memory is more reliable, but make sure to at least have a few workers.

Below, we use this information to setup our cluster, allocating 4 workers with 48GBs each. And very importantly, we set this to be single-threaded via threads_per_worker, multi-threading can introduce more opportunities for workers to trip up and fail, especially when we’re constructing our cluster parameters under the assumption that 1 worker is sized to handle about 1 (large) partition.

[4]:
# Load a small cone around our largest partition
ztf_cat = lsdb.open_catalog(
    "https://data.lsdb.io/hats/ztf_dr22", search_filter=lsdb.ConeSearch(288.0, -13.0, radius_arcsec=11000.0)
)
ztf_cat = ztf_cat.nest_lists(list_columns=["hmjd", "mag", "magerr", "clrcoeff", "catflags"], name="lc")
ztf_cat
[4]:
lsdb Catalog ztf_lc:
objectid filterid objra objdec nepochs lc
npartitions=39
Order: 5, Pixel: 7261 int64[pyarrow] int8[pyarrow] float[pyarrow] float[pyarrow] int64[pyarrow] nested<hmjd: [double], mag: [float], magerr: [...
Order: 6, Pixel: 29052 ... ... ... ... ... ...
... ... ... ... ... ... ...
Order: 6, Pixel: 29849 ... ... ... ... ... ...
Order: 6, Pixel: 29850 ... ... ... ... ... ...
6 out of 6 columns in the catalog have been loaded lazily, meaning no data has been read, only the catalog schema
[5]:
# Verify that the largest pixel is actually here
ztf_cat.pixel_search((5, 7457))
[5]:
lsdb Catalog ztf_lc:
objectid filterid objra objdec nepochs lc
npartitions=1
Order: 5, Pixel: 7457 int64[pyarrow] int8[pyarrow] float[pyarrow] float[pyarrow] int64[pyarrow] nested<hmjd: [double], mag: [float], magerr: [...
6 out of 6 columns in the catalog have been loaded lazily, meaning no data has been read, only the catalog schema
[6]:
# Now we can do this in parallel across all pixels
from dask.distributed import Client

with Client(
    dashboard_address="127.0.0.1:33709", n_workers=4, memory_limit="48GB", threads_per_worker=1
) as client:
    # We can use the map_partitions method to apply our function to each pixel
    # Results from single pixel operations can be used for the meta, but use head(0) to not pass along the full data
    full_res = ztf_cat.map_partitions(calc_periods, meta=single_res.head(0))
    full_res = full_res.compute()
2025-07-17 13:56:02,960 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 31.61 GiB -- Worker memory limit: 44.70 GiB
2025-07-17 13:56:07,478 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 35.85 GiB -- Worker memory limit: 44.70 GiB
2025-07-17 13:56:07,529 - distributed.worker.memory - WARNING - Worker is at 60% memory usage. Resuming worker. Process memory: 26.92 GiB -- Worker memory limit: 44.70 GiB

Despite our meticulous cluster crafting, we still get some worker memory warnings. These are acceptable, being that we only reach 80% memory at peak, but if you are seeing a lot of these warnings, that may be a sign to up the worker memory, especially for stability of larger jobs.

Dask Tip: You can also use the Dashboard to track the memory usage over the course of the computation.

[7]:
full_res
[7]:
period_0 period_s_to_n_0
_healpix_29
2044053637065117179 NaN NaN
2044053641978180364 276.323242 5.332016
... ... ...
2100516122303696693 NaN NaN
2100516153621180863 172.133255 3.879301

10372056 rows × 2 columns

About#

Authors: Doug Branton

Last updated on: July 17, 2025

If you use lsdb for published research, please cite following instructions.