map_partitions#
In this tutorial we will:
apply a user-defined function using distributed computing across an entire LSDB catalog’s partitions
showcase when, why, and how to supply required Dask “meta” when it cannot be inferred from your user-defined function
Introduction#
LSDB/HATS catalogs are organized into partitions, and this number of partitions is reported as npartitions= in the header, whenever printing a catalog. Each partition corresponds to a single pixel, and they have been sized to have approximately the same number of rows in each partition, in order to enable efficient parallel computation.
The map_partitions method provides a means for users to execute their own analysis functions on each partition of the catalog data. The data will be passed to your function as a Pandas DataFrame (pd.DataFrame) object.
1. Open a catalog#
For this example, we will use a small cone of the Gaia DR3 catalog, and only specify columns of interest. This limits the overall memory requirements of the pipeline.
Additional Help
For additional information on dask client creation, please refer to the official Dask documentation and our Dask cluster configuration page for LSDB-specific tips. Note that dask also provides its own best practices, which may also be useful to consult.
For tips on accessing remote data, see our Accessing remote data guide
[4]:
# Dask puts out more advisory logging than we care for in this tutorial.
# It takes some doing to quiet all of it, but this recipe works.
import dask
dask.config.set({"logging.distributed": "critical"})
import logging
# This also has to be done, for the above to be effective
logger = logging.getLogger("distributed")
logger.setLevel(logging.CRITICAL)
import warnings
# Finally, suppress the specific warning about Dask dashboard port usage
warnings.filterwarnings("ignore", message="Port 8787 is already in use.")
[ ]:
import lsdb
# We run this code with local LSDB catalog, to improve performance,
# but you can use the path in the line below to use the remote catalog.
# catalog_root = "https://data.lsdb.io/hats"
catalog_root = "/epyc/data3/hats/catalogs"
gaia3 = lsdb.open_catalog(
f"{catalog_root}/gaia_dr3",
search_filter=lsdb.ConeSearch(ra=280, dec=-60, radius_arcsec=2 * 3600),
columns=[
"source_id",
"ra",
"dec",
"phot_g_mean_mag",
],
)
gaia3
| source_id | ra | dec | phot_g_mean_mag | |
|---|---|---|---|---|
| npartitions=4 | ||||
| Order: 4, Pixel: 2944 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | float[pyarrow] |
| Order: 4, Pixel: 2945 | ... | ... | ... | ... |
| Order: 4, Pixel: 2946 | ... | ... | ... | ... |
| Order: 4, Pixel: 2947 | ... | ... | ... | ... |
[6]:
# You can get the number of partitions programmatically this way.
# This can be valuable when you want to choose the optimal number
# of workers to process the partitions.
gaia3.npartitions
[6]:
4
[7]:
# You can also access the individual HealpixPixel objects for
# each partition, this way, inspecting their order and pixel,
# if desired.
px = gaia3.get_healpix_pixels()[0]
px.order, px.pixel
[7]:
(np.int64(4), np.int64(2944))
2. Generating New Columns#
Since the partition’s pd.DataFrame is passed in to your custom function, you can augment it with new columns based on the existing columns, in ordinary Pandas style.
2.1 What you can map#
The trick is understanding what kind of custom function you can pass to .map_partitions. Your function is going to receive a Pandas DataFrame as its first parameter. Other parameters can be passed in as keyword arguments to .map_partitions, as you’ll see later on. For now, we’ll use a function that takes in one partition and produces a result that has the same shape.
Because the catalog is loaded lazily, .map_partitions also returns a lazy, or unevaluated, result. You can see the results the same way you can realize the original catalog, by any of these means:
calling
.compute()to produce apd.DataFramein memory;calling
.to_hats()to serialize it to disk as a HATS-format file;calling
.head()to see the first few rows,.tail()to see the last few.
[8]:
def mean_sq(df, pixel):
df["phot_g_mean_mag_sq"] = df["phot_g_mean_mag"] ** 2
return df
unrealized = gaia3.map_partitions(mean_sq, include_pixel=True)
unrealized
[8]:
| source_id | ra | dec | phot_g_mean_mag | phot_g_mean_mag_sq | |
|---|---|---|---|---|---|
| npartitions=4 | |||||
| Order: 4, Pixel: 2944 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | float[pyarrow] | float[pyarrow] |
| Order: 4, Pixel: 2945 | ... | ... | ... | ... | ... |
| Order: 4, Pixel: 2946 | ... | ... | ... | ... | ... |
| Order: 4, Pixel: 2947 | ... | ... | ... | ... | ... |
Taking a quick peek to see whether our function works correctly, and if the results in our new column are about what we expect:
[9]:
head_5 = unrealized.head(5)
head_5
[9]:
| source_id | ra | dec | phot_g_mean_mag | phot_g_mean_mag_sq | |
|---|---|---|---|---|---|
| _healpix_29 | |||||
| 3315212135629220059 | 6630424242158614528 | 279.475941 | -61.973682 | 20.157476 | 406.323853 |
| 3315212197603296958 | 6630424379597543936 | 279.40738 | -61.977054 | 19.488537 | 379.80307 |
| 3315212213755151065 | 6630424413957281792 | 279.436314 | -61.979345 | 18.925087 | 358.158905 |
| 3315212218438134563 | 6630424418254234752 | 279.412417 | -61.979072 | 20.522877 | 421.188477 |
| 3315212218704706046 | 6630424418256341376 | 279.416172 | -61.977445 | 16.336718 | 266.888336 |
5 rows × 5 columns
Looks good! Now on to computing the complete result.
This unrealized result has a top-level property indicating how many partitions it has. We can use this to choose our number of workers directly.
However, it’s a good idea to bound the number of workers, in case the number of partitions is larger than we expect (or we move this code fragment elsewhere).
[10]:
%%time
from dask.distributed import Client
npartitions = gaia3.npartitions
# Create a client which will be implicitly used until we make a new one
client = Client(n_workers=min(4, npartitions), memory_limit="auto")
result = unrealized.compute()
result
CPU times: user 677 ms, sys: 105 ms, total: 782 ms
Wall time: 9.54 s
[10]:
| source_id | ra | dec | phot_g_mean_mag | phot_g_mean_mag_sq | |
|---|---|---|---|---|---|
| _healpix_29 | |||||
| 3315212135629220059 | 6630424242158614528 | 279.475941 | -61.973682 | 20.157476 | 406.323853 |
| 3315212197603296958 | 6630424379597543936 | 279.40738 | -61.977054 | 19.488537 | 379.80307 |
| ... | ... | ... | ... | ... | ... |
| 3318663093295462547 | 6637326190180387584 | 280.426265 | -58.015067 | 21.097507 | 445.104828 |
| 3318663093808668415 | 6637326185883831296 | 280.423503 | -58.013744 | 20.301455 | 412.149048 |
469102 rows × 5 columns
No reduction step is needed here since the operation is not a reducing operation. There are as many rows in the new output as there were in the input.
3. Functions that reduce#
The above works when your output rows are the same as your input rows. When you’re doing a reducing operation (such as calculating statistics), the process changes a little.
3.1. Your function’s parameters#
Again, your first input parameter is a pd.DataFrame that is one partition of the catalog, and the return value of your function needs to be the same, even if your result has only a single row.
If you want to know the HEALPix number of the partition, calling .map_partitions with include_pixel=True will pass that as the second parameter to your function. We’ll do this in this example, for demonstration purposes, though it isn’t strictly necessary to this task.
If you have any other parameters that your function requires, take them as keyword arguments, and you can pass their values in as such, when calling .map_partitions. Our example will do this, too, taking a target_column= argument.
3.2. What you get back#
The operation we’re going to do here is a reducing operation (min and max), and it will be run on each partition, reducing the many rows in each partition to a single value. This means that the output of .map_partitions in this case will contain one row per partition. Thus, you will need to do additional reduction on this output in order to get a single final result.
[11]:
# Note that this function must work correctly when given an empty DataFrame
# as an input, too; if not, you're obliged to provide "meta", that is,
# information about output type and shape.
import pandas as pd
def find_stats(df, pixel, target_column=""):
c = df[target_column]
min_val = c.min()
max_val = c.max()
mean_val = c.mean()
return pd.DataFrame(
[
{
"pixel": pixel,
f"{target_column}_min": min_val,
f"{target_column}_max": max_val,
f"{target_column}_mean": mean_val,
}
]
)
3.3 When You Need meta=#
The above definition of find_stats works even with an empty pd.DataFrame argument because .mean() is written to handle zero-row inputs without errors.
But your own custom function might not. As a trivial example, suppose you implemented the arithmetic mean yourself, as below.
[12]:
# This version of the function will NOT work with map_partitions as is, because
# of the attempt to divide by `c.count()`, which will be zero for an empty input.
import pandas as pd
def find_stats_needs_meta(df, pixel, target_column=""):
c = df[target_column]
min_val = c.min()
max_val = c.max()
# WARNING! c.count() == 0 when passed an empty DataFrame.
# But meta= will come to the rescue.
mean_val = c.sum() / c.count()
return pd.DataFrame(
[
{
"pixel": pixel,
f"{target_column}_min": min_val,
f"{target_column}_max": max_val,
f"{target_column}_mean": mean_val,
}
]
)
In the above case, then, you need to indicate to Dask what type the output will be.
What Dask needs to know are the column names and their order, and so the below definition works, even though the types of the columns aren’t indicated. (The type of "pixel" will default to float64, which is wrong, but doesn’t matter in this case.)
[13]:
output_meta = pd.DataFrame(
{
"pixel": [],
"phot_g_mean_mag_min": [],
"phot_g_mean_mag_max": [],
"phot_g_mean_mag_mean": [],
}
)
Here’s another definition of output_meta that works equally well, and has the advantage that the way the pd.DataFrame is initialized is the same form as the return value in the custom function. The type of "pixel" will now be more correct:
[14]:
output_meta = pd.DataFrame(
[
{
"pixel": lsdb.HealpixPixel(0, 0),
"phot_g_mean_mag_min": 0.0,
"phot_g_mean_mag_max": 0.0,
"phot_g_mean_mag_mean": 0.0,
}
]
)
[15]:
output_meta["pixel"].dtype
[15]:
dtype('O')
although pd.DataFrame only cares that it is an “Object”, as we can see.
These can be complicated and error-prone to construct, and small mistakes can create confusing errors that show up late in the computation.
To help with these difficulties, Dask does provide a make_meta function. If you can pass it a single valid row from your catalog data (that’s what head_5.head(1) will do) which will work with your custom function, make_meta will generate the meta for it.
NOTE: It’s much faster here to use the computed data from last time (head_5) than trying to use gaia3.head(1). The latter will still give the right answer, but will be much slower, since it obliges Dask to re-execute the computation.
[16]:
%%time
from dask.dataframe.utils import make_meta
output_meta = make_meta(
find_stats_needs_meta(head_5.head(1), gaia3.get_healpix_pixels()[0], target_column="phot_g_mean_mag")
)
CPU times: user 1.87 ms, sys: 0 ns, total: 1.87 ms
Wall time: 2.47 ms
Passing a correct meta= to .map_partitions will allow Dask to skip the process of sending your function an empty pd.DataFrame, and so, in our case of find_stats_needs_meta (where we depend on a non-zero c.count()), it will succeed without error.
[17]:
unrealized = gaia3.map_partitions(
find_stats_needs_meta,
include_pixel=True,
# Keyword arguments after 'include_pixel=' are passed to your function.
target_column="phot_g_mean_mag",
# Here we give Dask the hint it needs to avoid giving us an empty frame
meta=output_meta,
)
[18]:
%%time
result = unrealized.compute()
result
CPU times: user 49.1 ms, sys: 977 μs, total: 50.1 ms
Wall time: 176 ms
[18]:
| pixel | phot_g_mean_mag_min | phot_g_mean_mag_max | phot_g_mean_mag_mean | |
|---|---|---|---|---|
| 0 | Order: 4, Pixel: 2944 | 8.154537 | 22.033451 | 19.083840 |
| 0 | Order: 4, Pixel: 2945 | 5.508409 | 22.345751 | 19.094207 |
| 0 | Order: 4, Pixel: 2946 | 6.264917 | 22.076380 | 19.102860 |
| 0 | Order: 4, Pixel: 2947 | 6.154421 | 21.986425 | 19.119143 |
4 rows × 4 columns
The objects in the ‘pixel’ column are the same type as from get_healpix_pixels().
[19]:
type(result["pixel"].iloc[0])
[19]:
hats.pixel_math.healpix_pixel.HealpixPixel
Because the result is one row per partition, we need additional reduction to get our single answer.
[20]:
result["phot_g_mean_mag_min"].min(), result["phot_g_mean_mag_max"].max()
[20]:
(np.float64(5.508409023284912), np.float64(22.34575080871582))
What about searching not only the four partitions from our cone search, but the whole catalog? All that changes is the number of partitions.
NOTE that since we are now using the find_stats that doesn’t need meta=, we don’t need to provide it.
[21]:
gaia3_all = lsdb.open_catalog(
f"{catalog_root}/gaia_dr3",
margin_cache="gaia_10arcs",
columns=[
"source_id",
"ra",
"dec",
"phot_g_mean_mag",
],
)
unrealized = gaia3_all.map_partitions(
find_stats,
include_pixel=True,
# Keyword arguments after 'include_pixel=' are passed to your
# function
target_column="phot_g_mean_mag",
)
npartitions = unrealized.npartitions
[22]:
npartitions
[22]:
2016
That’s a lot of partitions! If we didn’t bound this value, we could easily overwhelm our cluster.
[23]:
%%time
# Close the old client and make a new one that takes into account the new number of partitions.
client.close()
client = Client(n_workers=min(8, npartitions), memory_limit="auto")
# Note that if you are using remote catalog, you will effectively download the whole catalog to get these values.
result = unrealized.compute()
result
CPU times: user 1min 1s, sys: 9.56 s, total: 1min 10s
Wall time: 13min 30s
[23]:
| pixel | phot_g_mean_mag_min | phot_g_mean_mag_max | phot_g_mean_mag_mean | |
|---|---|---|---|---|
| 0 | Order: 2, Pixel: 0 | 3.382374 | 22.452248 | 18.722443 |
| 0 | Order: 2, Pixel: 1 | 2.896132 | 22.465290 | 18.827720 |
| ... | ... | ... | ... | ... |
| 0 | Order: 3, Pixel: 766 | 3.774703 | 22.479549 | 19.061691 |
| 0 | Order: 3, Pixel: 767 | 3.449377 | 22.423697 | 19.033006 |
2016 rows × 4 columns
[24]:
# We need to do a final reduction step to get the true min and max
result["phot_g_mean_mag_min"].min(), result["phot_g_mean_mag_max"].max()
[24]:
(np.float64(1.7316069602966309), np.float64(22.956424713134766))
Since we just searched the whole catalog, we can check our answer against the statistics that were compiled at import time for the catalog. As you can see, they match what we got when using the .map_partitions method.
[25]:
gaia3_all.hc_structure.aggregate_column_statistics(include_columns="phot_g_mean_mag")
[25]:
| min_value | max_value | null_count | row_count | |
|---|---|---|---|---|
| column_names | ||||
| phot_g_mean_mag | 1.731607 | 22.956425 | 5457477 | 1812731847 |
Closing the Dask client#
[26]:
client.close()
About#
Authors: Derek Jones
Last updated/verified on: January 19, 2026
If you use lsdb for published research, please cite following instructions.