Source code for pasteur.extras.datasets.mimic

from __future__ import annotations

from typing import TYPE_CHECKING, Callable, cast

import numpy as np
import pandas as pd

from ....dataset import Dataset
from ....utils import (
    LazyChunk,
    LazyFrame,
    gen_closure,
    get_relative_fn,
    to_chunked,
)

if TYPE_CHECKING:
    from pandas.io.parsers.readers import TextFileReader


def _split_table(
    name: str, chunksize: int, keys: np.ndarray, table: "Callable[..., TextFileReader]"
):
    pd_keys = pd.DataFrame(index=keys)
    del keys

    for chunk in table(chunksize=chunksize):
        c = chunk.join(pd_keys, on="subject_id", how="inner")

        # Fix poe id
        if name == 'hosp_pharmacy':
            c['poe_seq'] = c['poe_id'].str[1].astype('Int16')
            c = c.drop(columns=['poe_id'])

        yield c


def _partition_table(
    name: str, table: Callable, patients: LazyFrame, n_partition: int, chunksize: int
):
    # Deterministic loading = all tables have the same split
    keys = patients(["subject_id"]).index.to_numpy()
    partitions = np.array_split(keys, n_partition)

    return {
        str(i): gen_closure(_split_table, name, chunksize, part, table)
        for i, part in enumerate(partitions)
    }


[docs] class MimicDataset(Dataset): def __init__(self, n_partitions: int = 5, **_) -> None: super().__init__(**_) self._n_partitions = n_partitions _mimic_tables_single = [ "patients", "transfers", "admissions", "d_hcpcs", "diagnoses_icd", "d_icd_diagnoses", "d_icd_procedures", "d_labitems", "drgcodes", "hcpcsevents", "microbiologyevents", "poe_detail", "procedures_icd", "services", "icu_datetimeevents", "icu_d_items", "icu_icustays", "icu_outputevents", "icu_procedureevents", ] _mimic_tables_partitioned = { "emar": 4_000_000, "emar_detail": 4_000_000, "labevents": 4_000_000, "pharmacy": 4_000_000, "poe": 4_000_000, "prescriptions": 4_000_000, "icu_chartevents": 4_000_000, "icu_inputevents": 4_000_000, } _n_partitions = 5 name = "mimic" deps = { **{t: [t] for t in _mimic_tables_single}, **{t: [t, "patients"] for t in _mimic_tables_partitioned}, } key_deps = ["patients"] folder_name = "mimiciv_2_2" catalog = get_relative_fn("catalog.yml")
[docs] def ingest(self, name, **tables: LazyFrame | Callable[[], TextFileReader]): if name in self._mimic_tables_single: return cast("LazyFrame", tables[name]) if name in self._mimic_tables_partitioned: chunksize = self._mimic_tables_partitioned[name] return _partition_table( name, cast("Callable[[], TextFileReader]", tables[name]), cast("LazyFrame", tables["patients"]), self._n_partitions, chunksize, )
[docs] @to_chunked def keys(self, **tables: LazyChunk): return tables["patients"]()[[]]