Source code for icenet.tools.io

# Input data containers and memory management
# 
# m.mieskolainen@imperial.ac.uk, 2024

import numpy as np
import awkward as ak

import numba
import copy
import torch
import os
import psutil
import subprocess
import re


# MVA imputation
from sklearn.impute import KNNImputer
from sklearn.impute import SimpleImputer

from sklearn.experimental import enable_iterative_imputer # Needs this
from sklearn.impute import IterativeImputer

# Command line arguments
from glob import glob
from braceexpand import braceexpand
import copy
import time

import hashlib
import base64

from icenet.tools import aux
from icenet.tools import stx

# ------------------------------------------
from icenet import print
# ------------------------------------------

[docs] def get_file_timestamp(file_path: str): """ Return file timestamp as a string """ if os.path.exists(file_path): # Get the file last modification time timestamp = os.path.getmtime(file_path) # Convert it to a readable format readable_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp)) return readable_time else: return f"File '{file_path}' does not exist."
[docs] def rootsafe(txt): """ Change character due to ROOT """ return txt.replace('-', '_').replace('+','_').replace('/','_').replace('*','_')
[docs] def safetxt(txt): """ Protection for '/' """ if type(txt) is str: return txt.replace('/', '|') else: return txt
[docs] def count_files_in_dir(path): """ Count the number of files in a path """ return len([name for name in os.listdir(path) if os.path.isfile(os.path.join(path, name))])
[docs] def make_hash_sha256_file(filename): """ Create SHA256 hash from a file """ h = hashlib.sha256() b = bytearray(128*1024) mv = memoryview(b) with open(filename, 'rb', buffering=0) as f: while n := f.readinto(mv): h.update(mv[:n]) return h.hexdigest()
[docs] def make_hash_sha256_object(o): """ Create SHA256 hash from an object Args: o: python object (e.g. dictionary) Returns: hash """ hasher = hashlib.sha256() hasher.update(repr(make_hashable(o)).encode()) hash_str = base64.b64encode(hasher.digest()).decode() # May cause problems with directories hash_str = hash_str.replace('/', '_') hash_str = hash_str.replace('\\', '__') hash_str = hash_str.replace('.', '___') return hash_str
[docs] def make_hashable(o): """ Turn a python object into hashable type (recursively) """ if isinstance(o, (tuple, list)): return tuple((make_hashable(e) for e in o)) if isinstance(o, dict): return tuple(sorted((k,make_hashable(v)) for k,v in o.items())) if isinstance(o, (set, frozenset)): return tuple(sorted(make_hashable(e) for e in o)) return o
[docs] def glob_expand_files(datasets, datapath, recursive_glob=False): """ Do global / brace expansion of files Args: datasets: dataset filename with glob syntax (can be a list of files) datapath: root path to files Returns: files: full filenames including the path """ print("") print(f"Supported syntax: <filename_*>, <filename_0>, <filename_[0-99]>, <filename_{{0,3,4}}>") print("See https://docs.python.org/3/library/glob.html and brace expansion (be careful, do not use [,] brackets in your filenames)") print("") # Remove unnecessary [] if type(datasets) is list and len(datasets) == 1: datasets = datasets[0] # Try first to brace expand try: datasets = list(braceexpand(datasets)) except: True #print(__name__ + f'.glob_expand_files: After braceexpand: {datasets}') if (len(datasets) == 1) and ('[' in datasets[0]) and (']' in datasets[0]): print(f'Parsing of range [first-last] ...') res = re.findall(r'\[.*?\]', datasets[0])[0] temp = res[1:-1] numbers = temp.split('-') first = int(numbers[0]) last = int(numbers[1]) print(f'Obtained range of files: [{first}, {last}]') # Split and add parts = datasets[0].split(res) datasets[0] = parts[0] + '{' for i in range(first, last+1): datasets[0] += f'{i}' if i != last: datasets[0] += ',' datasets[0] += '}' + parts[1] datasets = list(braceexpand(datasets[0])) #print(__name__ + f'.glob_expand_files: After expanding the range: {datasets}') # Parse input files into a list files = list() for data in datasets: x = datapath + '/' + data expanded_files = glob(x, recursive=recursive_glob) # This does e.g. _*.root expansion (finds the files) # Loop over expanded set of files if expanded_files != []: for i in range(len(expanded_files)): files.append(expanded_files[i]) if files == []: files = [datapath] # Transform multiple slashes for i in range(len(files)): files[i] = files[i].replace('////','/').replace('///', '/').replace('//', '/') # Make them unique files = list(set(files)) files.sort() #print(__name__ + f'.glob_expand_files: Final files: {files}') return files
[docs] def showmem(color='red'): print(f"""Process RAM: {process_memory_use():0.2f} GB [total RAM in use {psutil.virtual_memory()[2]} %]""", color)
[docs] def showmem_cuda(device='cuda:0', color='red'): print(f"Process RAM: {process_memory_use():0.2f} GB [total RAM in use {psutil.virtual_memory()[2]} %] | VRAM usage: {get_gpu_memory_map()} GB [total VRAM {torch_cuda_total_memory(device):0.2f} GB]", color)
[docs] def get_gpu_memory_map(): """Get the GPU VRAM use in GB. Returns: dictionary with keys as device ids [integers] and values the memory used by the GPU. """ try: result = subprocess.check_output( [ 'nvidia-smi', '--query-gpu=memory.used', '--format=csv,nounits,noheader' ], encoding='utf-8') # into dictionary gpu_memory = [int(x)/1024.0 for x in result.strip().split('\n')] gpu_memory_map = dict(zip(range(len(gpu_memory)), gpu_memory)) return gpu_memory_map except Exception as e: print(f'Error: Could not run nvidia-smi: {e}') return None
[docs] def torch_cuda_total_memory(device='cuda:0'): """ Return CUDA device VRAM available in GB. """ return torch.cuda.get_device_properties(device).total_memory / 1024.0**3
[docs] def process_memory_use(): """ Return system memory (RAM) used by the process in GB. """ pid = os.getpid() py = psutil.Process(pid) return py.memory_info()[0]/2.**30
[docs] def checkinfnan(x, value = 0): """ Check inf and Nan values and replace with a default value. """ inf_ind = np.isinf(x) nan_ind = np.isnan(x) x[inf_ind] = value x[nan_ind] = value if np.sum(inf_ind) > 0: print(f'Inf found, replacing with {value}', 'red') if np.sum(nan_ind) > 0: print(f'NaN found, replacing with {value}', 'red') return x
[docs] class fastarray1: """ 1D pre-memory occupied buffer arrays for histogramming etc. """ def __init__(self, capacity = 32): self.capacity = capacity self.data = np.zeros((self.capacity)) self.size = 0 # use with x.update([1,3,5,1]) #
[docs] def update(self, row): for x in row: self.add(x)
# use with x.add(32) #
[docs] def add(self, x): if self.size == self.capacity: print(f'increasing current capacity = {self.capacity} to 2x') self.capacity *= 2 newdata = np.zeros((self.capacity)) newdata[:self.size] = self.data self.data = newdata self.data[self.size] = x self.size += 1
# Get values #
[docs] def values(self): return self.data[0:self.size]
# Reset index, keep the buffersize
[docs] def reset(self): self.size = 0
[docs] def index_list(target_list, keys): """ Use e.g. x_subset = x[:, io.index_list(ids, variables)] """ index = [] for key in keys: index.append(target_list.index(key)) return index
[docs] def pick_vars(data, set_of_vars): """ Choose the active set of input variables. Args: data: IceXYW type object set_of_vars: Variables to pick Returns: newind: Chosen indices newvars: Chosen variables """ newind = np.where(np.isin(data.ids, set_of_vars)) newind = np.array(newind).flatten() newvars = [] for i in newind : newvars.append(data.ids[i]) return newind, newvars
[docs] class IceXYW: """ Args: x : data object y : target output data w : weights """ # constructor def __init__(self, x = np.array([]), y = np.array([]), w = None, ids = None): self.x = x self.y = y self.w = w self.ids = ids if isinstance(x, np.ndarray): self.concat = np.concatenate elif isinstance(x, ak.Array): self.concat = ak.concatenate else: raise Exception(__name__ + f'.IceXYW.__init__: Unknown input array type')
[docs] def find_ind(self, key): """ Return column index corresponding to key """ return int(np.where(np.array(self.ids, dtype=np.object_) == key)[0])
def __getitem__(self, key): """ Advanced indexing with a variable or a list of variables """ if type(key) is not list: # access with a single variable name try: select = key new = IceXYW(x=self.x[select, ...], y=self.y[select], w=self.w[select], ids=self.ids) return new except: True if key in self.ids: # direct access col = self.ids.index(key) ids = [key] elif isinstance(self.x, np.ndarray): # might be a cut string, try that select = stx.eval_boolean_syntax(expr=key, X=self.x, ids=self.ids, verbose=True) return IceXYW(x=self.x[select, ...], y=self.y[select], w=self.w[select], ids=self.ids) else: raise Exception(__name__ + f'[operator]: Cannot execute') else: # list of variables col,ids = pick_vars(data=self, set_of_vars=key) if isinstance(self.x, np.ndarray): return IceXYW(x=self.x[..., col], y=self.y, w=self.w, ids=ids) else: return IceXYW(x=self.x[col], y=self.y, w=self.w, ids=ids) # length operator def __len__(self): return len(self.x) # + operator def __add__(self, other): if (len(self.x) == 0): # still empty return other x = self.concat((self.x, other.x), axis=0) y = self.concat((self.y, other.y), axis=0) if self.w is not None: w = self.concat((self.w, other.w), axis=0) return IceXYW(x, y, w) # += operator def __iadd__(self, other): if (len(self.x) == 0): # still empty return other self.x = self.concat((self.x, other.x), axis=0) self.y = self.concat((self.y, other.y), axis=0) if self.w is not None: self.w = self.concat((self.w, other.w), axis=0) return self # filter operator
[docs] def classfilter(self, classid): ind = (self.y == classid) x = self.x[ind] y = self.y[ind] if self.w is not None: w = self.w[ind] else: w = self.w return IceXYW(x=x, y=y, w=w, ids=self.ids)
# Permute events
[docs] def permute(self, permutation): self.x = self.x[permutation] self.y = self.y[permutation] if self.w is not None: self.w = self.w[permutation] return self
[docs] def split_data_simple(X, frac, permute=True): """ Split machine learning data into train, validation, test sets Args: X: data as a list of event objects (such as torch geometric Data) frac: split fraction """ ### Permute events to have random mixing between events if permute: randi = np.random.permutation(len(X)).tolist() X = [X[i] for i in randi] N = len(X) N_A = round(N * frac) N_B = N - N_A N_trn = N_A N_val = round(N_B / 2) N_tst = N - N_trn - N_val # Split X_trn = X[0:N_trn] X_val = X[N_trn:N_trn + N_val] X_tst = X[N - N_tst:] print(f"fractions [train: {len(X_trn)/N:0.3f}, validate: {len(X_val)/N:0.3f}, test: {len(X_tst)/N:0.3f}]") return X_trn, X_val, X_tst
[docs] def split_data(X, Y, W, ids, frac=[0.5, 0.1, 0.4], permute=True): """ Split machine learning data into train, validation, test sets Args: X: data matrix Y: target matrix W: weight matrix ids: variable names of columns frac: fraction [train, validate, evaluate] (sum to 1) rngseed: random seed """ ### Permute events to have random mixing between classes if permute: randi = np.random.permutation(len(X)) X = X[randi] Y = Y[randi] if W is not None: W = W[randi] # -------------------------------------------------------------------- frac = np.array(frac) frac = frac / np.sum(frac) # Get event counts N = len(X) N_trn = int(round(N * frac[0])) N_tst = int(round(N * frac[2])) N_val = N - N_trn - N_tst # 1. Train X_trn = X[0:N_trn] Y_trn = Y[0:N_trn] if W is not None: W_trn = W[0:N_trn] else: W_trn = None # 2. Validation X_val = X[N_trn:N_trn + N_val] Y_val = Y[N_trn:N_trn + N_val] if W is not None: W_val = W[N_trn:N_trn + N_val] else: W_val = None # 3. Test X_tst = X[N - N_tst:] Y_tst = Y[N - N_tst:] if W is not None: W_tst = W[N - N_tst:] else: W_tst = None # -------------------------------------------------------- # Construct trn = IceXYW(x = X_trn, y = Y_trn, w=W_trn, ids=ids) val = IceXYW(x = X_val, y = Y_val, w=W_val, ids=ids) tst = IceXYW(x = X_tst, y = Y_tst, w=W_tst, ids=ids) # -------------------------------------------------------- print(f"fractions [train: {len(X_trn)/N:0.3f}, validate: {len(X_val)/N:0.3f}, test: {len(X_tst)/N:0.3f}]") return trn, val, tst
[docs] def impute_data(X, imputer=None, dim=None, values=[-999], labels=None, algorithm='iterative', fill_value=0, knn_k=6): """ Data imputation (treatment of missing values, Nan and Inf). NOTE: This function can impute only fixed dimensional input currently (not Jagged numpy arrays) Args: X : Input data matrix [N vectors x D dimensions] imputer : Pre-trained imputator, default None dim : Array of active dimensions to impute values : List of special integer values indicating the need for imputation labels : List containing textual description of input variables algorithm : 'constant', mean', 'median', 'iterative', knn_k' knn_k : knn k-nearest neighbour parameter Returns: X : Imputed output data """ if dim is None: dim = np.arange(X.shape[1]) if labels is None: labels = np.zeros(X.shape[1]) N = X.shape[0] # Count NaN for j in dim: nan_ind = np.isnan(np.array(X[:,j], dtype=np.float32)) found = np.sum(nan_ind) if found > 0: print(f'Column {j} Number of {nan_ind} NaN found ({found/len(X):0.3E}) [{labels[j]}]', 'red') # Loop over dimensions for j in dim: # Set NaN for special values M_tot = 0 for z in values: ind = np.isclose(np.array(X[:,j], dtype=np.float32), z) X[ind, j] = np.nan M = np.sum(ind) M_tot += M if (M/N > 0): print(f'Column {j} fraction [{M/N:0.3E}] with value {z} [{labels[j]}]') if (M_tot == N): # Protection, if all are now NaN # Set to zero so Imputer Function below does not remove the full column!! X[:,j] = 0.0 # Treat infinities (inf) for j in dim: inf_ind = np.isinf(np.array(X[:,j], dtype=np.float32)) X[inf_ind, j] = np.nan found = np.sum(inf_ind) if found > 0: print(f'Column {j} Number of {found} Inf found ({found/len(X):0.3E}) [{labels[j]}]', 'red') if imputer == None: # Fill missing values if algorithm == 'constant': imputer = SimpleImputer(missing_values=np.nan, strategy='constant', fill_value=fill_value) elif algorithm == 'mean': imputer = SimpleImputer(missing_values=np.nan, strategy='mean') elif algorithm == 'median': imputer = SimpleImputer(missing_values=np.nan, strategy='median') elif algorithm == 'iterative': imputer = IterativeImputer(missing_values=np.nan) elif algorithm == 'knn': imputer = KNNImputer(missing_values=np.nan, n_neighbors=knn_k) else: raise Exception(__name__ + '.impute_data: Unknown algorithm chosen') # Fit and transform imputer.fit(X[:,dim]) X[:,dim] = imputer.transform(X[:,dim]) print('[done] \n') return X, imputer
[docs] def calc_madscore(X : np.array): """ Calculate robust normalization. Args: X : Input with [# vectors x # dimensions] Returns: X_m : Median vector X_mad : Median deviation vector """ X_m = np.zeros((X.shape[1])) X_mad = np.zeros((X.shape[1])) # Calculate mean and std based on the training data for i in range(X.shape[1]) : X_m[i] = np.median(X[:,i]) X_mad[i] = np.median(np.abs(X[:,i] - X_m[i])) if (np.isnan(X_mad[i])): raise Exception(__name__ + f': Fatal error with MAD[index = {i}] is NaN') if (np.isinf(X_mad[i])): raise Exception(__name__ + f': Fatal error with MAD[index = {i}] is Inf') return X_m, X_mad
[docs] def calc_zscore_tensor(T): """ Compute z-score normalization for tensors. Args: T : input tensor data (events x channels x rows x cols, ...) Returns: mu, std tensors """ Y = copy.deepcopy(T) Y[~np.isfinite(Y)] = 0 mu = np.mean(Y, axis=0) std = np.std(Y, axis=0) return mu, std
[docs] def apply_zscore_tensor(T, mu, std, EPS=1E-12): """ Apply z-score normalization for tensors. """ Y = copy.deepcopy(T) Y[~np.isfinite(Y)] = 0 # Over all events for i in range(T.shape[0]): Y[i,...] = (Y[i,...] - mu) / max(std, EPS) return Y
[docs] def calc_zscore(X: np.array, weights: np.array = None): """ Calculate 0-mean & unit-variance normalization. Args: X : Input with [N x dim] weights : Event weights Returns: X_mu : Mean vector X_std : Standard deviation vector """ X_mu = np.zeros((X.shape[1])) X_std = np.zeros((X.shape[1])) # Calculate mean and std based on the training data for i in range(X.shape[1]): xval = X[:,i] if weights is not None: X_mu[i], X_std[i] = aux.weighted_avg_and_std(xval, weights) else: X_mu[i], X_std[i] = np.mean(xval), np.std(xval) if (np.isnan(X_std[i])): raise Exception(__name__ + f': Fatal error with std[index = {i}] is NaN') if (np.isinf(X_std[i])): raise Exception(__name__ + f': Fatal error with std[index = {i}] is Inf') return X_mu, X_std
[docs] @numba.njit(parallel=True) def apply_zscore(X : np.array, X_mu, X_std, EPS=1E-12): """ Z-score normalization """ Y = np.zeros(X.shape) for i in range(len(X_mu)): Y[:,i] = (X[:,i] - X_mu[i]) / max(X_std[i], EPS) return Y
[docs] @numba.njit(parallel=True) def apply_madscore(X : np.array, X_m, X_mad, EPS=1E-12): """ MAD-score normalization """ Y = np.zeros(X.shape) scale = 0.6745 # 0.75th of z-normal for i in range(len(X_m)): Y[:,i] = scale * (X[:,i] - X_m[i]) / max(X_mad[i], EPS) return Y