Source code for lsdb.loaders.hats.read_hats

from __future__ import annotations

from pathlib import Path
from typing import Callable

import hats as hc
import nested_pandas as npd
import numpy as np
import pyarrow as pa
from fsspec.implementations.http import HTTPFileSystem
from hats.catalog import CatalogType
from hats.catalog.healpix_dataset.healpix_dataset import HealpixDataset as HCHealpixDataset
from hats.io.file_io import file_io
from hats.pixel_math import HealpixPixel
from hats.pixel_math.healpix_pixel_function import get_pixel_argsort
from hats.pixel_math.spatial_index import SPATIAL_INDEX_COLUMN
from nested_pandas.nestedframe.io import from_pyarrow
from upath import UPath

import lsdb.nested as nd
from lsdb.catalog.association_catalog import AssociationCatalog
from lsdb.catalog.catalog import Catalog, DaskDFPixelMap, MarginCatalog
from lsdb.catalog.dataset.healpix_dataset import HealpixDataset
from lsdb.catalog.map_catalog import MapCatalog
from lsdb.catalog.margin_catalog import _validate_margin_catalog
from lsdb.core.search.abstract_search import AbstractSearch
from lsdb.dask.divisions import get_pixels_divisions
from lsdb.io.schema import get_arrow_schema
from lsdb.loaders.hats.hats_loading_config import HatsLoadingConfig

MAX_PYARROW_FILTERS = 10


