map_partitions#

In this tutorial we will:

  • apply a user-defined function using distributed computing across an entire LSDB catalog’s partitions

  • showcase when, why, and how to supply required Dask “meta” when it cannot be inferred from your user-defined function

Introduction#

LSDB/HATS catalogs are organized into partitions, and this number of partitions is reported as npartitions= in the header, whenever printing a catalog. Each partition corresponds to a single pixel, and they have been sized to have approximately the same number of rows in each partition, in order to enable efficient parallel computation.

The map_partitions method provides a means for users to execute their own analysis functions on each partition of the catalog data. The data will be passed to your function as a Pandas DataFrame (pd.DataFrame) object.

1. Open a catalog#

For this example, we will use a small cone of the Gaia DR3 catalog, and only specify columns of interest. This limits the overall memory requirements of the pipeline.

Additional Help

For additional information on dask client creation, please refer to the official Dask documentation and our Dask cluster configuration page for LSDB-specific tips. Note that dask also provides its own best practices, which may also be useful to consult.

For tips on accessing remote data, see our Accessing remote data guide

[4]:
# Dask puts out more advisory logging than we care for in this tutorial.
# It takes some doing to quiet all of it, but this recipe works.

import dask

dask.config.set({"logging.distributed": "critical"})

import logging

# This also has to be done, for the above to be effective
logger = logging.getLogger("distributed")
logger.setLevel(logging.CRITICAL)

import warnings

# Finally, suppress the specific warning about Dask dashboard port usage
warnings.filterwarnings("ignore", message="Port 8787 is already in use.")
[ ]:
import lsdb

# We run this code with local LSDB catalog, to improve performance,
# but you can use the path in the line below to use the remote catalog.
# catalog_root = "https://data.lsdb.io/hats"
catalog_root = "/epyc/data3/hats/catalogs"
gaia3 = lsdb.open_catalog(
    f"{catalog_root}/gaia_dr3",
    search_filter=lsdb.ConeSearch(ra=280, dec=-60, radius_arcsec=2 * 3600),
    columns=[
        "source_id",
        "ra",
        "dec",
        "phot_g_mean_mag",
    ],
)
gaia3
lsdb Catalog gaia:
source_id ra dec phot_g_mean_mag
npartitions=4
Order: 4, Pixel: 2944 int64[pyarrow] double[pyarrow] double[pyarrow] float[pyarrow]
Order: 4, Pixel: 2945 ... ... ... ...
Order: 4, Pixel: 2946 ... ... ... ...
Order: 4, Pixel: 2947 ... ... ... ...
4 out of 152 available columns in the catalog have been loaded lazily, meaning no data has been read, only the catalog schema
[6]:
# You can get the number of partitions programmatically this way.
# This can be valuable when you want to choose the optimal number
# of workers to process the partitions.
gaia3.npartitions
[6]:
4
[7]:
# You can also access the individual HealpixPixel objects for
# each partition, this way, inspecting their order and pixel,
# if desired.
px = gaia3.get_healpix_pixels()[0]
px.order, px.pixel
[7]:
(np.int64(4), np.int64(2944))

2. Generating New Columns#

Since the partition’s pd.DataFrame is passed in to your custom function, you can augment it with new columns based on the existing columns, in ordinary Pandas style.

2.1 What you can map#

The trick is understanding what kind of custom function you can pass to .map_partitions. Your function is going to receive a Pandas DataFrame as its first parameter. Other parameters can be passed in as keyword arguments to .map_partitions, as you’ll see later on. For now, we’ll use a function that takes in one partition and produces a result that has the same shape.

Because the catalog is loaded lazily, .map_partitions also returns a lazy, or unevaluated, result. You can see the results the same way you can realize the original catalog, by any of these means:

  • calling .compute() to produce a pd.DataFrame in memory;

  • calling .to_hats() to serialize it to disk as a HATS-format file;

  • calling .head() to see the first few rows, .tail() to see the last few.

[8]:
def mean_sq(df, pixel):
    df["phot_g_mean_mag_sq"] = df["phot_g_mean_mag"] ** 2
    return df


unrealized = gaia3.map_partitions(mean_sq, include_pixel=True)
unrealized
[8]:
lsdb Catalog gaia:
source_id ra dec phot_g_mean_mag phot_g_mean_mag_sq
npartitions=4
Order: 4, Pixel: 2944 int64[pyarrow] double[pyarrow] double[pyarrow] float[pyarrow] float[pyarrow]
Order: 4, Pixel: 2945 ... ... ... ... ...
Order: 4, Pixel: 2946 ... ... ... ... ...
Order: 4, Pixel: 2947 ... ... ... ... ...
5 out of 152 available columns in the catalog have been loaded lazily, meaning no data has been read, only the catalog schema

Taking a quick peek to see whether our function works correctly, and if the results in our new column are about what we expect:

