Source code for hydroBayesCal.surrogate.gpe_skl

"""

A new class is generated, which inherits all attributes from the GaussianProcessRegressor class from Scikit learn. This
is done to manually set the "max_iter" and "gtol" values for the optimization of hyperparameters in the GPR kernel.

ToDo: Check GPyTorch+lbfgs to see if results can be improved by changing initial values or with Adam ?
ToDo: Save each gp (for each loc) in a list, to call it later to do BAL+MCMC methods with them.
"""

import numpy as np
import sys
import math
import sklearn
import scipy
from sklearn.utils.optimize import _check_optimize_result
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import sklearn.gaussian_process.kernels
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
import copy
from joblib import Parallel, delayed
import torch
import gpytorch
from pathlib import Path


# General GPR class
[docs] class MyGeneralGPR: """ Class assigns/creates the attributes which are constant for all GPR-library classes (such as SklTraining and GPyTraining) Parameters: collocation_points = np.array(number of TP, number of parameters per TP), with training points (parameter sets) model_evaluations = np.array(number of TP, number of points where the model is evaluated), with full-complexity model outputs in each location where the fcm was evaluated/in the locations being considered # xx prior_samples = np.array(MC, number of parameters per TP), with MC parameter sets in which to evaluate the trained GPE Attributes: self.n_obs = int, number of locations from the fcm where the GPE is to be trained. It is not necessarily the same as the number of true observations, since one could train the GPE in given locations (e.g. all grid points), where some locations coincide with the observation points. # xx self.surrogate_prediction = np.array(self.n_obs, self.prior_samples.shape[0]), where to save the mean for each GPE (n_obs) for each parameter set in prior_samples self.surrogate_std = np.array(self.n_obs, self.prior_samples.shape[0]), where to save the standard deviation for each GPE (n_obs) for each parameter set in prior_samples self.surrogate_up = np.array(self.n_obs, self.prior_samples.shape[0]), where to save the upper confidence level for each GPE (n_obs) for each parameter set in prior_samples self.surrogate_lc = np.array(self.n_obs, self.prior_samples.shape[0]), where to save the lower confidence level for each GPE (n_obs) for each parameter set in prior_samples """ def __init__(self, collocation_points, model_evaluations): self.training_points = collocation_points self.model_evaluations = model_evaluations self.n_obs = self.model_evaluations.shape[1] self.n_params = collocation_points.shape[1] self.gp_list = []
# Scikit-Learn -----------------------------------------------------------------------------------------------------
[docs] class MySklGPR(GaussianProcessRegressor): def __init__(self, max_iter=2e05, gtol=1e-06, **kwargs): super().__init__(**kwargs) self.max_iter = max_iter self.gtol = gtol def _constrained_optimization(self, obj_func, initial_theta, bounds): if self.optimizer == "fmin_l_bfgs_b": opt_res = scipy.optimize.minimize(obj_func, initial_theta, method="L-BFGS-B", jac=True, bounds=bounds, options={'maxiter':self.max_iter, 'gtol': self.gtol}) _check_optimize_result("lbfgs", opt_res) theta_opt, func_min = opt_res.x, opt_res.fun elif callable(self.optimizer): theta_opt, func_min = self.optimizer(obj_func, initial_theta, bounds=bounds) else: raise ValueError("Unknown optimizer %s." % self.optimizer) return theta_opt, func_min
[docs] class SklTraining(MyGeneralGPR): """Train a single-output Gaussian Process Emulator with scikit-learn. Uses scikit-learn's GP regression to build a GPE for a forward model, from collocation points produced by that model. See the scikit-learn `GaussianProcessRegressor <https://scikit-learn.org/stable/modules/generated/sklearn.gaussian_process.GaussianProcessRegressor.html>`_ for the underlying estimator. Parameters ---------- collocation_points : numpy.ndarray Training points (parameter sets), shape ``[n_tp, n_params]``. model_evaluations : numpy.ndarray Full-complexity model outputs at each evaluated location, shape ``[n_tp, n_locations]``. kernel : object or list of objects ``sklearn.gaussian_process.kernels`` instance(s) used to train the GPE; converted internally to a list. alpha : float or list of float Value added to the diagonal to avoid numerical errors. A scalar is broadcast to a list. n_restarts : int Number of optimiser restarts used to find the kernel hyper-parameters (avoids local minima). noise : bool, optional ``True`` (default) to add a white-noise kernel to the input kernel. y_normalization : bool, optional ``True`` (default) to normalise model outputs before training. tp_normalization : bool, optional ``True`` to normalise training-point parameter values before training (default ``False``). optimizer : str, optional Name of the optimiser to use (default scikit-learn optimiser). parallelize : bool, optional ``True`` to parallelise surrogate training, ``False`` to train sequentially. Notes ----- .. todo:: Accept the evaluation location as input and add a function that extracts the GPE predictions at the observation point (used in BAL). """ def __init__(self, collocation_points, model_evaluations, kernel, alpha, n_restarts, noise=True, y_normalization=True, y_log=False, tp_normalization=False, optimizer="fmin_l_bfgs_b", parallelize=False, n_jobs=-2): super(SklTraining, self).__init__(collocation_points=collocation_points, model_evaluations=model_evaluations) # Input for GPR library in sklearn: self.n_restart = n_restarts self.y_normalization_ = y_normalization self.y_log = y_log self.optimizer_ = optimizer self.noise = noise self.parallel = parallelize self.n_jobs = n_jobs # Options for GPR library: self.tp_norm = tp_normalization self._id_vectors(alpha, kernel) def _id_vectors(self, alpha, kernel): """ Function checks if the inputs for alpha and kernel are a single variable or a list. If they are a single value, the function generates a list filled with the same value/object, so it can be properly read in the train_ function. Args: alpha: <float> or <list of floats [n_obs]> with input alpha value(s). If list, there should be one value per observation. kernel: <object> or <list of objects [n_obs]> Scikit learn kernel objects, to send to the GPR training Returns: """ if isinstance(alpha, list): self.alpha = np.array(alpha) elif isinstance(alpha, float): self.alpha = np.full((self.training_points.shape[0], self.n_obs), alpha) elif isinstance(alpha, np.ndarray): if alpha.shape != (self.training_points.shape[0], self.n_obs): print('Using an alpha of 0') self.alpha = np.full((self.training_points.shape[0], self.n_obs), 0.0000001) else: self.alpha = alpha else: self.alpha = np.full((self.training_points.shape[0], self.n_obs), 0.0000001) if isinstance(kernel, list): self.kernel = kernel else: self.kernel = np.full(self.n_obs, kernel)
[docs] def train_(self): """ ToDo: Use joblib to parallelize training Returns: """ # train a surrogate for each output observation: if self.parallel and self.n_obs > 1: # out = Parallel(n_jobs=-1, backend='multiprocessing')(delayed(self._fit)(self.training_points, # self.model_evaluations[:, i], # self.kernel[i], # self.alpha[i]) # for i in range(self.n_obs)) out = Parallel(n_jobs=self.n_jobs, backend='threading')(delayed(self._fit)(self.training_points, self.model_evaluations[:, i], self.kernel[i], self.alpha[:, i]) for i in range(self.n_obs)) self.gp_list = out else: for i, model in enumerate(self.model_evaluations.T): out = self._fit(self.training_points, model, self.kernel[i], self.alpha[:, i]) self.gp_list.append(out)
def _fit(self, collocation_points, model_y, kernel, alpha): """ Function trains the Scikit-Learn surrogate model for each training location Args: collocation_points: array[n_tp, n_param] with training parameter sets model_y: array[n_tp,] with simulator outputs in training points kernel: object base kernel object, with constant*RBF_kernel Returns: dict with trained gp object, hyperparameters and normalization parameters (if needed) """ # Set Kernel: if self.noise: kernel = kernel + sklearn.gaussian_process.kernels.WhiteKernel(noise_level=np.std(model_y)/np.sqrt(2)) # 1.Initialize instance of sklearn GPR: gp = MySklGPR(kernel=kernel, alpha=alpha, normalize_y=self.y_normalization_, n_restarts_optimizer=self.n_restart, optimizer=self.optimizer_) if self.y_log: model_y = np.log(model_y) if self.tp_norm: # Normalize the training points (if the scales are very different) # scaler_x_train = MinMaxScaler() scaler_x_train = StandardScaler() scaler_x_train.fit(self.training_points) collocation_points_scaled = scaler_x_train.transform(self.training_points) # 2. Train GPR gp.fit(collocation_points_scaled, model_y) score = gp.score(collocation_points_scaled, model_y) else: # KEEP TP as they are # 2. Train GPR gp.fit(collocation_points, model_y) score = gp.score(collocation_points, model_y) return_out_dic = dict() return_out_dic['gp'] = gp hp = np.exp(gp.kernel_.theta) return_out_dic['c_hp'] = hp[0] if hp.shape[0] < self.n_params: return_out_dic['cl_hp'] = hp[1] else: return_out_dic['cl_hp'] = hp[1:self.n_params + 1] if self.noise: return_out_dic['noise_hp'] = hp[-1] return_out_dic['R2'] = score if self.tp_norm: return_out_dic['normalizer'] = scaler_x_train return return_out_dic
[docs] def predict_(self, input_sets, get_conf_int=False): """Evaluate the per-location surrogate models on all input sets. Parameters ---------- input_sets : numpy.ndarray Parameter sets to evaluate the surrogate models on, shape ``[MC, n_params]``. get_conf_int : bool, optional ``True`` to also estimate the upper and lower confidence intervals. Returns ------- dict Surrogate-model mean (``output``) and standard deviation (``std``) for each location, each of shape ``[n_obs, MC]``. """ # surrogate_prediction = np.zeros((len(self.gp_list), input_sets.shape[0])) # GPE mean, for each obs # surrogate_std = np.zeros((len(self.gp_list), input_sets.shape[0])) # GPE mean, for each obs # if get_conf_int: # upper_ci = np.zeros((len(self.gp_list), input_sets.shape[0])) # GPE mean, for each obs # lower_ci = np.zeros((len(self.gp_list), input_sets.shape[0])) # GPE mean, for each obs surrogate_prediction = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs surrogate_std = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs if get_conf_int: upper_ci = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs lower_ci = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs for i in range(0, self.n_obs): if self.tp_norm: input_scaled = self.gp_list[i]['normalizer'].transform(input_sets) surrogate_prediction[:, i], surrogate_std[:, i] = self.gp_list[i]['gp'].predict_(input_scaled) else: surrogate_prediction[:, i], surrogate_std[:, i] = self.gp_list[i]['gp'].predict_(input_sets) if get_conf_int: lower_ci[:, i] = surrogate_prediction[:, i] - (1.96 * surrogate_std[:, i]) upper_ci[:, i] = surrogate_prediction[:, i] + (1.96 * surrogate_std[:, i]) output_dic = dict() # if self.y_log: # surrogate_prediction = np.exp(surrogate_prediction) # surrogate_std = # TODO output_dic['output'] = surrogate_prediction output_dic['std'] = surrogate_std if get_conf_int: output_dic['upper_ci'] = upper_ci output_dic['lower_ci'] = lower_ci return output_dic
# ------------------------------------------------------------------------------------------------------------------
[docs] def validation_error(true_y, sim_y, output_names, n_per_type): """Estimate validation criteria for a surrogate model per output location. Results for each output type are stored under separate keys in a dictionary. Parameters ---------- true_y : numpy.ndarray Simulator outputs for the validation samples, shape ``[mc_valid, n_obs]``. sim_y : numpy.ndarray or dict Surrogate/emulator outputs for the validation samples, shape ``[mc_valid, n_obs]``. If a dict, it holds ``output`` and ``std`` keys. output_names : array-like of str Name of each output type, shape ``[n_types]``. n_per_type : int Number of observations per output type. Returns ------- tuple Validation criteria for each output location and output type. Notes ----- .. todo:: As in BayesValidRox, optionally estimate the surrogate predictions here by passing a surrogate object. .. todo:: Move into the GPR class and return a dictionary keyed by output type. """ criteria_dict = {'rmse': dict(), 'mse': dict(), 'nse': dict(), 'r2': dict(), 'mean_error': dict(), 'std_error': dict()} # criteria_dict = {'rmse': dict(), # 'valid_error': dict(), # 'nse': dict()} if isinstance(sim_y, dict): sm_out = sim_y['output'] sm_std = sim_y['std'] upper_ci = sim_y['upper_ci'] lower_ci = sim_y['lower_ci'] criteria_dict['norm_error'] = dict() criteria_dict['P95'] = dict() else: sm_out = sim_y # RMSE for each output location: not a dictionary (yet). [n_obs, ] rmse = sklearn.metrics.mean_squared_error(y_true=true_y, y_pred=sm_out, multioutput='raw_values', squared=False) c = 0 for i, key in enumerate(output_names): # RMSE criteria_dict['rmse'][key] = sklearn.metrics.mean_squared_error(y_true=true_y[:, c:c + n_per_type], y_pred=sm_out[:, c:c + n_per_type], multioutput='raw_values', squared=False) criteria_dict['mse'][key] = sklearn.metrics.mean_squared_error(y_true=true_y[:, c:c + n_per_type], y_pred=sm_out[:, c:c + n_per_type], multioutput='raw_values', squared=True) # # NSE criteria_dict['nse'][key] = sklearn.metrics.r2_score(y_true=true_y[:, c:c+n_per_type], y_pred=sm_out[:, c:c+n_per_type], multioutput='raw_values') # # Validation error: # criteria_dict['valid_error'][key] = criteria_dict['rmse'][key] ** 2 / np.var(true_y[:, c:c+n_per_type], # ddof=1, axis=0) # NSE criteria_dict['nse'][key] = sklearn.metrics.r2_score(y_true=true_y[:, c:c + n_per_type], y_pred=sm_out[:, c:c + n_per_type], multioutput='raw_values') # Mean errors criteria_dict['mean_error'][key] = np.abs( np.mean(true_y[:, c:c + n_per_type], axis=0) - np.mean(sm_out[:, c:c + n_per_type], axis=0)) / np.mean( true_y[:, c:c + n_per_type], axis=0) criteria_dict['std_error'][key] = np.abs( np.std(true_y[:, c:c + n_per_type], axis=0) - np.std(sm_out[:, c:c + n_per_type], axis=0)) / np.std( true_y[:, c:c + n_per_type], axis=0) # Norm error if isinstance(sim_y, dict): # Normalized error ind_val = np.divide(np.subtract(sm_out[:, c:c + n_per_type], true_y[:, c:c + n_per_type]), sm_std[:, c:c + n_per_type]) criteria_dict['norm_error'][key] = np.mean(ind_val ** 2, axis=0) # P95 p95 = np.where((true_y[:, c:c + n_per_type] <= upper_ci[:, c:c + n_per_type]) & ( true_y[:, c:c + n_per_type] >= lower_ci[:, c:c + n_per_type]), 1, 0) criteria_dict['P95'][key] = np.mean(p95, axis=0) criteria_dict['r2'][key] = np.zeros(n_per_type) for j in range(n_per_type): criteria_dict['r2'][key][j] = np.corrcoef(true_y[:, j+c], sm_out[:, j+c])[0, 1] c = c + n_per_type return rmse, criteria_dict
[docs] def save_valid_criteria(new_dict, old_dict, n_tp): """Append the current iteration's validation criteria to a results dict. Stores the validation criteria for the current iteration (``n_tp``) in an existing dictionary, so the results for all iterations live in one file. Each validation criterion has a key per output type, holding a vector with one value per output location. Parameters ---------- new_dict : dict Validation criteria for the current iteration. old_dict : dict Validation criteria for all previous iterations, including an ``N_tp`` key that tracks the iteration number. n_tp : int Number of training points for the current BAL iteration. Returns ------- dict The updated dictionary including the current iteration. """ if len(old_dict) == 0: old_dict = dict(new_dict) old_dict['N_tp'] = [n_tp] else: for key in old_dict: if key == 'N_tp': old_dict[key].append(n_tp) else: for out_type in old_dict[key]: old_dict[key][out_type] = np.vstack((old_dict[key][out_type], new_dict[key][out_type])) return old_dict