Source code for pasteur.graph.loss
from functools import reduce
from typing import Callable, NamedTuple, Sequence
import numpy as np
from ..attribute import DatasetAttributes, get_dtype
from .beliefs import IndexArg, convert_sel, has_attr, is_same, deduplicate
from .hugin import CliqueMeta, get_attrs, get_clique_domain
[docs]
def sel_is_subset(source_sel, clique_sel):
if isinstance(source_sel, int):
# If source sel is an int, if clique sel is not it eclipses it,
# if it is we compare heights
if isinstance(clique_sel, int):
return source_sel >= clique_sel
else:
return True
# Source sel is not an int, if clique sel is, that means it does not
# eclipse source sel
if isinstance(clique_sel, int):
return False
for sn, sh in source_sel:
found = False
for cn, ch in clique_sel:
if cn == sn:
if sh < ch:
return False
found = True
if not found:
return False
return True
[docs]
def clique_is_subset(source, clique):
for c in source:
found = False
for s in clique:
if c.table == s.table and c.order == s.order and c.attr == s.attr:
found = True
if not sel_is_subset(c.sel, s.sel):
return False
if not found:
return False
return True
[docs]
def get_parents(source: CliqueMeta, cliques: Sequence[CliqueMeta]):
return [c for c in cliques if clique_is_subset(source, c)]
[docs]
def get_smallest_parent(
source: CliqueMeta, cliques: Sequence[CliqueMeta], attrs: DatasetAttributes
):
p = get_parents(source, cliques)
assert p, f"No parents found for clique: {source}. Should be impossible."
return min(p, key=lambda x: get_clique_domain(x, attrs))
[docs]
class LinearObservation(NamedTuple):
"""Observations against which the loss is computed.
Mapping can be any differentiable function, but in this case we limit it to
a matrix multiplication or none.
All cliques that superset completely the `source` clique will be adjusted to
it, and matrix multiplied to `mapping` to produce the expected `obs'`.
Then, the selected loss function will be applied to them.
`confidence` is a normalized value that signifies how trustworthy this
observation is from [0, 1] and is used to scale the loss produced by the
observation. It should be drawn from the expected SNR of the observation,
given the expected sampling noise and the noise added by differential privacy.
The `mapping` size is very large (O(N^2) of the clique), so it should be avoided
if possible. If set to None, a matrix multiplication is not performed.
"""
source: CliqueMeta
mapping: np.ndarray | None
obs: np.ndarray
confidence: float
[docs]
class ParentMeta(NamedTuple):
"""Contains the necessary metadata to align a parent clique to an observation
clique.
First, attributes that are not present in an observation clique are summed
out. Then the common dimensions that are different are transposed to the
front and indexed. Finally, the transpose is undone and the cliques are
aligned."""
sum_dims: tuple[int, ...]
idx: np.ndarray | None = None
b_doms: tuple[int, ...] = tuple()
transpose: tuple[int, ...] = tuple()
transpose_undo: tuple[int, ...] = tuple()
[docs]
def get_parent_meta(
source: CliqueMeta, parent: CliqueMeta, attrs: DatasetAttributes
) -> ParentMeta:
# Use the parent with the smallest domain as that should be faster
# computationally (?)
sum_dims = []
args: list[IndexArg | None] = []
for i in range(len(parent)):
if has_attr(source, parent[i]):
j = 0
while not is_same(parent[i], source[j]):
j += 1
if parent[i].sel != source[j].sel:
attr = get_attrs(attrs, parent[i].table, parent[i].order)[
parent[i].attr
]
a_map = attr.get_mapping(convert_sel(parent[i].sel))
a_dom = attr.get_domain(convert_sel(parent[i].sel))
b_map = attr.get_mapping(convert_sel(source[j].sel))
b_dom = attr.get_domain(convert_sel(source[j].sel))
assert len(a_map) == len(b_map)
assert np.max(a_map) < a_dom and np.max(b_map) < b_dom
args.append(IndexArg(tuple(a_map), tuple(b_map), a_dom, b_dom))
else:
args.append(None)
else:
sum_dims.append(i)
transpose_front = []
transpose_back = []
idx_a_dims = []
idx_a_doms = []
idx_b_dims = []
idx_b_doms = []
for i, arg in enumerate(args):
if arg:
# FIXME: Unique does nothing
# uniques = np.unique(np.stack([arg.a_map, arg.b_map]), axis=1)
uniques = deduplicate(arg.a_map, arg.b_map)
idx_a_dims.append(uniques[0, :])
idx_a_doms.append(arg.a_dom)
idx_b_dims.append(uniques[1, :])
idx_b_doms.append(arg.b_dom)
transpose_front.append(i)
else:
transpose_back.append(i)
# When reshaping is not needed, the `if arg` block
# of the loop won't run so arrays will be empty. SKip.
if not idx_a_dims:
return ParentMeta(tuple(sum_dims), None)
dtype_a = get_dtype(reduce(lambda a, b: a * b, idx_a_doms))
dtype_b = get_dtype(reduce(lambda a, b: a * b, idx_b_doms))
mesh_a = np.meshgrid(*idx_a_dims, indexing="ij")
mesh_b = np.meshgrid(*idx_b_dims, indexing="ij")
idx_a = np.zeros_like(mesh_a[0], dtype=dtype_a)
idx_b = np.zeros_like(mesh_a[0], dtype=dtype_b)
a_dom = 1
b_dom = 1
for i in reversed(list(range(len(idx_a_doms)))):
idx_a += a_dom * mesh_a[i].astype(dtype_a)
idx_b += b_dom * mesh_b[i].astype(dtype_b)
a_dom *= idx_a_doms[i]
b_dom *= idx_b_doms[i]
idx_a = idx_a.reshape(-1)
idx_b = idx_b.reshape(-1)
# Only a single index op is performed to align parent to source
# where each unique value in idx_a is mapped to idx_b
_, uidx = np.unique(idx_a, return_index=True)
idx = idx_b[uidx]
b_doms = tuple(idx_b_doms)
transpose = tuple(transpose_front + transpose_back)
transpose_undo = tuple(transpose.index(i) for i in range(len(transpose)))
return ParentMeta(tuple(sum_dims), idx, b_doms, transpose, transpose_undo)