map_partitions#
- Catalog.map_partitions(func: Callable[[...], NestedFrame], *args, meta: DataFrame | Series | dict | Iterable | tuple | None = None, include_pixel: bool = False, compute_single_partition: bool = False, partition_index: int | HealpixPixel | None = None, **kwargs) Catalog | Series[source]#
Applies a function to each partition in the catalog and respective margin.
The ra and dec of each row is assumed to remain unchanged. If the function returns a DataFrame, an LSDB Catalog is constructed and its respective margin is updated accordingly, if it exists. Otherwise, only the main catalog Dask object is returned.
- Parameters:
- funcCallable
The function applied to each partition, which will be called with: func(partition: npd.NestedFrame, *args, **kwargs) with the additional args and kwargs passed to the map_partitions function. If the include_pixel parameter is set, the function will be called with the healpix_pixel as the second positional argument set to the healpix pixel of the partition as func(partition: npd.NestedFrame, healpix_pixel: HealpixPixel, *args, **kwargs)
- *args
Additional positional arguments to call func with.
- metapd.DataFrame | pd.Series | Dict | Iterable | Tuple | None, default None
An empty pandas DataFrame that has columns matching the output of the function applied to a partition. Other types are accepted to describe the output dataframe format, for full details see the dask documentation https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument If meta is None (default), LSDB will try to work out the output schema of the function by calling the function with an empty DataFrame. If the function does not work with an empty DataFrame, this will raise an error and meta must be set. Note that some operations in LSDB will generate empty partitions, though these can be removed by calling the Catalog.prune_empty_partitions method.
- include_pixelbool, default False
Whether to pass the Healpix Pixel of the partition as a HealpixPixel object to the second positional argument of the function
- compute_single_partitionbool, default False
If true, runs the function on a single partition only in the local thread, without going through dask. This is useful for testing and debugging functions on a single partition, as all normal debugging tools can be used. Note that when this is true, which partition is computed is determined by the partition_index parameter.
- partition_indexint | HealpixPixel | None, default None
The index of the partition to compute when compute_single_partition is True. Also accepts a HealpixPixel object to specify the partition by its HEALPix order and pixel. If None, defaults to 0.
- **kwargs
Additional keyword args to pass to the function. These are passed to the Dask DataFrame dask.dataframe.map_partitions function, so any of the dask function’s keyword args such as transform_divisions will be passed through and work as described in the dask documentation https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html
- Returns:
- Catalog | dd.Series
A new catalog with each partition replaced with the output of the function applied to the original partition. If the function returns a non dataframe output, a dask Series will be returned.
Examples
Apply a function to each partition (e.g., add a derived column):
>>> import lsdb >>> from lsdb.nested.datasets import generate_data >>> nf = generate_data(1000, 5, seed=0, ra_range=(0.0, 300.0), dec_range=(-50.0, 50.0)) >>> catalog = lsdb.from_dataframe(nf.compute()[["ra", "dec", "id"]]) >>> def add_flag(df): ... return df.assign(in_north=df["dec"] > 0) >>> catalog2 = catalog.map_partitions(add_flag) >>> catalog2.head() ra dec id in_north _healpix_29 118362963675428450 52.696686 39.675892 8154 True 98504457942331510 89.913567 46.147079 3437 True 70433374600953220 40.528952 35.350965 8214 True 154968715224527848 17.57041 29.8936 9853 True 67780378363846894 45.08384 31.95611 8297 True