Source code for pasteur.kedro.pipelines.dataset
from __future__ import annotations
from kedro.pipeline import Pipeline as pipeline
from ...dataset import Dataset, TypedDataset
from .meta import TAGS_DATASET
from .meta import DatasetMeta as D
from .meta import PipelineMeta, node
[docs]
def create_dataset_pipeline(
dataset: Dataset, tables: list[str] | None = None
) -> PipelineMeta:
if not tables:
tables = dataset.tables
nodes = []
outputs = []
if isinstance(dataset, TypedDataset):
nodes += [
node(
func=dataset.type,
name=f"type_{t}",
inputs=[f"raw@{t}"],
outputs=f"typed.{t}",
tags=TAGS_DATASET,
)
for t in dataset.raw_tables
]
outputs += [
D("typed", f"{dataset}.typed.{t}", ["ds", dataset, "typed", t], type="pq")
for t in dataset.raw_tables
]
prefix = "typed."
else:
prefix = "raw@"
nodes += [
node(
name=f"ingest_{t}",
func=dataset.ingest,
args=[t],
inputs={dep: f"{prefix}{dep}" for dep in dataset.deps[t]},
outputs=t,
tags=TAGS_DATASET,
)
for t in tables
]
outputs += [
D("interim", f"{dataset}.{t}", ["ds", dataset, "tables", t], type="pq")
for t in tables
]
meta_tables = PipelineMeta(
pipeline(nodes, namespace=dataset.name),
outputs,
)
pipe = pipeline(
[
node(
func=dataset.keys,
name="gen_keys",
inputs={dep: f"{dataset}.{dep}" for dep in dataset.key_deps},
namespace=str(dataset),
outputs=f"{dataset}.keys",
tags=TAGS_DATASET,
)
]
)
meta_keys = PipelineMeta(
pipe,
[D("keys", f"{dataset}.keys", ["ds", dataset, "keys"])],
)
return meta_tables + meta_keys