Determining the Right Dask Cluster Parameters#
In this tutorial, we will:
configure clusters based on the memory demands of larger data partitions
find the largest data partitions in a dataset
apply general strategies for scaling in-memory code to large out-of-memory runs
Introduction#
Working at scale almost always demands that you, the user, understand details of your workflows memory usage to properly setup the various parameters of your cluster setup. This can also be a very challenging task, as many users are not used to being aware of how much memory their code needs to execute. In this tutorial, we’ll walk through a few strategies for determining how much memory your workers will need in the “worst case” and try to equip you with some code and hueristics for how to think about setting up a cluster for your workflow.
Install external packages needed for this notebook with the command below:
[ ]:
# pip install light-curve memory-profiler
[1]:
import lsdb
import light_curve as licu
import numpy as np
For this tutorial, we’ll be looking at ZTF DR22. The first thing we’ll do, is find the largest pixel (not by spatial size, but by the amount of data stored within it).
Finding the largest pixel can be really helpful setting our cluster up for success, as it will usually be the most challenging partition for a worker to handle. Below, we compute per_pixel_statistics for the catalog, and display the top 5 largest pixels by nested row_count:
[11]:
ztf_cat = lsdb.open_catalog("https://data.lsdb.io/hats/ztf_dr22")
stats = ztf_cat.per_pixel_statistics(include_columns=["objectid", "hmjd"], include_stats=["row_count"])
stats.sort_values(["hmjd: row_count"], ascending=False).head(5)
[11]:
| objectid: row_count | hmjd: row_count | |
|---|---|---|
| Order: 5, Pixel: 7457 | 911483.0 | 327483964.0 |
| Order: 5, Pixel: 7460 | 888057.0 | 308832839.0 |
| Order: 6, Pixel: 14533 | 950980.0 | 304487284.0 |
| Order: 6, Pixel: 30526 | 984845.0 | 302272221.0 |
| Order: 6, Pixel: 29868 | 931536.0 | 300487880.0 |
Some of our largest partitions in this dataset have just shy of a million objects, with 300 million timeseries observations for them in total. Let’s grab one of them into memory, by searching for it via PixelSearch and then using compute. We can work with this partition locally, as a means to directly develop and test our analysis code.
[2]:
ztf_single_pix = lsdb.open_catalog(
"https://data.lsdb.io/hats/ztf_dr22", search_filter=lsdb.PixelSearch((5, 7457))
)
ztf_single_pix = ztf_single_pix.nest_lists(
list_columns=["hmjd", "mag", "magerr", "clrcoeff", "catflags"], name="lc"
).compute()
ztf_single_pix
[2]:
| objectid | filterid | objra | objdec | nepochs | lc | |||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2098958901716483082 | 385202300127741 | 2 | 288.283020 | -13.245409 | 780 |
780 rows × 5 columns |
||||||||||
| 2098958901716483896 | 385102300097293 | 1 | 288.283020 | -13.245406 | 213 |
213 rows × 5 columns |
||||||||||
| 2098958901746886855 | 385202300096509 | 2 | 288.281982 | -13.245864 | 955 |
955 rows × 5 columns |
||||||||||
| 2098958901746923271 | 385102300085934 | 1 | 288.281982 | -13.245849 | 298 |
298 rows × 5 columns |
||||||||||
| 2098958901946483197 | 385202300139959 | 2 | 288.280243 | -13.245934 | 1 |
1 rows × 5 columns |
||||||||||
| ... | ... | ... | ... | ... | ... | ... |
info is a nice way to check its characteristics, we see it’s almost 8GBs in memory. While not specifically mentioned in the output, the vast majority of that will be the nested “lc” data. As an initial thought, we know from this that just holding the data will at worst cost 8GBs, meaning that immediately we’ve discounted having many ~8GB or smaller workers.
[3]:
ztf_single_pix.info()
<class 'nested_pandas.nestedframe.core.NestedFrame'>
Index: 921107 entries, 2098958901716483082 to 2099240376120221204
Data columns (total 6 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 objectid 921107 non-null int64[pyarrow]
1 filterid 921107 non-null int8[pyarrow]
2 objra 921107 non-null float[pyarrow]
3 objdec 921107 non-null float[pyarrow]
4 nepochs 921107 non-null int64[pyarrow]
5 lc 921107 non-null nested<hmjd: [double], mag: [float], magerr: [float], clrcoeff: [float], catflags: [int32]>
dtypes: float[pyarrow](2), int64[pyarrow](2), int8[pyarrow](1), nested<hmjd: [double], mag: [float], magerr: [float], clrcoeff: [float], catflags: [int32]>(1)
memory usage: 7.7 GB
Now, let’s create an analysis function that calculates the periods for each of our lightcurves:
[ ]:
def calc_periods(pix_df):
"""Calculate the period of all objects in a pixel"""
# pix_df is a nested-pandas NestedFrame, so we use the NestedFrame API within this function
# First, cut our photometry on catflags
pix_df = pix_df.query("lc.catflags == 0")
# Drop any empty light curves
pix_df = pix_df.dropna(subset=["lc"])
# Now we can calculate the periods
extractor = licu.Extractor(
licu.Periodogram(
peaks=1,
max_freq_factor=1.0,
fast=True,
), # Would give two features: peak period and signa-to-noise ratio of the peak
)
# light-curve requires all arrays to be the same dtype.
# It also requires the time array to be ordered and to have no duplicates.
def _extract_features(mjd, mag, **kwargs):
# We offset date, so we still would have <1 second precision
if len(mjd) < 50:
return dict(zip(extractor.names, [np.nan] * len(extractor.names)))
t = np.asarray(mjd - 60000, dtype=np.float32)
# print(t)
_, sort_index = np.unique(t, return_index=True)
features = extractor(
t[sort_index],
mag[sort_index],
**kwargs,
)
# Return the features as a dictionary
return dict(zip(extractor.names, features))
features = pix_df.map_rows(
_extract_features,
columns=["lc.hmjd", "lc.mag"],
row_container="args", # Pass columns as individual arguments
)
return features
It’s very useful to test our new analysis function on a single partition of the dataset, and we can easily use the one we just grabbed above. (For larger functions, it’s also a great way to iteratively build those functions!)
[5]:
# use the computed pixel to test the function
single_res = calc_periods(ztf_single_pix)
single_res
[5]:
| period_0 | period_s_to_n_0 | |
|---|---|---|
| _healpix_29 | ||
| 2098958901716483082 | 97.901375 | 9.427485 |
| 2098958901716483896 | 56.258545 | 5.917771 |
| ... | ... | ... |
| 2099240375263563088 | 179.489136 | 4.851810 |
| 2099240376120221204 | NaN | NaN |
909850 rows × 2 columns
We see above that our function works, and what it returns.
Understanding what cluster parameters to use (number of workers, worker memory limits, etc.) can be a very tricky task. Especially when writing a custom function, as we have above, the needs of the cluster will directly depend on the memory demands of our function. Below, we use a memory profiler to estimate the amount of memory used for operating on a single partition. Above, we picked out one of the largest partitions in our sample, which positions this estimate as more of an upper bound to the needed worker memory.
IMPORTANT: Memory Profiling Caveats: It’s very easy to use this incorrectly, especially within a notebook environment. Modules like this are almost always measuring the kernel memory, meaning it’s not just a measure of the function run, but anything else in memory at the time. If you were to rerun this, you would see the number increasing every time, until garbage collection eventually stepped in. Running the single pixel calculation in the cell above would also affect the result. In principle, this number serves as a reasonable estimate if you restart the kernel and make sure to only run the needed cells ahead of running the memory profiler, and perhaps more ideally you would create a dedicated script executed separately to run the profiler. With these caveats in mind, this is still a useful exercise to get a rough sense of how much memory you will need.
[6]:
# Assessing Memory Usage for a single pixel
from memory_profiler import memory_usage
def calc_periods_mem():
"""Calculate the period of all objects in a pixel, with memory profiling"""
return calc_periods(ztf_single_pix)
mem = max(
memory_usage(
proc=calc_periods_mem,
)
)
print("Maximum memory used: {} MiB".format(mem))
Maximum memory used: 32424.65234375 MiB
We see that for our largest partition, our analysis required a bit more than 32 GBs of memory. As a rough hueristic, it’s good to take this number and multiply it by 1.5-2x when defining memory limits. This overhead is for a couple reasons, dask will be storing intermediate results alongside doing these operations, and the largest partition we found may not be the largest partition in the full dataset. With a smaller number of workers, you might need closer to ~2x as those workers take on more burden in terms of storing intermediate results, whereas with a larger number of workers you might be able to operate comfortably at the ~1.5x regime. From a stability perspective, more worker memory is more reliable, but make sure to at least have a few workers.
Below, we use this information to setup our cluster, allocating 4 workers with 48GBs each. And very importantly, we set this to be single-threaded via threads_per_worker, multi-threading can introduce more opportunities for workers to trip up and fail, especially when we’re constructing our cluster parameters under the assumption that 1 worker is sized to handle about 1 (large) partition.
[4]:
# Load a small cone around our largest partition
ztf_cat = lsdb.open_catalog(
"https://data.lsdb.io/hats/ztf_dr22", search_filter=lsdb.ConeSearch(288.0, -13.0, radius_arcsec=11000.0)
)
ztf_cat = ztf_cat.nest_lists(list_columns=["hmjd", "mag", "magerr", "clrcoeff", "catflags"], name="lc")
ztf_cat
[4]:
| objectid | filterid | objra | objdec | nepochs | lc | |
|---|---|---|---|---|---|---|
| npartitions=39 | ||||||
| Order: 5, Pixel: 7261 | int64[pyarrow] | int8[pyarrow] | float[pyarrow] | float[pyarrow] | int64[pyarrow] | nested<hmjd: [double], mag: [float], magerr: [... |
| Order: 6, Pixel: 29052 | ... | ... | ... | ... | ... | ... |
| ... | ... | ... | ... | ... | ... | ... |
| Order: 6, Pixel: 29849 | ... | ... | ... | ... | ... | ... |
| Order: 6, Pixel: 29850 | ... | ... | ... | ... | ... | ... |
[5]:
# Verify that the largest pixel is actually here
ztf_cat.pixel_search((5, 7457))
[5]:
| objectid | filterid | objra | objdec | nepochs | lc | |
|---|---|---|---|---|---|---|
| npartitions=1 | ||||||
| Order: 5, Pixel: 7457 | int64[pyarrow] | int8[pyarrow] | float[pyarrow] | float[pyarrow] | int64[pyarrow] | nested<hmjd: [double], mag: [float], magerr: [... |
[6]:
# Now we can do this in parallel across all pixels
from dask.distributed import Client
with Client(
dashboard_address="127.0.0.1:33709", n_workers=4, memory_limit="48GB", threads_per_worker=1
) as client:
# We can use the map_partitions method to apply our function to each pixel
# Results from single pixel operations can be used for the meta, but use head(0) to not pass along the full data
full_res = ztf_cat.map_partitions(calc_periods, meta=single_res.head(0))
full_res = full_res.compute()
2025-07-17 13:56:02,960 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 31.61 GiB -- Worker memory limit: 44.70 GiB
2025-07-17 13:56:07,478 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 35.85 GiB -- Worker memory limit: 44.70 GiB
2025-07-17 13:56:07,529 - distributed.worker.memory - WARNING - Worker is at 60% memory usage. Resuming worker. Process memory: 26.92 GiB -- Worker memory limit: 44.70 GiB
Despite our meticulous cluster crafting, we still get some worker memory warnings. These are acceptable, being that we only reach 80% memory at peak, but if you are seeing a lot of these warnings, that may be a sign to up the worker memory, especially for stability of larger jobs.
Dask Tip: You can also use the Dashboard to track the memory usage over the course of the computation.
[7]:
full_res
[7]:
| period_0 | period_s_to_n_0 | |
|---|---|---|
| _healpix_29 | ||
| 2044053637065117179 | NaN | NaN |
| 2044053641978180364 | 276.323242 | 5.332016 |
| ... | ... | ... |
| 2100516122303696693 | NaN | NaN |
| 2100516153621180863 | 172.133255 | 3.879301 |
10372056 rows × 2 columns
About#
Authors: Doug Branton
Last updated on: July 17, 2025
If you use lsdb for published research, please cite following instructions.