Source code for icenet.tools.iceroot

# ROOT file processing tools
#
# m.mieskolainen@imperial.ac.uk, 2024

import numpy as np
import uproot
import awkward as ak
import ray
import multiprocessing
import os
import copy
import gc
import time
from tqdm import tqdm

from icenet.tools import io
from icenet.tools import aux
from icenet.tools import iceroot

# ------------------------------------------
from icenet import print
# ------------------------------------------


[docs] def read_single(process_func, process, root_path, param, class_id, dtype=None, num_cpus=0, verbose=False): """ Loop over different MC / data processes as defined in the yaml files [awkward compatible only] Args: process_func: data processing function process: MC / data process dictionary (from yaml) root_path: main path of files param: parameters of 'process_func' class_id: class identifier (integer), e.g. 0, 1, 2 ... num_cpus: number of CPUs used (set 0 for automatic) verbose: verbose output print Returns: X, Y, W, ids, info (awkward array format) """ print(f'{process}') # -------------------- datasets = process['path'] + '/' + process['files'] xs = process['xs'] model_param = process['model_param'] if 'model_param' in process.keys() else None force_xs = process['force_xs'] maxevents_scale = process['maxevents_scale'] isMC = process['isMC'] # -------------------- print(datasets) print(root_path) rootfile = io.glob_expand_files(datasets=datasets, datapath=root_path) # Custom scale event statistics if param['maxevents'] is None: maxevents = None else: maxevents = np.max([1, int(param['maxevents'] * maxevents_scale)]) # Load file X_uncut, ids = iceroot.load_tree(rootfile=rootfile, tree=param['tree'], entry_start=param['entry_start'], entry_stop=param['entry_stop'], maxevents=maxevents, ids=param['load_ids'], library='ak', dtype=dtype, num_cpus=num_cpus, verbose=verbose) N_before = len(X_uncut) ## ** Here one could save X before any cuts ** # Apply selections X,ids,stats = process_func(X=X_uncut, ids=ids, isMC=isMC, class_id=class_id, **param) N_after = len(X) eff_acc = N_after / N_before print(f'efficiency x acceptance = {eff_acc:0.6f}') Y = class_id * ak.Array(np.ones(N_after, dtype=np.int32)) # ------------------------------------------------- # Visible cross-section weights # (keep float64 to avoid possible precision problems -- conversion to float32 done later) if not force_xs: # Sum over W yields --> (eff_acc * xs) W = (ak.Array(np.ones(N_after, dtype=np.float64)) / N_after) * (xs * eff_acc) # 'force_xs' mode is useful e.g. in training with a mix of signals with different eff x acc, but one wants # to have equal proportions for e.g. theory conditional MVA training. # (then one uses the same input 'xs' value for each process in the steering .yaml file) else: # Sum over W yields --> xs W = (ak.Array(np.ones(N_after, dtype=np.float64)) / N_after) * xs # Save statistics information info = {'yaml': process, 'cut_stats': stats, 'eff_acc': eff_acc, 'W_sum': ak.sum(W), 'W2_sum': ak.sum(W**2)} # ------------------------------------------------- # Add conditional (theory param) variables if model_param is not None: print(f'Adding conditional theory (model) parameters') print(model_param) for var in model_param.keys(): value = float(model_param[var]) if model_param[var] is not None else np.nan # Create a new 'record' (column) to ak-array [input for MVA] col_name = f'MODEL_{var}' X[col_name] = value # Broadcasting happens to all events # A "mirror" copy col_name = f'GEN_{var}' X[col_name] = value # This as a last ids = ak.fields(X) return {'X': X, 'Y': Y, 'W': W, 'ids': ids, 'info': info}
[docs] def read_multiple(process_func, processes, root_path, param, class_id, dtype=None, num_cpus=0, verbose=False): """ Loop over different MC / data processes as defined in the yaml files Args: process_func: data processing function processes: MC processes dictionary (from yaml) root_path: main path of files param: parameters of 'process_func' class_id: class identifier (integer), e.g. 0, 1, 2 ... num_cpus: number of CPUs used (set 0 for automatic) verbose: verbose output print Returns: X, Y, W, ids, info (awkward array format) """ # Combine results X_list, Y_list, W_list, ids, info = [], [], [], None, {} for i, key in enumerate(processes): data = read_single( process_func=process_func, process=processes[key], root_path=root_path, param=param, class_id=class_id, dtype=dtype, num_cpus=num_cpus, verbose=verbose ) # Append data to the lists X_list.append(data['X']) Y_list.append(data['Y']) W_list.append(data['W']) # Assuming ids are the same for all processes if i == 0: ids = copy.deepcopy(data['ids']) info[key] = copy.deepcopy(data['info']) io.showmem() gc.collect() #! # Concatenate all lists into final arrays tic = time.time() X = aux.concatenate_and_clean(X_list, axis=0) Y = aux.concatenate_and_clean(Y_list, axis=0) W = aux.concatenate_and_clean(W_list, axis=0) toc = time.time() - tic print(f'Final concatenation took: {toc:0.2f} sec') io.showmem() return X,Y,W,ids,info
[docs] def load_tree_stats(rootfile, tree, key=None, verbose=False): """ Load the number of events in a list of rootfiles Args: rootfile: a list of rootfiles tree: tree name to open key: key (variable name) to use to get the number of events, if None then use the first one verbose: verbose output print Returns: number of events """ if type(rootfile) is not list: rootfile = [rootfile] num_events = np.zeros(len(rootfile), dtype=int) for i in range(len(rootfile)): with uproot.open(rootfile[i]) as file: events = file[tree] key_name = events.keys()[0] if key is None else key num_events[i] = len(events.arrays(key_name)) if verbose: print(f'{rootfile[i]}') print(f'keys(): {events.keys()}') print(f'values(): {events.values()}') file.close() return num_events
[docs] def events_to_jagged_numpy(events, ids, entry_start=0, entry_stop=None, maxevents=None, label=None): """ Process uproot tree to a jagged numpy (object) array Args: events: uproot tree ids: names of the variables to pick entry_start: first event to consider entry_stop: last event to consider Returns: X """ if label is not None: print(f'Loading: {label}', 'yellow') # ------------------------------- if entry_start is None: entry_start = 0 N_all = len(events.arrays(ids[0])) N = len(events.arrays(ids[0], entry_start=entry_start, entry_stop=entry_stop)) if (maxevents is not None) and N > maxevents: entry_stop_final = entry_start + maxevents else: entry_stop_final = entry_stop if entry_stop is not None else N # ------------------------------- X = np.empty((entry_stop_final - entry_start, len(ids)), dtype=object) for j in tqdm(range(len(ids))): X[:,j] = np.asarray(events.arrays(ids[j], entry_start=entry_start, entry_stop=entry_stop_final, library="np", how=list)) print(f'Entry_start = {entry_start}, entry_stop = {entry_stop}, maxevents = {maxevents} | realized = {len(X)} ({100*len(X)/N_all:0.3f} % | available = {N_all})', 'green') return X, ids
[docs] def load_tree(rootfile, tree, entry_start=0, entry_stop=None, maxevents=None, ids=None, library='np', dtype=None, num_cpus=0, verbose=False): """ Load ROOT files Args: rootfile: Name of root file paths (string or a list of strings) tree: Tree to read out entry_start: First event to read per file entry_stop: Last event to read per file maxevents: Maximum number of events in total (over all files) ids: Names of the variables to read out from the root tree library: Return type 'np' (jagged numpy) or 'ak' (awkward) of the array num_cpus: Number of processes used (set 0 for automatic) verbose: Verbose output Returns: array of type 'library' """ if type(rootfile) is not list: rootfile = [rootfile] print(f'Opening rootfile {rootfile} with a tree key <{tree}>', 'yellow') # Add Tree handles here using the string-syntax of uproot {file:tree} files = [rootfile[i] + f':{tree}' for i in range(len(rootfile))] # ---------------------------------------------------------- ### Select variables with uproot.open(files[0]) as events: all_ids = events.keys() load_ids = aux.process_regexp_ids(ids=ids, all_ids=all_ids) if verbose: print(f'All variables ({len(all_ids)}): \n{all_ids} \n', 'green') print('') print(f'Loading variables ({len(load_ids)}): \n{load_ids} \n', 'green') print('') else: print(f'All variables ({len(all_ids)}) | Loading variables ({len(load_ids)})', 'green') print('') print(f'Reading {len(files)} root files | Conversion to object library "{library}" ') print('') if int(num_cpus) == 0: num_workers = min(len(files), multiprocessing.cpu_count() // 2) # min handles the case #files < #cpu else: num_workers = int(num_cpus) if library == 'np': if maxevents is None: maxevents = int(1e10) print(f'maxevents is None, setting maxevents = {maxevents}', 'red') # Non-multiprocessed version for single files if len(files) == 1: with uproot.open(files[0]) as events: param = {'events': events, 'ids': load_ids, 'entry_start': entry_start, 'entry_stop': entry_stop, 'maxevents': maxevents, 'label': files[0]} X, _ = events_to_jagged_numpy(**param) # Clear gc.collect() io.showmem() if (maxevents is not None) and (len(X) >= maxevents): X = X[0:maxevents] print(f'Maximum event count {maxevents} reached', 'red') print(f'Total number of entries = {len(X)}') return X, load_ids else: # ====================================================== # Multiprocessing version for multiple files ray.init(num_cpus=num_workers, _temp_dir=f'{os.getcwd()}/tmp/') chunk_ind = aux.split_start_end(range(len(files)), num_workers) submaxevents = aux.split_size(range(maxevents), num_workers) futures = [] print(f'submaxevents per ray process: {submaxevents}') for k in range(num_workers): futures.append(read_file_np.remote(files[chunk_ind[k][0]:chunk_ind[k][-1]], load_ids, entry_start, entry_stop, submaxevents[k], dtype)) print(f'Get futures') results = ray.get(futures) # synchronous read-out ray.shutdown() # Combine future returned sub-arrays print(f'Concatenating results from futures') X = aux.concatenate_and_clean(results, axis=0) io.showmem() # Concatenate one-by-one #for k in tqdm(range(len(results))): # X = copy.deepcopy(results[k]) if (k == 0) else np.concatenate((X, results[k]), axis=0) # # results[k] = None # free memory # gc.collect() # io.showmem() print(f'Total number of entries = {len(X)}') return X, load_ids elif library == 'ak': if maxevents is None: maxevents = int(1e10) print(f'maxevents is None, setting maxevents = {maxevents}', 'red') # Non-multiprocessed version for single files if len(files) == 1: # Get the number of events num_events = get_num_events(rootfile=files[0]) with uproot.open(files[0]) as events: X = events.arrays(load_ids, entry_start=entry_start, entry_stop=entry_stop, library='ak', how='zip') if dtype is not None: X = X.values_astype(X, dtype) if (maxevents is not None) and (len(X) > maxevents): X = X[0:maxevents] print(f'Maximum event count {maxevents} reached (had {num_events})', 'red') else: # ====================================================== # Multiprocessing version for multiple files ray.init(num_cpus=num_workers, _temp_dir=f'{os.getcwd()}/tmp/') chunk_ind = aux.split_start_end(range(len(files)), num_workers) submaxevents = aux.split_size(range(maxevents), num_workers) futures = [] print(f'submaxevents per ray process: {submaxevents}') for k in range(num_workers): futures.append(read_file_ak.remote(files[chunk_ind[k][0]:chunk_ind[k][-1]], load_ids, entry_start, entry_stop, submaxevents[k], dtype)) print(f'Get futures') results = ray.get(futures) # synchronous read-out ray.shutdown() # Combine future returned sub-arrays print(f'Concatenating results from futures') X = aux.concatenate_and_clean(results, axis=0) io.showmem() # Concatenate one-by-one #for k in tqdm(range(len(results))): # X = copy.deepcopy(results[k]) if (k == 0) else ak.concatenate((X, results[k]), axis=0) # # results[k] = None # free memory # gc.collect() # io.showmem() print(f'Total number of entries = {len(X)}') return X, ak.fields(X) else: raise Exception(__name__ + f'.load_tree: Unknown library support')
@ray.remote def read_file_np(files, ids, entry_start, entry_stop, maxevents, dtype=None): """ Ray multiprocess read-wrapper-function per root file. Remark: Multiprocessing within one file did not yield good performance empirically, but performance scales linearly for separate files. Args: files: list of root filenames ids: variables to read out entry_start: first entry to to read entry_stop: last entry to read maxevents: maximum number of events (in total over all files) dtype: Data type Returns: awkward array """ for i in range(len(files)): # Get the number of entries #num_entries = get_num_events(rootfile=files[i]) #print(__name__ + f'.read_file_ak: Found {num_entries} entries from the file: {files[i]}') with uproot.open(files[i]) as events: param = {'events': events, 'ids': ids, 'entry_start': entry_start, 'entry_stop': entry_stop, 'maxevents': maxevents, 'label': files[0]} data, loaded_ids = events_to_jagged_numpy(**param) # Concatenate with other file results X = copy.deepcopy(data) if (i == 0) else np.concatenate((X, data), axis=0) del data gc.collect() io.showmem() if (maxevents is not None) and (len(X) >= maxevents): X = X[0:maxevents] print(f'Maximum event count {maxevents} reached', 'red') break return X @ray.remote def read_file_ak(files, ids, entry_start, entry_stop, maxevents, dtype=None): """ Ray multiprocess read-wrapper-function per root file. Remark: Multiprocessing within one file did not yield good performance empirically, but performance scales linearly for separate files. Args: files: list of root filenames ids: variables to read out entry_start: first entry to to read entry_stop: last entry to read maxevents: maximum number of events (in total over all files) dtype: Data type Returns: awkward array """ for i in range(len(files)): # Get the number of entries #num_entries = get_num_events(rootfile=files[i]) #print(__name__ + f'.read_file_ak: Found {num_entries} entries from the file: {files[i]}') with uproot.open(files[i]) as events: data = events.arrays(ids, entry_start=entry_start, entry_stop=entry_stop, library='ak', how='zip') if dtype is not None: data = data.values_astype(data, dtype) # Concatenate with other file results X = copy.deepcopy(data) if (i == 0) else ak.concatenate((X, data), axis=0) del data gc.collect() io.showmem() if (maxevents is not None) and (len(X) >= maxevents): X = X[0:maxevents] print(__name__ + f'Maximum event count {maxevents} reached', 'red') break return X
[docs] def get_num_events(rootfile, key_index=0): """ Get the number of entries in a rootfile by reading a key Args: rootfile: rootfile string (with possible Tree name appended with :) key_index: which variable use as a dummy Returns: number of entries """ with uproot.open(rootfile) as events: return len(events.arrays(events.keys()[key_index]))
@ray.remote def events_to_jagged_ak(events, ids, entry_start=None, entry_stop=None, library='ak', how='zip'): """ Wrapper for Ray """ return events.arrays(ids, entry_start=entry_start, entry_stop=entry_stop, library=library, how=how)