Source code for twaml.data

# -*- coding: utf-8 -*-

"""twaml.data module

This module contains a classe to abstract datasets using
pandas.DataFrames as the payload for feeding to machine learning
frameworks and other general data investigating

"""

import uproot
import pandas as pd
import h5py
import numpy as np
import re
import yaml
from pathlib import PosixPath
from typing import List, Tuple, Optional, Union, Dict
from concurrent.futures import ThreadPoolExecutor
import logging

log = logging.getLogger(__name__)

__all__ = ["dataset", "scale_weight_sum"]


[docs]class dataset: """A class to define a dataset with a pandas.DataFrame as the payload of the class. The class provides a set of static functions to construct a dataset. The class constructor should be used only in very special cases. ``datasets`` should `always` be constructed from a staticmethod, currently there are 3 available: - :meth:`dataset.from_root` - :meth:`dataset.from_pytables` - :meth:`dataset.from_h5` Attributes ---------- files: List[PosixPath] List of files delivering the dataset name: str Name for the dataset tree_name: str All of our datasets had to come from a ROOT tree at some point. This is the name weights: numpy.ndarray The array of event weights df: :class:`pandas.DataFrame` The payload of the class, a dataframe auxweights: Optional[pandas.DataFrame] Auxiliary weights to have access too label: Optional[int] Optional dataset label (as an int) auxlabel: Optional[int] Optional auxiliary label (as an int) - sometimes we need two labels label_asarray: Optional[numpy.ndarray] Optional dataset label (as an array of ints) auxlabel_asarray: Optional[numpy.ndarray] Optional dataset auxiliary label (as an array of ins) has_payload: bool Flag to know that the dataset actually wraps data cols: List[str] Column names as a list of strings shape: Tuple Shape of the main payload dataframe wtloop_metas: Optional[Dict[str, Dict[str]]] A dictionary of files to meta dictionaries was_selected_with: Optional[str] A string (in :meth:`pandas.DataFrame.eval` form) that all of data in the dataset had to satisfy """ _weights = None _df = None _auxweights = None files = None name = None weight_name = None tree_name = None _label = None _auxlabel = None wtloop_metas = None was_selected_with = None def _init( self, input_files: List[str], name: Optional[str] = None, tree_name: str = "WtLoop_nominal", weight_name: str = "weight_nominal", label: Optional[int] = None, auxlabel: Optional[int] = None, ) -> None: """Default initialization - should only be called by internal staticmethods ``from_root``, ``from_pytables``, ``from_h5`` Parameters ---------- input_files: List[str] List of input files name: Optional[str] Name of the dataset (if none use first file name) tree_name: str Name of tree which this dataset originated from weight_name: str Name of the weight branch label: Optional[int] Give dataset an integer based label auxlabel: Optional[int] Give dataset an integer based auxiliary label """ self._weights = np.array([]) self._df = pd.DataFrame({}) self._auxweights = None self.files = [PosixPath(f) for f in input_files] for f in self.files: assert f.exists(), f"{f} does not exist" if name is None: self.name = str(self.files[0].parts[-1]) else: self.name = name self.weight_name = weight_name self.tree_name = tree_name self._label = label self._auxlabel = auxlabel @staticmethod def _combine_wtloop_metas(meta1, meta2) -> Optional[dict]: if meta1 is not None and meta2 is not None: return {**meta1, **meta2} elif meta1 is None and meta2 is not None: return {**meta2} elif meta1 is not None and meta2 is None: return {**meta1} else: return None @property def has_payload(self) -> bool: has_df = not self._df.empty has_weights = self._weights.shape[0] > 0 return has_df and has_weights @property def df(self) -> pd.DataFrame: return self._df @df.setter def df(self, new: pd.DataFrame) -> None: assert len(new) == len(self._weights), "df length != weight length" self._df = new @property def weights(self) -> np.ndarray: return self._weights @weights.setter def weights(self, new: np.ndarray) -> None: assert len(new) == len(self._df), "weight length != frame length" self._weights = new @property def auxweights(self) -> pd.DataFrame: return self._auxweights @auxweights.setter def auxweights(self, new: pd.DataFrame) -> None: if new is not None: assert len(new) == len(self._df), "auxweights length != frame length" self._auxweights = new @property def label(self) -> Optional[int]: return self._label @label.setter def label(self, new: int) -> None: self._label = new @property def label_asarray(self) -> Optional[np.ndarray]: if self._label is None: return None return np.ones_like(self.weights, dtype=np.int64) * self._label @property def auxlabel(self) -> Optional[int]: return self._auxlabel @auxlabel.setter def auxlabel(self, new: int) -> None: self._auxlabel = new @property def auxlabel_asarray(self) -> Optional[np.ndarray]: if self._auxlabel is None: return None return np.ones_like(self.weights, dtype=np.int64) * self._auxlabel @property def cols(self) -> List[str]: return list(self.df.columns) @property def shape(self) -> Tuple: return self.df.shape @shape.setter def shape(self, new) -> None: raise NotImplementedError("Cannot set shape manually") def _set_df_and_weights( self, df: pd.DataFrame, w: np.ndarray, auxw: Optional[pd.DataFrame] = None ) -> None: assert len(df) == len(w), "unequal length df and weights" self._df = df self._weights = w if auxw is not None: assert len(df) == len(auxw), "unequal length df and auxw weights" self._auxweights = auxw
[docs] def keep_columns(self, cols: List[str]) -> None: """Drop all columns not included in ``cols`` Parameters ---------- cols: List[str] Columns to keep """ self._df = self._df[cols]
[docs] def keep_weights(self, weights: List[str]) -> None: """Drop all columns from the aux weights frame that are not in ``weights`` Parameters ---------- weights: List[str] Weights to keep in the aux weights frame """ self._auxweights = self._auxweights[weights]
[docs] def rm_weight_columns(self) -> None: """Remove all payload df columns which begin with ``weight_`` If you are reading a dataset that was created retaining weights in the main payload, this is a useful function to remove them. The design of ``twaml.data.dataset`` expects weights to be separated from the payload's main dataframe. Internally this is done by calling :meth:`pandas.DataFrame.drop` with ``inplace`` on the payload """ import re pat = re.compile("^weight_") rmthese = [c for c in self._df.columns if re.match(pat, c)] self._df.drop(columns=rmthese, inplace=True)
[docs] def rmcolumns_re(self, pattern: str) -> None: """Remove some columns from the payload based on regex paterns Internally this is done by calling :meth:`pandas.DataFrame.drop` with ``inplace`` on the payload Parameters ---------- pattern : str Regex used to remove columns """ pat = re.compile(pattern) rmthese = [c for c in self._df.columns if re.search(pat, c)] self._df.drop(columns=rmthese, inplace=True)
[docs] def rmcolumns(self, cols: List[str]) -> None: """Remove columns from the dataset Internally this is done by calling :meth:`pandas.DataFrame.drop` with ``inplace`` on the payload Parameters ---------- cols: List[str] List of column names to remove """ self._df.drop(columns=cols, inplace=True)
[docs] def change_weights(self, wname: str) -> None: """Change the main weight of the dataset this function will swap the current main weight array of the dataset with one in the ``auxweights`` frame (based on its name in the ``auxweights`` frame). Parameters ---------- wname: name of weight in ``auxweight`` DataFrame to turn into the main weight. """ assert self._auxweights is not None, "aux weights do not exist" old_name = self.weight_name old_weights = self.weights self._auxweights[old_name] = old_weights self.weights = self._auxweights[wname].to_numpy() self.weight_name = wname self._auxweights.drop(columns=[wname], inplace=True)
[docs] def append(self, other: "dataset") -> None: """Append a dataset to an exiting one We perform concatenations of the dataframes and weights to update the existing dataset's payload. if one dataset has aux weights and the other doesn't, the aux weights are dropped. Parameters ---------- other : twanaet.data.dataset The dataset to append """ assert self.has_payload, "Unconstructed df (self)" assert other.has_payload, "Unconstructed df (other)" assert self.weight_name == other.weight_name, "different weight names" assert self.shape[1] == other.shape[1], "different df columns" if self._auxweights is not None and other.auxweights is not None: assert ( self._auxweights.shape[1] == other.auxweights.shape[1] ), "aux weights are different lengths" self._df = pd.concat([self._df, other.df]) self._weights = np.concatenate([self._weights, other.weights]) self.files = self.files + other.files self.wtloop_metas = self._combine_wtloop_metas( self.wtloop_metas, other.wtloop_metas ) if self._auxweights is not None and other.auxweights is not None: self._auxweights = pd.concat([self._auxweights, other.auxweights]) else: self._auxweights = None
[docs] def to_pytables(self, file_name: str) -> None: """Write dataset to disk as a pytables h5 file (with a strict twaml-compatible naming scheme) An existing dataset label **is not stored**. The properties of the class that are serialized to disk: - ``df`` as ``{name}_payload`` - ``weights`` as ``{name}_{weight_name}`` - ``auxweights`` as ``{name}_auxweights`` - ``wtloop_metas`` as ``{name}_wtloop_metas`` These properties are wrapped in a pandas DataFrame (if they are not already) to be stored in a .h5 file. The :meth:`from_pytables` is designed to read in this output; so the standard use case is to call this function to store a dataset that was intialized via :meth:`from_root`. Internally this function uses :meth:`pandas.DataFrame.to_hdf` on a number of structures. Parameters ---------- file_name: output file name, Examples -------- >>> ds = twaml.dataset.from_root("file.root", name="myds", ... detect_weights=True, wtloop_metas=True) >>> ds.to_pytables("output.h5") >>> ds_again = twaml.dataset.from_pytables("output.h5") >>> ds_again.name 'myds' """ log.info(f"Creating pytables dataset with name '{self.name}' in {file_name}") log.info(f" according to the dataset class the original source was:") for f in self.files: log.info(f" - {f}") if PosixPath(file_name).exists(): log.warning(f"{file_name} exists, overwriting") weights_frame = pd.DataFrame(dict(weights=self._weights)) self._df.to_hdf(file_name, f"{self.name}_payload", mode="w") weights_frame.to_hdf(file_name, f"{self.name}_{self.weight_name}", mode="a") if self._auxweights is not None: self._auxweights.to_hdf(file_name, f"{self.name}_auxweights", mode="a") if self.wtloop_metas is not None: tempdict = {k: np.array([str(v)]) for k, v in self.wtloop_metas.items()} wtmetadf = pd.DataFrame.from_dict(tempdict) wtmetadf.to_hdf(file_name, f"{self.name}_wtloop_metas", mode="a")
[docs] def __add__(self, other: "dataset") -> "dataset": """Add two datasets together We perform concatenations of the dataframes and weights to generate a new dataset with the combined a new payload. if one dataset has aux weights and the other doesn't, the aux weights are dropped. """ assert self.has_payload, "Unconstructed df (self)" assert other.has_payload, "Unconstructed df (other)" assert self.weight_name == other.weight_name, "different weight names" assert self.shape[1] == other.shape[1], "different df columns" if self._auxweights is not None and other.auxweights is not None: assert ( self._auxweights.shape[1] == other.auxweights.shape[1] ), "aux weights are different lengths" new_weights = np.concatenate([self.weights, other.weights]) new_df = pd.concat([self.df, other.df]) new_files = [str(f) for f in (self.files + other.files)] new_ds = dataset() new_ds._init( new_files, self.name, weight_name=self.weight_name, tree_name=self.tree_name, label=self._label, auxlabel=self._auxlabel, ) new_ds.wtloop_metas = self._combine_wtloop_metas( self.wtloop_metas, other.wtloop_metas ) if self._auxweights is not None and other.auxweights is not None: new_aw = pd.concat([self._auxweights, other.auxweights]) else: new_aw = None new_ds._set_df_and_weights(new_df, new_weights, auxw=new_aw) return new_ds
def __len__(self) -> int: """length of the dataset""" return len(self.weights) def __repr__(self) -> str: """standard repr""" return f"<twaml.data.dataset(name={self.name}, shape={self.shape})>" def __str__(self) -> str: """standard str""" return f"dataset(name={self.name})"
[docs] @staticmethod def from_root( input_files: Union[str, List[str]], name: Optional[str] = None, tree_name: str = "WtLoop_nominal", weight_name: str = "weight_nominal", branches: List[str] = None, selection: Optional[str] = None, label: Optional[int] = None, auxlabel: Optional[int] = None, allow_weights_in_df: bool = False, auxweights: Optional[List[str]] = None, detect_weights: bool = False, nthreads: Optional[int] = None, wtloop_meta: bool = False, ) -> "dataset": """Initialize a dataset from ROOT files Parameters ---------- input_files: Single or list of ROOT input file(s) to use name: Name of the dataset (if none use first file name) tree_name: Name of the tree in the file to use weight_name: Name of the weight branch branches: List of branches to store in the dataset, if None use all selection: A string passed to pandas.DataFrame.eval to apply a selection based on branch/column values. e.g. ``(reg1j1b == True) & (OS == True)`` requires the ``reg1j1b`` and ``OS`` branches to be ``True``. label: Give the dataset an integer label auxlabel: Give the dataset an integer auxiliary label allow_weights_in_df: Allow "^weight_" branches in the payload dataframe auxweights: Auxiliary weights to store in a second dataframe. detect_weights: If True, fill the auxweights df with all "^weight_" branches If ``auxweights`` is not None, this option is ignored. nthreads: Number of threads to use reading the ROOT tree (see uproot.TTreeMethods_pandas.df) wtloop_meta: grab and store the `WtLoop_meta` YAML entries. stored as a dictionary of the form ``{ str(filename) : dict(yaml) }`` in the class variable ``wtloop_metas``. Examples -------- Example with a single file and two branches: >>> ds1 = dataset.from_root(["file.root"], name="myds", ... branches=["pT_lep1", "pT_lep2"], label=1) Example with multiple input_files and a selection (uses all branches). The selection requires the branch ``nbjets == 1`` and ``njets >= 1``, then label it 5. >>> flist = ["file1.root", "file2.root", "file3.root"] >>> ds = dataset.from_root(flist, selection='(nbjets == 1) & (njets >= 1)') >>> ds.label = 5 Example using aux weights >>> ds = dataset.from_root(flist, name="myds", weight_name="weight_nominal", ... auxweights=["weight_sys_radLo", " weight_sys_radHi"]) Example where we detect aux weights automatically >>> ds = dataset.from_root(flist, name="myds", weight_name="weight_nominal", ... detect_weights=True) Example using a ThreadPoolExecutor (16 threads): >>> ds = dataset.from_root(flist, name="myds", nthreads=16) """ if isinstance(input_files, (str, bytes)): input_files = [input_files] else: try: iter(input_files) except TypeError: input_files = [input_files] else: input_files = list(input_files) executor = None if nthreads is not None: executor = ThreadPoolExecutor(nthreads) ds = dataset() ds._init( input_files, name, tree_name=tree_name, weight_name=weight_name, label=label, auxlabel=auxlabel, ) if wtloop_meta: meta_trees = { file_name: uproot.open(file_name)["WtLoop_meta"] for file_name in input_files } ds.wtloop_metas = { fn: yaml.full_load(mt.array("meta_yaml")[0]) for fn, mt in meta_trees.items() } uproot_trees = [uproot.open(file_name)[tree_name] for file_name in input_files] wpat = re.compile("^weight_") if auxweights is not None: w_branches = auxweights elif detect_weights: urtkeys = [k.decode("utf-8") for k in uproot_trees[0].keys()] w_branches = [k for k in urtkeys if re.match(wpat, k)] if weight_name in w_branches: w_branches.remove(weight_name) else: w_branches = None frame_list, weight_list, aux_frame_list = [], [], [] for t in uproot_trees: raw_w = t.array(weight_name) raw_f = t.pandas.df( branches=branches, namedecode="utf-8", executor=executor ) if not allow_weights_in_df: rmthese = [c for c in raw_f.columns if re.match(wpat, c)] raw_f.drop(columns=rmthese, inplace=True) if w_branches is not None: raw_aw = t.pandas.df(branches=w_branches, namedecode="utf-8") if selection is not None: iselec = raw_f.eval(selection) raw_w = raw_w[iselec] raw_f = raw_f[iselec] if w_branches is not None: raw_aw = raw_aw[iselec] assert len(raw_w) == len(raw_f), "frame length and weight length different" weight_list.append(raw_w) frame_list.append(raw_f) if w_branches is not None: aux_frame_list.append(raw_aw) assert len(raw_w) == len( raw_aw ), "aux weight length and weight length different" weights_array = np.concatenate(weight_list) df = pd.concat(frame_list) if w_branches is not None: aw_df = pd.concat(aux_frame_list) else: aw_df = None ds._set_df_and_weights(df, weights_array, auxw=aw_df) return ds
[docs] @staticmethod def from_pytables( file_name: str, name: str = "auto", tree_name: str = "none", weight_name: str = "auto", label: Optional[int] = None, auxlabel: Optional[int] = None, ) -> "dataset": """Initialize a dataset from pytables output generated from dataset.to_pytables The payload is extracted from the .h5 pytables files using the name of the dataset and the weight name. If the name of the dataset doesn't exist in the file you'll crash. Aux weights are retrieved if available. Parameters ---------- file_name: Name of h5 file containing the payload name: Name of the dataset inside the h5 file. If ``"auto"`` (default), we attempt to determine the name automatically from the h5 file. tree_name: Name of tree where dataset originated (only for reference) weight_name: Name of the weight array inside the h5 file. If ``"auto"`` (default), we attempt to determine the name automatically from the h5 file. label: Give the dataset an integer label auxlabel: Give the dataset an integer auxiliary label Examples -------- >>> ds1 = dataset.from_pytables("ttbar.h5", "ttbar") >>> ds1.label = 1 ## add label dataset after the fact """ with h5py.File(file_name, "r") as f: keys = list(f.keys()) if name == "auto": for k in keys: if "_payload" in k: name = k.split("_payload")[0] break if weight_name == "auto": for k in keys: if "_weight" in k: weight_name = k.split(f"{name}_")[-1] break main_frame = pd.read_hdf(file_name, f"{name}_payload") main_weight_frame = pd.read_hdf(file_name, f"{name}_{weight_name}") with h5py.File(file_name, "r") as f: if f"{name}_auxweights" in f: aux_frame = pd.read_hdf(file_name, f"{name}_auxweights") else: aux_frame = None w_array = main_weight_frame.weights.to_numpy() ds = dataset() ds._init( [file_name], name, weight_name=weight_name, tree_name=tree_name, label=label, auxlabel=auxlabel, ) ds._set_df_and_weights(main_frame, w_array, auxw=aux_frame) with h5py.File(file_name, "r") as f: if f"{name}_wtloop_metas" in f: wtloop_metas = pd.read_hdf(file_name, f"{name}_wtloop_metas") print(wtloop_metas) ds.wtloop_metas = { fn: yaml.full_load(wtloop_metas[fn].to_numpy()[0]) for fn in wtloop_metas.columns } return ds
[docs] @staticmethod def from_h5( file_name: str, name: str, columns: List[str], tree_name: str = "WtLoop_nominal", weight_name: str = "weight_nominal", label: Optional[int] = None, auxlabel: Optional[int] = None, ) -> "dataset": """Initialize a dataset from generic h5 input (loosely expected to be from the ATLAS Analysis Release utility ``ttree2hdf5`` The name of the HDF5 dataset inside the file is assumed to be ``tree_name``. The ``name`` argument is something *you choose*. Parameters ---------- file_name: Name of h5 file containing the payload name: Name of the dataset you would like to define columns: Names of columns (branches) to include in payload tree_name: Name of tree dataset originates from (HDF5 dataset name) weight_name: str Name of the weight array inside the h5 file label: Give the dataset an integer label auxlabel: Give the dataset an integer auxiliary label Examples -------- >>> ds = dataset.from_h5('file.h5', 'dsname', tree_name='WtLoop_EG_RESOLUTION_ALL__1up') """ ds = dataset() ds._init( [file_name], name=name, weight_name=weight_name, tree_name=tree_name, label=label, auxlabel=auxlabel, ) f = h5py.File(file_name, mode="r") full_ds = f[tree_name] w_array = f[tree_name][weight_name] coldict = {} for col in columns: coldict[col] = full_ds[col] frame = pd.DataFrame(coldict) ds._set_df_and_weights(frame, w_array) return ds
[docs] def apply_selections(self, selections: Dict[str, str]) -> Dict[str, "dataset"]: """Based on a dictionary of selections, break the dataset into a set of multiple (finer grained) datasets. Parameters ---------- selections: Dictionary of selections in the form ``{ name : selection }``. Returns ------- Dict[str, dataset] A dictionary of datasets satisfying the selections Examples -------- A handful of selections with all requiring ``OS`` and ``elmu`` to be true, while changing the ``reg{...}`` requirement. >>> selections = { '1j1b' : '(reg1j1b == True) & (OS == True) & (elmu == True)', ... '2j1b' : '(reg2j1b == True) & (OS == True) & (elmu == True)', ... '2j2b' : '(reg2j2b == True) & (OS == True) & (elmu == True)', ... '3j' : '(reg3j == True) & (OS == True) & (elmu == True)'} >>> selected_datasets = ds.apply_selections(selections) """ breaks = {} for selk, selv in selections.items(): mask = self.df.eval(selv) new_df = self.df[mask] new_weights = self.weights[mask] new_auxweights = None if self.auxweights is not None: new_auxweights = self.auxweights[mask] new_meta = self.wtloop_metas new_ds = dataset() new_ds._init( self.files, self.name, self.tree_name, self.weight_name, label=self.label, auxlabel=self.auxlabel, ) new_ds._set_df_and_weights(new_df, new_weights, new_auxweights) new_ds.wtloop_metas = new_meta new_ds.was_selected_with = selv breaks[selk] = new_ds return breaks
[docs]def scale_weight_sum(to_update: "dataset", reference: "dataset") -> None: """Scale the weights of the `to_update` dataset such that the sum of weights are equal to the sum of weights of the `reference` dataset. Parameters ---------- to_update: dataset with weights to be scaled reference dataset to scale to """ assert to_update.has_payload, f"{to_update} is without payload" assert reference.has_payload, f"{reference} is without payload" sum_to_update = to_update.weights.sum() sum_reference = reference.weights.sum() to_update.weights *= sum_reference / sum_to_update