Source code for pasteur.extras.synth.extern

from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any

from ...synth import Synth

if TYPE_CHECKING:
    import pandas as pd

    from ...attribute import Attributes, IdxValue

logger = logging.getLogger(__name__)


[docs]class ExternalPythonSynth(Synth, ABC): """Abstract class for calling synthesis algorithms written in python from other projects. Can use a custom virtual environment, custom project directory, and custom command. With parameters defined per execution.""" type = "idx" tabular = True multimodal = False timeseries = False gpu = True _pg_args = {"index": False} def __init__( self, cmd: str, venv: str | None = None, dir: str | None = None, rebalance_columns: bool = True, **_, ) -> None: super().__init__(**_) self.venv = venv self.dir = dir self.cmd = cmd self.rebalance_columns = rebalance_columns @property def _logger(self): return logging.getLogger(f"extern.{self.name if self.name else 'ukn'}")
[docs] def preprocess( self, attrs: dict[str, Attributes], data: dict[str, pd.DataFrame], ids: dict[str, pd.DataFrame], ): self.attrs = attrs
[docs] def bake( self, data: dict[str, pd.DataFrame], ids: dict[str, pd.DataFrame], ): pass
[docs] def fit(self, data: dict[str, pd.DataFrame], ids: dict[str, pd.DataFrame]): import json import shlex import signal import subprocess from os import path from tempfile import TemporaryDirectory from threading import Thread import pandas as pd param_dict, data_in, data_out = self._prepare_synthesis_data( self.attrs, data, ids ) with TemporaryDirectory(f"_{self.name}") as dir: fns = {} # Save input data to temporary files for name, (type, data) in data_in.items(): match type: case "json": fn = path.join(dir, f"{name}.json") with open(fn, "w") as f: json.dump(data, f) case "csv": fn = path.join(dir, f"{name}.csv") assert isinstance(data, pd.DataFrame) data.to_csv(fn, **self._pg_args) case _: assert False, "Unknown type" fns[name] = fn for name, type in data_out.items(): fns[name] = path.join(dir, f"{name}.{type}") # Create command # python3 or path/to/venv/bin/python3 python_bin = path.join(self.venv, "bin/python3") if self.venv else "python3" # can call a module with -m, or include a dir and make the command a # python script if self.dir: # path/to/proj/test.py cmd = path.join(self.dir, self.cmd) else: # -m kedro ... cmd = self.cmd # Unfold the dictionary params = "" if isinstance(param_dict, dict): for param, val in param_dict.items(): params += f"{param} {val} " else: for param in param_dict: params += f"{param} " full_cmd = f"{python_bin} {cmd} {params}" # Replace file names with directory full_cmd = full_cmd.format_map(fns) # Run command self._logger.info(f"Running command {self.cmd} as: \n{full_cmd}") def _log_pipe(proc, is_err): if is_err: f = self._logger.warn s = proc.stderr else: f = self._logger.info s = proc.stdout while True: line = s.readline() if not line: break f(line[:-1]) proc = None tout = None terr = None try: def preexec_function(): signal.signal(signal.SIGINT, signal.SIG_IGN) proc = subprocess.Popen( shlex.split(full_cmd), text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=preexec_function, ) tout = Thread(target=_log_pipe, args=(proc, False)) terr = Thread(target=_log_pipe, args=(proc, True)) tout.start() terr.start() proc.wait() if tout: tout.join() if terr: terr.join() except Exception: if proc and not proc.poll(): proc.terminate() if tout: tout.join() if terr: terr.join() raise # Load output data loaded_out = {} for name, type in data_out.items(): fn = fns[name] match type: case "csv": loaded_out[name] = pd.read_csv(fn) case "json": loaded_out[name] = json.load(fn) self.loaded_out = loaded_out
[docs] def sample( self, n: int | None = None ) -> tuple[dict[str, pd.DataFrame], dict[str, pd.DataFrame]]: assert n is None, "Variable n not supported" return self._return_synthetic_data(self.loaded_out)
@abstractmethod def _prepare_synthesis_data( self, attrs: dict[str, Attributes], data: dict[str, pd.DataFrame], ids: dict[str, pd.DataFrame], ) -> tuple[dict[str, Any] | list[Any], dict[str, tuple[str, Any]], dict[str, str]]: """Called during fit. Should return a parameter dictionary for the synthesis command, a dictionary of `<name>` to `(<type>,<data>)` tuples, where type is json or csv, and a dictionary of <name> to <type> tuples for output data The data in the input dictionary is saved to the filesystem in a temporary directory. The names in the output dictionary are associated with temporary files. Parameters containing `{name}` are substituted by the filename of the file with that name in the input/output dictionary. """ pass @abstractmethod def _return_synthetic_data( self, data: dict[str, Any] ) -> tuple[dict[str, pd.DataFrame], dict[str, pd.DataFrame]]: """Receives as input a dictionary of `<name>` to `<data>` pairs. This function is expected to output a data, ids pair of DataFrame dictionaries, in the same format the sample function expects.""" pass
[docs]class AimSynth(ExternalPythonSynth): name = "aim" type = "idx" tabular = True multimodal = False timeseries = False gpu = True def __init__( self, e: float = 1, delta: float | None = None, max_model_size: int = 80, rebalance_columns: bool = False, seed: int | None = None, deterministic_upsample: bool = False, **kwargs, ) -> None: super().__init__(cmd="mechanisms/aim.py", **kwargs) self.e = e self.delta = delta self.max_model_size = max_model_size self.seed = seed self.rebalance_columns = rebalance_columns self.deterministic_upsample = deterministic_upsample self.kwargs = kwargs def _prepare_synthesis_data( self, attrs: dict[str, Attributes], data: dict[str, pd.DataFrame], ids: dict[str, pd.DataFrame], ) -> tuple[dict[str, Any] | list[Any], dict[str, tuple[str, Any]], dict[str, str]]: import pandas as pd assert len(data) == 1 self.table_name = next(iter(data)) table = data[self.table_name] table_attrs = attrs[self.table_name] if self.rebalance_columns: from ...hierarchy import rebalance_attributes table_attrs = rebalance_attributes( table, table_attrs, reshape_domain=False, **self.kwargs ) self.table_attrs = table_attrs n = len(table) params = { "--dataset": "{dataset}", "--domain": "{domain}", "--save": "{save}", "--epsilon": self.e, "--delta": self.delta if self.delta != None else 1 / n / 10, "--max_model_size": self.max_model_size, } domain = {} downsampled_table = pd.DataFrame(index=table.index) for attr in table_attrs.values(): for name, col in attr.vals.items(): assert isinstance(col, IdxValue) h = col.select_height() domain[name] = col.get_domain(h) downsampled_table[name] = col.downsample(table[name].to_numpy(), h) data_in = {"dataset": ("csv", downsampled_table), "domain": ("json", domain)} data_out = {"save": "csv"} return params, data_in, data_out def _return_synthetic_data( self, data: dict[str, Any] ) -> tuple[dict[str, pd.DataFrame], dict[str, pd.DataFrame]]: import pandas as pd if self.rebalance_columns: downsampled_table = data["save"] table = pd.DataFrame(index=downsampled_table.index) for attr in self.table_attrs.values(): for name, col in attr.vals.items(): assert isinstance(col, IdxValue) h = col.select_height() table[name] = col.upsample( downsampled_table[name], h, deterministic=self.deterministic_upsample, ) else: table = data["save"] return {self.table_name: table}, {self.table_name: pd.DataFrame()}
[docs]class PrivMrfSynth(ExternalPythonSynth): """ Runs the PrivMrf algorithm externally. Place the following snippet in `<priv-mrf>/pasteur.py`: ``` from sys import argv import PrivMRF import PrivMRF.utils.tools as tools from PrivMRF.domain import Domain import numpy as np if __name__ == '__main__': fn_data, fn_domain, fn_synth, e = argv e = float(e) data, _ = tools.read_csv(fn_data) data = np.array(data, dtype=int) json_domain = tools.read_json_domain(fn_domain) domain = Domain(json_domain, list(range(data.shape[1]))) model = PrivMRF.run(data, domain, attr_hierarchy=None, \ exp_name='exp', epsilon=e, p_config={}) syn_data = model.synthetic_data(fn_synth) ``` """ name = "mrf" type = "idx" tabular = True multimodal = False timeseries = False gpu = True def __init__( self, e: float = 1.12, seed: int | None = None, rebalance_columns: bool | None = None, deterministic_upsample: bool = False, **kwargs, ) -> None: super().__init__(cmd="pasteur.py", **kwargs) self.e = e self.seed = seed self.rebalance_columns = rebalance_columns self.deterministic_upsample = deterministic_upsample self.kwargs = kwargs def _prepare_synthesis_data( self, attrs: dict[str, Attributes], data: dict[str, pd.DataFrame], ids: dict[str, pd.DataFrame], ): import pandas as pd assert len(data) == 1 self.table_name = next(iter(data)) table = data[self.table_name] table_attrs = attrs[self.table_name] params = ["{dataset}", "{domain}", "{out}", f"{self.e}"] if self.rebalance_columns: from ...hierarchy import rebalance_attributes table_attrs = rebalance_attributes( table, table_attrs, reshape_domain=False, **self.kwargs ) col_names = [] domain: dict[str, dict[str, Any]] = {} downsampled_table = pd.DataFrame(index=table.index) for attr in table_attrs.values(): for name, col in attr.vals.items(): assert isinstance(col, IdxValue) h = col.select_height() domain[str(len(col_names))] = { "type": "discrete", "domain": col.get_domain(h), } col_names.append(name) downsampled_table[name] = col.downsample(table[name].to_numpy(), h) self.id = table.index.name self.col_names = col_names self.table_attrs = table_attrs col_mapping = {name: i for i, name in enumerate(col_names)} dataset = downsampled_table.rename(columns=col_mapping) data_in = {"dataset": ("csv", dataset), "domain": ("json", domain)} data_out = {"out": "csv"} return params, data_in, data_out def _return_synthetic_data( self, data: dict[str, Any] ) -> tuple[dict[str, pd.DataFrame], dict[str, pd.DataFrame]]: import pandas as pd col_mapping = {str(i): name for i, name in enumerate(self.col_names)} downsampled_table = data["out"].rename(columns=col_mapping) if self.rebalance_columns: table = pd.DataFrame(index=downsampled_table.index) for attr in self.table_attrs.values(): for name, col in attr.vals.items(): assert isinstance(col, IdxValue) h = col.select_height() table[name] = col.upsample( downsampled_table[name], h, deterministic=self.deterministic_upsample, ) else: table = downsampled_table table.index.name = self.id return {self.table_name: table}, {self.table_name: pd.DataFrame()}