map_partitions

map_partitions#

Catalog.map_partitions(func: Callable[[...], NestedFrame], *args, meta: DataFrame | Series | dict | Iterable | tuple | None = None, include_pixel: bool = False, compute_single_partition: bool = False, partition_index: int | HealpixPixel | None = None, **kwargs) Catalog | Series[source]#

Applies a function to each partition in the catalog and respective margin.

The ra and dec of each row is assumed to remain unchanged. If the function returns a DataFrame, an LSDB Catalog is constructed and its respective margin is updated accordingly, if it exists. Otherwise, only the main catalog Dask object is returned.

Parameters:
funcCallable

The function applied to each partition, which will be called with: func(partition: npd.NestedFrame, *args, **kwargs) with the additional args and kwargs passed to the map_partitions function. If the include_pixel parameter is set, the function will be called with the healpix_pixel as the second positional argument set to the healpix pixel of the partition as func(partition: npd.NestedFrame, healpix_pixel: HealpixPixel, *args, **kwargs)

*args

Additional positional arguments to call func with.

metapd.DataFrame | pd.Series | Dict | Iterable | Tuple | None, default None

An empty pandas DataFrame that has columns matching the output of the function applied to a partition. Other types are accepted to describe the output dataframe format, for full details see the dask documentation https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument If meta is None (default), LSDB will try to work out the output schema of the function by calling the function with an empty DataFrame. If the function does not work with an empty DataFrame, this will raise an error and meta must be set. Note that some operations in LSDB will generate empty partitions, though these can be removed by calling the Catalog.prune_empty_partitions method.

include_pixelbool, default False

Whether to pass the Healpix Pixel of the partition as a HealpixPixel object to the second positional argument of the function

compute_single_partitionbool, default False

If true, runs the function on a single partition only in the local thread, without going through dask. This is useful for testing and debugging functions on a single partition, as all normal debugging tools can be used. Note that when this is true, which partition is computed is determined by the partition_index parameter.

partition_indexint | HealpixPixel | None, default None

The index of the partition to compute when compute_single_partition is True. Also accepts a HealpixPixel object to specify the partition by its HEALPix order and pixel. If None, defaults to 0.

**kwargs

Additional keyword args to pass to the function. These are passed to the Dask DataFrame dask.dataframe.map_partitions function, so any of the dask function’s keyword args such as transform_divisions will be passed through and work as described in the dask documentation https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html

Returns:
Catalog | dd.Series

A new catalog with each partition replaced with the output of the function applied to the original partition. If the function returns a non dataframe output, a dask Series will be returned.

Examples

Apply a function to each partition (e.g., add a derived column):

>>> import lsdb
>>> from lsdb.nested.datasets import generate_data
>>> nf = generate_data(1000, 5, seed=0, ra_range=(0.0, 300.0), dec_range=(-50.0, 50.0))
>>> catalog = lsdb.from_dataframe(nf.compute()[["ra", "dec", "id"]])
>>> def add_flag(df):
...     return df.assign(in_north=df["dec"] > 0)
>>> catalog2 = catalog.map_partitions(add_flag)
>>> catalog2.head()
                        ra        dec    id in_north
_healpix_29
118362963675428450  52.696686  39.675892  8154     True
98504457942331510   89.913567  46.147079  3437     True
70433374600953220   40.528952  35.350965  8214     True
154968715224527848   17.57041    29.8936  9853     True
67780378363846894    45.08384   31.95611  8297     True