Source code for lsdb.catalog.catalog

from __future__ import annotations

import warnings
from pathlib import Path
from typing import Any, Callable, Iterable

import dask.dataframe as dd
import hats as hc
import nested_pandas as npd
import pandas as pd
from deprecated import deprecated  # type: ignore
from hats.catalog.catalog_collection import CatalogCollection
from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset
from hats.catalog.index.index_catalog import IndexCatalog as HCIndexCatalog
from hats.pixel_math import HealpixPixel
from pandas._typing import Renamer
from typing_extensions import Self
from upath import UPath

from lsdb import io
from lsdb.catalog.association_catalog import AssociationCatalog
from lsdb.catalog.dataset.healpix_dataset import HealpixDataset
from lsdb.catalog.map_catalog import MapCatalog
from lsdb.catalog.margin_catalog import MarginCatalog
from lsdb.core.crossmatch.abstract_crossmatch_algorithm import AbstractCrossmatchAlgorithm
from lsdb.core.crossmatch.kdtree_match import KdTreeCrossmatch
from lsdb.core.search.abstract_search import AbstractSearch
from lsdb.core.search.index_search import IndexSearch
from lsdb.dask.concat_catalog_data import _assert_same_ra_dec, concat_catalog_data, handle_margins_for_concat
from lsdb.dask.crossmatch_catalog_data import crossmatch_catalog_data, crossmatch_catalog_data_nested
from lsdb.dask.join_catalog_data import (
    join_catalog_data_nested,
    join_catalog_data_on,
    join_catalog_data_through,
    merge_asof_catalog_data,
)
from lsdb.dask.merge_catalog_functions import DEFAULT_SUFFIX_METHOD, create_merged_catalog_info
from lsdb.dask.merge_map_catalog_data import merge_map_catalog_data
from lsdb.io.schema import get_arrow_schema
from lsdb.loaders.hats.hats_loading_config import HatsLoadingConfig
from lsdb.nested.core import NestedFrame
from lsdb.types import DaskDFPixelMap


def _default_suffixes(left_name: str, right_name: str) -> tuple[str, str]:
    """Return the default pair of suffixes for left/right catalog names."""
    return (f"_{left_name}", f"_{right_name}")


