CatalogStream

CatalogStream#

class CatalogStream(catalog: Catalog, client: Client | None = None, partitions_per_chunk: int = 1, shuffle: bool = True, seed: int | None = None)[source]#

Stream partitons from a catalog

The data is pre-fetched on the background, ‘n_workers’ number of partitions per time (derived from client object).

Parameters:
cataloglsdb.Catalog

A catalog to iterate over.

clientdask.distributed.Client or None

Dask client for distributed computation. None means running in a synced way with dask.compute() instead of asynced with client.compute().

partitions_per_chunkint

Number of partitions to yield. It will be clipped to the total number of partitions. Be mindful when setting this value larger than 1, as holding multiple partitions in memory at once will increase memory usage.

shufflebool

Whether to shuffle the partition order before streaming. If False, the partitions will be streamed in their original order. True by default. Additionally, if shuffle is True, the rows within each partition will also be shuffled.

seedint

Random seed to use for observation sampling, when shuffling partitions.

Methods

get_next_partitions(partitions_left, rng)

Get the next set of partitions to iterate over.

submit_next_partitions(partitions)

Submit the next set of partitions for computation.

Examples

Consider a toy catalog, which contains 12 data partitions:

>>> import lsdb
>>> from lsdb.streams import CatalogStream
>>> cat = lsdb.generate_catalog(500, 10, seed=1)
>>> cat.npartitions
12

The following grabs 4 random partitions 5 times in a row, looping over the data as needed:

>>> cat_stream = CatalogStream(catalog=cat, partitions_per_chunk=4, seed=1)
>>> for chunk in cat_stream:
...     print(len(chunk))
171
154
175
__init__(catalog: Catalog, client: Client | None = None, partitions_per_chunk: int = 1, shuffle: bool = True, seed: int | None = None) None[source]#

Methods

__init__(catalog[, client, ...])

get_next_partitions(partitions_left, rng)

Get the next set of partitions to iterate over.

submit_next_partitions(partitions)

Submit the next set of partitions for computation.