[docs] def open_catalog( path: str | Path | UPath, search_filter: AbstractSearch | None = None, columns: list[str] | str | None = None, margin_cache: str | Path | UPath | None = None, error_empty_filter: bool = True, filters: list[tuple[str]] | None = None, path_generator: Callable[[UPath, HealpixPixel, dict | None, str], UPath] = hc.io.pixel_catalog_file, **kwargs, ) -> Catalog: """Open a catalog from a HATS path. Catalogs exist in collections or stand-alone. Catalogs in a HATS collection are composed of a main catalog, and margin and index catalogs. LSDB will open exactly ONE main object catalog and at most ONE margin catalog. The `collection.properties` file specifies which margins and indexes are available, and which margin to use by default:: my_collection_dir/ ├── main_catalog/ ├── margin_catalog/ ├── margin_catalog_2/ ├── index_catalog/ ├── collection.properties All arguments passed to the `open_catalog` call are applied to the calls to open the main and margin catalogs. Typical usage example, where we open a collection with a subset of columns:: lsdb.open_catalog(path='./my_collection_dir', columns=['ra','dec']) Typical usage example, where we open a collection from a cone search:: lsdb.open_catalog( path='./my_collection_dir', columns=['ra','dec'], search_filter=lsdb.ConeSearch(ra, dec, radius_arcsec), ) Typical usage example, where we open a collection with a non-default margin:: lsdb.open_catalog(path='./my_collection_dir', margin_cache='margin_catalog_2') Note that this margin still needs to be specified in the `all_margins` attribute of the `collection.properties` file. We can also open each catalog separately, if needed:: lsdb.open_catalog(path='./my_collection_dir/main_catalog') Parameters ---------- path : path-like The path that locates the root of the HATS collection or stand-alone catalog. search_filter : type[AbstractSearch] or None, default None The spatial filter method to be applied. columns : list[str] or str or None, default None The set of columns to filter the catalog on. If None, the catalog's default columns will be loaded. To load all catalog columns, use `columns="all"`. margin_cache : path-like or None, default None The margin for the main catalog, provided as a path. error_empty_filter : bool, default True If loading the catalog with a filter results in an empty catalog, throw error. filters : list[tuple[str]] or None, default None Filters to apply when reading parquet files. These may be applied as pyarrow filters or URL parameters. path_generator : Callable[[UPath, HealpixPixel, dict | None, str], UPath], optional The function `f(catalog_base_dir, pixel, query_params, npix_suffix)` that translates HEALPix into partition data paths. Its arguments are the following: - catalog_base_dir: UPath - path passed to `open_catalog`/`read_hats` - pixel: HealpixPixel - pixel to generate path for - query_params: dict | None - dictionary used to generate HTTP query string - npix_suffix: str - "/" for leaf directory, filename suffix like ".parquet" for leaf file The catalog metadata files need to live where the HATS standard expects them. Defaults to `hats.io.pixel_catalog_file`. **kwargs Arguments to pass to the pandas parquet file reader Returns ------- Catalog The catalog loaded according to the specified arguments. """ hc_catalog = hc.read_hats(path) if not isinstance(hc_catalog, (hc.catalog.CatalogCollection, hc.catalog.Catalog)): raise TypeError("To load auxiliary datasets please use `lsdb.read_hats()`") return _read_dataset( hc_catalog, search_filter=search_filter, columns=columns, margin_cache=margin_cache, error_empty_filter=error_empty_filter, filters=filters, path_generator=path_generator, **kwargs, )
def read_hats( path: str | Path | UPath, search_filter: AbstractSearch | None = None, columns: list[str] | str | None = None, margin_cache: str | Path | UPath | None = None, error_empty_filter: bool = True, filters: list[tuple[str]] | None = None, path_generator: Callable[[UPath, HealpixPixel, dict | None, str], UPath] = hc.io.pixel_catalog_file, **kwargs, ) -> HealpixDataset: """Load dataset from a HATS path. Use this method to load auxiliary (margin, association, map) datasets. Parameters ---------- path : path-like The path that locates the root of the HATS collection or stand-alone catalog. search_filter : type[AbstractSearch] or None, default None The spatial filter method to be applied. columns : list[str] or str or None, default None The set of columns to filter the catalog on. If None, the catalog's default columns will be loaded. To load all catalog columns, use `columns="all"`. margin_cache : path-like or None, default None The margin for the main catalog, provided as a path. error_empty_filter : bool, default True If loading the catalog with a filter results in an empty catalog, throw error. filters : list[tuple[str]] or None, default None Filters to apply when reading parquet files. These may be applied as pyarrow filters or URL parameters. path_generator : Callable[[UPath, HealpixPixel, dict | None, str], UPath], optional The function `f(catalog_base_dir, pixel, query_params, npix_suffix)` that translates HEALPix into partition data paths. Its arguments are the following: - catalog_base_dir: UPath - path passed to `open_catalog`/`read_hats` - pixel: HealpixPixel - pixel to generate path for - query_params: dict | None - dictionary used to generate HTTP query string - npix_suffix: str - "/" for leaf directory, filename suffix like ".parquet" for leaf file The catalog metadata files need to live where the HATS standard expects them. Defaults to `hats.io.pixel_catalog_file`. **kwargs Arguments to pass to the pandas parquet file reader Returns ------- HealpixDataset A valid HATS dataset. """ hc_catalog = hc.read_hats(path) return _read_dataset( hc_catalog, search_filter=search_filter, columns=columns, margin_cache=margin_cache, error_empty_filter=error_empty_filter, filters=filters, path_generator=path_generator, **kwargs, ) def _read_dataset( hc_catalog: hc.catalog.CatalogCollection | hc.catalog.Dataset, *, search_filter: AbstractSearch | None = None, columns: list[str] | str | None = None, margin_cache: str | Path | UPath | None = None, error_empty_filter: bool = True, filters: list[tuple[str]] | None = None, path_generator: Callable[[UPath, HealpixPixel, dict | None, str], UPath] = hc.io.pixel_catalog_file, **kwargs, ): """Internal method to read any HATS collection/dataset""" config = HatsLoadingConfig( search_filter=search_filter, columns=columns, error_empty_filter=error_empty_filter, margin_cache=margin_cache, filters=filters, path_generator=path_generator, kwargs=kwargs, ) if isinstance(hc_catalog, hc.catalog.CatalogCollection): config.margin_cache = _get_collection_margin(hc_catalog, margin_cache) catalog = _load_catalog(hc_catalog.main_catalog, config) catalog.hc_collection = hc_catalog # type: ignore[attr-defined] else: catalog = _load_catalog(hc_catalog, config) return catalog def _get_collection_margin( collection: hc.catalog.CatalogCollection, margin_cache: str | Path | UPath | None ) -> UPath | None: """The path to the collection margin. The `margin_cache` should be provided as: - An identifier to the margin catalog name (it needs to be a string and be specified in the `all_margins` attribute of the `collection.properties`). - The absolute path to a margin, hosted locally or remote. By default, if no `margin_cache` is provided, the absolute path to the default collection margin is returned. """ if margin_cache is None: return collection.default_margin_catalog_dir margin_cache = file_io.get_upath(margin_cache) if margin_cache.path in collection.all_margins: return collection.collection_path / margin_cache.path return margin_cache def _load_catalog(hc_catalog: hc.catalog.Dataset, config: HatsLoadingConfig) -> HealpixDataset: config.set_columns_from_catalog_info(hc_catalog.catalog_info) if hc_catalog.schema is None: raise ValueError( "The catalog schema could not be loaded from metadata." " Ensure your catalog has _common_metadata or _metadata files" ) catalog_type = hc_catalog.catalog_info.catalog_type if catalog_type in (CatalogType.OBJECT, CatalogType.SOURCE): catalog = _load_object_catalog(hc_catalog, config) elif catalog_type == CatalogType.MARGIN: catalog = _load_margin_catalog(hc_catalog, config) elif catalog_type == CatalogType.ASSOCIATION: catalog = _load_association_catalog(hc_catalog, config) elif catalog_type == CatalogType.MAP: catalog = _load_map_catalog(hc_catalog, config) else: raise NotImplementedError(f"Cannot load catalog of type {catalog_type}") if ( config.search_filter is not None and len(catalog.get_healpix_pixels()) == 0 and config.error_empty_filter ): raise ValueError("The selected sky region has no coverage") catalog.hc_structure = _update_hc_structure(catalog) if isinstance(catalog, Catalog) and catalog.margin is not None: catalog.margin.hc_structure = _update_hc_structure(catalog.margin) return catalog def _update_hc_structure(catalog: HealpixDataset): """Create the modified schema of the catalog after all the processing on the `read_hats` call""" # pylint: disable=protected-access default_columns = None if catalog.hc_structure.catalog_info.default_columns is not None: default_columns = [ col for col in catalog.hc_structure.catalog_info.default_columns if col in catalog._ddf.exploded_columns ] return catalog._create_modified_hc_structure( updated_schema=get_arrow_schema(catalog._ddf), default_columns=default_columns, ) def _load_association_catalog(hc_catalog, config): """Load a catalog from the configuration specified when the loader was created Returns ------- AssociationCatalog Catalog object with data from the source given at loader initialization """ if hc_catalog.catalog_info.contains_leaf_files: dask_df, dask_df_pixel_map = _load_dask_df_and_map(hc_catalog, config) else: dask_meta_schema = _load_dask_meta_schema(hc_catalog, config) dask_df = nd.NestedFrame.from_single_partition(dask_meta_schema) dask_df_pixel_map = {} return AssociationCatalog(dask_df, dask_df_pixel_map, hc_catalog, loading_config=config) def _load_margin_catalog(hc_catalog, config): """Load a catalog from the configuration specified when the loader was created Returns ------- MarginCatalog Catalog object with data from the source given at loader initialization """ if config.search_filter: hc_catalog = config.search_filter.filter_hc_catalog(hc_catalog) pyarrow_filter = _generate_pyarrow_filters_from_moc(hc_catalog) if len(pyarrow_filter) > 0 and not config.filters: config.filters = pyarrow_filter dask_df, dask_df_pixel_map = _load_dask_df_and_map(hc_catalog, config) margin = MarginCatalog(dask_df, dask_df_pixel_map, hc_catalog, loading_config=config) if config.search_filter is not None: margin = margin.search(config.search_filter) return margin def _load_object_catalog(hc_catalog, config): """Load a catalog from the configuration specified when the loader was created Returns ------- Catalog Catalog object with data from the source given at loader initialization """ if config.search_filter: hc_catalog = config.search_filter.filter_hc_catalog(hc_catalog) if len(hc_catalog.get_healpix_pixels()) == 0 and config.error_empty_filter: raise ValueError("The selected sky region has no coverage") pyarrow_filter = _generate_pyarrow_filters_from_moc(hc_catalog) if len(pyarrow_filter) > 0 and not config.filters: config.filters = pyarrow_filter dask_df, dask_df_pixel_map = _load_dask_df_and_map(hc_catalog, config) catalog = Catalog(dask_df, dask_df_pixel_map, hc_catalog, loading_config=config) if config.search_filter is not None: catalog = catalog.search(config.search_filter) if config.margin_cache is not None: margin_hc_catalog = hc.read_hats(config.margin_cache) margin = _load_margin_catalog(margin_hc_catalog, config) _validate_margin_catalog(margin, catalog) catalog.margin = margin return catalog def _generate_pyarrow_filters_from_moc(filtered_catalog): pyarrow_filter = [] if not ( filtered_catalog.has_healpix_column() and filtered_catalog.catalog_info.healpix_column in filtered_catalog.schema.names ): return pyarrow_filter healpix_column = filtered_catalog.catalog_info.healpix_column healpix_order = filtered_catalog.catalog_info.healpix_order if filtered_catalog.moc is not None: moc = ( filtered_catalog.moc if healpix_order >= filtered_catalog.moc.max_order else filtered_catalog.moc.degrade_to_order(healpix_order) ) depth_array = moc.to_depth29_ranges depth_array = depth_array >> (2 * (29 - healpix_order)) if len(depth_array) > MAX_PYARROW_FILTERS: starts = depth_array.T[0] ends = depth_array.T[1] diffs = starts[1:] - ends[:-1] max_diff_inds = np.argpartition(diffs, -MAX_PYARROW_FILTERS)[-MAX_PYARROW_FILTERS:] max_diff_inds = np.sort(max_diff_inds) reduced_filters = [] for i_start, i_end in zip(np.concat(([0], max_diff_inds)), np.concat((max_diff_inds, [-1]))): reduced_filters.append([starts[i_start], ends[i_end]]) depth_array = np.array(reduced_filters) for hpx_range in depth_array: pyarrow_filter.append([(healpix_column, ">=", hpx_range[0]), (healpix_column, "<", hpx_range[1])]) return pyarrow_filter def _load_map_catalog(hc_catalog, config): """Load a catalog from the configuration specified when the loader was created Returns ------- MapCatalog Catalog object with data from the source given at loader initialization """ dask_df, dask_df_pixel_map = _load_dask_df_and_map(hc_catalog, config) return MapCatalog(dask_df, dask_df_pixel_map, hc_catalog) def _load_dask_meta_schema(hc_catalog, config) -> npd.NestedFrame: """Loads the Dask meta DataFrame from the parquet _metadata file""" columns = config.columns dask_meta_schema = from_pyarrow(hc_catalog.schema.empty_table()) if not hc_catalog.has_healpix_column(): if columns is not None: dask_meta_schema = dask_meta_schema[columns] return dask_meta_schema healpix_column = hc_catalog.catalog_info.healpix_column if columns is not None and healpix_column not in columns: columns = columns + [healpix_column] if columns is not None: dask_meta_schema = dask_meta_schema[columns] if dask_meta_schema.index.name != healpix_column and healpix_column in dask_meta_schema.columns: dask_meta_schema = dask_meta_schema.set_index(healpix_column) if ( config.columns is not None and healpix_column in config.columns and dask_meta_schema.index.name == healpix_column ): config.columns.remove(healpix_column) return dask_meta_schema def _load_dask_df_and_map(catalog: HCHealpixDataset, config) -> tuple[nd.NestedFrame, DaskDFPixelMap]: """Load Dask DF from parquet files and make dict of HEALPix pixel to partition index""" pixels = catalog.get_healpix_pixels() ordered_pixels = np.array(pixels)[get_pixel_argsort(pixels)] divisions = get_pixels_divisions(ordered_pixels) dask_meta_schema = _load_dask_meta_schema(catalog, config) index_column = dask_meta_schema.index.name query_url_params = None if isinstance(file_io.get_upath(catalog.catalog_base_dir).fs, HTTPFileSystem): query_url_params = config.make_query_url_params() npix_suffix = catalog.catalog_info.npix_suffix if len(ordered_pixels) > 0: ddf = nd.NestedFrame.from_map( read_pixel, ordered_pixels, path_generator=config.path_generator, catalog_base_dir=catalog.catalog_base_dir, npix_suffix=npix_suffix, query_url_params=query_url_params, columns=config.columns, schema=catalog.schema, filters=config.filters, index_column=index_column, divisions=divisions, meta=dask_meta_schema, is_dir=(npix_suffix == "/"), **config.kwargs, ) else: ddf = nd.NestedFrame.from_single_partition(dask_meta_schema) pixel_to_index_map = {pixel: index for index, pixel in enumerate(ordered_pixels)} return ddf, pixel_to_index_map def read_pixel( pixel: HealpixPixel, *, path_generator: Callable[[UPath, HealpixPixel, dict | None, str], UPath], catalog_base_dir: UPath, npix_suffix: str, query_url_params: dict | None = None, index_column: str = SPATIAL_INDEX_COLUMN, columns: list[str] | str | None = None, schema: pa.Schema | None = None, is_dir: bool = False, **kwargs, ) -> npd.NestedFrame: """Utility method to read a single pixel's parquet file from disk. NB: `columns` is necessary as an argument, even if None, so that dask-expr optimizes the execution plan. Parameters ---------- pixel : HealpixPixel The HEALPix file whose file is to be read. path_generator : Callable[[UPath, HealpixPixel, dict | None, str], UPath] The object that translates HEALPix to their respective files. index_column : str, default SPATIAL_INDEX_COLUMN The index column. columns: list[str] or str or None, default None The columns to load. schema: pa.Schema or None, default None The pyarrow schema expected for the file. is_dir : bool, optional (Default value = False) If True, the pixel data is stored in a directory. Returns ------- npd.NestedFrame The pixel data, as read from its parquet file. """ return _read_parquet_file( path_generator( catalog_base_dir, pixel, query_url_params, npix_suffix, ), columns=columns, schema=schema, index_column=index_column, is_dir=is_dir, **kwargs, ) def _read_parquet_file( path: UPath, *, columns=None, schema=None, index_column=None, is_dir=False, **kwargs, ) -> npd.NestedFrame: if ( columns is not None and schema is not None and index_column in schema.names and index_column not in columns ): columns = columns + [index_column] dataframe = file_io.read_parquet_file_to_pandas( path, columns=columns, schema=schema, is_dir=is_dir, **kwargs ) if dataframe.index.name != index_column and index_column in dataframe.columns: dataframe = dataframe.set_index(index_column) return dataframe