# pylint: disable=protected-access,too-many-public-methods, too-many-lines
[docs] class Catalog(HealpixDataset): """LSDB Catalog to perform analysis of sky catalogs and efficient spatial operations.""" hc_structure: hc.catalog.Catalog """`hats.Catalog` object representing (only) the structure and metadata of the HATS catalog""" margin: MarginCatalog | None = None """Link to a ``MarginCatalog`` object that represents the objects in other partitions that are within a specified radius of the border with this partition. This is useful for finding best counterparts when crossmatching catalogs.""" hc_collection: CatalogCollection | None = None """`hats.CatalogCollection` object representing the structure and metadata of the HATS catalog, as well as links to affiliated tables like margins and indexes."""
[docs] def __init__( self, ddf: NestedFrame, ddf_pixel_map: DaskDFPixelMap, hc_structure: hc.catalog.Catalog, *, loading_config: HatsLoadingConfig | None = None, margin: MarginCatalog | None = None, ): """Initialise a Catalog object. Not to be used to load a catalog directly, use one of the `lsdb.from_...` or `lsdb.open_...` methods Parameters ---------- ddf: nd.NestedFrame Dask Nested DataFrame with the source data of the catalog ddf_pixel_map: DaskDFPixelMap Dictionary mapping HEALPix order and pixel to partition index of ddf hc_structure: HCHealpixDataset Object with hats metadata of the catalog loading_config: HatsLoadingConfig or None, default None The configuration used to read the catalog from disk margin: MarginCatalog or None, default None The margin catalog. """ super().__init__(ddf, ddf_pixel_map, hc_structure, loading_config) self.margin = margin
def _create_updated_dataset( self, ddf: NestedFrame | None = None, ddf_pixel_map: DaskDFPixelMap | None = None, hc_structure: HCHealpixDataset | None = None, updated_catalog_info_params: dict | None = None, margin: MarginCatalog | None = None, ) -> Self: cat = super()._create_updated_dataset( ddf, ddf_pixel_map, hc_structure, updated_catalog_info_params, ) cat.margin = margin return cat @property def iloc(self): """Returns the position-indexer for the catalog""" raise NotImplementedError( "Access via .iloc is not supported since it would require computing the entire catalog." ) @property def loc(self): """Returns the label-indexer for the catalog""" raise NotImplementedError( "Access via .loc is not allowed. Please use `Catalog.id_search` instead." " For example, to retrieve a row for an object of ID 'GAIA_123' use" " catalog.id_search(values={'objid':'GAIA_123'}), where 'objid' is the" " column for which there is an index catalog. If `id_search` is targeted" " at a column other than the collection's default index column, or if" " working with a stand-alone catalog, use the `index_catalogs` argument" " to specify a HATS index catalog for the desired column." )
[docs] def query(self, expr: str) -> Catalog: """Filters catalog and respective margin, if it exists, using a complex query expression Parameters ---------- expr : str Query expression to evaluate. The column names that are not valid Python variables names should be wrapped in backticks, and any variable values can be injected using f-strings. The use of '@' to reference variables is not supported. More information about pandas query strings is available `here <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html>`__. Returns ------- Catalog A catalog that contains the data from the original catalog that complies with the query expression. If a margin exists, it is filtered according to the same query expression. Examples -------- Filter a small synthetic catalog using a pandas-style query string: >>> import lsdb >>> from lsdb.nested.datasets import generate_data >>> nf = generate_data(1000, 5, seed=0, ra_range=(0.0, 300.0), dec_range=(-50.0, 50.0)) >>> catalog = lsdb.from_dataframe(nf.compute()[["ra", "dec", "id"]]) >>> filtered = catalog.query("ra < 100 and dec > 0") >>> filtered.head() # doctest: +NORMALIZE_WHITESPACE ra dec id _healpix_29 118362963675428450 52.696686 39.675892 8154 98504457942331510 89.913567 46.147079 3437 70433374600953220 40.528952 35.350965 8214 154968715224527848 17.57041 29.8936 9853 67780378363846894 45.08384 31.95611 8297 """ catalog = super().query(expr) if self.margin is not None: catalog.margin = self.margin.query(expr) return catalog
[docs] def rename(self, columns: Renamer) -> Catalog: """Renames catalog columns (not indices) and that of its margin if it exists using a dictionary or function mapping. Parameters ---------- columns : dict-like or function Transformations to apply to column names. Returns ------- Catalog A catalog that contains the data from the original catalog with renamed columns. If a margin exists, it is renamed according to the same column name mapping. """ catalog = super().rename(columns) if self.margin is not None: catalog.margin = self.margin.rename(columns) return catalog
[docs] def crossmatch( self, other: Catalog, *, n_neighbors: int | None = None, radius_arcsec: float | None = None, min_radius_arcsec: float | None = None, algorithm: AbstractCrossmatchAlgorithm | None = None, output_catalog_name: str | None = None, require_right_margin: bool = False, how: str = "inner", suffixes: tuple[str, str] | None = None, suffix_method: str | None = None, log_changes: bool = True, ) -> Catalog: # pylint:disable=unused-argument """Perform a cross-match between two catalogs The pixels from each catalog are aligned via a `PixelAlignment`, and cross-matching is performed on each pair of overlapping pixels. The resulting catalog will have partitions matching an inner pixel alignment - using pixels that have overlap in both input catalogs and taking the smallest of any overlapping pixels. The resulting catalog will be partitioned using the left catalog's ra and dec, and the index for each row will be the same as the index from the corresponding row in the left catalog's index. Parameters ---------- other : Catalog The right catalog to cross-match against n_neighbors : int, default 1 The number of neighbors to find within each point. radius_arcsec : float, default 1.0 The threshold distance in arcseconds beyond which neighbors are not added. min_radius_arcsec : float, default 0.0 The threshold distance in arcseconds beyond which neighbors are added. algorithm : AbstractCrossmatchAlgorithm | None, default `KDTreeCrossmatch` The instance of an algorithm used to perform the crossmatch. If None, the default KDTree crossmatch algorithm is used. If specified, the algorithm is defined by subclassing `AbstractCrossmatchAlgorithm`. Default algorithm: - `KdTreeCrossmatch`: find the k-nearest neighbors using a kd_tree Custom algorithm: To specify a custom algorithm, write a class that subclasses the `AbstractCrossmatchAlgorithm` class, and either overwrite the `crossmatch` or the `perform_crossmatch` function. The function should be able to perform a crossmatch on two pandas DataFrames from a partition from each catalog. It should return two 1d numpy arrays of equal lengths with the indices of the matching rows from the left and right dataframes, and a dataframe with any extra columns generated by the crossmatch algorithm, also with the same length. These columns are specified in {AbstractCrossmatchAlgorithm.extra_columns}, with their respective data types, by means of an empty pandas dataframe. As an example, the KdTreeCrossmatch algorithm outputs a "_dist_arcsec" column with the distance between data points. Its extra_columns attribute is specified as follows:: pd.DataFrame({"_dist_arcsec": pd.Series(dtype=np.dtype("float64"))}) The `crossmatch`/`perform_crossmatch` methods will receive an instance of `CrossmatchArgs` which includes the partitions and respective pixel information:: - left_df: npd.NestedFrame - right_df: npd.NestedFrame - left_order: int - left_pixel: int - right_order: int - right_pixel: int - left_catalog_info: hc.catalog.TableProperties - right_catalog_info: hc.catalog.TableProperties - right_margin_catalog_info: hc.catalog.TableProperties Include any algorithm-specific parameters in the initialization of your object. These parameters should be validated in `AbstractCrossmatchAlgorithm.validate`, by overwriting the method. output_catalog_name : str, default {left_name}_x_{right_name} The name of the resulting catalog. require_right_margin : bool, default False If true, raises an error if the right margin is missing which could lead to incomplete crossmatches. how : str How to handle the crossmatch of the two catalogs. One of {'left', 'inner'}; defaults to 'inner'. suffixes : Tuple[str,str] or None A pair of suffixes to be appended to the end of each column name when they are joined. Default uses the name of the catalog for the suffix. suffix_method : str or None, default "all_columns" Method to use to add suffixes to columns. Options are: - "overlapping_columns": only add suffixes to columns that are present in both catalogs - "all_columns": add suffixes to all columns from both catalogs .. warning:: This default will change to "overlapping_columns" in a future release. log_changes : bool, default True If True, logs an info message for each column that is being renamed. This only applies when suffix_method is 'overlapping_columns'. Returns ------- Catalog A Catalog with the data from the left and right catalogs merged with one row for each pair of neighbors found from cross-matching. The resulting table contains all columns from the left and right catalogs with their respective suffixes and, whenever specified, a set of extra columns generated by the crossmatch algorithm. Examples -------- Crossmatch two small synthetic catalogs: >>> import lsdb >>> from lsdb.nested.datasets import generate_data >>> nf = generate_data(1000, 5, seed=0, ra_range=(0.0, 300.0), dec_range=(-50.0, 50.0)) >>> df = nf.compute()[["ra", "dec", "id"]] >>> left = lsdb.from_dataframe(df, catalog_name="left") >>> right = lsdb.from_dataframe(df, catalog_name="right") >>> xmatch = left.crossmatch(right, n_neighbors=1, radius_arcsec=1.0, ... suffix_method="overlapping_columns", log_changes=False) >>> xmatch.head()[ # doctest: +NORMALIZE_WHITESPACE ... ["ra_left", "dec_left", "id_left", "_dist_arcsec"] ... ] ra_left dec_left id_left _dist_arcsec _healpix_29 118362963675428450 52.696686 39.675892 8154 0.0 98504457942331510 89.913567 46.147079 3437 0.0 70433374600953220 40.528952 35.350965 8214 0.0 154968715224527848 17.57041 29.8936 9853 0.0 67780378363846894 45.08384 31.95611 8297 0.0 Raises ------ TypeError If the `other` catalog is not of type `Catalog` ValueError If both the kwargs for the default algorithm and an `algorithm` are specified. If the `suffixes` provided is not a tuple of two strings. If the right catalog has no margin and `require_right_margin` is True. """ if not isinstance(other, Catalog): raise TypeError( f"Expected `other` to be a Catalog instance, got {type(other)}. " "You may want `lsdb.crossmatch(frame_or_catalog, frame_or_catalog)` instead." ) default_kwargs = { k: v for k, v in locals().items() if k in ("radius_arcsec", "n_neighbors", "min_radius_arcsec") and v is not None } if not algorithm: algorithm = KdTreeCrossmatch(**default_kwargs) elif any(default_kwargs.values()): raise ValueError(f"If you specify `algorithm`, do not set {list(default_kwargs.keys())}") if suffixes is None: suffixes = _default_suffixes(self.name, other.name) if len(suffixes) != 2: raise ValueError("`suffixes` must be a tuple with two strings") if suffix_method is None: suffix_method = DEFAULT_SUFFIX_METHOD warnings.warn( "The default suffix behavior will change from applying suffixes to all columns to only " "applying suffixes to overlapping columns in a future release." "To maintain the current behavior, explicitly set `suffix_method='all_columns'`. " "To change to the new behavior, set `suffix_method='overlapping_columns'`.", FutureWarning, ) if other.margin is None and require_right_margin: raise ValueError("Right catalog margin cache is required for cross-match.") if output_catalog_name is None: output_catalog_name = f"{self.name}_x_{other.name}" ddf, ddf_map, alignment = crossmatch_catalog_data( self, other, algorithm, how, suffixes, suffix_method, log_changes, ) new_catalog_info = create_merged_catalog_info( self, other, output_catalog_name, suffixes, suffix_method, ) hc_catalog = self.hc_structure.__class__( new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc ) return self.__class__(ddf, ddf_map, hc_catalog)
[docs] def crossmatch_nested( self, other: Catalog, *, n_neighbors: int | None = None, radius_arcsec: float | None = None, min_radius_arcsec: float | None = None, algorithm: AbstractCrossmatchAlgorithm | None = None, output_catalog_name: str | None = None, require_right_margin: bool = False, nested_column_name: str | None = None, ) -> Catalog: # pylint:disable=unused-argument """Perform a cross-match between two catalogs, adding the result as a nested column For each row in the left catalog, the cross-matched rows from the right catalog are added in a new nested column. Any extra columns from the crossmatch like distance are added to this nested column too. The pixels from each catalog are aligned via a `PixelAlignment`, and cross-matching is performed on each pair of overlapping pixels. The resulting catalog will have partitions matching an inner pixel alignment - using pixels that have overlap in both input catalogs and taking the smallest of any overlapping pixels. The resulting catalog will be partitioned using the left catalog's ra and dec, and the index for each row will be the same as the index from the corresponding row in the left catalog's index. Parameters ---------- other : Catalog The right catalog to cross-match against n_neighbors : int, default 1 The number of neighbors to find within each point. radius_arcsec : float, default 1.0 The threshold distance in arcseconds beyond which neighbors are not added. min_radius_arcsec : float, default 0.0 The threshold distance in arcseconds beyond which neighbors are added. algorithm : AbstractCrossmatchAlgorithm | None, default `KDTreeCrossmatch` The instance of an algorithm used to perform the crossmatch. If None, the default KDTree crossmatch algorithm is used. If specified, the algorithm is defined by subclassing `AbstractCrossmatchAlgorithm`. Default algorithm: - `KdTreeCrossmatch`: find the k-nearest neighbors using a kd_tree Custom algorithm: To specify a custom algorithm, write a class that subclasses the `AbstractCrossmatchAlgorithm` class, and either overwrite the `crossmatch` or the `perform_crossmatch` function. The function should be able to perform a crossmatch on two pandas DataFrames from a partition from each catalog. It should return two 1d numpy arrays of equal lengths with the indices of the matching rows from the left and right dataframes, and a dataframe with any extra columns generated by the crossmatch algorithm, also with the same length. These columns are specified in {AbstractCrossmatchAlgorithm.extra_columns}, with their respective data types, by means of an empty pandas dataframe. As an example, the KdTreeCrossmatch algorithm outputs a "_dist_arcsec" column with the distance between data points. Its extra_columns attribute is specified as follows:: pd.DataFrame({"_dist_arcsec": pd.Series(dtype=np.dtype("float64"))}) The `crossmatch`/`perform_crossmatch` methods will receive an instance of `CrossmatchArgs` which includes the partitions and respective pixel information:: - left_df: npd.NestedFrame - right_df: npd.NestedFrame - left_order: int - left_pixel: int - right_order: int - right_pixel: int - left_catalog_info: hc.catalog.TableProperties - right_catalog_info: hc.catalog.TableProperties - right_margin_catalog_info: hc.catalog.TableProperties Include any algorithm-specific parameters in the initialization of your object. These parameters should be validated in `AbstractCrossmatchAlgorithm.validate`, by overwriting the method. output_catalog_name : str, default {left_name}_x_{right_name} The name of the resulting catalog. require_right_margin : bool, default False If true, raises an error if the right margin is missing which could lead to incomplete crossmatches. nested_column_name : str, default uses the name of the right catalog The name of the nested column that will contain the crossmatched rows from the right catalog. Returns ------- Catalog A Catalog with the data from the left and right catalogs joined with the cross-matched rows from the right catalog added in a new nested column. The resulting table contains all columns from the left catalog and a new nested column with all the columns from the right catalog and any extra columns generated by the crossmatch algorithm. Raises ------ ValueError If both the kwargs for the default algorithm and an `algorithm` are specified. If the right catalog has no margin and `require_right_margin` is True. """ default_kwargs = { k: v for k, v in locals().items() if k in ("radius_arcsec", "n_neighbors", "min_radius_arcsec") and v is not None } if not algorithm: algorithm = KdTreeCrossmatch(**default_kwargs) elif any(default_kwargs.values()): raise ValueError(f"If you specify `algorithm`, do not set {list(default_kwargs.keys())}") if nested_column_name is None: nested_column_name = other.name if other.margin is None and require_right_margin: raise ValueError("Right catalog margin cache is required for cross-match.") if output_catalog_name is None: output_catalog_name = f"{self.name}_x_{other.name}" ddf, ddf_map, alignment = crossmatch_catalog_data_nested(self, other, algorithm, nested_column_name) hc_catalog = self.hc_structure.__class__( self.hc_structure.catalog_info, alignment.pixel_tree, moc=alignment.moc, ) return self._create_updated_dataset( ddf=ddf, ddf_pixel_map=ddf_map, hc_structure=hc_catalog, updated_catalog_info_params={"catalog_name": output_catalog_name}, )
[docs] def concat( self, other: Catalog, *, ignore_empty_margins: bool = False, **kwargs, ) -> Catalog: """Concatenate two catalogs by aligned HEALPix pixels. Parameters ---------- other : Catalog Catalog to concatenate with. ignore_empty_margins : bool, default False If True, keep the available margin when only one side has it (treated as incomplete). If False, drop margins when only one side has them. Defaults to False. **kwargs Extra arguments forwarded to internal `pandas.concat`. Returns ------- Catalog New catalog with OUTER pixel alignment. If both inputs have a margin — or if `ignore_empty_margins=True` and at least one side has it — the result includes a concatenated margin dataset. Raises ------ ValueError If RA/Dec column names differ between the input catalogs, or between a catalog and its own margin. """ # Fail fast if RA/Dec columns differ between the two catalogs. _assert_same_ra_dec(self, other, context="Catalog concat") # Delegate margin handling to helper (which also validates catalog vs margin) margin = handle_margins_for_concat( self, other, ignore_empty_margins=ignore_empty_margins, **kwargs, ) # Main catalog concatenation ddf, ddf_map, alignment = concat_catalog_data(self, other, **kwargs) hc_catalog = self.hc_structure.__class__( self.hc_structure.catalog_info, alignment.pixel_tree, moc=alignment.moc, ) return self._create_updated_dataset( ddf=ddf, ddf_pixel_map=ddf_map, hc_structure=hc_catalog, margin=margin, )
[docs] def merge_map( self, map_catalog: MapCatalog, func: Callable[..., npd.NestedFrame], *args, meta: npd.NestedFrame | None = None, **kwargs, ) -> Catalog: """Applies a function to each pair of partitions in this catalog and the map catalog. The pixels from each catalog are aligned via a `PixelAlignment`, and the respective dataframes are passed to the function. The resulting catalog will have the same partitions as the point source catalog. Parameters ---------- map_catalog : MapCatalog The continuous map to merge. func : Callable The function applied to each catalog partition, which will be called with: `func(catalog_partition: npd.NestedFrame, map_partition: npd.NestedFrame, ` ` healpix_pixel: HealpixPixel, *args, **kwargs)` with the additional args and kwargs passed to the `merge_map` function. *args : Additional positional arguments to call `func` with. meta : pd.DataFrame | pd.Series | Dict | Iterable | Tuple | None, default None An empty pandas DataFrame that has columns matching the output of the function applied to the catalog partition. Other types are accepted to describe the output dataframe format, for full details see the dask documentation https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument If meta is None (default), LSDB will try to work out the output schema of the function by calling the function with an empty DataFrame. If the function does not work with an empty DataFrame, this will raise an error and meta must be set. Note that some operations in LSDB will generate empty partitions, though these can be removed by calling the `Catalog.prune_empty_partitions` method. **kwargs Additional keyword args to pass to the function. These are passed to the Dask DataFrame `dask.dataframe.map_partitions` function, so any of the dask function's keyword args such as `transform_divisions` will be passed through and work as described in the dask documentation https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html Returns ------- Catalog A Catalog with the data from the left and right catalogs merged with one row for each pair of neighbors found from cross-matching. The resulting table contains all columns from the left and right catalogs with their respective suffixes and, whenever specified, a set of extra columns generated by the crossmatch algorithm. """ ddf, ddf_map, alignment = merge_map_catalog_data(self, map_catalog, func, *args, meta=meta, **kwargs) hc_catalog = self.hc_structure.__class__( self.hc_structure.catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc, ) return self._create_updated_dataset(ddf=ddf, ddf_pixel_map=ddf_map, hc_structure=hc_catalog)
[docs] def search(self, search: AbstractSearch): """Find rows by reusable search algorithm. Filters partitions in the catalog to those that match some rough criteria. Filters to points that match some finer criteria. Parameters ---------- search : AbstractSearch Instance of AbstractSearch. Returns ------- Catalog A new Catalog containing the points filtered to those matching the search parameters. """ cat = super().search(search) cat.margin = self.margin.search(search) if self.margin is not None else None return cat
[docs] def map_partitions( self, func: Callable[..., npd.NestedFrame], *args, meta: pd.DataFrame | pd.Series | dict | Iterable | tuple | None = None, include_pixel: bool = False, compute_single_partition: bool = False, partition_index: int | HealpixPixel | None = None, **kwargs, ) -> Catalog | dd.Series: """Applies a function to each partition in the catalog and respective margin. The ra and dec of each row is assumed to remain unchanged. If the function returns a DataFrame, an LSDB Catalog is constructed and its respective margin is updated accordingly, if it exists. Otherwise, only the main catalog Dask object is returned. Parameters ---------- func : Callable The function applied to each partition, which will be called with: `func(partition: npd.NestedFrame, *args, **kwargs)` with the additional args and kwargs passed to the `map_partitions` function. If the `include_pixel` parameter is set, the function will be called with the `healpix_pixel` as the second positional argument set to the healpix pixel of the partition as `func(partition: npd.NestedFrame, healpix_pixel: HealpixPixel, *args, **kwargs)` *args Additional positional arguments to call `func` with. meta : pd.DataFrame | pd.Series | Dict | Iterable | Tuple | None, default None An empty pandas DataFrame that has columns matching the output of the function applied to a partition. Other types are accepted to describe the output dataframe format, for full details see the dask documentation https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument If meta is None (default), LSDB will try to work out the output schema of the function by calling the function with an empty DataFrame. If the function does not work with an empty DataFrame, this will raise an error and meta must be set. Note that some operations in LSDB will generate empty partitions, though these can be removed by calling the `Catalog.prune_empty_partitions` method. include_pixel : bool, default False Whether to pass the Healpix Pixel of the partition as a `HealpixPixel` object to the second positional argument of the function compute_single_partition : bool, default False If true, runs the function on a single partition only in the local thread, without going through dask. This is useful for testing and debugging functions on a single partition, as all normal debugging tools can be used. Note that when this is true, which partition is computed is determined by the `partition_index` parameter. partition_index : int | HealpixPixel | None, default None The index of the partition to compute when compute_single_partition is True. Also accepts a HealpixPixel object to specify the partition by its HEALPix order and pixel. If None, defaults to 0. **kwargs Additional keyword args to pass to the function. These are passed to the Dask DataFrame `dask.dataframe.map_partitions` function, so any of the dask function's keyword args such as `transform_divisions` will be passed through and work as described in the dask documentation https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html Returns ------- Catalog | dd.Series A new catalog with each partition replaced with the output of the function applied to the original partition. If the function returns a non dataframe output, a dask Series will be returned. Examples -------- Apply a function to each partition (e.g., add a derived column): >>> import lsdb >>> from lsdb.nested.datasets import generate_data >>> nf = generate_data(1000, 5, seed=0, ra_range=(0.0, 300.0), dec_range=(-50.0, 50.0)) >>> catalog = lsdb.from_dataframe(nf.compute()[["ra", "dec", "id"]]) >>> def add_flag(df): ... return df.assign(in_north=df["dec"] > 0) >>> catalog2 = catalog.map_partitions(add_flag) >>> catalog2.head() # doctest: +NORMALIZE_WHITESPACE ra dec id in_north _healpix_29 118362963675428450 52.696686 39.675892 8154 True 98504457942331510 89.913567 46.147079 3437 True 70433374600953220 40.528952 35.350965 8214 True 154968715224527848 17.57041 29.8936 9853 True 67780378363846894 45.08384 31.95611 8297 True """ catalog = super().map_partitions( func, *args, meta=meta, include_pixel=include_pixel, compute_single_partition=compute_single_partition, partition_index=partition_index, **kwargs, ) if isinstance(catalog, Catalog) and self.margin is not None: # For single partition updates, we need to update the margin for that partition only if compute_single_partition: # Get the corresponding pixel for this partition pixel = catalog.get_healpix_pixels()[0] # Update the margin for this pixel only if pixel in self.margin._ddf_pixel_map: margin_partition_index = self.margin.get_partition_index(pixel.order, pixel.pixel) catalog.margin = self.margin.map_partitions( func, *args, meta=meta, include_pixel=include_pixel, compute_single_partition=True, partition_index=margin_partition_index, **kwargs, ) # type: ignore[assignment] else: # Update all margins as before catalog.margin = self.margin.map_partitions( func, *args, meta=meta, include_pixel=include_pixel, **kwargs, ) # type: ignore[assignment] return catalog
[docs] def merge( self, other: Catalog, how: str = "inner", on: str | list | None = None, left_on: str | list | None = None, right_on: str | list | None = None, left_index: bool = False, right_index: bool = False, suffixes: tuple[str, str] | None = None, ) -> NestedFrame: """Performs the merge of two catalog Dataframes More information about pandas merge is available `here <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html>`__. Parameters ---------- other : Catalog The right catalog to merge with. how : str How to handle the merge of the two catalogs. One of {'left', 'right', 'outer', 'inner', 'leftsemi'}, default 'inner' on : str | List Column or index names to join on. Defaults to the intersection of columns in both Dataframes if on is None and not merging on indexes. left_on : str | List Column to join on the left Dataframe. Lists are supported if their length is one. right_on : str | List Column to join on the right Dataframe. Lists are supported if their length is one. left_index : bool, default False Use the index of the left Dataframe as the join key. right_index : bool, default False Use the index of the right Dataframe as the join key. suffixes : tuple[str,str] A pair of suffixes to be appended to the end of each column name when they are joined. Defaults to using the name of the catalog for the suffix. Returns ------- Catalog A new Dask Dataframe containing the data points that result from the merge of the two catalogs. """ if suffixes is None: suffixes = _default_suffixes(self.name, other.name) if len(suffixes) != 2: raise ValueError("`suffixes` must be a tuple with two strings") def make_strlist(col: str | list[str] | None) -> list[str]: if col is None: return [] if isinstance(col, str): return [col] return col names_to_check = make_strlist(on) if not left_index: names_to_check += make_strlist(left_on) if not right_index: names_to_check += make_strlist(right_on) self._check_unloaded_columns(names_to_check) return self._ddf.merge( other._ddf, how=how, on=on, left_on=left_on, right_on=right_on, left_index=left_index, right_index=right_index, suffixes=suffixes, )
[docs] def merge_asof( self, other: Catalog, direction: str = "backward", suffixes: tuple[str, str] | None = None, output_catalog_name: str | None = None, suffix_method: str | None = None, log_changes: bool = True, ): """Uses the pandas `merge_asof` function to merge two catalogs on their indices by distance of keys Must be along catalog indices, and does not include margin caches, meaning results may be incomplete for merging points. This function is intended for use in special cases such as Dust Map Catalogs, for general merges, the ``crossmatch`` and ``join`` functions should be used. Parameters ---------- other : lsdb.Catalog The right catalog to merge to suffixes : tuple[str,str] The suffixes to apply to each partition's column names direction : str, default "backward" The direction to perform the merge_asof output_catalog_name : str The name of the resulting catalog to be stored in metadata suffix_method : str, default "all_columns" Method to use to add suffixes to columns. Options are: - "overlapping_columns": only add suffixes to columns that are present in both catalogs - "all_columns": add suffixes to all columns from both catalogs .. warning:: This default will change to "overlapping_columns" in a future release. log_changes : bool, default True If True, logs an info message for each column that is being renamed. This only applies when suffix_method is 'overlapping_columns'. Returns ------- Catalog A new catalog with the columns from each of the input catalogs with their respective suffixes added, and the rows merged using merge_asof on the specified columns. """ if suffixes is None: suffixes = _default_suffixes(self.name, other.name) if len(suffixes) != 2: raise ValueError("`suffixes` must be a tuple with two strings") if suffix_method is None: suffix_method = DEFAULT_SUFFIX_METHOD warnings.warn( "The default suffix behavior will change from applying suffixes to all columns to only " "applying suffixes to overlapping columns in a future release." "To maintain the current behavior, explicitly set `suffix_method='all_columns'`. " "To change to the new behavior, set `suffix_method='overlapping_columns'`.", FutureWarning, ) ddf, ddf_map, alignment = merge_asof_catalog_data( self, other, suffixes=suffixes, direction=direction, suffix_method=suffix_method, log_changes=log_changes, ) if output_catalog_name is None: output_catalog_name = ( f"{self.hc_structure.catalog_info.catalog_name}_merge_asof_" f"{other.hc_structure.catalog_info.catalog_name}" ) new_catalog_info = create_merged_catalog_info( self, other, output_catalog_name, suffixes, suffix_method, ) hc_catalog = hc.catalog.Catalog( new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc ) return Catalog(ddf, ddf_map, hc_catalog)
[docs] def join( self, other: Catalog, left_on: str | None = None, right_on: str | None = None, through: AssociationCatalog | None = None, suffixes: tuple[str, str] | None = None, output_catalog_name: str | None = None, suffix_method: str | None = None, log_changes: bool = True, ) -> Catalog: """Perform a spatial join to another catalog Joins two catalogs together on a shared column value, merging rows where they match. This is an inner join: only rows with matching join keys are returned (unmatched rows are dropped). The operation only joins data from matching partitions, and does not join rows that have a matching column value but are in separate partitions in the sky. For a more general join, see the `merge` function. Parameters ---------- other : Catalog The right catalog to join to left_on : str The name of the column in the left catalog to join on right_on : str The name of the column in the right catalog to join on through : AssociationCatalog An association catalog that provides the alignment between pixels and individual rows. suffixes : tuple[str,str] Suffixes to apply to the columns of each table output_catalog_name : str The name of the resulting catalog to be stored in metadata suffix_method : str, default "all_columns" Method to use to add suffixes to columns. Options are: - "overlapping_columns": only add suffixes to columns that are present in both catalogs - "all_columns": add suffixes to all columns from both catalogs .. warning:: This default will change to "overlapping_columns" in a future release. log_changes : bool, default True If True, logs an info message for each column that is being renamed. This only applies when suffix_method is 'overlapping_columns'. Returns ------- Catalog A new catalog with the columns from each of the input catalogs with their respective suffixes added, and the rows merged on the specified columns. Examples -------- Join two catalogs on a shared key within the same sky partitions: >>> import lsdb >>> from lsdb.nested.datasets import generate_data >>> nf = generate_data(1000, 5, seed=0, ra_range=(0.0, 300.0), dec_range=(-50.0, 50.0)) >>> base = lsdb.from_dataframe(nf.compute()[["ra", "dec", "id"]]) >>> left = base.rename({"ra": "ra_left", "dec": "dec_left"}) >>> right = base.rename({"ra": "ra_right", "dec": "dec_right", "id": "id_right"}).map_partitions( ... lambda df: df.assign(right_flag=True) ... ) >>> joined = left.join(right, left_on="id", right_on="id_right", suffix_method="overlapping_columns") >>> joined.head()[ # doctest: +NORMALIZE_WHITESPACE ... ["ra_left", "dec_left", "id", "right_flag"] ... ] ra_left dec_left id right_flag _healpix_29 118362963675428450 52.696686 39.675892 8154 True 98504457942331510 89.913567 46.147079 3437 True 70433374600953220 40.528952 35.350965 8214 True 154968715224527848 17.57041 29.8936 9853 True 67780378363846894 45.08384 31.95611 8297 True """ if suffixes is None: suffixes = _default_suffixes(self.name, other.name) if len(suffixes) != 2: raise ValueError("`suffixes` must be a tuple with two strings") if suffix_method is None: suffix_method = DEFAULT_SUFFIX_METHOD warnings.warn( "The default suffix behavior will change from applying suffixes to all columns to only " "applying suffixes to overlapping columns in a future release." "To maintain the current behavior, explicitly set `suffix_method='all_columns'`. " "To change to the new behavior, set `suffix_method='overlapping_columns'`.", FutureWarning, ) self._check_unloaded_columns([left_on, right_on]) if through is not None: ddf, ddf_map, alignment = join_catalog_data_through( self, other, through, suffixes, suffix_method=suffix_method, log_changes=log_changes ) else: if left_on is None or right_on is None: raise ValueError("Either both of left_on and right_on, or through must be set") if left_on not in self._ddf.columns: raise ValueError("left_on must be a column in the left catalog") if right_on not in other._ddf.columns: raise ValueError("right_on must be a column in the right catalog") ddf, ddf_map, alignment = join_catalog_data_on( self, other, left_on, right_on, suffixes, suffix_method=suffix_method, log_changes=log_changes ) if output_catalog_name is None: output_catalog_name = self.hc_structure.catalog_info.catalog_name new_catalog_info = create_merged_catalog_info( self, other, output_catalog_name, suffixes, suffix_method, ) hc_catalog = hc.catalog.Catalog( new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc ) return Catalog(ddf, ddf_map, hc_catalog)
[docs] def join_nested( self, other: Catalog, left_on: str | None = None, right_on: str | None = None, nested_column_name: str | None = None, output_catalog_name: str | None = None, how: str = "inner", ) -> Catalog: """Perform a spatial join to another catalog by adding the other catalog as a nested column Joins two catalogs together on a shared column value, merging rows where they match. The result is added as a nested dataframe column using `nested-dask <https://github.com/lincc-frameworks/nested-dask>`__, where the right catalog's columns are encoded within a column in the resulting dataframe. For more information, view the `nested-dask documentation <https://nested-dask.readthedocs.io/en/latest/>`__. The operation only joins data from matching partitions and their margin caches, and does not join rows that have a matching column value but are in separate partitions in the sky. For a more general join, see the `merge` function. Parameters ---------- other : Catalog The right catalog to join to left_on : str The name of the column in the left catalog to join on right_on : str The name of the column in the right catalog to join on nested_column_name : str The name of the nested column in the resulting dataframe storing the joined columns in the right catalog. (Default: name of right catalog) output_catalog_name : str The name of the resulting catalog to be stored in metadata how : str, {'inner', 'left'}, default 'inner' How to handle the alignment Returns ------- Catalog A new catalog with the columns from each of the input catalogs with their respective suffixes added, and the rows merged on the specified columns. """ if left_on is None or right_on is None: raise ValueError("Both of left_on and right_on must be set") self._check_unloaded_columns([left_on]) if left_on not in self._ddf.columns: raise ValueError("left_on must be a column in the left catalog") other._check_unloaded_columns([right_on]) if right_on not in other._ddf.columns: raise ValueError("right_on must be a column in the right catalog") ddf, ddf_map, alignment = join_catalog_data_nested( self, other, left_on, right_on, nested_column_name=nested_column_name, how=how ) if output_catalog_name is None: output_catalog_name = self.hc_structure.catalog_info.catalog_name new_catalog_info = self.hc_structure.catalog_info.copy_and_update( catalog_name=output_catalog_name, total_rows=None ) hc_catalog = hc.catalog.Catalog( new_catalog_info, alignment.pixel_tree, schema=get_arrow_schema(ddf), moc=alignment.moc ) return Catalog(ddf, ddf_map, hc_catalog)
[docs] def nest_lists( self, base_columns: list[str] | None = None, list_columns: list[str] | None = None, name: str = "nested", ) -> Catalog: """Creates a new catalog with a set of list columns packed into a nested column. Parameters ---------- base_columns : list-like or None Any columns that have non-list values in the input catalog. These will simply be kept as identical columns in the result. If None, is inferred to be all columns in the input catalog that are not considered list-value columns. list_columns : list-like or None The list-value columns that should be packed into a nested column. All columns in the list will attempt to be packed into a single nested column with the name provided in ``nested_name``. All columns in list_columns must have pyarrow list dtypes, otherwise the operation will fail. If None, is defined as all columns not in ``base_columns``. Returns ------- Catalog A new catalog with specified list columns nested into a new nested column. Notes ----- As noted above, all columns in `list_columns` must have a pyarrow ListType dtype. This is needed for proper meta propagation. To convert a list column to this dtype, you can use this command structure:: nf= nf.astype({"colname": pd.ArrowDtype(pa.list_(pa.int64()))}) Where pa.int64 above should be replaced with the correct dtype of the underlying data accordingly. Additionally, it's a known issue in Dask (https://github.com/dask/dask/issues/10139) that columns with list values will by default be converted to the string type. This will interfere with the ability to recast these to pyarrow lists. We recommend setting the following dask config setting to prevent this:: dask.config.set({"dataframe.convert-string":False}) """ catalog = super().nest_lists( base_columns=base_columns, list_columns=list_columns, name=name, ) if self.margin is not None: catalog.margin = self.margin.nest_lists( base_columns=base_columns, list_columns=list_columns, name=name, ) return catalog
[docs] def map_rows( self, func, columns=None, *, row_container="dict", output_names=None, infer_nesting=True, append_columns=False, meta=None, **kwargs, ) -> Catalog: """Takes a function and applies it to each top-level row of the Catalog. docstring copied from nested-pandas Nested columns are packaged alongside base columns and available for function use, where base columns are passed as scalars and nested columns are passed as numpy arrays. The way in which the row data is packaged is configurable (by default, a dictionary) and controlled by the `row_container` argument. Parameters ---------- func : callable Function to apply to each nested dataframe. The first arguments to `func` should be which columns to apply the function to. See the Notes for recommendations on writing func outputs. columns : None | str | list of str, default None Specifies which columns to pass to the function in the row_container format. If None, all columns are passed. If list of str, those columns are passed. If str, a single column is passed or if the string is a nested column, then all nested sub-columns are passed (e.g. columns="nested" passes all columns of the nested dataframe "nested"). To pass individual nested sub-columns, use the hierarchical column name (e.g. columns=["nested.t",...]). row_container : 'dict' or 'args', default 'dict' Specifies how the row data will be packaged when passed as an input to the function. If 'dict', the function will be called as `func({"col1": value, ...}, **kwargs)`, so func should expect a single dictionary input with keys corresponding to column names. If 'args', the function will be called as `func(value, ..., **kwargs)`, so func should expect positional arguments corresponding to the columns specified in `args`. (Default value = "dict") output_names : None | str | list of str Specifies the names of the output columns in the resulting NestedFrame. If None, the function will return whatever names the user function returns. If specified will override any names returned by the user function provided the number of names matches the number of outputs. When not specified and the user function returns values without names (e.g. a list or tuple), the output columns will be enumerated (e.g. "0", "1", ...). (Default value = None) infer_nesting : bool, default True If True, the function will pack output columns into nested structures based on column names adhering to a nested naming scheme. E.g. "nested.b" and "nested.c" will be packed into a column called "nested" with columns "b" and "c". If False, all outputs will be returned as base columns. Note that this will trigger off of names specified in `output_names` in addition to names returned by the user function. (Default value = True) append_columns : bool, default False if True, the output columns should be appended to those in the original NestedFrame meta : dataframe or series-like, optional, default None The dask meta of the output. If append_columns is True, the meta should specify just the additional columns output by func. kwargs : keyword arguments, optional Keyword arguments to pass to the function. Returns ------- `Catalog` `Catalog` with the results of the function applied to the columns of the frame. Notes ----- If concerned about performance, specify `columns` to only include the columns needed for the function, as this will avoid the overhead of packaging all columns for each row. By default, `map_rows` will produce a `NestedFrame` with enumerated column names for each returned value of the function. It's recommended to either specify `output_names` or have `func` return a dictionary where each key is an output column of the dataframe returned by `map_rows` (as shown above). Examples -------- >>> import numpy as np >>> import lsdb >>> import pandas as pd >>> catalog = lsdb.from_dataframe(pd.DataFrame({"ra":[0, 10], "dec":[5, 15], ... "mag":[21, 22], "mag_err":[.1, .2]})) Writing a function that takes a row as a dictionary: >>> def my_sigma(row): ... '''map_rows will return a NestedFrame with two columns''' ... return row["mag"] + row["mag_err"], row["mag"] - row["mag_err"] >>> meta = {"plus_one": np.float64, "minus_one": np.float64} >>> catalog.map_rows(my_sigma, ... columns=["mag","mag_err"], ... output_names=["plus_one", "minus_one"], ... meta=meta).compute().reset_index() _healpix_29 plus_one minus_one 0 1372475556631677955 21.1 20.9 1 1389879706834706546 22.2 21.8 Writing the same function using positional arguments: >>> def my_sigma(col1, col2): ... '''map_rows will return a NestedFrame with two columns''' ... return col1 + col2, col1 - col2 >>> meta = {"plus_one": np.float64, "minus_one": np.float64} >>> catalog.map_rows(my_sigma, ... columns=["mag","mag_err"], ... row_container='args', # send columns as positional args ... output_names=["plus_one", "minus_one"], ... meta=meta).compute().reset_index() _healpix_29 plus_one minus_one 0 1372475556631677955 21.1 20.9 1 1389879706834706546 22.2 21.8 See more examples in the nested-pandas documentation. """ catalog = super().map_rows( func, columns=columns, row_container=row_container, output_names=output_names, infer_nesting=infer_nesting, append_columns=append_columns, meta=meta, **kwargs, ) if self.margin is not None: catalog.margin = self.margin.map_rows( func, columns=columns, row_container=row_container, output_names=output_names, infer_nesting=infer_nesting, append_columns=append_columns, meta=meta, **kwargs, ) return catalog
@deprecated( version="0.7.3", reason="`to_hats` will be removed in the future, " "use `write_catalog` instead." ) def to_hats( self, base_catalog_path: str | Path | UPath, *, catalog_name: str | None = None, default_columns: list[str] | None = None, as_collection: bool = True, overwrite: bool = False, error_if_empty: bool = True, **kwargs, ): """Save the catalog to disk in the HATS format. See write_catalog().""" self.write_catalog( base_catalog_path, catalog_name=catalog_name, default_columns=default_columns, as_collection=as_collection, overwrite=overwrite, error_if_empty=error_if_empty, **kwargs, )
[docs] def write_catalog( self, base_catalog_path: str | Path | UPath, *, catalog_name: str | None = None, default_columns: list[str] | None = None, as_collection: bool = True, overwrite: bool = False, create_thumbnail: bool = True, error_if_empty: bool = True, **kwargs, ): """Save the catalog to disk in HATS format. Parameters ---------- base_catalog_path : str | Path | UPath, Location where catalog is saved to catalog_name : str The name of the catalog to be saved default_columns : list[str] A metadata property with the list of the columns in the catalog to be loaded by default. By default, uses the default columns from the original hats catalog if they exist. as_collection : bool, default True If True, saves the catalog and its margin as a collection overwrite : bool, default False If True existing catalog is overwritten error_if_empty : bool, default True If True, raises an error if the catalog is empty. **kwargs Arguments to pass to the parquet write operations Examples -------- Write a small synthetic catalog to disk: >>> import lsdb >>> from lsdb.nested.datasets import generate_data >>> nf = generate_data(1000, 5, seed=0, ra_range=(0.0, 300.0), dec_range=(-50.0, 50.0)) >>> catalog = lsdb.from_dataframe(nf.compute()[["ra", "dec", "id"]], catalog_name="demo") >>> catalog.write_catalog(<your path here> / "demo_catalog", overwrite=True) # doctest: +SKIP """ if as_collection: self._check_unloaded_columns(default_columns) io.to_collection( self, base_collection_path=base_catalog_path, catalog_name=catalog_name, default_columns=default_columns, overwrite=overwrite, error_if_empty=error_if_empty, create_thumbnail=create_thumbnail, **kwargs, ) else: super().write_catalog( base_catalog_path, catalog_name=catalog_name, default_columns=default_columns, overwrite=overwrite, create_thumbnail=create_thumbnail, error_if_empty=error_if_empty, **kwargs, )