[9]:
head_5 = unrealized.head(5)
head_5
[9]:
source_id ra dec phot_g_mean_mag phot_g_mean_mag_sq
_healpix_29
3315212135629220059 6630424242158614528 279.475941 -61.973682 20.157476 406.323853
3315212197603296958 6630424379597543936 279.40738 -61.977054 19.488537 379.80307
3315212213755151065 6630424413957281792 279.436314 -61.979345 18.925087 358.158905
3315212218438134563 6630424418254234752 279.412417 -61.979072 20.522877 421.188477
3315212218704706046 6630424418256341376 279.416172 -61.977445 16.336718 266.888336

5 rows × 5 columns

Looks good! Now on to computing the complete result.

This unrealized result has a top-level property indicating how many partitions it has. We can use this to choose our number of workers directly.

However, it’s a good idea to bound the number of workers, in case the number of partitions is larger than we expect (or we move this code fragment elsewhere).

[10]:
%%time
from dask.distributed import Client

npartitions = gaia3.npartitions

# Create a client which will be implicitly used until we make a new one
client = Client(n_workers=min(4, npartitions), memory_limit="auto")

result = unrealized.compute()
result
CPU times: user 677 ms, sys: 105 ms, total: 782 ms
Wall time: 9.54 s
[10]:
source_id ra dec phot_g_mean_mag phot_g_mean_mag_sq
_healpix_29
3315212135629220059 6630424242158614528 279.475941 -61.973682 20.157476 406.323853
3315212197603296958 6630424379597543936 279.40738 -61.977054 19.488537 379.80307
... ... ... ... ... ...
3318663093295462547 6637326190180387584 280.426265 -58.015067 21.097507 445.104828
3318663093808668415 6637326185883831296 280.423503 -58.013744 20.301455 412.149048

469102 rows × 5 columns

No reduction step is needed here since the operation is not a reducing operation. There are as many rows in the new output as there were in the input.

3. Functions that reduce#

The above works when your output rows are the same as your input rows. When you’re doing a reducing operation (such as calculating statistics), the process changes a little.

3.1. Your function’s parameters#

Again, your first input parameter is a pd.DataFrame that is one partition of the catalog, and the return value of your function needs to be the same, even if your result has only a single row.

If you want to know the HEALPix number of the partition, calling .map_partitions with include_pixel=True will pass that as the second parameter to your function. We’ll do this in this example, for demonstration purposes, though it isn’t strictly necessary to this task.

If you have any other parameters that your function requires, take them as keyword arguments, and you can pass their values in as such, when calling .map_partitions. Our example will do this, too, taking a target_column= argument.

3.2. What you get back#

The operation we’re going to do here is a reducing operation (min and max), and it will be run on each partition, reducing the many rows in each partition to a single value. This means that the output of .map_partitions in this case will contain one row per partition. Thus, you will need to do additional reduction on this output in order to get a single final result.

[11]:
# Note that this function must work correctly when given an empty DataFrame
# as an input, too; if not, you're obliged to provide "meta", that is,
# information about output type and shape.
import pandas as pd


def find_stats(df, pixel, target_column=""):
    c = df[target_column]
    min_val = c.min()
    max_val = c.max()
    mean_val = c.mean()
    return pd.DataFrame(
        [
            {
                "pixel": pixel,
                f"{target_column}_min": min_val,
                f"{target_column}_max": max_val,
                f"{target_column}_mean": mean_val,
            }
        ]
    )

3.3 When You Need meta=#

The above definition of find_stats works even with an empty pd.DataFrame argument because .mean() is written to handle zero-row inputs without errors.

But your own custom function might not. As a trivial example, suppose you implemented the arithmetic mean yourself, as below.

[12]:
# This version of the function will NOT work with map_partitions as is, because
# of the attempt to divide by `c.count()`, which will be zero for an empty input.
import pandas as pd


def find_stats_needs_meta(df, pixel, target_column=""):
    c = df[target_column]
    min_val = c.min()
    max_val = c.max()
    # WARNING! c.count() == 0 when passed an empty DataFrame.
    # But meta= will come to the rescue.
    mean_val = c.sum() / c.count()
    return pd.DataFrame(
        [
            {
                "pixel": pixel,
                f"{target_column}_min": min_val,
                f"{target_column}_max": max_val,
                f"{target_column}_mean": mean_val,
            }
        ]
    )

In the above case, then, you need to indicate to Dask what type the output will be.

What Dask needs to know are the column names and their order, and so the below definition works, even though the types of the columns aren’t indicated. (The type of "pixel" will default to float64, which is wrong, but doesn’t matter in this case.)

[13]:
output_meta = pd.DataFrame(
    {
        "pixel": [],
        "phot_g_mean_mag_min": [],
        "phot_g_mean_mag_max": [],
        "phot_g_mean_mag_mean": [],
    }
)

Here’s another definition of output_meta that works equally well, and has the advantage that the way the pd.DataFrame is initialized is the same form as the return value in the custom function. The type of "pixel" will now be more correct:

[14]:
output_meta = pd.DataFrame(
    [
        {
            "pixel": lsdb.HealpixPixel(0, 0),
            "phot_g_mean_mag_min": 0.0,
            "phot_g_mean_mag_max": 0.0,
            "phot_g_mean_mag_mean": 0.0,
        }
    ]
)
[15]:
output_meta["pixel"].dtype
[15]:
dtype('O')

