Source code for icedqcd.deploy

# Deploy MVA-model to data (or MC) files (work in progress, not all models supported ...)
#
# This code is compatible with grid (SGE, HTCondor) submission
#
# m.mieskolainen@imperial.ac.uk, 2024

import os
import numpy as np
import awkward as ak
import pickle
import uproot
import logging
import copy
import socket
import gc

from importlib import import_module
from datetime import datetime

from icedqcd import common
from icenet.tools import iceroot, io, aux, process
from icenet.deep import predict
from tqdm import tqdm

# ------------------------------------------
#from icenet import print # (reservation)
# ------------------------------------------

[docs] def f2s(value, decimals=2): """ Convert e.g. floating point "1.5" to "1p5" """ return str(np.round(value, decimals)).replace('.', 'p')
[docs] def generate_cartesian_param(ids, mc_param, decimals=2): """ Generate cartesian N-dim array for the theory model parameter conditional sampling """ arr = [] for key in mc_param.keys(): arr.append(np.round(np.array(mc_param[key]), decimals)) CAX = aux.cartesian_product(*arr) pindex = np.zeros(len(mc_param.keys()), dtype=int) for i, key in enumerate(mc_param.keys()): pindex[i] = ids.index(f'MODEL_{key}') return CAX, pindex
[docs] def create_ID_label(ID, mc_param, lattice_values, decimals=2): """ Create parameter label for the conditional model parameters """ ID_label = f'{ID}_' for i, key in enumerate(mc_param.keys()): ID_label += f'_{key}_{f2s(lattice_values[i], decimals=decimals)}' return ID_label
[docs] def zscore_normalization(X, args): """ Z-score normalization """ if args['varnorm'] == 'zscore': print(f'Z-score normalizing variables ...') X_mu, X_std = pickle.load(open(args["modeldir"] + '/zscore.pkl', 'rb')) print(f'X.shape = {X.shape} | X_mu.shape = {X_mu.shape} | X_std.shape = {X_std.shape}') X = io.apply_zscore(X, X_mu, X_std) return X
[docs] def process_data(args): """ Main processing loop """ rootname = args["rootname"] inputvars = import_module("configs." + rootname + "." + args["inputvars"]) print(f'Processing data', 'green') print(args) ## ** Special YAML loader here ** from libs.ccorp.ruamel.yaml.include import YAML yaml = YAML() yaml.allow_duplicate_keys = True cwd = os.getcwd() with open(f'{cwd}/configs/{rootname}/{args["inputmap"]}', 'r') as f: try: inputmap = yaml.load(f) if 'includes' in inputmap: del inputmap['includes'] # ! crucial except yaml.YAMLError as exc: print(f'yaml.YAMLError: {exc}') exit() root_path = args['root_files'] if type(root_path) is list: root_path = root_path[0] # Remove [] list # ------------------ # Phase 0: Create output path path_str = f"{cwd}/output/{rootname}/deploy/modeltag__{args['modeltag']}" # Add conditional tag conditional_tag = f'--use_conditional__{args["use_conditional"]}' if args["use_conditional"] else "" path_str += conditional_tag basepath = aux.makedir(path_str) # ------------------ # Phase 1: Read ROOT-file to awkward-format nodestr = (f"inputmap__{io.safetxt(args['inputmap'])}--grid_id__{args['grid_id']}--hostname__{socket.gethostname()}--time__{datetime.now()}").replace(' ', '') logging.basicConfig(filename=f'{basepath}/deploy--{nodestr}.log', encoding='utf-8', level=logging.DEBUG, format='%(asctime)s | %(message)s', datefmt='%d/%m/%Y %H:%M:%S') # Save to log-file logging.debug(__name__ + f'.process_data: {nodestr}') logging.debug('') logging.debug(f'{inputmap}') # Pick the model lattice definition if args['use_conditional']: mc_param = args['mc_param'] for key in inputmap.keys(): print(__name__ + f'.process_data: Processing "{key}"') # Write to log-file logging.debug('-------------------------------------------------------') logging.debug(f'Process: {key}') # Get all files datasets = inputmap[key]['path'] + '/' + inputmap[key]['files'] rootfiles = io.glob_expand_files(datasets=datasets, datapath=root_path) # Loop over the files total_num_events = 0 # ---------------------------------- # GRID (batch) processing all_file_id = aux.split_start_end(range(len(rootfiles)), args['grid_nodes']) try: file_id = all_file_id[args['grid_id']] print(f"file_id = {file_id} (grid_id = {args['grid_id']})") except Exception as e: logging.debug('Subfolder already saturated -- no processing under this grid PC node -- continue') continue # This subfolder is already saturated on the grid processing, i.e. grid_id > len(all_file_id) # ---------------------------------- for k in range(file_id[0], file_id[-1]): filename = rootfiles[k] param = { 'rootfile' : filename, 'tree' : 'Events', 'entry_start' : None, 'entry_stop' : None, 'maxevents' : None, 'ids' : inputvars.LOAD_VARS, 'library' : 'ak' } try: X_nocut, ids_nocut = iceroot.load_tree(**param) # Write to log-file logging.debug(f'{filename} | Number of events: {len(X_nocut)}') total_num_events += len(X_nocut) except Exception as e: print(f'A fatal error in iceroot.load_tree with a file "{filename}": ' + str(e), 'red') # Write to log-file logging.debug(f'{filename} | A fatal error in iceroot.load_tree: ' + str(e)) continue # ------------------------------------------------- # Phase 1.5: Add conditional (theory MC param) variables if args['use_conditional']: print(f'Initializing conditional theory (model) parameters') for var in mc_param.keys(): # Create new 'record' (column) to ak-array and init to zero col_name = f'MODEL_{var}' X_nocut[col_name] = 0.0 ids_nocut = ak.fields(X_nocut) # ------------------ # Phase 2: Apply pre-selections to get an event mask mask = common.process_root(X=X_nocut, args=args, return_mask=True) if np.sum(mask) == 0: X = X_nocut logging.debug("No events left after pre-cuts -- scores will be -1 for all events") OK_event = False else: X = X_nocut[mask] OK_event = True # ------------------ # Phase 3: Convert to icenet dataformat try: # Dequantization always off in deployment data = common.splitfactor(x=X, y=None, w=None, ids=ids_nocut, args=args, skip_graph=True, use_dequantize=False) except Exception as e: # Something went wrong at OS level (e.g. memory), write to log-file and exit with error txt = f'{filename} | A fatal error in common.splitfactor -- os._exit(os.EX_OSERR): ' + str(e) logging.debug(txt) print(txt) os._exit(os.EX_OSERR) # ------------------ # ** Free memory ** del X del X_nocut gc.collect() # ------------------ # ------------------ # Phase 4: Apply MVA-models ALL_scores = {} for i in range(len(args['active_models'])): gc.collect() ID = args['active_models'][i] param = args['models'][ID] # xgb / iceboost if param['predict'] in ['xgb', 'xgb_logistic']: print(f'Evaluating MVA-model "{ID}" \n') ## 1. Impute data if args['imputation_param']['active']: fmodel = os.path.join(args["modeldir"], 'imputer.pkl') imputer = pickle.load(open(fmodel, 'rb')) data['data'], _ = process.impute_datasets(data=data['data'], features=None, args=args['imputation_param'], imputer=imputer) ## 2. Apply the input variable set reductor X,ids = aux.red(X=data['data'].x, ids=data['data'].ids, param=param) ## 3. Get the MVA-model func_predict, model = get_predictor(args=args, param=param, feature_names=ids) ## 4. ** Conditional model ** if args['use_conditional']: ## Get conditional parameters CAX, pindex = generate_cartesian_param(ids=ids, mc_param=mc_param) ## Run the MVA-model on all the theory model points in CAX array for z in tqdm(range(len(CAX))): gc.collect() # Set the new conditional model parameters to X lattice_values = CAX[z,:] ID_label = create_ID_label(ID=ID, mc_param=mc_param, lattice_values=lattice_values) if OK_event: XX = copy.deepcopy(X) XX[:, pindex] = lattice_values # Set new values # Variable normalization XX = zscore_normalization(X=XX, args=args) # Predict pred = func_predict(XX) pred = aux.unmask(x=pred, mask=mask, default_value=-1) else: pred = (-1) * np.ones(len(mask)) # all -1 # Save ALL_scores[io.rootsafe(ID_label)] = pred # 4. ** Unconditional model ** else: if OK_event: # Variable normalization XX = copy.deepcopy(X) XX = zscore_normalization(X=XX, args=args) # Predict pred = func_predict(XX) pred = aux.unmask(x=pred, mask=mask, default_value=-1) else: pred = (-1) * np.ones(len(mask)) # all -1 # Save ALL_scores[io.rootsafe(ID)] = pred else: # if param['predict'] == 'torch_graph': # Turned off for now # scores = func_predict(data['graph']) # Write to log-file logging.debug(f'Did not evaluate model (unsupported deployment): {ID}') continue # ------------------ # Phase 5: Write MVA-scores out aux.makedir(basepath + '/' + filename.rsplit('/', 1)[0]) # Create dir outputfile = basepath + '/' + filename.replace('.root', '-icenet.root') print(f'Saving root output to "{outputfile}"') with uproot.recreate(outputfile, compression=uproot.ZLIB(4)) as rfile: rfile[f"Events"] = ALL_scores gc.collect() #! Garbage collection # Write to log-file logging.debug(f'Total number of events: {total_num_events}')
[docs] def get_predictor(args, param, feature_names=None): """ Load model predictor function handle """ model = None if param['predict'] == 'xgb': func_predict, model = predict.pred_xgb(args=args, param=param, feature_names=feature_names, return_model=True) elif param['predict'] == 'xgb_logistic': func_predict, model = predict.pred_xgb_logistic(args=args, param=param, feature_names=feature_names, return_model=True) #elif param['predict'] == 'torch_vector': # func_predict = predict.pred_torch_generic(args=args, param=param) #elif param['predict'] == 'torch_graph': # func_predict = predict.pred_torch_graph(args=args, param=param) else: raise Exception(__name__ + f'.get_predictor: Unknown param["predict"] = {param["predict"]}') return func_predict, model