# Common input & data reading routines for the DQCD analysis
#
# m.mieskolainen@imperial.ac.uk, 2024
import numpy as np
import copy
import os
import ray
from tqdm import tqdm
import time
import multiprocessing
from importlib import import_module
from icenet.tools import io, aux, prints, iceroot
from icenet.algo import analytic
from icefit import dequantize
from icedqcd import graphio
# ------------------------------------------
from icenet import print
# ------------------------------------------
# GLOBALS
from configs.dqcd.cuts import *
from configs.dqcd.filter import *
[docs]
def load_root_file(root_path, ids=None, entry_start=0, entry_stop=None, maxevents=None, args=None):
""" Loads the root files
Args:
root_path: path to root files
Returns:
X: jagged columnar data
Y: class labels
W: event weights
ids: columnar variables string (list)
info: trigger, MC xs, pre-selection acceptance x efficiency information (dict)
"""
inputvars = import_module("configs." + args["rootname"] + "." + args["inputvars"])
if type(root_path) is list:
root_path = root_path[0] # Remove [] list
# -----------------------------------------------
param = {
"tree": "Events",
"entry_start": entry_start,
"entry_stop": entry_stop,
"maxevents": maxevents,
"args": args,
"load_ids": inputvars.LOAD_VARS
}
INFO = {}
X = {}
Y = {}
W = {}
# =================================================================
# *** BACKGROUND MC, SIGNAL MC, DOMAIN ADAPTATION ... ***
for key in args["input"].keys(): # input from yamlgen generated yml
class_id = int(key.split("_")[1])
proc = args["input"][key]
X[key], Y[key], W[key], _, INFO[key] = iceroot.read_multiple(class_id=class_id,
process_func=process_root, processes=proc, root_path=root_path, param=param,
num_cpus=args['num_cpus'])
# =================================================================
# Sample conditional theory parameters as they are distributed in signal sample
sig_class = f"class_1" # ** HARDCODED DEFINITION **
# Probability per event entry, float64 needed for precision
# See: https://stackoverflow.com/questions/46539431/np-random-choice-probabilities-do-not-sum-to-1
p = ak.to_numpy(W[sig_class]).squeeze().astype('float64')
p = p / np.sum(p)
# We pick all variables at once
# (so N-dim random variable N-tuplets are sampled 1-to-1 as in the signal class)
var = inputvars.MODEL_VARS
for key in X.keys(): # Loop over classes
# Other classes than signal (e.g. Background ~ 0, DA-data ~ -2)
# Signal has already MODEL_ and GEN_ (picked from yaml under under iceroot)
if key != sig_class:
# Reshape into numpy array
N = len(X[sig_class])
A = np.zeros((N, len(var)), dtype=np.float32)
for i in range(len(var)):
A[:,i] = ak.to_numpy(X[sig_class][var[i]]).squeeze()
# Sample
rng = np.random.default_rng()
new = rng.choice(A, axis=0, size=len(X[key]), p=p, replace=True)
print(f'Sampling theory conditional parameter "{var}" for "{key}"')
# Set conditional variables 'MODEL_'
for i in range(len(var)):
X[key][var[i]] = new[:,i].squeeze().tolist()
# "Mirror" copy variables to 'GEN_', for ROC plots etc. diagnostics in the evaluation stage
var_new = [s.replace('MODEL', 'GEN') for s in var]
for i in range(len(var)):
X[key][var_new[i]] = new[:,i].squeeze().tolist()
# =================================================================
# *** Finally combine ***
X = ak.concatenate(X.values(), axis=0)
Y = ak.concatenate(Y.values(), axis=0)
W = ak.concatenate(W.values(), axis=0)
# ** Crucial -- randomize order to avoid problems with other functions **
rand = np.random.permutation(len(X))
X = X[rand]
Y = Y[rand]
W = W[rand]
print(f'Event counts per class')
unique, counts = np.unique(Y, return_counts=True)
print(np.asarray((unique, counts)).T)
return {'X':X, 'Y':Y, 'W':W, 'ids': ak.fields(X), 'info': INFO}
[docs]
def process_root(X, args, ids=None, isMC=None, return_mask=False, class_id=None, **kwargs):
"""
Apply selections
"""
FILTERFUNC = globals()[args['filterfunc']]
CUTFUNC = globals()[args['cutfunc']]
stats = {'filterfunc': None, 'cutfunc': None}
# @@ Filtering done here @@
fmask = FILTERFUNC(X=X, isMC=isMC, class_id=class_id, xcorr_flow=args['xcorr_flow'])
stats['filterfunc'] = {'before': len(X), 'after': sum(fmask)}
#plots.plot_selection(X=X, mask=mask, ids=ids, plotdir=args['plotdir'], label=f'<filterfunc>_{isMC}', varlist=CUT_VARS, library='ak')
print(f'isMC = {isMC} | <filterfunc> before: {len(X)}, after: {sum(fmask)} events ({sum(fmask)/(len(X)+1E-12):0.6f})', 'green')
prints.printbar()
X_new = X[fmask]
# @@ Observable cut selections done here @@
cmask = CUTFUNC(X=X_new, xcorr_flow=args['xcorr_flow'])
stats['cutfunc'] = {'before': len(X_new), 'after': sum(cmask)}
#plots.plot_selection(X=X, mask=mask, ids=ids, plotdir=args['plotdir'], label=f'<cutfunc>_{isMC}', varlist=CUT_VARS, library='ak')
print(f"isMC = {isMC} | <cutfunc> before: {len(X_new)}, after: {sum(cmask)} events ({sum(cmask)/(len(X_new)+1E-12):0.6f}) \n", 'green')
prints.printbar()
io.showmem()
X_final = X_new[cmask]
if return_mask == False:
return X_final, ids, stats
else:
fmask_np = fmask.to_numpy()
fmask_np[fmask_np] = cmask # cmask is evaluated for which fmask == True
return fmask_np
[docs]
def splitfactor(x, y, w, ids, args, skip_graph=True, use_dequantize=True):
"""
Transform data into different datatypes.
Args:
data: jagged arrays
args: arguments dictionary
Returns:
dictionary with different data representations
"""
inputvars = import_module("configs." + args["rootname"] + "." + args["inputvars"])
data = io.IceXYW(x=x, y=y, w=w, ids=ids)
if data.y is not None:
data.y = ak.to_numpy(data.y).astype(np.float32)
if data.w is not None:
data.w = ak.to_numpy(data.w).astype(np.float32)
# -------------------------------------------------------------------------
### Pick active variables out
scalar_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='first'), ids=eval('inputvars.' + args['inputvar_scalar']))
jagged_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=eval('inputvars.' + args['inputvar_jagged']))
# Individually for GNNs
muon_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=inputvars.MVA_MUON_VARS)
jet_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=inputvars.MVA_JET_VARS)
sv_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=inputvars.MVA_SV_VARS)
muonsv_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=inputvars.MVA_MUONSV_VARS)
cpf_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=inputvars.MVA_CPF_VARS)
npf_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=inputvars.MVA_NPF_VARS)
pf_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='second'), ids=inputvars.MVA_PF_VARS)
# -------------------------------------------------------------------------
### ** Conditional variables **
if args['use_conditional'] == False:
for var in inputvars.MODEL_VARS:
try:
scalar_vars.remove(var)
print(f'Removing model conditional var "{var}" from scalar_vars', 'red')
except:
print(f'Model conditional var "{var}" did not exist in scalar_vars', 'red')
continue
else:
# Dequantization for training (turn off for deployment)
if args['conditional_param']['dequantize_common'] and use_dequantize:
n_interp = args['conditional_param']['n_interp']
kind = args['conditional_param']['kind']
for var in inputvars.MODEL_VARS:
print(f'Dequantizing conditional "{var}" with iDQF', 'yellow')
data.x[var] = dequantize.iDQF(x=ak.to_numpy(ak.ravel(data.x[var])), n_interp=n_interp, kind=kind)
# -------------------------------------------------------------------------
## ** Collection filter **
for d in args['jagged_filter']:
expr = 'data.x.' + d['condition'].strip() # strip to remove leading/trailing spaces
print(f'Filtering collection {d} with {expr}', 'yellow')
filter_ind = eval(expr)
data.x[d['name']] = data.x[d['name']][filter_ind]
# -------------------------------------------------------------------------
## ** Collection entry re-ordering sort **
for d in args['jagged_order']:
print(f'Collection re-ordering {d}', 'yellow')
sort_ind = ak.argsort(data.x[d['name']][d['var']], ascending=d['ascending'])
data.x[d['name']] = data.x[d['name']][sort_ind]
# -------------------------------------------------------------------------
## ** Custom variables added to collections **
print(f"Adding custom muonSV variables ...")
## \DeltaR
data.x['muonSV', 'deltaR'] = \
analytic.deltaR(x=data.x['muonSV'], eta1='mu1eta', eta2='mu2eta', phi1='mu1phi', phi2='mu2phi')
jagged_vars.append('muonSV_deltaR')
muonsv_vars.append('muonSV_deltaR')
## Invariant Mass
data.x['muonSV', 'mass'] = \
analytic.invmass(x=data.x['muonSV'], pt1='mu1pt', pt2='mu2pt',
eta1='mu1eta', eta2='mu2eta', phi1='mu1phi', phi2='mu2phi',
m1_const=0.1057, m2_const=0.1057)
jagged_vars.append('muonSV_mass')
muonsv_vars.append('muonSV_mass')
print(f"muonSV.fields = {data.x['muonSV'].fields}", 'yellow')
# -------------------------------------------------------------------------
### Pick kinematic variables out
data_kin = None
if inputvars.KINEMATIC_VARS is not None:
kinematic_vars = aux.process_regexp_ids(all_ids=aux.unroll_ak_fields(x=x, order='first'),
ids=inputvars.KINEMATIC_VARS)
print(kinematic_vars)
data_kin = copy.deepcopy(data)
data_kin.x = aux.ak2numpy(x=data.x, fields=kinematic_vars)
data_kin.ids = kinematic_vars
# -------------------------------------------------------------------------
## Graph representation
data_graph = None
if not skip_graph:
#node_features = {'muon': muon_vars, 'jet': jet_vars, 'cpf': cpf_vars, 'npf': npf_vars, 'sv': sv_vars}
node_features = {'muon': muon_vars, 'jet': jet_vars, 'sv': sv_vars}
"""
start_time = time.time()
data_graph_ = graphio.parse_graph_data(X=data.x, Y=data.y, weights=data.w, ids=data.ids,
features=scalar_vars, node_features=node_features, graph_param=args['graph_param'])
print(f'single_results: {time.time()-start_time:0.1f} sec')
"""
## ------------------------------------------
# Parallel processing of graph objects with Ray
start_time = time.time()
big_chunk_size = 10000
if int(args['num_cpus']) == 0:
num_workers = multiprocessing.cpu_count() // 2
else:
num_workers = int(args['num_cpus'])
big_chunks = int(np.ceil(len(data.x) / big_chunk_size))
chunk_ind = aux.split_start_end(range(len(data.x)), num_workers * big_chunks)
print(chunk_ind)
data_graph = []
job_index = 0
for _ in tqdm(range(big_chunks)):
ray.init(num_cpus=num_workers, _temp_dir=f'{os.getcwd()}/tmp/')
graph_futures = []
obj_ref = ray.put(data.x)
for _ in range(num_workers):
entry_start, entry_stop = chunk_ind[job_index][0], chunk_ind[job_index][-1]
graph_futures.append( \
graphio.parse_graph_data_ray.remote( \
obj_ref, data.ids, scalar_vars, node_features, args['graph_param'],
data.y, data.w, entry_start, entry_stop)
)
job_index += 1
data_graph += sum(ray.get(graph_futures), []) # Join split array results
ray.shutdown()
print(f'ray_results: {time.time() - start_time:0.1f} sec')
io.showmem()
# -------------------------------------------------------------------------
## Tensor representation
data_tensor = None
# -------------------------------------------------------------------------
## Turn jagged data to a "long-vector" zero-padded matrix representation
data.x, data.ids = aux.jagged_ak_to_numpy(arr=data.x, scalar_vars=scalar_vars,
jagged_vars=jagged_vars, jagged_maxdim=args['jagged_maxdim'],
null_value=args['imputation_param']['fill_value'])
io.showmem()
# -------------------------------------------------------------------------
# Mutual information regularization targets
vars = aux.process_regexp_ids(all_ids=data.ids, ids=inputvars.MI_VARS)
data_MI = data[vars].x.astype(np.float32)
# --------------------------------------------------------------------------
# Create DeepSet style representation from the "long-vector" content
data_deps = None
"""
## ** TBD. This should be generalized to handle multiple different length sets **
data_deps = copy.deepcopy(data)
M = len(jagged_vars) # Number of (jagged) variables per event
D = args['jagged_maxdim']['sv'] # Tuplet feature vector dimension
data_deps.x = aux.longvec2matrix(X=data.x[:, len(scalar_vars):], M=M, D=D)
data_deps.y = data_deps.y
data_deps.ids = all_jagged_vars
"""
# --------------------------------------------------------------------------
return {'data': data,
'data_MI': data_MI,
'data_kin': data_kin,
'data_deps': data_deps,
'data_tensor': data_tensor,
'data_graph': data_graph}