InfiniteStream#
- class InfiniteStream(catalog: Catalog, client: Client | None = None, partitions_per_chunk: int = 1, seed: int | None = None)[source]#
Stream continuously yielding random subsets of partitions from a catalog.
The data is pre-fetched on the background, ‘n_workers’ number of partitions per time (derived from client object).
- Parameters:
- cataloglsdb.Catalog
A catalog to iterate over.
- clientdask.distributed.Client or None
Dask client for distributed computation. None means running in a synced way with dask.compute() instead of asynced with client.compute().
- partitions_per_chunkint
Number of partitions to yield. It will be clipped to the total number of partitions. Be mindful when setting this value larger than 1, as holding multiple partitions in memory at once will increase memory usage.
- seedint
Random seed to use for observation sampling.
Methods
get_next_partitions(partitions_left, rng)Get the next set of partitions to iterate over.
submit_next_partitions(partitions)Submit the next set of partitions for computation.
Examples
Consider a toy catalog, which contains 12 data partitions:
>>> import lsdb >>> from lsdb.streams import InfiniteStream >>> cat = lsdb.generate_catalog(500, 10, seed=1) >>> cat.npartitions 12
The following grabs 4 random partitions 5 times in a row, looping over the data as needed:
>>> inf_stream = InfiniteStream(catalog=cat, partitions_per_chunk=4, seed=1) >>> cat_iter = iter(inf_stream) >>> for _ in range(5): ... chunk = next(cat_iter) ... print(len(chunk)) 157 185 165 169 185
- __init__(catalog: Catalog, client: Client | None = None, partitions_per_chunk: int = 1, seed: int | None = None) None[source]#
Methods
__init__(catalog[, client, ...])get_next_partitions(partitions_left, rng)Get the next set of partitions to iterate over.
submit_next_partitions(partitions)Submit the next set of partitions for computation.