from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
import pandas as pd
from ....synth import Synth, make_deterministic
from ....utils import LazyFrame, data_to_tables, tables_to_data
if TYPE_CHECKING:
from ....attribute import Attributes
from ....marginal import MarginalOracle
[docs]
def compress_domain(data, measurements):
supports = {}
new_measurements = []
for Q, y, sigma, proj in measurements:
col = proj[0]
sup = y >= 3 * sigma
supports[col] = sup
if supports[col].sum() == y.size:
new_measurements.append((Q, y, sigma, proj))
else: # need to re-express measurement over the new domain
y2 = np.append(y[sup], y[~sup].sum())
I2 = np.ones(y2.size)
I2[-1] = 1.0 / np.sqrt(y.size - y2.size + 1.0)
y2[-1] /= np.sqrt(y.size - y2.size + 1.0)
I2 = sparse.diags(I2)
new_measurements.append((I2, y2, sigma, proj))
undo_compress_fn = lambda data: reverse_data(data, supports)
return transform_data(data, supports), new_measurements, undo_compress_fn
[docs]
def MST_out_of_core(data, epsilon, delta):
import numpy as np
from mst import FactoredInference, cdp_rho, measure, select
rho = cdp_rho(epsilon, delta)
sigma = np.sqrt(3 / (2 * rho))
cliques = [(col,) for col in data.domain]
log1 = measure(data, cliques, sigma)
data, log1, undo_compress_fn = compress_domain(data, log1)
cliques = select(data, rho / 3.0, log1)
log2 = measure(data, cliques, sigma)
engine = FactoredInference(data.domain, iters=5000)
est = engine.estimate(log1 + log2)
return est, supports
logger = logging.getLogger(__name__)
[docs]
class MST(Synth):
name = "mst"
type = "idx"
tabular = True
multimodal = False
timeseries = False
parallel = True
def __init__(
self,
e: float = 1.0,
delta: float = 1e-9,
num_marginals: int = 0,
degree: int = 2,
max_cells: int = 2**16,
marginal_mode: "MarginalOracle.MODES" = "out_of_core",
seed: int | None = None,
n: int | None = None,
partitions: int | None = None,
**kwargs,
) -> None:
self.e = e
self.delta = delta
self.num_marginals = num_marginals
self.degree = degree
self.max_cells = max_cells
self.seed = seed
self.n = n
self.partitions = partitions
self.marginal_mode: "MarginalOracle.MODES" = marginal_mode
self.kwargs = kwargs
[docs]
@make_deterministic
def preprocess(self, meta: dict[str, Attributes], data: dict[str, LazyFrame]):
self.table = next(iter(meta))
self.attrs = meta
self._n = data[self.table].shape[0]
self._partitions = len(data[self.table])
[docs]
@make_deterministic
def bake(self, data: dict[str, LazyFrame]):
pass
[docs]
@make_deterministic
def fit(self, data: dict[str, LazyFrame]):
import itertools
import numpy as np
from mst import MST as MSTimpl
from ....marginal import MarginalOracle
from .common import OracleDataset
ids, tables = data_to_tables(data)
table = tables[self.table]
self.partitions = self.partitions or len(table)
self.n = self.n or (table.shape[0] // self.partitions)
with MarginalOracle(
self.attrs[self.table], tables[self.table], mode=self.marginal_mode
) as o:
data = OracleDataset(o)
workload = list(itertools.combinations(data.domain, self.degree))
workload = [cl for cl in workload if data.domain.size(cl) <= self.max_cells]
if self.num_marginals > 0:
workload = [
workload[i]
for i in np.random.choice(
len(workload), self.num_marginals, replace=False
)
]
workload = [(cl, 1.0) for cl in workload]
self.model = MSTimpl(data, self.e, self.delta)
[docs]
@make_deterministic("i")
def sample_partition(self, *, n: int, i: int = 0) -> dict[str, Any]:
data = self.model.synthetic_data(n or self.n)
return tables_to_data({self.table: pd.DataFrame()}, {self.table: data.df})