although pd.DataFrame only cares that it is an “Object”, as we can see.

These can be complicated and error-prone to construct, and small mistakes can create confusing errors that show up late in the computation.

To help with these difficulties, Dask does provide a make_meta function. If you can pass it a single valid row from your catalog data (that’s what head_5.head(1) will do) which will work with your custom function, make_meta will generate the meta for it.

NOTE: It’s much faster here to use the computed data from last time (head_5) than trying to use gaia3.head(1). The latter will still give the right answer, but will be much slower, since it obliges Dask to re-execute the computation.

[16]:
%%time
from dask.dataframe.utils import make_meta

output_meta = make_meta(
    find_stats_needs_meta(head_5.head(1), gaia3.get_healpix_pixels()[0], target_column="phot_g_mean_mag")
)
CPU times: user 1.87 ms, sys: 0 ns, total: 1.87 ms
Wall time: 2.47 ms

Passing a correct meta= to .map_partitions will allow Dask to skip the process of sending your function an empty pd.DataFrame, and so, in our case of find_stats_needs_meta (where we depend on a non-zero c.count()), it will succeed without error.

[17]:
unrealized = gaia3.map_partitions(
    find_stats_needs_meta,
    include_pixel=True,
    # Keyword arguments after 'include_pixel=' are passed to your function.
    target_column="phot_g_mean_mag",
    # Here we give Dask the hint it needs to avoid giving us an empty frame
    meta=output_meta,
)
[18]:
%%time
result = unrealized.compute()
result
CPU times: user 49.1 ms, sys: 977 μs, total: 50.1 ms
Wall time: 176 ms
[18]:
pixel phot_g_mean_mag_min phot_g_mean_mag_max phot_g_mean_mag_mean
0 Order: 4, Pixel: 2944 8.154537 22.033451 19.083840
0 Order: 4, Pixel: 2945 5.508409 22.345751 19.094207
0 Order: 4, Pixel: 2946 6.264917 22.076380 19.102860
0 Order: 4, Pixel: 2947 6.154421 21.986425 19.119143

4 rows × 4 columns

The objects in the ‘pixel’ column are the same type as from get_healpix_pixels().

[19]:
type(result["pixel"].iloc[0])
[19]:
hats.pixel_math.healpix_pixel.HealpixPixel

Because the result is one row per partition, we need additional reduction to get our single answer.

[20]:
result["phot_g_mean_mag_min"].min(), result["phot_g_mean_mag_max"].max()
[20]:
(np.float64(5.508409023284912), np.float64(22.34575080871582))

What about searching not only the four partitions from our cone search, but the whole catalog? All that changes is the number of partitions.

NOTE that since we are now using the find_stats that doesn’t need meta=, we don’t need to provide it.

[21]:
gaia3_all = lsdb.open_catalog(
    f"{catalog_root}/gaia_dr3",
    margin_cache="gaia_10arcs",
    columns=[
        "source_id",
        "ra",
        "dec",
        "phot_g_mean_mag",
    ],
)
unrealized = gaia3_all.map_partitions(
    find_stats,
    include_pixel=True,
    # Keyword arguments after 'include_pixel=' are passed to your
    # function
    target_column="phot_g_mean_mag",
)
npartitions = unrealized.npartitions
[22]:
npartitions
[22]:
2016

That’s a lot of partitions! If we didn’t bound this value, we could easily overwhelm our cluster.

[23]:
%%time
# Close the old client and make a new one that takes into account the new number of partitions.
client.close()
client = Client(n_workers=min(8, npartitions), memory_limit="auto")
# Note that if you are using remote catalog, you will effectively download the whole catalog to get these values.
result = unrealized.compute()
result
CPU times: user 1min 1s, sys: 9.56 s, total: 1min 10s
Wall time: 13min 30s
[23]:
pixel phot_g_mean_mag_min phot_g_mean_mag_max phot_g_mean_mag_mean
0 Order: 2, Pixel: 0 3.382374 22.452248 18.722443
0 Order: 2, Pixel: 1 2.896132 22.465290 18.827720
... ... ... ... ...
0 Order: 3, Pixel: 766 3.774703 22.479549 19.061691
0 Order: 3, Pixel: 767 3.449377 22.423697 19.033006

2016 rows × 4 columns

[24]:
# We need to do a final reduction step to get the true min and max
result["phot_g_mean_mag_min"].min(), result["phot_g_mean_mag_max"].max()
[24]:
(np.float64(1.7316069602966309), np.float64(22.956424713134766))

Since we just searched the whole catalog, we can check our answer against the statistics that were compiled at import time for the catalog. As you can see, they match what we got when using the .map_partitions method.

[25]:
gaia3_all.hc_structure.aggregate_column_statistics(include_columns="phot_g_mean_mag")
[25]:
min_value max_value null_count row_count
column_names
phot_g_mean_mag 1.731607 22.956425 5457477 1812731847

Closing the Dask client#

[26]:
client.close()

About#

Authors: Derek Jones

Last updated/verified on: January 19, 2026

If you use lsdb for published research, please cite following instructions.