"""Pasteur's data utilities. The main funcitonality provided by this module
is `LazyPartition` and `LazyDataset`, with their specializations for `pandas`:
`LazyFrame`, `LazyChunk`.
These data types allow for loading dataset partitions on command, and when the
data is no longer useful, evacuating it from RAM using the `del` keyword."""
from __future__ import annotations
import logging
from functools import partial, update_wrapper
from itertools import chain
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generic,
Mapping,
NamedTuple,
ParamSpec,
TypeVar,
overload,
Union,
)
if TYPE_CHECKING:
import pandas as pd
A = TypeVar("A")
B = TypeVar("B")
P = ParamSpec("P")
G = TypeVar("G", bound=Union[tuple, list, dict])
logger = logging.getLogger(__name__)
[docs]
class RawSource(NamedTuple):
"""Represents a raw data source that can be downloaded.
`files` is a list or a single URI that points to an S3 directory, index listing,
or file.
Files will be saved to `<raw_directory>/<save_name>` or the raw source name/dataset
name if not provided..
HTTP basic auth is supported and can be enabled by setting `credentials` to True.
`description` is shown by the command `pasteur download` and should contain
licensing information.
@warning: Experimental API, subject to change."""
files: str | list[str]
save_name: str | None = None
credentials: bool = False
desc: str | None = None
[docs]
class LazyPartition(Generic[A]):
def __init__(
self,
fun: Callable[..., A],
shape_fun: Callable[..., tuple[int, ...]] | None,
/,
*args,
**kwargs,
):
self.fun = fun
self.shape_fun = shape_fun
self.args = args
self.kwargs = kwargs
def __call__(
self, columns: list[str] | None = None, chunksize: int | None = None
) -> A:
new_kw = {}
if columns is not None:
new_kw["columns"] = columns
if chunksize is not None:
new_kw["chunksize"] = chunksize
try:
return self.fun(*self.args, **self.kwargs, **new_kw)
except TypeError:
return self.fun(*self.args, **self.kwargs)
@property
def partitioned(self):
return False
@property
def shape(self):
if self.shape_fun is not None:
return self.shape_fun(*self.args, **self.kwargs)
return self().shape # type: ignore
LazyChunk = LazyPartition["pd.DataFrame"]
def _extract_lazy(d) -> list[LazyDataset]:
out = []
if isinstance(d, tuple) or isinstance(d, list):
for k in d:
out.extend(_extract_lazy(k))
elif isinstance(d, dict):
for k in d.values():
out.extend(_extract_lazy(k))
elif isinstance(d, LazyDataset):
out.append(d)
# skip non-lazy arguments
return out
def _partition_lazy(d, key: str):
if isinstance(d, tuple):
return tuple([_partition_lazy(v, key) for v in d])
if isinstance(d, list):
return [_partition_lazy(v, key) for v in d]
if isinstance(d, dict):
return {k: _partition_lazy(v, key) for k, v in d.items()}
if isinstance(d, LazyDataset):
return d[key] if d.partitioned else d
return d
def _dummy_return(a, *b, **c):
return a
def _wrap_lazy(d, cls):
if isinstance(d, tuple):
return tuple([_wrap_lazy(v, cls) for v in d])
if isinstance(d, list):
return [_wrap_lazy(v, cls) for v in d]
if isinstance(d, dict):
return {k: _wrap_lazy(v, cls) for k, v in d.items()}
return cls(partial(_dummy_return, d))
def _load_partition(_partition, _cache, _id, _pid):
key = (_id, _pid)
if key in _cache:
return _cache[key]
data = _partition()
_cache[key] = data
return data
def _cache_partitions(d, cls, _cache=None, _ofs=0):
if _cache is None:
_cache = {}
if isinstance(d, tuple):
return tuple(
[_cache_partitions(v, cls, _cache, _ofs + i) for i, v in enumerate(d)]
)
if isinstance(d, list):
return [_cache_partitions(v, cls, _cache, _ofs + i) for i, v in enumerate(d)]
if isinstance(d, dict):
return {
k: _cache_partitions(v, cls, _cache, _ofs + i)
for i, (k, v) in enumerate(d.items())
}
_ofs += 1
if isinstance(d, LazyDataset):
merged = (
LazyPartition(
partial(
_load_partition,
_partition=d.merged_load,
_cache=_cache,
_id=_ofs,
_pid=None,
),
d.merged_load.shape_fun,
)
if d.merged_load is not None
else None
)
partitions = (
{
k: LazyPartition(
partial(
_load_partition, _partition=v, _cache=_cache, _id=_ofs, _pid=i
),
v.shape_fun,
)
for i, (k, v) in enumerate(d._partitions.items())
}
if d._partitions is not None
else None
)
return cls(merged, partitions)
if isinstance(d, LazyPartition):
return LazyPartition(
partial(_load_partition, _partition=d, _id=_ofs, _pid=None), d.shape_fun
)
return d
[docs]
class LazyDataset(Generic[A], LazyPartition[A]):
def __init__(
self,
merged_load: LazyPartition[A] | None,
partitions: dict[str, LazyPartition[A]] | None = None,
) -> None:
self._partitions = partitions
self.merged_load = merged_load
def __call__(
self, columns: list[str] | None = None, chunksize: int | None = None
) -> Any:
assert self.merged_load, f"Merged loading is not implemented for this dataset."
if self.partitioned:
logger.warning(
f"Loading partitioned dataset as a whole, this may cause memory issues."
)
return self.merged_load(columns=columns, chunksize=chunksize)
def __getitem__(self, pid: str) -> LazyPartition[A]:
assert self._partitions, "No partitions found."
return self._partitions[pid]
def __iter__(self):
if self._partitions:
return iter(self._partitions)
return iter([self.merged_load])
@property
def shape(self):
if self.merged_load is not None:
return self.merged_load.shape
partitions = list(self.values())
if len(partitions) == 0:
return tuple()
if len(partitions) == 1:
return partitions[0].shape
shape = list(partitions[0].shape)
for part in partitions[1:]:
shape[0] += part.shape[0]
return tuple(shape)
@property
def partitioned(self):
return self._partitions is not None and len(self._partitions) > 1
@property
def sample(self):
if self._partitions:
return next(iter(self._partitions.values()))
assert self.merged_load is not None, f"LazyDataset is empty."
return self.merged_load
[docs]
def keys(self):
assert self._partitions, f"Dataset not partitioned, check `.partitioned` first."
return self._partitions.keys()
[docs]
def values(self):
if not self._partitions:
assert self.merged_load
return [self.merged_load]
return self._partitions.values()
[docs]
def items(self):
assert self._partitions, f"Dataset not partitioned, check `.partitioned` first."
return self._partitions.items()
[docs]
@staticmethod
def are_partitioned(*positional, **keyword):
"""Returns whether the provided datasets are partitioned. If they are,
checks they have the same partitions."""
partitions = _extract_lazy(positional) + _extract_lazy(keyword)
# Check keys are correct
keys = None
for partition in partitions:
if not partition.partitioned:
continue
if keys == None:
keys = set(partition.keys())
else:
new_keys = set(partition.keys())
assert keys == new_keys, f"Found different keys {keys} != {new_keys}"
return keys is not None
@staticmethod
@overload
def zip(datasets: G, /) -> dict[str, G]: ...
@staticmethod
@overload
def zip(*positional: LazyDataset[B]) -> dict[str, list[LazyPartition[B]]]: ...
@staticmethod
@overload
def zip(**keyword: LazyDataset[B]) -> dict[str, dict[str, LazyPartition[B]]]: ...
@staticmethod
@overload
def zip(
*positional: LazyDataset[B], **keyword: LazyDataset[B]
) -> dict[str, tuple[list[LazyPartition[B]], dict[str, LazyPartition[B]]]]: ...
[docs]
@staticmethod
def zip(*positional, **keyword):
"""Aligns and returns a dictionary of partition ids to partitions.
Partitions can be a list, if positional arguments were provided, or a dictionary
if keyword arguments were provided.
@warning: all partitioned sets should have the same keys."""
if positional and keyword:
vals = [positional, keyword]
elif positional and len(positional) == 1:
vals = positional[0]
elif positional:
vals = positional
elif keyword:
vals = keyword
else:
vals = []
# Check keys are correct
keys = None
for partition in _extract_lazy(vals):
if not partition.partitioned:
continue
if keys == None:
keys = list(partition.keys())
else:
assert set(keys) == set(partition.keys())
assert (
keys
), f"None of the datasets are partitioned (or one is empty). Call `are_partitioned()` first."
return {key: _partition_lazy(vals, key) for key in sorted(keys)}
[docs]
@staticmethod
def zip_values(*positional, **keyword) -> list:
"""Same as zip, but doesn't return partition names and works even if
the datasets are not partitioned, by returning a single partition."""
if not LazyDataset.are_partitioned(positional, keyword):
if positional and keyword:
return [(positional, keyword)]
elif positional and len(positional) == 1:
return [positional[0]]
elif positional:
return [positional]
elif keyword:
return [keyword]
return [()]
return list(LazyDataset.zip(*positional, **keyword).values())
[docs]
def separate(
**datasets: LazyDataset[A],
) -> tuple[dict[str, LazyDataset[A]], dict[str, LazyDataset[A]]]:
"""Splits the datasets into partitioned and not partitioned and returns them.
`non_partitioned, partitioned = separate_partitioned(datasets)`"""
return {name: ds for name, ds in datasets.items() if not ds.partitioned}, {
name: ds for name, ds in datasets.items() if ds.partitioned
}
def __len__(self):
if self._partitions:
return len(self._partitions)
if self.merged_load:
return 1
return 0
[docs]
@classmethod
def wrap(cls, *positional, **keyword) -> Any:
"""Converts provided arguments to lazy. Tuples, dicts, and lists are traversed,
and every object found in them is wrapped in a LazyDataset."""
if positional and keyword:
return _wrap_lazy((positional, keyword), cls)
elif positional and len(positional) == 1:
return _wrap_lazy(positional[0], cls)
elif positional:
return _wrap_lazy(positional, cls)
elif keyword:
return _wrap_lazy(keyword, cls)
return None
[docs]
@classmethod
def cache(cls, *positional, **keyword) -> Any:
if positional and keyword:
return _cache_partitions((positional, keyword), cls)
elif positional and len(positional) == 1:
return _cache_partitions(positional[0], cls)
elif positional:
return _cache_partitions(positional, cls)
elif keyword:
return _cache_partitions(keyword, cls)
return None
LazyFrame = LazyDataset["pd.DataFrame"]
[docs]
def get_relative_fn(fn: str):
"""Returns the directory of a file relative to the script calling this function."""
import inspect
import os
script_fn = inspect.currentframe().f_back.f_globals["__file__"] # type: ignore
dirname = os.path.dirname(script_fn)
return os.path.join(dirname, fn)
[docs]
def list_unique(*args: list[A]) -> list[A]:
return list(dict.fromkeys(chain(*args)))
[docs]
class gen_closure(partial):
"""Creates a closure for function `fun`, by passing the positional arguments
provided in this function to `fun` before the ones given to the function and
by passing the sum of named arguments given to both functions.
The closure retains the original function name. If desired, it can
be renamed using the `_fn` parameter. If fn contains `%s`, it will be
replaced with the function name.
The `_eat` parameter can be used to consume keyword arguments passed to the
child function. Ex. if `_eat=["bob"]`, passing `bob=safd` will have no effect
on the bound function.
The `_return` parameter can override what the function returns."""
def __new__(
cls,
func,
/,
*args,
_fn: str | None = None,
_eat: list[str] | None = None,
_return: Any | None = None,
**keywords,
):
self = super().__new__(cls, func, *args, **keywords)
if hasattr(func, "func"):
_eat = _eat or func._eat
_return = _return or func._return
func = func.func
if _fn:
if hasattr(func, "__name__"):
self.__name__ = _fn.replace("%s", func.__name__) # type: ignore
else:
self.__name__ = _fn # type: ignore
else:
if hasattr(func, "__name__"):
self.__name__ = func.__name__ # type: ignore
else:
self.__name__ = "ukn" # type: ignore
# FIXME: probably get lost during multiprocessing
self._eat = _eat # type: ignore
self._return = _return # type: ignore
return self
def __call__(self, /, *args, **keywords):
kw = keywords.copy()
if self._eat: # type: ignore
for e in self._eat: # type: ignore
kw.pop(e, None)
keywords = {**self.keywords, **kw}
val = self.func(*self.args, *args, **keywords)
if self._return is not None: # type: ignore
return self._return # type: ignore
return val
def _chunk_fun(fun, *args: Any, **kwargs: Any) -> set[Callable]:
# If datasets are not partitioned, return a closure
if not LazyDataset.are_partitioned(*args, **kwargs):
return {gen_closure(fun, *args, _canary=True, **kwargs)}
# If they are, create proper arguments for each function call.
closures = set()
for pid, (cargs, ckwargs) in LazyDataset.zip([args, kwargs]).items():
closures.add(gen_closure(fun, *cargs, _canary=True, _partition=pid, **ckwargs))
return closures
def _to_partition(d, key):
if isinstance(d, dict):
return {k: _to_partition(v, key) for k, v in d.items()}
elif isinstance(d, list) or isinstance(d, tuple):
return [_to_partition(v, key) for v in d]
else:
return {key: d}
[docs]
def to_chunked(func: Callable[P, A], /) -> Callable[P, set[Callable[..., A]]]:
"""Makes wrapped function lazy evaluate. If args contain forms of
`LazyDataset`, a set of lazy functions are returned, each one loading
one partition."""
def wrapper(*args, _canary=False, _partition: str | None = None, **kwargs):
"""Wrapper acts like the real function when `_canary` is True, otherwise
it runs `_chunk_fun()`.
This way, the decorated function can both be used to create the lazy dictionary
when called by end users and perform the evaluation when ran by one of the
closures."""
if _canary:
out = func(*args, **kwargs) # type: ignore
# Try to partition output if partition is provided
# dictionaries will have their values become a nested dictionary
# with 1 key, the partition
# Similarly with lists
# Else, the output itself is wrapped
if _partition:
return _to_partition(out, _partition)
return out
else:
return _chunk_fun(wrapper, *args, **kwargs)
update_wrapper(wrapper, func)
# # Update annotations
# new_annotations = {}
# for param, orig in getattr(func, "__annotations__", {}).items():
# annotation = str(orig)
# if annotation == "LazyChunk":
# annotation = "LazyFrame"
# elif annotation == "LazyFrame":
# raise AttributeError("Can't wrap a LazyFrame with to_chunked")
# elif annotation.startswith("LazyDataset"):
# raise AttributeError("Can't wrap a LazyDataset with to_chunked")
# elif annotation.startswith("LazyPartition"):
# annotation = "LazyDataset" + annotation[len("LazyPartition") :]
# else:
# annotation = (
# orig # preserve annotation object if it's not one of the lazies
# )
# new_annotations[param] = annotation
# wrapper.__annotations__ = new_annotations
return wrapper # type: ignore
[docs]
def apply_fun(obj: Any, *args, _fun: str, **kwargs):
"""Runs function with name `_fun` of object `obj` with the provided arguments."""
return getattr(obj, _fun)(*args, **kwargs)
@overload
def data_to_tables(
data: Mapping[str, LazyDataset],
) -> tuple[dict[str, LazyFrame], dict[str, LazyFrame]]: ...
@overload
def data_to_tables(
data: Mapping[str, LazyPartition],
) -> tuple[dict[str, LazyChunk], dict[str, LazyChunk]]: ...
@overload
def data_to_tables(data: Mapping[str, A]) -> tuple[dict[str, A], dict[str, A]]: ...
[docs]
def data_to_tables(data):
# Use old format
ids = {}
tables = {}
for name, datum in data.items():
if name.endswith("_ids"):
ids[name[:-4]] = datum
else:
tables[name] = datum
return ids, tables
[docs]
def data_to_tables_ctx(data):
# Use old format
ids = {}
tables = {}
ctx = {}
for name, datum in data.items():
if name.endswith("_ids"):
ids[name[:-4]] = datum
elif name.endswith("_ctx"):
creator, parent = name[:-4].split("#")
if creator not in ctx:
ctx[creator] = {}
ctx[creator][parent] = datum
else:
tables[name] = datum
return ids, tables, ctx
[docs]
def tables_to_data(
ids: Mapping[str, Any],
tables: Mapping[str, Any],
ctx: Mapping[str, Mapping[str, Any]] = {},
):
ctx_out = {}
for creator, ctxc in ctx.items():
for parent, ctxv in ctxc.items():
ctx_out[f"{creator}#{parent}_ctx"] = ctxv
return {**{f"{n}_ids": v for n, v in ids.items()}, **tables, **ctx_out}
[docs]
def lazy_load_tables(tables: Mapping[str, LazyChunk | pd.DataFrame]):
"""Lazy loads partitions and keeps them in-memory in a closure.
Once the functions go out of scope, the partitions are released."""
cached_tables: dict[str, pd.DataFrame] = {}
def _get_table(name: str):
if name in cached_tables:
return cached_tables[name]
candidate = tables[name]
if callable(candidate):
table = candidate()
else:
table = candidate
cached_tables[name] = table
return table
return _get_table
[docs]
def get_relationships(
ids: dict[str, LazyFrame],
):
from collections import defaultdict
full_relationships = defaultdict(list)
# Find parents
for name, table_ids in ids.items():
for parent in table_ids.sample().columns:
full_relationships[parent].append(name)
# Trim leaf tables
relationships = {}
for name in list(full_relationships.keys()):
tmp = list(full_relationships[name])
for child in full_relationships[name]:
for k in full_relationships[child]:
if k in tmp:
tmp.remove(k)
relationships[name] = tmp
return relationships