InfiniteStream

InfiniteStream#

class InfiniteStream(catalog: Catalog, client: Client | None = None, partitions_per_chunk: int = 1, seed: int | None = None)[source]#

Stream continuously yielding random subsets of partitions from a catalog.

The data is pre-fetched on the background, ‘n_workers’ number of partitions per time (derived from client object).

Parameters:
cataloglsdb.Catalog

A catalog to iterate over.

clientdask.distributed.Client or None

Dask client for distributed computation. None means running in a synced way with dask.compute() instead of asynced with client.compute().

partitions_per_chunkint

Number of partitions to yield. It will be clipped to the total number of partitions. Be mindful when setting this value larger than 1, as holding multiple partitions in memory at once will increase memory usage.

seedint

Random seed to use for observation sampling.

Methods

get_next_partitions(partitions_left, rng)

Get the next set of partitions to iterate over.

submit_next_partitions(partitions)

Submit the next set of partitions for computation.

Examples

Consider a toy catalog, which contains 12 data partitions:

>>> import lsdb
>>> from lsdb.streams import InfiniteStream
>>> cat = lsdb.generate_catalog(500, 10, seed=1)
>>> cat.npartitions
12

The following grabs 4 random partitions 5 times in a row, looping over the data as needed:

>>> inf_stream = InfiniteStream(catalog=cat, partitions_per_chunk=4, seed=1)
>>> cat_iter = iter(inf_stream)
>>> for _ in range(5):
...     chunk = next(cat_iter)
...     print(len(chunk))
157
185
165
169
185
__init__(catalog: Catalog, client: Client | None = None, partitions_per_chunk: int = 1, seed: int | None = None) None[source]#

Methods

__init__(catalog[, client, ...])

get_next_partitions(partitions_left, rng)

Get the next set of partitions to iterate over.

submit_next_partitions(partitions)

Submit the next set of partitions for computation.