Source code for hydroBayesCal.surrogate.gpe_gpytorch

"""
This module inherits from the PyTorch library for training a Gaussian Process Emulator (GPE).
The module supersede the ExactGP base class from GPyTorch and extend the functionality by customizing the mean function,likelihoods and kernel (covariance function)
The MultitaskGPModel class also extends the ExactGP base class to handle multitask (multiple outputs) learning scenarios. It is designed to model multiple related tasks simultaneously
especially if they have similarities by sharing information across them using a common GP framework. (https://docs.gpytorch.ai/en/stable/examples/03_Multitask_Exact_GPs/Multitask_GP_Regression.html).
Author: Andres Heredia (2024)
"""
import numpy as np
import sys
import scipy
import sklearn
import copy
from joblib import Parallel, delayed
import torch
import gpytorch
from gpytorch.models import ExactGP
from gpytorch.constraints import GreaterThan
from gpytorch.likelihoods import MultitaskGaussianLikelihood
from gpytorch.means import MultitaskMean, ConstantMean
from gpytorch.kernels import MultitaskKernel, RBFKernel, LinearKernel, ScaleKernel, ProductKernel, AdditiveKernel, MaternKernel, PeriodicKernel
from scipy.optimize import dual_annealing, differential_evolution


[docs] class MyExactGPyModel(gpytorch.models.ExactGP): """ Instance of GPyTorch's "ExactGP" library, with custom likelihood, kernel, training points. The likelihood is kept constant: Gaussian Likelihood (https://docs.gpytorch.ai/en/latest/likelihoods.html) Parameters: :param train_x: <np.array[n_tp, n_p]> with parameter sets used to train GPR :param train_y: <np.array[n_tp, n_obs]> with forward model outputs used to train GPR :param kernel: <kernel instance> with kernel used in GPR :param likelihood <likelihood instance> to train noise in GPR """ def __init__( self, train_x, train_y, kernel, likelihood ): super(MyExactGPyModel, self).__init__(train_inputs=train_x, train_targets=train_y, likelihood=likelihood) # self.mean_module = gpytorch.means.ConstantMean() self.mean_module = gpytorch.means.ZeroMean() self.covar_module = kernel
[docs] def forward(self, x): """ Takes in the training data (x) and returns a multivariate normal distribution with mean and covariance (kernel) set in "__init__()" :param x: training data (parameter sets) :return: """ mean_x = self.mean_module(x) covar_x = self.covar_module(x) return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)
[docs] class GPyTraining: """Train a single-output Gaussian Process Emulator with GPyTorch. Uses GPyTorch's exact GP regression to build a GPE for a forward model, from collocation points produced by that model. Parameters ---------- collocation_points : numpy.ndarray Training points (parameter sets), shape ``[n_tp, n_p]``. model_evaluations : numpy.ndarray Model outputs at each evaluated location, shape ``[n_tp, n_obs]``. kernel : gpytorch.kernels.Kernel Kernel used to train the GPE. May use default values or a user-defined anisotropy (``ard_num_dims = n_parameters``). likelihood : gpytorch.likelihoods.GaussianLikelihood Likelihood used to optimise the GPR. May use default constraints/initial values or be customised by the caller. training_iter : int Number of optimiser iterations used to train the GPE. optimizer : str, optional Optimiser to use: ``"adam"`` (default) or ``"lbfgs"``. loss : str, optional Loss function: ``"exact"`` or ``"loo"``. n_restarts : int, optional Number of optimisation restarts. tp_normalization : bool, optional ``True`` to normalise training-point parameter values before training (default ``False``). y_normalization : bool, optional ``True`` to normalise model outputs before training (predictions are de-normalised afterwards). parallelize : bool, optional ``True`` to parallelise surrogate training, ``False`` to train sequentially. Notes ----- .. todo:: Accept the evaluation location as input and add a function that extracts the GPE predictions at the observation point (used in BAL). .. todo:: For GPyTorch, check the GPU settings and other ``gpytorch.settings`` for prediction. """ def __init__( self, collocation_points, model_evaluations, kernel, training_iter, likelihood, y_normalization=True, tp_normalization=False, optimizer="adam", lr=0.5, loss='exact', n_restarts=1, weight_decay=0, gradient_free_start=False, verbose=True, parallelize=False ): # Basic attributed self.training_points = collocation_points self.model_evaluations = model_evaluations self.n_obs = self.model_evaluations.shape[1] self.n_params = collocation_points.shape[1] self.gp_list = [] # Input for GPR library in GPyTorch: self.kernel = kernel self.optimizer_ = optimizer self.training_iter = training_iter self.loss = loss self.n_restarts = n_restarts self.gradient_free_start = gradient_free_start self.lr = lr self.weight_decay = weight_decay # self.likelihood = likelihood self.parallel = parallelize self.verbose = verbose # Options for GPR library: self.tp_norm = tp_normalization self.y_norm = y_normalization self._id_vectors(likelihood, kernel) def _id_vectors(self, likelihood, kernel): """ Function checks if the inputs for alpha and kernel are a single variable or a list. If they are a single value, the function generates a list filled with the same value/object, so it can be properly read in the train_ function. Args: likelihood: <object> or <list of objects [n_obs]> with input alpha value(s). If list, there should be one value per observation. kernel: <object> or <list of objects [n_obs]> Scikit learn kernel objects, to send to the GPR training Returns: """ if isinstance(likelihood, list): self.likelihood = np.array(likelihood) else: self.likelihood = np.full(self.n_obs, likelihood)
[docs] @staticmethod def convert_to_tensor(array): """ Function to transform np.array to a tensor Args: array: <np.array> that you want to change to a tensor Returns: <tensor> data in np.array transformed to tensor format """ transformed = torch.tensor(array).float() return transformed
[docs] def normalize_tp(self, train_y): """ Function to normalize training points outputs before training Args: train_y: <np.array[tp_size, n_obs]> with model output values to normalize Returns: <tensor> with normalized input values. """ norm_y = (train_y - np.mean(train_y))/(np.std(train_y)) train_y = self.convert_to_tensor(norm_y) return train_y
[docs] def train_(self): """ Function trains the surrogate model using the GPyTorch library, using the given optimizer. Returns: ToDo: parallelize training """ # Convert training points and prior parameter sets to tensor: train_x = self.convert_to_tensor(self.training_points) if self.parallel and self.n_obs > 1: out = Parallel(n_jobs=-1, backend='multiprocessing')(delayed(self._fit_adam)(train_x=train_x, model_y=self.model_evaluations[:, i], likelihood=self.likelihood[i], kernel=self.kernel) for i in range(self.n_obs)) self.gp_list = out else: for i, y_model in enumerate(self.model_evaluations.T): out = self._fit_adam(train_x=train_x, model_y=y_model, likelihood=self.likelihood[i], kernel=self.kernel) self.gp_list.append(out)
[docs] @staticmethod def init_model_params(model): """ Function to initalize model hyperparameters, for multi-start optimizations Args: model: GPyTorch instance Returns: """ def initialize_tensor(param_): if len(param_.shape) < 2: # If the tensor has fewer than two dimensions, apply a different initialization method torch.nn.init.uniform_(param_) # Example: Uniform initialization for tensors with fewer than 2 dimensions else: torch.nn.init.xavier_uniform_(param_) for name, param in model.named_parameters(): initialize_tensor(param)
def _fit_adam(self, train_x, model_y, kernel, likelihood): """ Function trains the GPR for a given training location using the 'adam' optimizer from PyTorch Args: train_x: tensor [n_tp, n_param] with input parameter sets to use in training model_y: array[n_tp,] with simulator outputs in training points Returns: dict with trained gp object, trained likelihood object, hyperparameters and y_normalization parameters (if needed) """ # Normalize, if needed, and transform model_evaluations at loc "i" to a tensor if self.y_norm: train_y = self.normalize_tp(model_y) else: train_y = self.convert_to_tensor(model_y) best_loss = float('inf') best_params = None for i in range(self.n_restarts): # Initialize kernel and likelihood: kernel_ = copy.deepcopy(kernel) likelihood_ = copy.deepcopy(likelihood) # Initialize instance of GPyTorch GPR: gp = MyExactGPyModel(train_x, train_y, kernel_, likelihood_) # Start training gp.train() likelihood_.train() if i > 1: self.init_model_params(gp) # Setup Optimizer if self.optimizer_ == 'adam': optimizer = torch.optim.Adam(gp.parameters(), lr=self.lr, weight_decay=self.weight_decay) elif self.optimizer_ == 'lbfgs': optimizer = torch.optim.lbfgs(gp.parameters(), lr=self.lr) else: raise ValueError(f'There is no optimizer {self.optimizer_} available.') # Loss for GPs - log likelihood if 'exact' in self.loss.lower(): mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood_, gp) elif 'loo' in self.loss.lower(): mll = gpytorch.mlls.LeaveOneOutPseudoLikelihood(likelihood_, gp) # Change learning rate: scheduler = torch.optim.lr_scheduler.StepLR( optimizer, step_size=5, gamma=0.5 ) if i == 0 and self.gradient_free_start: def negative_log_likelihood(params): """ Gradient-free optimizer from SciPy""" # Set the model hyperparameters based on the optimization parameters gp.covar_module.base_kernel.lengthscale = torch.tensor(params[0:self.n_params]) gp.covar_module.outputscale = torch.tensor(params[-2]) gp.likelihood.noise = torch.tensor(params[-1]) # Zero out the gradients gp.zero_grad() # Forward pass to compute the negative log likelihood output = gp(train_x) loss = -mll(output, train_y) if self.verbose: print(f'Gradient-free start - Loss: {loss.item()}', f' Outputscale: {gp.covar_module.outputscale.item()}', f' lengthscale: {gp.covar_module.base_kernel.lengthscale[0]}', f' noise: {gp.likelihood.noise.item()}') # Return the negative log likelihood as a NumPy array return loss.item() # Define the bounds for the optimization parameters (lengthscale and outputscale) value = np.empty((), dtype=object) value[()] = (1e-3, 1e2) bounds_1 = list(np.full(self.n_params, value, dtype=object)) bounds_2 = [(1e-2, 1e1), (1e-6, 1)] bounds = bounds_1 + bounds_2 # Perform the global optimization using differential evolution # t1 = time.time() result = differential_evolution(negative_log_likelihood, bounds, maxiter=10) # Optimize parameters: for j in range(self.training_iter): # a. Zero gradients from previous iteration optimizer.zero_grad() # b. Output from model output = gp(train_x) # c. Calculate loss and back propagation gradients loss = -mll(output, train_y) loss.backward() optimizer.step() scheduler.step() # Change learning rate if self.verbose: print(f'Iter {j + 1}/{self.training_iter} - Loss: {loss.item()}', f' Outputscale: {gp.covar_module.outputscale.item()}', f' lengthscale: {gp.covar_module.base_kernel.lengthscale[0]}', f' noise: {gp.likelihood.noise.item()}') if loss < best_loss: best_loss = loss best_params = gp.state_dict() gp.load_state_dict(best_params) with torch.no_grad(): return_out_dic = dict() return_out_dic['gp'] = gp return_out_dic['likelihood'] = likelihood_ return_out_dic['c_hp'] = gp.covar_module.outputscale.item() return_out_dic['cl_hp'] = gp.covar_module.base_kernel.lengthscale.numpy()[0, :] return_out_dic['noise_hp'] = gp.likelihood.noise.item() if self.y_norm: return_out_dic['y_norm'] = [np.mean(model_y), np.std(model_y)] return return_out_dic
[docs] def predict_(self, input_sets, get_conf_int=False): """ TO DO: DESCRIPTION TO BE COMPLETED :param input_sets: :param get_conf_int: :return: """ prior_x = self.convert_to_tensor(input_sets) surrogate_prediction = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs surrogate_std = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs if get_conf_int: upper_ci = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs lower_ci = np.zeros((input_sets.shape[0], len(self.gp_list))) # GPE mean, for each obs for i in range(0, self.n_obs): # Extract data gp = copy.deepcopy(self.gp_list[i]['gp']) likelihood_ = copy.deepcopy(self.gp_list[i]['likelihood']) # 5.1 Go into eval mode: gp.eval() likelihood_.eval() with torch.no_grad(): f_pred = likelihood_(gp(prior_x)) prediction = f_pred.mean.numpy() std = f_pred.stddev.numpy() if self.y_norm: # Back-transform # normalized results prediction = self.gp_list[i]['y_norm'][1] * prediction + self.gp_list[i]['y_norm'][0] std = std * self.gp_list[i]['y_norm'][1] surrogate_prediction[:, i] = prediction surrogate_std[:, i] = std # Calculate 95% confidence intervals. if get_conf_int: upper_ci[:, i] = surrogate_prediction[:, i] + 2 * surrogate_std[:, i] lower_ci[:, i] = surrogate_prediction[:, i] - 2 * surrogate_std[:, i] output_dic = dict() output_dic['output'] = surrogate_prediction output_dic['std'] = surrogate_std if get_conf_int: output_dic['upper_ci'] = upper_ci output_dic['lower_ci'] = lower_ci return output_dic
[docs] def validation_error(true_y, sim_y, output_names, n_per_type): """Estimate validation criteria for a surrogate model per output location. Results for each output type are stored under separate keys in a dictionary. Parameters ---------- true_y : numpy.ndarray Simulator outputs for the validation samples, shape ``[mc_valid, n_obs]``. sim_y : numpy.ndarray or dict Surrogate/emulator outputs for the validation samples, shape ``[mc_valid, n_obs]``. If a dict, it holds ``output`` and ``std`` keys. output_names : array-like of str Name of each output type, shape ``[n_types]``. n_per_type : int Number of observations per output type. Returns ------- tuple Validation criteria for each output location and output type. Notes ----- .. todo:: As in BayesValidRox, optionally estimate the surrogate predictions here by passing a surrogate object. .. todo:: Move into the GPR class and return a dictionary keyed by output type. """ criteria_dict = { 'rmse': dict(), 'mse': dict(), 'nse': dict(), 'r2': dict(), 'mean_error': dict(), 'std_error': dict() } if isinstance(sim_y, dict): sm_out = sim_y['output'] sm_std = sim_y['std'] upper_ci = sim_y['upper_ci'] lower_ci = sim_y['lower_ci'] criteria_dict['norm_error'] = dict() criteria_dict['P95'] = dict() else: sm_out = sim_y # RMSE for each output location: not a dictionary (yet). [n_obs, ] rmse = sklearn.metrics.mean_squared_error( y_true=true_y, y_pred=sm_out, multioutput='raw_values', squared=False ) c = 0 for i, key in enumerate(output_names): # RMSE criteria_dict['rmse'][key] = sklearn.metrics.mean_squared_error( y_true=true_y[:, c:c + n_per_type], y_pred=sm_out[:, c:c + n_per_type], multioutput='raw_values', squared=False ) # NSE criteria_dict['nse'][key] = sklearn.metrics.r2_score( y_true=true_y[:, c:c+n_per_type], y_pred=sm_out[:, c:c+n_per_type], multioutput='raw_values' ) # NSE criteria_dict['nse'][key] = sklearn.metrics.r2_score( y_true=true_y[:, c:c + n_per_type], y_pred=sm_out[:, c:c + n_per_type], multioutput='raw_values' ) criteria_dict['mse'][key] = sklearn.metrics.mean_squared_error( y_true=true_y[:, c:c + n_per_type], y_pred=sm_out[:, c:c + n_per_type], multioutput='raw_values', squared=True ) # Mean errors criteria_dict['mean_error'][key] = np.abs( np.mean(true_y[:, c:c + n_per_type], axis=0) - np.mean(sm_out[:, c:c + n_per_type], axis=0)) / np.mean( true_y[:, c:c + n_per_type], axis=0) criteria_dict['std_error'][key] = np.abs( np.std(true_y[:, c:c + n_per_type], axis=0) - np.std(sm_out[:, c:c + n_per_type], axis=0)) / np.std( true_y[:, c:c + n_per_type], axis=0) # Norm error if isinstance(sim_y, dict): # Normalized error ind_val = np.divide(np.subtract(sm_out[:, c:c + n_per_type], true_y[:, c:c + n_per_type]), sm_std[:, c:c + n_per_type]) criteria_dict['norm_error'][key] = np.mean(ind_val ** 2, axis=0) # P95 p95 = np.where((true_y[:, c:c + n_per_type] <= upper_ci[:, c:c + n_per_type]) & ( true_y[:, c:c + n_per_type] >= lower_ci[:, c:c + n_per_type]), 1, 0) criteria_dict['P95'][key] = np.mean(p95, axis=0) criteria_dict['r2'][key] = np.zeros(n_per_type) for j in range(n_per_type): criteria_dict['r2'][key][j] = np.corrcoef(true_y[:, j+c], sm_out[:, j+c])[0, 1] c = c + n_per_type return rmse, criteria_dict
[docs] def save_valid_criteria(new_dict, old_dict, n_tp): """Append the current iteration's validation criteria to a results dict. Stores the validation criteria for the current iteration (``n_tp``) in an existing dictionary, so the results for all iterations live in one file. Each validation criterion has a key per output type, holding a vector with one value per output location. Parameters ---------- new_dict : dict Validation criteria for the current iteration. old_dict : dict Validation criteria for all previous iterations, including an ``N_tp`` key that tracks the iteration number. n_tp : int Number of training points for the current BAL iteration. Returns ------- dict The updated dictionary including the current iteration. """ if len(old_dict) == 0: old_dict = dict(new_dict) old_dict['N_tp'] = [n_tp] else: for key in old_dict: if key == 'N_tp': old_dict[key].append(n_tp) else: for out_type in old_dict[key]: old_dict[key][out_type] = np.vstack((old_dict[key][out_type], new_dict[key][out_type])) return old_dict
[docs] class MultiGPyTraining: """ Class to train multiple Gaussian Process models using given collocation points and model evaluations. It uses the MultiGPyTraining class for multitask regression using GPyTorch. """
[docs] def __init__( self, collocation_points, model_evaluations, kernel, training_iter, likelihood, optimizer="adam", lr=0.5, n_restarts=1, parallelize=False, number_quantities=2, noise_constraint=GreaterThan(1e-6) ): """ Parameters ---------- See original class docstring for parameter details. """ # Basic attributes self.training_points = collocation_points self.model_evaluations = model_evaluations self.number_quantities = number_quantities self.n_obs = self.model_evaluations.shape[1] self.n_params = collocation_points.shape[1] self.gp_list = [] self.normalization_params = [] # Initialize likelihood and other hyperparameters self.likelihood = likelihood self.kernel = kernel self.optimizer_ = optimizer self.training_iter = training_iter self.n_restarts = n_restarts self.lr = lr self.parallel = parallelize self.noise_constraint = noise_constraint self.losses = [] # To store loss values for tracking convergence self.lengthscales = [] self.noise_values = [] # To track noise values self.multitask_cov = [] # To store multitask covariance matrices (Kronecker product of task kernel and input kernel)
[docs] def train_tasks_variables(self):#Training variables at each location separately """ Train multitask Gaussian Process models using the provided collocation points and model evaluations. """ X = torch.tensor(self.training_points, dtype=torch.float32) Y = torch.tensor(self.model_evaluations, dtype=torch.float32) # Number of locations num_locations = Y.shape[1] // self.number_quantities # Iterate over each location for loc in range(num_locations): # Extract the columns corresponding to the current location Y_loc = Y[:, self.number_quantities * loc : self.number_quantities * (loc + 1)].numpy() # Normalize outputs y_mean = np.mean(Y_loc, axis=0) y_std = np.std(Y_loc, axis=0) Y_loc_norm = (Y_loc - y_mean) / y_std # Save normalization parameters self.normalization_params.append((y_mean, y_std)) # Convert normalized Y_loc to torch Y_loc_norm = torch.tensor(Y_loc_norm, dtype=torch.float32) # Initialize multitask GP model model = MultitaskGPModel(X, Y_loc_norm, self.likelihood, self.kernel,self.number_quantities) # Set model and likelihood to training mode model.train() self.likelihood.train() # Set optimizer if self.optimizer_ == "adam": optimizer = torch.optim.Adam(model.parameters(), lr=self.lr) else: raise ValueError(f"Optimizer '{self.optimizer_}' not supported.") # Set MLL objective mll = gpytorch.mlls.ExactMarginalLogLikelihood(self.likelihood, model) converged = False plateau_count = 0 # Counter for consecutive small changes in loss threshold = 1e-3 # Convergence threshold for loss required_plateau = 5 # Number of consecutive stable steps required for convergence # For length scale tracking and convergence check lengthscale_converged = False lengthscale_plateau_count = 0 lengthscale_threshold = 1e-2 # Convergence threshold for length scale # Training loop for _ in range(self.training_iter): optimizer.zero_grad() output = model(X) loss = -mll(output, Y_loc_norm) loss.backward() optimizer.step() self.losses.append(loss.item()) noise = self.likelihood.noise.detach().cpu().numpy() self.noise_values.append(noise) # Track lengthscale try: lengthscale = model.covar_module.data_covar_module.base_kernel.lengthscale.detach().cpu().numpy() except AttributeError: lengthscale = None self.lengthscales.append(lengthscale) # Loss convergence check if len(self.losses) > 1: loss_diff = abs(self.losses[-1] - self.losses[-2]) if loss_diff < threshold: plateau_count += 1 else: plateau_count = 0 if plateau_count >= required_plateau: converged = True # Length scale convergence check if lengthscale is not None and len(self.lengthscales) > 1: prev_ls = self.lengthscales[-2] curr_ls = self.lengthscales[-1] ls_diff = np.abs(curr_ls - prev_ls) if np.all(ls_diff < lengthscale_threshold): lengthscale_plateau_count += 1 else: lengthscale_plateau_count = 0 if lengthscale_plateau_count >= required_plateau: lengthscale_converged = True self.gp_list.append({'gp': model, 'y_norm': (y_mean, y_std)})
[docs] def train_tasks_locations(self): """ Train multitask Gaussian Process models using the provided collocation points and model evaluations. Trains the model for each output variable (water depth OR velocity) at all locations simultaneously. """ X = torch.tensor(self.training_points, dtype=torch.float32) Y = torch.tensor(self.model_evaluations, dtype=torch.float32) # Number of locations num_locations = Y.shape[1] // self.number_quantities # Total number of locations # Reorganize the tensor so that: # - The 0, 2, 4, 6, ... columns are for the first output (water depth) # - The 1, 3, 5, 7, ... columns are for the second output (velocity) Y_output_1 = Y[:, ::2] # First output: water depth (even columns: 0, 2, 4, ...) Y_output_2 = Y[:, 1::2] # Second output: velocity (odd columns: 1, 3, 5, ...) # Train the multitask GP model for the first output (water depth) for var in range(2): # 0: water depth, 1: velocity if var == 0: Y_output_var = Y_output_1 y_mean = torch.mean(Y_output_var, dim=0) y_std = torch.std(Y_output_var, dim=0) Y_output_var_norm = (Y_output_var - y_mean) / y_std self.normalization_params.append(('output_1', y_mean, y_std)) else: Y_output_var = Y_output_2 y_mean = torch.mean(Y_output_var, dim=0) y_std = torch.std(Y_output_var, dim=0) Y_output_var_norm = (Y_output_var - y_mean) / y_std self.normalization_params.append(('output_2', y_mean, y_std)) # Normalize and convert to torch tensor Y_output_var_norm = torch.tensor(Y_output_var_norm, dtype=torch.float32) # Ensure the output has the correct shape # Initialize multitask GP model for the entire set of locations (tasks) model = MultitaskGPModel(X, Y_output_var_norm, self.likelihood, self.kernel, number_tasks=num_locations) # num_tasks = num_locations # Set model and likelihood to training mode model.train() self.likelihood.train() # Set optimizer optimizer = torch.optim.Adam(model.parameters(), lr=self.lr) # Set MLL objective mll = gpytorch.mlls.ExactMarginalLogLikelihood(self.likelihood, model) # Training loop for the entire set of N locations (tasks) for this variable for _ in range(self.training_iter): optimizer.zero_grad() output = model(X) # Ensure the output shape matches expected for MLL loss = -mll(output, Y_output_var_norm) loss.backward() optimizer.step() # Store trained model for the current variable (water depth or velocity) self.gp_list.append({'gp': model, 'y_norm': (y_mean, y_std)})
[docs] def train_tasks_all(self): """ Train a single multitask Gaussian Process model using all outputs (water depth and velocity) at all locations simultaneously. """ X = torch.tensor(self.training_points, dtype=torch.float32) Y = torch.tensor(self.model_evaluations, dtype=torch.float32) # Number of locations num_locations = Y.shape[1] // self.number_quantities # Total number of locations # Normalize Y y_mean = torch.mean(Y, dim=0) # Compute mean per (location, quantity) y_std = torch.std(Y, dim=0) # Compute std per (location, quantity) Y_norm = (Y - y_mean) / y_std # Normalize Y # Save normalization parameters self.normalization_params = {'y_mean': y_mean, 'y_std': y_std} # Initialize multitask GP model model = MultitaskGPModel(X, Y_norm, self.likelihood, self.kernel, number_tasks=self.n_obs) # Set model and likelihood to training mode model.train() self.likelihood.train() # Set optimizer optimizer = torch.optim.Adam(model.parameters(), lr=self.lr) # Set MLL objective mll = gpytorch.mlls.ExactMarginalLogLikelihood(self.likelihood, model) # Training loop for _ in range(self.training_iter): optimizer.zero_grad() output = model(X) # Compute negative marginal likelihood loss loss = -mll(output, Y_norm) loss.backward() optimizer.step() # Store trained model self.gp_list = [{'gp': model, 'y_norm': (y_mean, y_std)}]
[docs] def predict_(self, input_sets, get_conf_int=False, multitask_cov=False): """ Predict the outputs and their standard deviations for given input sets using the trained GP models. Automatically selects the appropriate method based on the structure of self.gp_list. """ input_sets = torch.tensor(input_sets, dtype=torch.float32) n_samples = input_sets.shape[0] n_obs = self.n_obs n_locations = n_obs // self.number_quantities # Initialize storage for predictions surrogate_prediction = np.zeros((n_samples, n_obs)) surrogate_std = np.zeros((n_samples, n_obs)) if get_conf_int: upper_ci = np.zeros((n_samples, n_obs)) lower_ci = np.zeros((n_samples, n_obs)) if len(self.gp_list) == 1: model_info = self.gp_list[0] gp = model_info['gp'] y_mean, y_std = model_info['y_norm'] gp.eval() self.likelihood.eval() with torch.no_grad(): predictions = self.likelihood(gp(input_sets)) mean = predictions.mean # PyTorch tensor of shape (n_samples, n_obs) std = predictions.stddev # PyTorch tensor of shape (n_samples, n_obs) # Back-transform normalized predictions mean = y_std * mean + y_mean # Undo normalization std = std * y_std # Convert to NumPy arrays and store in the respective matrices surrogate_prediction[:] = mean.numpy() surrogate_std[:] = std.numpy() if get_conf_int: upper_ci[:] = surrogate_prediction + 2 * surrogate_std lower_ci[:] = surrogate_prediction - 2 * surrogate_std elif len(self.gp_list) == self.number_quantities: # Use logic from predict__ for m, model_info in enumerate(self.gp_list): gp = model_info['gp'] y_mean, y_std = model_info['y_norm'] gp.eval() self.likelihood.eval() with torch.no_grad(): predictions = self.likelihood(gp(input_sets)) mean = predictions.mean # This is a PyTorch tensor std = predictions.stddev # This is a PyTorch tensor # Back-transform normalized predictions (make sure everything is a tensor) mean = y_std * mean + y_mean # y_std and y_mean should be tensors already # If y_std is a NumPy array, convert it to a PyTorch tensor (assuming y_std and y_mean are NumPy arrays) if isinstance(y_std, np.ndarray): y_std = torch.tensor(y_std, dtype=torch.float32) if isinstance(y_mean, np.ndarray): y_mean = torch.tensor(y_mean, dtype=torch.float32) # Now, std is a PyTorch tensor std = std * y_std # Store predictions in even and odd columns based on the iteration for i in range(n_locations): # Assuming mean has shape (20, 42) if m == 0: surrogate_prediction[:, 2 * (i)] = mean[:, i].numpy() else: surrogate_prediction[:, 2 * (i) + 1] = mean[:, i].numpy() # Store std similarly for i in range(n_locations): if m == 0: surrogate_std[:, 2 * (i)] = std[:, i].numpy() else: surrogate_std[:, 2 * (i) + 1] = std[:, i].numpy() if get_conf_int: for i in range(n_locations): if m == 0: upper_ci[:, 2 * i] = mean[:, i].numpy() + 2 * std[:, i].numpy() lower_ci[:, 2 * i] = mean[:, i].numpy() - 2 * std[:, i].numpy() else: upper_ci[:, 2 * i + 1] = mean[:, i].numpy() + 2 * std[:, i].numpy() lower_ci[:, 2 * i + 1] = mean[:, i].numpy() - 2 * std[:, i].numpy() elif len(self.gp_list) == n_locations: multitask_cov_list = [[None for _ in range(n_locations)] for _ in range(n_samples)] for i, model_info in enumerate(self.gp_list): gp = model_info['gp'] y_mean, y_std = model_info['y_norm'] D = np.diag(y_std) gp.eval() self.likelihood.eval() with torch.no_grad(): predictions = self.likelihood(gp(input_sets)) mean = predictions.mean.numpy() std = predictions.stddev.numpy() # Back-transform normalized predictions mean = y_std * mean + y_mean std = std * y_std # Store predictions surrogate_prediction[:, i * self.number_quantities:(i + 1) * self.number_quantities] = mean surrogate_std[:, i * self.number_quantities:(i + 1) * self.number_quantities] = std if get_conf_int: upper_ci[:, i * self.number_quantities:(i + 1) * self.number_quantities] = mean + 2 * std lower_ci[:, i * self.number_quantities:(i + 1) * self.number_quantities] = mean - 2 * std if multitask_cov: # Now compute multitask covariance for all samples at this location task_covar_module = gp.covar_module.task_covar_module base_kernel = gp.covar_module.data_covar_module.base_kernel task_indices = torch.arange(self.number_quantities) for j in range(n_samples): x = input_sets[j].unsqueeze(0) # (1, n_inputs) input_kernel = base_kernel(x, x).evaluate().squeeze() task_kernel = task_covar_module(task_indices).evaluate() full_cov = (input_kernel * task_kernel).detach().cpu().numpy() # (Q, Q) # Back-transform covariance to original scale D = np.diag(y_std if isinstance(y_std, np.ndarray) else y_std.numpy()) full_cov_orig = D @ full_cov @ D multitask_cov_list[j][i] = full_cov_orig # j: sample, i: location else: raise ValueError( f"Mismatch in gp_list length ({len(self.gp_list)}). Expected {n_locations} (locations) or {self.number_quantities} (quantities)." ) # Prepare output dictionary if multitask_cov: output_dic = { 'output': surrogate_prediction, 'std': surrogate_std, 'multitask_cov': multitask_cov_list } else: output_dic = { 'output': surrogate_prediction, 'std': surrogate_std } if get_conf_int: output_dic['upper_ci'] = upper_ci output_dic['lower_ci'] = lower_ci return output_dic
[docs] class MultitaskGPModel(ExactGP): """ Gaussian Process model for multitask regression using the GPyTorch library. This model handles multiple tasks (or quantities) simultaneously by using a multitask kernel and multitask mean function. """
[docs] def __init__( self, train_x, train_y, likelihood, kernel, number_tasks ): """ :param train_x: torch.Tensor The input training data. A tensor of shape (n_samples, n_params) where `n_samples` is the number of samples and `n_params` is the number of input model parameters. :param train_y: torch.Tensor The output training data. A tensor of shape (n_samples, n_tasks) where `n_samples` is the number of samples and `n_tasks` is the number of tasks or quantities. The output is typically organized so that each column corresponds to a different task. :param likelihood: gpytorch.likelihoods.MultitaskGaussianLikelihood A multitask likelihood function used with the GP model. :param kernel: tuple(gpytorch.kernels.Kernel, gpytorch.kernels.Kernel) A tuple of kernel components to be used in the GP model. The tuple should contain two kernel components. """ super(MultitaskGPModel, self).__init__(train_x, train_y, likelihood) self.mean_module = MultitaskMean(ConstantMean(), num_tasks=number_tasks) self.covar_module = MultitaskKernel( kernel, num_tasks=number_tasks, rank=2 )
[docs] def forward(self, x): """ Computes the mean and covariance of the Gaussian Process given the input data `x`. :param x: torch.Tensor A tensor containing the training data for the model. :return: gpytorch.distributions.MultitaskMultivariateNormal """ mean_x = self.mean_module(x) covar_x = self.covar_module(x) return gpytorch.distributions.MultitaskMultivariateNormal(mean_x, covar_x)