Source code for hydroBayesCal.telemac.control_telemac

# coding: utf-8
"""
Functional core for controlling Telemac simulations for coupling with the Surrogate-Assisted Bayesian inversion technique.
Authors: Andres Heredia, Sebastian Schwindt
"""
from scipy import spatial
import numpy as np
from hydroBayesCal.telemac import config_telemac
from datetime import datetime
from hydroBayesCal.telemac.pputils.ppmodules.selafin_io_pp import ppSELAFIN

from collections import OrderedDict
from hydroBayesCal.hysim import HydroSimulations
from hydroBayesCal.function_pool import *  # provides os, subprocess, logging


[docs] class TelemacModel(HydroSimulations):
[docs] def __init__( self, friction_file="", tm_xd="Telemac2d", gaia_steering_file=None, fortran_file=None, results_filename_base="", gaia_results_filename_base=None, stdout=6, python_shebang="#!/usr/bin/env python3", *args, **kwargs ): """ Constructor for the TelemacModel Class. The class contains all necessary methods for Telemac simulations,extractions of simulation outputs and iterative updating of the control files. Parameters ---------- friction_file : str, optional Name of the friction file to be used in Telemac simulations (should end with ".tbl"); do not include the directory path. tm_xd : str, Specifies the dimension of the Telemac hydrodynamic solver, either 'Telemac2d' or 'Telemac3d'. gaia_steering_file : str, optional Name of the Gaia steering file; should be provided if required. Not implemented on this HydroBayesCal version. results_filename_base : str, optional Base name for the results file, which will be iteratively updated in the .cas file. python_shebang : str, optional Shebang line for Python scripts (default is "#!/usr/bin/env python3"). *args : tuple Additional positional arguments. **kwargs : dict Additional keyword arguments. Attributes ---------- friction_file : str Name of the Telemac friction file .tbl. tm_xd : str Dimension of the Telemac simulation ('Telemac2d' or 'Telemac3d'). gaia_steering_file : str or None Gaia steering file name if provided; otherwise, None. Not implemented on this HydroBayesCal Version. results_filename_base : str Base name for the Telemac results file. python_shebang : str Shebang line for Python scripts. tm_cas : str Full path to the Telemac steering file (.cas). fr_tbl : str Full path to the friction file (.tbl). comm : MPI.Comm MPI communicator for parallel processing. shebang : str Shebang line for Python scripts. tm_xd_dict : dict Dictionary mapping 'Telemac2d' and 'Telemac3d' to their respective script names. bal_iteration : int Bayesian Active Learning iteration number based on max_runs. num_run : int Simulation number; iteratively updated based on collocation points. tm_results_filename : str File path for storing output data from Telemac simulations. Note ---- The attributes specific to Telemac are listed above. For attributes inherited from the `HydroSimulations` class, please refer to its documentation. """ super().__init__(*args, **kwargs) # Initialize subclass-specific attributes self.friction_file = friction_file self.tm_xd = tm_xd self.gaia_steering_file = gaia_steering_file self.gaia_results_filename_base = gaia_results_filename_base self.fortran_file = fortran_file self.results_filename_base = results_filename_base self.python_shebang = python_shebang self.tm_cas = "{}{}{}".format(self.model_dir, os.sep, self.control_file) self.fr_tbl = "{}{}{}".format(self.model_dir, os.sep, self.friction_file) if self.gaia_steering_file is not None: self.gaia_cas = "{}{}{}".format(self.model_dir, os.sep, self.gaia_steering_file) else: self.gaia_cas = None if self.fortran_file is not None: self.fortran_file = os.path.join(self.model_dir, "user_fortran", self.fortran_file) else: self.fortran_file = None self.shebang = python_shebang self.tm_xd_dict = { "Telemac2d": "telemac2d.py ", "Telemac3d": "telemac3d.py ", } self.stdout = stdout # Initializes the BAL iteration number self.bal_iteration = int() # Initializes the simulation number self.num_run = int() # Initializes the file where the output data from simulations will be stored self.tm_results_filename = '' self.gaia_results_filename = ''
[docs] def update_model_controls( self, collocation_point_values, calibration_parameters, auxiliary_file_path=None, gaia_file_path=None, simulation_id=0, ): """ Modifies the .cas steering file for each of the Telemac runs according to the values of the collocation points and the calibration parameters. If a "FRICTION DATA FILE" is provided for Telemac simulations, it is possible to consider any zone as a calibration parameter. The parameters must start with the prefix "zone" and the number of the friction zone. The .tbl file will be modified for this purpose. This method is called every time it is required that the .cas or .tbl files are modified. It also modifies the gaia cas file. If the parameter starts with the prefix "gaia", the method will look for the parameter in the gaia cas file and update it with the new value. If the parameter starts with "f.", the method will look for it in the fortran file and update it with the new value. The rest of the parameters will be updated in the telemac cas file. Parameters ---------- collocation_point_values : list Values for each of the calibration parameters. calibration_parameters : list Names of the calibration parameters. auxiliary_file_path : str, optional Path to the friction file (.tbl). gaia_file_path : str, optional Path to the GAIA steering file (.cas). If provided, GAIA calibration parameters will also be updated. simulation_id : int, optional Identifier of the current simulation. Used when generating or updating control files for multiple simulations. Default is 0. Returns ------- None Modified control files (telemac.cas, gaia.cas, fortran file, and/or friction .tbl) for Telemac simulations. """ # ============================================================ # TELEMAC RESULT FILE NAMES # ============================================================ self.tm_results_filename = ( self.results_filename_base + '_' + str(simulation_id) + '.slf' ) if self.tm_xd == 'Telemac3d': # Main 3D result file tm_result_keys = ['3D RESULT FILE'] # Additional 2D result file generated by TELEMAC-3D # Example: # results_3d.slf -> results_3d_2d.slf self.tm_2d_results_filename_from_3d = self.tm_results_filename.replace( '.slf', '_2d.slf' ) tm_result_keys.append('2D RESULT FILE') tm_result_values = [ self.tm_results_filename, self.tm_2d_results_filename_from_3d ] else: # Standard TELEMAC-2D result file tm_result_keys = ['RESULTS FILE'] tm_result_values = [self.tm_results_filename] # ============================================================ # GAIA RESULT FILE NAME # ============================================================ if self.gaia_cas is not None: self.gaia_results_filename = ( self.gaia_results_filename_base + '_' + str(simulation_id) + '.slf' ) params_with_results = ( calibration_parameters + tm_result_keys + ['GAIA RESULTS FILE'] ) values_with_results = [ round(val, 7) if isinstance(val, float) else val for val in ( collocation_point_values + tm_result_values + [self.gaia_results_filename] ) ] else: params_with_results = calibration_parameters + tm_result_keys values_with_results = [ round(val, 7) if isinstance(val, float) else val for val in collocation_point_values + tm_result_values ] logger.info(f'Results file name for this simulation: {self.tm_results_filename}') if self.tm_xd == 'Telemac3d': logger.info( f'2D results file name for this 3D simulation: ' f'{self.tm_2d_results_filename_from_3d}' ) friction_file_path = auxiliary_file_path try: for param, value in zip(params_with_results, values_with_results): if param.lower().startswith("zone"): zone_identifier = param[4:] self.tbl_creator(zone_identifier, value, friction_file_path) elif param.lower().startswith("vg_zone"): # Extract the zone identifier and vegetation parameter number parts = param.split("-") # Assuming the format is like "vg_zoneXX_X" if len(parts) == 2: zone_identifier = parts[0][7:] # Extracts the number after 'vg_zone' veg_param_value = parts[1] # Extracts the vegetation parameter number self.tbl_creator( zone_identifier, value, friction_file_path, veg_param_number=veg_param_value, veg_indicator=True ) elif param.lower().startswith("gaia"): gaia_param_name = param[4:].strip() parts_gaia_name = gaia_param_name.rsplit(" ", 1) # Check if the last part is a number if parts_gaia_name[-1].isdigit(): gaia_parameter_name = parts_gaia_name[0].strip() class_number = int(parts_gaia_name[1]) self.gaia_string_for_classes = parse_classes_keyword( self.gaia_cas, gaia_parameter_name ) gaia_string = update_gaia_class_line( self.gaia_string_for_classes, class_number - 1, value ) self.rewrite_steering_file( gaia_parameter_name, gaia_string, steering_module="gaia" ) else: gaia_string = self.create_cas_string(gaia_param_name, value) self.rewrite_steering_file( gaia_param_name, gaia_string, steering_module="gaia" ) elif param.lower().startswith("f."): fortran_param_name = param[2:].strip() fortran_string = " " + self.create_cas_string( fortran_param_name, value ) self.rewrite_steering_file( fortran_param_name, fortran_string, steering_module="fortran" ) else: cas_string = self.create_cas_string(param, value) self.rewrite_steering_file( param, cas_string, steering_module="telemac" ) except Exception as e: logger_error.error(f'Error occurred during CAS creation: {e}') raise RuntimeError
[docs] @staticmethod def create_cas_string( param_name, value ): """ Create string names with new values to be used in Telemac2d / Gaia steering files Parameters ---------- param_name: string Name of parameter to update value: int , float or string Value to be assigned to param_name Returns ------- None Update parameter line for a steering file """ if isinstance(value, (int, float, str)) or ':' in value: return param_name + " = " + str(value) else: try: return param_name + " = " + "; ".join(map(str, value)) except Exception as error: logger_error.error( "ERROR: could not generate cas-file string for {0} and value {1}:\n{2}".format(str(param_name), str(value), str(error)))
[docs] def rewrite_steering_file( self, param_name, updated_string, steering_module="telemac" ): """Rewrite the ``.cas`` steering file with updated parameters. Parameters ---------- param_name : str Name of the calibration parameter. updated_string : str Updated string written into the ``.cas`` file with the new value. steering_module : str, optional Steering module to rewrite: ``"telemac"`` (default) or ``"gaia"``. Returns ------- int ``0`` on success, ``-1`` on error. """ # check if telemac or gaia cas type if "telemac" in steering_module: steering_file_name = self.tm_cas elif "gaia" in steering_module: steering_file_name = self.gaia_cas elif "fortran" in steering_module: steering_file_name = self.fortran_file # save the variable of interest without unwanted spaces variable_interest = param_name.rstrip().lstrip() # open steering file with read permission and save a temporary copy if os.path.isfile(steering_file_name): cas_file = open(steering_file_name, "r") else: logger_error.error("ERROR: no such steering file:\n" + steering_file_name) return -1 read_steering = cas_file.readlines() # if the updated_string has more than 72 characters, then divide it into two if len(updated_string) >= 72: position = updated_string.find("=") + 1 updated_string = updated_string[:position].rstrip().lstrip() + "\n" + updated_string[ position:].rstrip().lstrip() # preprocess the steering file # if in a previous case, a line had more than 72 characters then it was split into 2 # this loop cleans up all lines that start with a number temp = [] for i, line in enumerate(read_steering): if not isinstance(line[0], int): temp.append(line) else: previous_line = read_steering[i - 1].split("=")[0].rstrip().lstrip() if previous_line != variable_interest: temp.append(line) # loop through all lines of the temp cas file, until it finds the line with the parameter of interest # and substitute it with the new formatted line for i, line in enumerate(temp): line_value = line.split("=")[0].rstrip().lstrip() if line_value == variable_interest: temp[i] = updated_string + "\n" # rewrite and close the steering file cas_file = open(steering_file_name, "w") cas_file.writelines(temp) cas_file.close() return 0
[docs] def run_single_simulation( self, control_file="tel.cas", ): """ Runs a Telemac2D or Telemac3D simulation with one or more processors. The number of processors to use is defined by self.nproc. Parameters ---------- control_file : str The name of the control file used to launch the simulation. Default is "tel.cas". This file should be located in the model directory. Returns ------- None The method executes the model run using a launcher command. """ logger.info("Running full complexity model " + str(self.num_run)) start_time = datetime.now() if self.nproc <= 1: logger.info("* Sequential run (single processor)") else: logger.info(f"* Parallel run on {self.nproc} processors") # Determine the launcher command for Telemac if self.tm_xd in self.tm_xd_dict: tm_launcher = self.tm_xd_dict[self.tm_xd] telemac_command = f"{tm_launcher} {os.path.join(self.model_dir, control_file)} --ncsize={self.nproc}" else: raise ValueError(f"Invalid launcher: {self.tm_xd}. Expected one of {list(self.tm_xd_dict.keys())}.") logger.info(f"Executing control file: {control_file}") try: # Run the simulation command process = subprocess.Popen(telemac_command, cwd=self.model_dir, shell=True, env=os.environ) stdout, stderr = process.communicate() if process.returncode != 0: logger.error(f"Simulation failed with error: {stderr}") else: logger.info("Simulation completed successfully.") except Exception as e: logger.error(f"An error occurred while running the simulation: {e}") logger.info("TELEMAC simulation time: " + str(datetime.now() - start_time))
[docs] def run_multiple_simulations( self, collocation_points=None, bal_new_set_parameters=None, bal_iteration=int(), complete_bal_mode=True, output_extraction="interpolated", output_extraction_time="last", n=40, validation=False, kill_process = True ): """ Runs multiple Telemac2d or Telemac3d simulations with a set of collocation points and a new set of calibration parameters when BAL mode is chosen. The number of processors to use is defined by self.nproc in user_inputs. Parameters ---------- collocation_points : array Numpy array of shape [No. init_runs x No. calibration parameters] which contains the initial collocation points (parameter combinations) for iterative Telemac runs. Default is None, and it is filled with values for the initial surrogate model phase. It remains None during the BAL phase. bal_new_set_parameters : array 2D array of shape [1 x No. parameters] containing the new set of values after each BAL iteration. bal_iteration : int The number of the BAL iteration. Default is 0. complete_bal_mode : bool Default is True when the code accounts for initial runs, surrogate construction and BAL phase. False when only initial runs are required. validation : bool If `True`, the method runs a separate set of simulations for validation purposes. output_extraction : str The mode for extracting model outputs. Options are "nearest", "index" or "interpolated". output_extraction_time : str The time mode for extracting model outputs. Options are "last", "index", or "mean_last". n : int The number of last time steps to consider when `output_extraction_time` is set to "mean_last". Default is 40. validation: bool If `True`, the method runs a separate set of simulations for validation purposes, and saves the collocation points used for validation in a separate CSV file. kill_process: bool If `True`, the method will attempt to kill any remaining Telemac processes after running the simulations. This is useful when preventing to running BAL after the initial runs. Returns ------- model_evaluations:array 2D array containing processed model outputs. Shape: `[num_runs, nloc * num_calibration_quantities]`, where: - `num_runs` is the total number of model evaluations, including both initial runs and Bayesian Active Learning iterations. - `nloc * num_calibration_quantities` represents the total number of outputs, with results interleaved in columns. Example: For two calibration quantities and two calibration locations: - Columns 1 and 2 correspond to the outputs (2 quantities) of the first calibration location. - Columns 3 and 4 correspond to the outputs of the second location, and so on. """ calibration_parameters = self.calibration_parameters res_dir = self.calibration_folder restart_data_path = self.restart_data_folder fr_tbl = self.fr_tbl init_runs = self.init_runs logger.info( "* Running multiple Telemac simulations can take time -- check CPU acitivity...") start_time = datetime.now() if complete_bal_mode: # This part of the code runs the initial runs for initial surrogate. if bal_new_set_parameters is None: # Convert collocation_points to a numpy array if it is not already if not isinstance(collocation_points, np.ndarray): collocation_points = np.array(collocation_points) # Ensure collocation_points is a 2D array if collocation_points.ndim == 1: collocation_points = collocation_points[:, np.newaxis] # Convert collocation_points to a list for saving to CSV array_list = collocation_points.tolist() if validation: with open(res_dir + os.sep + "/collocation-points-validation.csv", mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) # Write the array data else: quantities_str = '_'.join(self.calibration_quantities) with open(res_dir + os.sep + f"collocation-points-{quantities_str}.csv", mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) # Write the array data with open(restart_data_path + os.sep + "initial-collocation-points.csv", mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) # Write the array data collocation_points=collocation_points for i in range(init_runs): self.num_run = i + 1 collocation_point_sim_list = collocation_points[i].tolist() logger.info( f" Running full complexity model # {self.num_run} with collocation point : {collocation_point_sim_list} ") self.update_model_controls(collocation_point_values=collocation_point_sim_list, calibration_parameters=calibration_parameters, auxiliary_file_path=fr_tbl, simulation_id=self.num_run) self.run_single_simulation(self.control_file) self.extract_data_point(self.tm_results_filename, self.calibration_pts_df, self.dict_output_name, self.extraction_quantities, self.num_run, self.model_dir, res_dir,output_extraction=output_extraction,output_extraction_time=output_extraction_time,n=n,compute_wall_law_diagnostics=False) self.model_evaluations = self.output_processing(output_data_path=os.path.join(res_dir, f'{self.dict_output_name}-detailed.json'), delete_slf_files=self.delete_complex_outputs, validation=validation, save_extraction_outputs=True) if self.num_run == self.init_runs: self.model_evaluations = self.output_processing(output_data_path=os.path.join(res_dir, f'{self.dict_output_name}-detailed.json'), delete_slf_files=self.delete_complex_outputs, validation=validation, filter_outputs=True, save_extraction_outputs=True, run_range_filtering=(1, init_runs + 1)) logger.info("TELEMAC simulations time for initial runs: " + str(datetime.now() - start_time)) # This part of the code runs BAL else: # Convert collocation_points to a numpy array if it is not already if not isinstance(collocation_points, np.ndarray): collocation_points = np.array(collocation_points) # Ensure collocation_points is a 2D array if collocation_points.ndim == 1: collocation_points = collocation_points[:, np.newaxis] self.bal_iteration = bal_iteration self.num_run = bal_iteration + init_runs new_collocation_point=bal_new_set_parameters updated_collocation_points = np.vstack((collocation_points, new_collocation_point)) collocation_point_sim_list = updated_collocation_points[-1].tolist() array_list = updated_collocation_points.tolist() logger.info( f" Running full complexity model after BAL # {self.bal_iteration} with collocation point : {collocation_point_sim_list} ") if validation: update_collocation_pts_file(res_dir + "/collocation-points-validation.csv", new_collocation_point=array_list) else: quantities_str = '_'.join(self.calibration_quantities) with open(res_dir + os.sep + f"collocation-points-{quantities_str}.csv", mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) self.update_model_controls(collocation_point_values=collocation_point_sim_list, calibration_parameters=calibration_parameters, auxiliary_file_path=fr_tbl, simulation_id=self.num_run) self.run_single_simulation(self.control_file) output_name_calibration = f'{self.dict_output_name}{"_".join(self.calibration_quantities)}' self.extract_data_point(self.tm_results_filename, self.calibration_pts_df, self.dict_output_name, self.extraction_quantities, self.num_run, self.model_dir, res_dir,output_extraction=output_extraction,output_extraction_time=output_extraction_time,n=n) # In this first output processing, ALL the extraction quantities are saved as .csv file in the calibration folder. self.output_processing(output_data_path=os.path.join(res_dir,f'{self.dict_output_name}-detailed.json'), delete_slf_files=self.delete_complex_outputs, validation=validation, save_extraction_outputs=True, # When True, it saves ALL model outputs of ALL extraction quantities at ALL points as a .csv file extraction_mode=True) # This part extracts the calibration quantities from the detailed dictionary as a numpy array for BAL and creates a new filtered dictionary. self.model_evaluations = self.output_processing(output_data_path=os.path.join(res_dir, f'{self.dict_output_name}-detailed.json'), delete_slf_files=self.delete_complex_outputs, validation=validation, filter_outputs=True, save_extraction_outputs=False, run_range_filtering=(1, init_runs +bal_iteration)) quantities_str = '_'.join(self.calibration_quantities) self.output_processing(output_data_path=os.path.join(res_dir,f'{quantities_str}-detailed.json'), delete_slf_files=self.delete_complex_outputs, validation=validation, save_extraction_outputs=False, calibration_mode=True) logger.info("TELEMAC simulations time for initial runs: " + str(datetime.now() - start_time)) # This part of the code only runs iterative runs without performing BAL else: if collocation_points is not None: # Ensure collocation_points is a 2D NumPy array if not isinstance(collocation_points, np.ndarray): collocation_points = np.array(collocation_points) if collocation_points.ndim == 1: if collocation_points.size == 1: collocation_points = collocation_points[:, np.newaxis] else: collocation_points = collocation_points.reshape(1, -1) # Convert to list for CSV writing array_list = collocation_points.tolist() if validation: # Validation case — always write validation CSV with open(os.path.join(restart_data_path, "collocation-points-validation.csv"), mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) else: quantities_str = '_'.join(self.calibration_quantities) if self.user_collocation_points is not None: # USER case — create a separate CSV for user collocation points self.calibration_folder = self.restart_data_folder self.dict_output_name = "user-" + self.dict_output_name user_csv_path = os.path.join( res_dir, f"user-collocation-points-{quantities_str}.csv" ) with open(user_csv_path, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) # Do NOT create or overwrite the default collocation-points.csv here else: # DEFAULT case — create standard collocation CSVs default_csv_path = os.path.join( res_dir, f"collocation-points-{quantities_str}.csv" ) with open(default_csv_path, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) # Also create the initial-collocation-points.csv init_csv_path = os.path.join( restart_data_path, "initial-collocation-points.csv" ) with open(init_csv_path, mode='w', newline='') as file: writer = csv.writer(file) writer.writerow(calibration_parameters) writer.writerows(array_list) # Keep reference if needed later collocation_points = collocation_points for i in range(init_runs): self.num_run = i + 1 collocation_point_sim_list = collocation_points[i].tolist() logger.info( f" Running full complexity model # {self.num_run} with collocation point : {collocation_point_sim_list} ") self.update_model_controls(collocation_point_values=collocation_point_sim_list, calibration_parameters=calibration_parameters, auxiliary_file_path=fr_tbl, simulation_id=self.num_run) self.run_single_simulation(self.control_file) self.extract_data_point(self.tm_results_filename, self.calibration_pts_df, self.dict_output_name, self.extraction_quantities, self.num_run, self.model_dir, self.calibration_folder, self.validation, self.user_param_values, output_extraction=output_extraction, output_extraction_time=output_extraction_time, n=n ) if validation: output_data_path = os.path.join(restart_data_path, 'model-results-validation.json') else: output_data_path = os.path.join(self.calibration_folder, f'{self.dict_output_name}-detailed.json') self.model_evaluations = self.output_processing(output_data_path=output_data_path, delete_slf_files=self.delete_complex_outputs, validation=validation, save_extraction_outputs=True, # This option True saves ALL model outputs of ALL required quantities at ALL points as a .csv file calibration_mode=True, extraction_mode = True, #This option True extracts the data from the dictionary and populates the array with the values from the dictionary for the parameters in extraction_quantities ) logger.info("TELEMAC simulations time for initial runs: " + str(datetime.now() - start_time)) if kill_process: return self.model_evaluations return self.model_evaluations
[docs] def output_processing( self, output_data_path='', calibration_quantities='', delete_slf_files=False, validation=False, save_extraction_outputs = False, filter_outputs=False, run_range_filtering=None, extraction_mode = False, calibration_mode = False, ): """ Processes model output data from a JSON file into a 2D array format for Bayesian calibration and saves the results to a CSV file. This method reads a JSON file specified by `output_data_path`, extracts and processes the model outputs, and saves them in a CSV file format suitable for Bayesian calibration. Parameters ---------- output_data_path : str Path to the file (.json) containing the model outputs. The file should be structured such that its keys correspond to calibration points, and its values are lists of nested dictionaries having the output values for each run and quantity/ies. delete_complex_outputs: Boolean, Default: False Delete complex model output files from the results folder (e.g. auto-saved-results-HydroBayesCal/<variable>). Recommended when running several simulations of the full complexity model. validation: Boolean, Default: False If True, new files for collocation points and model results are created. This is done to keep the collocation points and model results obtained during the calibration process. Returns ------- model_results : numpy.ndarray A 2D array containing the processed model outputs. The shape of the array is [number of total runs, number of calibration points x number of quantities], where 'number of quantities' represents the calibration quantities processed, and 'number of total runs' is the sum of initial runs and Bayesian active learning iterations. The columns are intercalated to store the quantities outputs. This array is also saved to a CSV file in the specified directory. """ with open(output_data_path, "r") as file: output_data = json.load(file) # Get quantities from the first entry dynamically extraction_quantities = self.extraction_quantities # Extract quantities from the first run of the first calibration point calibration_quantities = self.calibration_quantities n_calibration_pts = self.nloc n_total_runs = self.init_runs + self.bal_iteration # Number of quantities per location for calibration purposes num_quantities_calibration = len(calibration_quantities) num_quantities_extraction = len(extraction_quantities) # Initialize a 2D NumPy array with zeros model_results_calibration = np.zeros((num_quantities_calibration * n_total_runs, n_calibration_pts)) model_results_extraction = np.zeros((num_quantities_extraction * n_total_runs, n_calibration_pts)) # Filtering if filter_outputs: quantities_str = '_'.join(calibration_quantities) filtered_output_data_extraction = filter_model_outputs(data_dict=output_data, quantities=extraction_quantities, run_range_filtering=run_range_filtering) filtered_output_data_calibration = filter_model_outputs(data_dict=output_data, quantities=calibration_quantities, run_range_filtering=run_range_filtering) if validation: pass else: with open(os.path.join(self.calibration_folder,f'{self.dict_output_name}-detailed.json'), 'w') as json_file: json.dump(filtered_output_data_extraction, json_file, indent=4) with open(os.path.join(self.calibration_folder, f'{quantities_str}-detailed.json'), 'w') as json_file: json.dump(filtered_output_data_calibration, json_file, indent=4) for i, (key, values) in enumerate(filtered_output_data_extraction.items()): # Iterate over calibration points for j, value_set in enumerate(values): # Iterate over runs for k, quantity in enumerate(calibration_quantities): # Calibrate quantities if quantity in value_set: model_results_calibration[k * n_total_runs + j, i] = value_set[quantity] else: raise ValueError(f"Quantity '{quantity}' not found in data for calibration point '{key}'.") model_results_calibration = rearrange_array(model_results_calibration, num_quantities_calibration) model_results_calibration=model_results_calibration[~np.all(model_results_calibration == 0, axis=1)] else: if extraction_mode: # This mode processes (extracts data from dictionary and populate the array with the values from the dictionary ) # the detailed dictionary data for ALL model parameters (extraction parameters). for i, (key, values) in enumerate(output_data.items()): # Iterate over calibration points for j, value_set in enumerate(values): # Iterate over runs for k, quantity in enumerate(extraction_quantities): # Extract quantities model_results_extraction[k * n_total_runs + j, i] = value_set[quantity] if calibration_mode: # This mode processes (extracts data from dictionary and transforms into numpy arrays ) # the detailed dictionary data for calibration parameters. for i, (key, values) in enumerate(output_data.items()): for j, value_set in enumerate(values): # Iterate over runs for k, quantity in enumerate(calibration_quantities): # Calibrate quantities if quantity in value_set: model_results_calibration[k * n_total_runs + j, i] = value_set[quantity] else: raise ValueError(f"Quantity '{quantity}' not found in data for calibration point '{key}'.") if not extraction_mode and not calibration_mode: # This mode processes (extracts data from dictionary and transforms into numpy arrays ) # the detailed dictionary data for the calibration parameters and ALL model outputs. for i, (key, values) in enumerate(output_data.items()): # Iterate over calibration points for j, value_set in enumerate(values): # Iterate over runs for k, quantity in enumerate(extraction_quantities): # Extract quantities model_results_extraction[k * n_total_runs + j, i] = value_set[quantity] for j, value_set in enumerate(values): # Iterate over runs for k, quantity in enumerate(calibration_quantities): # Calibrate quantities if quantity in value_set: model_results_calibration[k * n_total_runs + j, i] = value_set[quantity] if num_quantities_calibration == 1: column_headers_calibration = [] for i in range(1, n_calibration_pts + 1): # Calibration point indices for quantity in calibration_quantities: column_headers_calibration.append(f'PT{i}_{quantity}') if validation: np.savetxt( os.path.join(self.restart_data_folder, 'model-results-validation.csv'), model_results_calibration, delimiter=',', fmt='%.8f', header=','.join(column_headers_calibration), ) else: quantities_str = '_'.join(calibration_quantities) if self.user_param_values: extraction_csv_file_name = f"user-model-results-calibration-{quantities_str}.csv" else: extraction_csv_file_name = f"model-results-calibration-{quantities_str}.csv" np.savetxt( os.path.join(self.calibration_folder, extraction_csv_file_name), model_results_calibration, delimiter=',', fmt='%.8f', header=','.join(column_headers_calibration), ) if save_extraction_outputs and extraction_mode: model_results_extraction = rearrange_array(model_results_extraction, num_quantities_extraction) if self.user_param_values: extraction_csv_file_name = 'user-model-results-extraction.csv' else: extraction_csv_file_name = 'model-results-extraction.csv' column_headers_extraction = [] for i in range(1, n_calibration_pts + 1): # Calibration point indices for quantity in extraction_quantities: column_headers_extraction.append(f'PT{i}_{quantity}') np.savetxt( os.path.join(self.calibration_folder, extraction_csv_file_name), model_results_extraction, delimiter=',', fmt='%.8f', header=','.join(column_headers_extraction), ) else: model_results_extraction = rearrange_array(model_results_extraction,num_quantities_extraction) model_results_calibration = rearrange_array(model_results_calibration,num_quantities_calibration) num_columns_calibration = model_results_calibration.shape[1] num_columns_extraction = model_results_extraction.shape[1] column_headers_calibration = [] column_headers_extraction = [] for i in range(1, num_columns_calibration // num_quantities_calibration + 1): # Loop through each set of columns for quantity in calibration_quantities: # Loop through each calibration parameter column_headers_calibration.append(f'{i}_{quantity}') for i in range(1, num_columns_extraction // num_quantities_extraction + 1): # Loop through each set of columns for quantity in extraction_quantities: # Loop through each calibration parameter column_headers_extraction.append(f'{i}_{quantity}') if validation: np.savetxt( os.path.join(self.restart_data_folder, 'model-results-validation.csv'), model_results_extraction, delimiter=',', fmt='%.8f', header=','.join(column_headers_extraction), ) else: quantities_str = '_'.join(calibration_quantities) if self.user_param_values: extraction_csv_file_name = f"user-model-results-calibration-{quantities_str}.csv" else: extraction_csv_file_name = f"model-results-calibration-{quantities_str}.csv" np.savetxt( os.path.join(self.calibration_folder, extraction_csv_file_name), model_results_calibration, delimiter=',', fmt='%.8f', header=','.join(column_headers_calibration), ) if save_extraction_outputs and extraction_mode: if validation: pass else: if self.user_param_values: extraction_csv_file_name = 'user-model-results-extraction.csv' else: extraction_csv_file_name = 'model-results-extraction.csv' column_headers_extraction = [] for i in range(1, n_calibration_pts + 1): # Calibration point indices for quantity in extraction_quantities: column_headers_extraction.append(f'PT{i}_{quantity}') np.savetxt( os.path.join(self.calibration_folder, extraction_csv_file_name), model_results_extraction, delimiter=',', fmt='%.8f', header=','.join(column_headers_extraction), ) if delete_slf_files: delete_slf(self.calibration_folder) return model_results_calibration
[docs] def extract_data_point( self, input_file, calibration_pts_df, output_name, extraction_quantity, simulation_number, model_directory, results_folder_directory, validation=False, user_param_values=False, output_extraction="interpolated", # "nearest": nearest node; "interpolated": IDW interpolation k=3, output_extraction_time="last", # "last", "index", "mean_last" time_index=0, n=5, compute_wall_law_diagnostics=False ): """ Extract model results at specified calibration or validation points from TELEMAC and/or GAIA SELAFIN result files. The method supports extraction of scalar variables, vertical layer selection based on measurement height, inverse-distance interpolation, and optional wall-law diagnostics. Extracted values are written to JSON files and result files are moved to the designated results directory. Parameters ---------- input_file : str Name of the TELEMAC result file (.slf) to extract data from. calibration_pts_df : pandas.DataFrame DataFrame containing extraction locations. The first column must contain point identifiers. The following columns are expected to be: - column 1: x-coordinate - column 2: y-coordinate - column 3: vertical measurement offset (z) output_name : str Base name used for generated JSON output files. extraction_quantity : list of str Quantities to extract from the model results. Variables may originate from TELEMAC or GAIA according to the configuration mapping ``classification_tm_gaia_dict``. simulation_number : int Current simulation number within the calibration workflow. model_directory : str Directory containing TELEMAC and GAIA result files. results_folder_directory : str Directory where extracted results and moved result files are stored. validation : bool, optional If True, extracted values are treated as validation results and are written to validation-specific JSON files. Default is False. user_param_values : bool, optional Flag controlling restart-data generation. Default is False. output_extraction : {"nearest", "interpolated"}, optional Spatial extraction method. - ``"nearest"``: use the closest model node. - ``"interpolated"``: perform inverse-distance-weighted interpolation using the k nearest nodes. Default is ``"interpolated"``. k : int, optional Number of nearest nodes used for interpolation when ``output_extraction="interpolated"``. Ignored when using nearest-node extraction. Default is 3. output_extraction_time : {"last", "index", "mean_last"}, optional Temporal aggregation mode applied to the extracted time series. - ``"last"``: use the final time step. - ``"index"``: use the time step specified by ``time_index``. - ``"mean_last"``: average the last ``n`` time steps. Default is ``"last"``. time_index : int, optional Time-step index used when ``output_extraction_time="index"``. Default is 0. n : int, optional Number of final time steps used when ``output_extraction_time="mean_last"``. Default is 5. compute_wall_law_diagnostics : bool, optional If True, compute wall-law diagnostic quantities from TELEMAC 3D results and the generated 2D result file. Diagnostics include friction velocity, y-plus values, bottom friction parameters, near-bed velocity information, and the complete modeled vertical velocity profile. Default is False. Returns ------- None Results are written to JSON files and model result files are moved to the results directory. Notes ----- - If ``"3D VELOCITY MAGNITUDE"`` is requested, it is computed from ``VELOCITY U``, ``VELOCITY V``, and ``VELOCITY W``. - For 3D simulations, the vertical layer closest to the measurement elevation is automatically selected using ``ELEVATION Z``. - Wall-law diagnostics require at least two vertical planes (``NPLAN >= 2``). """ self.tm_results_filename = input_file classification_tm_gaia_dict = config_telemac.classification_tm_gaia_dict # ============================================================ # WALL-LAW CONSTANTS FROM CONFIG # ============================================================ kappa = config_telemac.von_Karman_constant nikuradse_log_factor = config_telemac.nikuradse_log_factor kinematic_viscosity = config_telemac.kinematic_viscosity_water bottom_friction_2d_variable_candidates = ( config_telemac.slf_2d_variables_from_3d ) # ============================================================ # DERIVED TELEMAC QUANTITY: 3D VELOCITY MAGNITUDE # ============================================================ velocity_magnitude_quantity = "3D VELOCITY MAGNITUDE" velocity_components = [ "VELOCITY U", "VELOCITY V", "VELOCITY W" ] compute_3d_velocity_magnitude = ( velocity_magnitude_quantity in extraction_quantity ) # ============================================================ # WALL-LAW DIAGNOSTIC QUANTITIES # ============================================================ # Since the modeled vertical profile is diagnostic information, # it is also excluded from the normal calibration-output dictionary. wall_law_required_3d_components = [ "VELOCITY U", "VELOCITY V", "VELOCITY W", "ELEVATION Z" ] wall_law_diagnostic_quantities = { "FRICTION VELOCITY", "Y PLUS", "DZ PLANE 1 2", "U PLANE 2", "BOTTOM FRICTION", "BOTTOM FRICTION SOURCE FILE", "BOTTOM FRICTION VARIABLE", "VELOCITY PROFILE" } # If these are accidentally included in extraction_quantity, # remove them from the calibration/model-output dictionary. # They are diagnostics and will be saved separately. if any(q in extraction_quantity for q in wall_law_diagnostic_quantities): compute_wall_law_diagnostics = True calibration_extraction_quantity = [ q for q in extraction_quantity if q not in wall_law_diagnostic_quantities ] # ============================================================ # CLASSIFY REQUESTED QUANTITIES # ============================================================ telemac_quantities = [ q for q in calibration_extraction_quantity if classification_tm_gaia_dict.get(q) == "telemac" ] gaia_quantities = [ q for q in calibration_extraction_quantity if classification_tm_gaia_dict.get(q) == "gaia" ] internal_telemac_helpers = set() # If 3D velocity magnitude is requested, internally add U, V, W. # These are helper variables. They will not be saved unless explicitly requested. if compute_3d_velocity_magnitude: for comp in velocity_components: if comp not in telemac_quantities: telemac_quantities.append(comp) internal_telemac_helpers.add(comp) # If wall-law diagnostics are requested, internally add the 3D variables needed. # Bottom friction itself is NOT read from the 3D SLF. # It is read from the generated 2D SLF. # # VELOCITY W is included here because the wall-law diagnostics JSON now also # receives the full modeled vertical velocity profile. if compute_wall_law_diagnostics: for comp in wall_law_required_3d_components: if comp not in telemac_quantities: telemac_quantities.append(comp) internal_telemac_helpers.add(comp) # ============================================================ # FILE PATHS # ============================================================ slf_files = { "telemac": os.path.join(model_directory, self.tm_results_filename) } if self.gaia_cas is not None: slf_files["gaia"] = os.path.join( model_directory, self.gaia_results_filename ) json_path = os.path.join( results_folder_directory, f"{output_name}.json" ) json_path_detailed = os.path.join( results_folder_directory, f"{output_name}-detailed.json" ) json_path_restart_data = os.path.join( self.restart_data_folder, "initial-model-outputs.json" ) if validation: json_path_wall_law_diagnostics = os.path.join( self.restart_data_folder, "model-results-validation-wall-law-diagnostics.json" ) else: json_path_wall_law_diagnostics = os.path.join( results_folder_directory, f"{output_name}-wall-law-diagnostics.json" ) keys = calibration_pts_df.iloc[:, 0].tolist() differentiated_dict = {} wall_law_diagnostics_dict = {} logger.info( f"Extracting from {input_file} using quantities: {extraction_quantity}" ) # ============================================================ # PRECOMPUTE MODELS # ============================================================ model_cache = {} for model_source, quantities in zip( ["telemac", "gaia"], [telemac_quantities, gaia_quantities] ): if not quantities: continue if model_source not in slf_files: continue slf = ppSELAFIN(slf_files[model_source]) slf.readHeader() slf.readTimes() variables = [' '.join(v.split()) for v in slf.getVarNames()] var_index = {v: i for i, v in enumerate(variables)} # -------------------------------------------------------- # CHECK AVAILABLE VARIABLES # -------------------------------------------------------- # "3D VELOCITY MAGNITUDE" is derived, so it is not expected # to exist directly inside the SLF file. missing_quantities = [ q for q in quantities if q not in var_index and q != velocity_magnitude_quantity ] if missing_quantities: raise ValueError( f"The following quantities were requested for {model_source.upper()} " f"but are not available in the SLF file: {missing_quantities}. " f"Available variables are: {variables}" ) # Check that U, V, W exist if 3D velocity magnitude is requested. if model_source == "telemac" and compute_3d_velocity_magnitude: missing_components = [ comp for comp in velocity_components if comp not in var_index ] if missing_components: raise ValueError( "Cannot compute 3D VELOCITY MAGNITUDE because the following " f"velocity components are missing in the TELEMAC SLF file: " f"{missing_components}. Available variables are: {variables}" ) NVAR = len(variables) NELEM, NPOIN, NDP, IKLE, IPOBO, x, y = slf.getMesh() NPLAN = slf.getNPLAN() # Check wall-law requirements after NPLAN is known. if model_source == "telemac" and compute_wall_law_diagnostics: missing_wall_law_components = [ comp for comp in wall_law_required_3d_components if comp not in var_index ] if missing_wall_law_components: raise ValueError( "Cannot compute wall-law diagnostics and velocity profile " "because these 3D SLF variables are missing: " f"{missing_wall_law_components}. " f"Available variables are: {variables}" ) if NPLAN < 2: raise ValueError( "Cannot compute wall-law diagnostics because the 3D result " f"has NPLAN={NPLAN}. At least 2 vertical planes are required." ) x2d = x[:len(x) // NPLAN] y2d = y[:len(x) // NPLAN] tree = spatial.cKDTree(np.column_stack((x2d, y2d))) step = NPOIN // NPLAN model_cache[model_source] = { "slf": slf, "variables": variables, "var_index": var_index, "NVAR": NVAR, "NPLAN": NPLAN, "step": step, "tree": tree, "x2d": x2d, "y2d": y2d, "quantities": quantities } # ============================================================ # LOAD GENERATED 2D SLF FOR WALL-LAW DIAGNOSTICS # ============================================================ if compute_wall_law_diagnostics: slf_2d_data = self._load_generated_2d_slf_for_bottom_friction( model_directory=model_directory, bottom_friction_2d_variable_candidates=bottom_friction_2d_variable_candidates ) else: slf_2d_data = None # ============================================================ # MAIN LOOP # ============================================================ for key_index, key in enumerate(keys): xu = calibration_pts_df.iloc[key_index, 1] yu = calibration_pts_df.iloc[key_index, 2] zu = calibration_pts_df.iloc[key_index, 3] differentiated_values = {} wall_law_diagnostic_values = {} for model_source in ["telemac", "gaia"]: if model_source not in model_cache: continue data = model_cache[model_source] slf = data["slf"] tree = data["tree"] var_index = data["var_index"] NPLAN = data["NPLAN"] step = data["step"] x2d = data["x2d"] y2d = data["y2d"] quantities = data["quantities"] if not quantities: continue # -------------------------------------------------------- # NODE SELECTION # -------------------------------------------------------- k_use = 1 if output_extraction == "nearest" else k d, idx = tree.query((xu, yu), k=k_use) if k_use == 1: idx_base = np.array([idx]) idx_coord = np.array([[x2d[idx], y2d[idx]]]) else: idx_base = np.array(idx) idx_coord = np.column_stack((x2d[idx], y2d[idx])) logger.info( f"[{model_source.upper()}] Point {key} ({xu:.3f},{yu:.3f})" ) # -------------------------------------------------------- # WALL-LAW DIAGNOSTICS + FULL MODELED VELOCITY PROFILE # -------------------------------------------------------- # Computed from: # - 3D SLF: plane 1 and plane 2 # - generated 2D SLF: bottom friction / Nikuradse ks # # Saved separately in the wall-law diagnostics JSON. # Not saved in differentiated_dict. if model_source == "telemac" and compute_wall_law_diagnostics: wall_law_diagnostic_values = ( self._compute_wall_law_from_3d_plane2_and_2d_bottom_friction( slf_3d=slf, var_index_3d=var_index, idx_base_3d=idx_base, idx_coord_3d=idx_coord, step_3d=step, k_use=k_use, target_xy=(xu, yu), key=key, slf_2d_data=slf_2d_data, output_extraction_time=output_extraction_time, time_index=time_index, n=n, kappa=kappa, nikuradse_log_factor=nikuradse_log_factor, kinematic_viscosity=kinematic_viscosity ) ) wall_law_diagnostic_values["VELOCITY PROFILE"] = ( self._vertical_velocity_profile( slf=slf, var_index=var_index, idx_base=idx_base, idx_coord=idx_coord, step=step, k_use=k_use, NPLAN=NPLAN, NVAR=data["NVAR"], target_xy=(xu, yu), output_extraction_time=output_extraction_time, time_index=time_index, n=n ) ) # -------------------------------------------------------- # VERTICAL PROFILE FOR NORMAL EXTRACTION # -------------------------------------------------------- # This block only selects the closest TELEMAC vertical layer # to the requested measurement height zu. # The full velocity profile is already saved above, but only # inside wall_law_diagnostic_values. if NPLAN == 1: p = 0 logger.info( f"[{model_source.upper()}] Point {key} � single layer model" ) else: if "ELEVATION Z" not in var_index: raise ValueError( f"'ELEVATION Z' is required to select the vertical layer, " f"but it is missing in the {model_source.upper()} SLF file." ) z_index = var_index["ELEVATION Z"] base_idx_for_z = idx_base[0] z_profile = np.zeros(NPLAN) for p_i in range(NPLAN): node_idx = base_idx_for_z + p_i * step slf.readVariablesAtNode(node_idx) all_times_outputs = slf.getVarValuesAtNode() results_z = self._apply_time_mode( all_times_outputs=all_times_outputs, output_extraction_time=output_extraction_time, time_index=time_index, n=n ) z_profile[p_i] = results_z[z_index] z_target = z_profile[0] + zu p = np.argmin(np.abs(z_profile - z_target)) logger.info( f"[{model_source.upper()}] Point {key} � selected layer " f"p={p + 1}/{NPLAN} (zu={zu:.3f})" ) # -------------------------------------------------------- # VALUE EXTRACTION # -------------------------------------------------------- results_all = np.zeros((k_use, data["NVAR"])) for j in range(k_use): node_idx = idx_base[j] + p * step logger.debug( f"[{model_source.upper()}] extracting node={node_idx}" ) slf.readVariablesAtNode(node_idx) all_times_outputs = slf.getVarValuesAtNode() vals = self._apply_time_mode( all_times_outputs=all_times_outputs, output_extraction_time=output_extraction_time, time_index=time_index, n=n ) results_all[j, :] = vals # -------------------------------------------------------- # INTERPOLATION # -------------------------------------------------------- if k_use != 1: results = interpolate_values( idx_coord, results_all, (xu, yu) ) else: results = results_all[0] # -------------------------------------------------------- # STORE NORMAL RESULTS # -------------------------------------------------------- for q in quantities: # Derived variable: compute internally from U, V, W. if q == velocity_magnitude_quantity: u = results[var_index["VELOCITY U"]] v = results[var_index["VELOCITY V"]] w = results[var_index["VELOCITY W"]] differentiated_values[q] = float( np.sqrt(u ** 2 + v ** 2 + w ** 2) ) # Internal helper components: # skip them if they were not explicitly requested by the user. elif ( q in internal_telemac_helpers and q not in calibration_extraction_quantity ): continue # Normal SLF variable extraction. else: differentiated_values[q] = results[var_index[q]] differentiated_dict[key] = differentiated_values if compute_wall_law_diagnostics: wall_law_diagnostics_dict[key] = wall_law_diagnostic_values # ============================================================ # JSON HANDLING # ============================================================ if simulation_number == 1: if os.path.exists(json_path): os.rename( json_path, json_path.replace(".json", "_old.json") ) if os.path.exists(json_path_detailed): os.rename( json_path_detailed, json_path_detailed.replace(".json", "_old.json") ) if ( compute_wall_law_diagnostics and os.path.exists(json_path_wall_law_diagnostics) ): os.rename( json_path_wall_law_diagnostics, json_path_wall_law_diagnostics.replace(".json", "_old.json") ) if validation: json_target = os.path.join( self.restart_data_folder, "model-results-validation.json" ) else: json_target = json_path_detailed update_json_file( json_path=json_target, modeled_values_dict=differentiated_dict, detailed_dict=True ) # Wall-law diagnostics are saved separately. # They are not calibration targets. # # The full modeled velocity profile is also saved here, and only here. if compute_wall_law_diagnostics: update_json_file( json_path=json_path_wall_law_diagnostics, modeled_values_dict=wall_law_diagnostics_dict, detailed_dict=True ) if ( simulation_number == self.init_runs and not validation and not user_param_values ): update_json_file( json_path=json_path_detailed, modeled_values_dict=differentiated_dict, detailed_dict=True, save_dict=True, saving_path=json_path_restart_data ) try: shutil.move( os.path.join(model_directory, self.tm_results_filename), results_folder_directory ) shutil.move( os.path.join(model_directory, self.tm_2d_results_filename_from_3d), results_folder_directory ) # Move generated 2D result file if it exists. if compute_wall_law_diagnostics: if ( hasattr(self, "tm_2d_results_filename") and self.tm_2d_results_filename_from_3d is not None ): tm_2d_results_filename = self.tm_2d_results_filename_from_3d else: tm_2d_results_filename = self._get_2d_result_filename_from_3d( self.tm_results_filename ) tm_2d_result_path = os.path.join( model_directory, tm_2d_results_filename ) if os.path.exists(tm_2d_result_path): shutil.move( tm_2d_result_path, results_folder_directory ) if self.gaia_cas is not None: shutil.move( os.path.join(model_directory, self.gaia_results_filename), results_folder_directory ) except Exception as error: print( "ERROR: could not move results file to " + self.res_dir + "\nREASON:\n" + str(error) )
[docs] @staticmethod def tbl_creator( zone_identifier, val, friction_file_path, veg_param_number=None, veg_indicator=False, ): """ Modifies the FRICTION DATA FILE (.tbl) for Telemac simulations based on the specified zone, value, and optional vegetation parameters. This method updates the friction values in the table for different zones as part of the calibration process and also the friction parameters for a previous selected vegetation friction rule. Parameters ---------- zone_identifier : str Identifier for the friction zone to be updated in the friction table. val : str The new friction value to be set for the specified zone. friction_file_path : str The file path to the existing friction file (.tbl) that will be modified. veg_param_number : str, optional The vegetation parameter number associated with the zone, if applicable. Default is None, indicating no vegetation parameter is to be updated. veg_indicator : bool, optional Indicator whether vegetation parameters should be modified in the friction file. Default is False, which means only friction values are updated. Returns ------- None The function updates the friction file in place and does not return any value. """ with open(friction_file_path, 'r') as file: file_lines = file.readlines() updated_lines = [] for line in file_lines: line_list = list(filter(None, line.split())) if line_list and line_list[0].startswith('*'): updated_lines.append(line) continue if line_list[0] == zone_identifier: if veg_indicator==False: if len(line_list) > 2 and line_list[2] != 'NULL': try: line_list[2] = str(val) except ValueError: pass else: param_column_index=4+int(veg_param_number) if len(line_list) > 2 and line_list[2] != 'NULL': try: line_list[param_column_index] = str(val) except ValueError: pass updated_zone_line = '\t'.join(line_list) updated_lines.append(updated_zone_line + '\n') else: updated_lines.append(line) with open(friction_file_path, 'w') as file: file.writelines(updated_lines)
# ============================================================ # TIME HELPER FOR TELEMAC EXTRACTION # ============================================================ def _apply_time_mode( self, all_times_outputs, output_extraction_time="last", time_index=0, n=5 ): if output_extraction_time == "last": return all_times_outputs[-1] elif output_extraction_time == "index": return all_times_outputs[time_index] elif output_extraction_time == "mean_last": return np.mean(all_times_outputs[-n:], axis=0) else: raise ValueError( "Invalid output_extraction_time. " "Use 'last', 'index', or 'mean_last'." ) # ============================================================ # GENERAL HELPERS FOR WALL-LAW DIAGNOSTICS # ============================================================ def _get_2d_result_filename_from_3d(self, input_file_3d): """ Converts: result.slf -> result_2d.slf """ root, ext = os.path.splitext(input_file_3d) return f"{root}_2d{ext}" def _normalize_var_name(self, name): return " ".join(str(name).split()).upper() def _interpolate_scalar(self, idx_coord, values, target_xy): values = np.asarray(values, dtype=float) if len(values) == 1: return float(values[0]) interpolated = interpolate_values( idx_coord, values.reshape(-1, 1), target_xy ) return float(np.asarray(interpolated).ravel()[0]) def _vertical_velocity_profile( self, slf, var_index, idx_base, idx_coord, step, k_use, NPLAN, NVAR, target_xy, output_extraction_time="last", time_index=0, n=5 ): """ Extracts the full modeled vertical velocity profile at one horizontal point. This is intended for the wall-law diagnostics JSON only. One profile entry is saved per TELEMAC vertical plane/layer. """ required_vars = [ "ELEVATION Z", "VELOCITY U", "VELOCITY V", "VELOCITY W" ] missing_vars = [ var for var in required_vars if var not in var_index ] if missing_vars: raise ValueError( "Cannot extract modeled vertical velocity profile because these " f"variables are missing: {missing_vars}. " f"Available variables are: {list(var_index.keys())}" ) z_index = var_index["ELEVATION Z"] u_index = var_index["VELOCITY U"] v_index = var_index["VELOCITY V"] w_index = var_index["VELOCITY W"] velocity_profile = [] bed_z = None for p_i in range(NPLAN): results_all = np.zeros((k_use, NVAR)) for j in range(k_use): node_idx = idx_base[j] + p_i * step slf.readVariablesAtNode(node_idx) all_times_outputs = slf.getVarValuesAtNode() vals = self._apply_time_mode( all_times_outputs=all_times_outputs, output_extraction_time=output_extraction_time, time_index=time_index, n=n ) results_all[j, :] = vals if k_use != 1: results = interpolate_values( idx_coord, results_all, target_xy ) else: results = results_all[0] z_abs = float(results[z_index]) if bed_z is None: bed_z = z_abs z_above_bed = float(z_abs - bed_z) u = float(results[u_index]) v = float(results[v_index]) w = float(results[w_index]) velocity_horizontal = float(np.sqrt(u ** 2 + v ** 2)) velocity_3d = float(np.sqrt(u ** 2 + v ** 2 + w ** 2)) velocity_profile.append({ "LAYER": int(p_i + 1), "Z": z_abs, "Z ABOVE BED": z_above_bed, "VELOCITY U": u, "VELOCITY V": v, "VELOCITY W": w, "HORIZONTAL VELOCITY MAGNITUDE": velocity_horizontal, "3D VELOCITY MAGNITUDE": velocity_3d }) return velocity_profile # ============================================================ # GENERATED 2D SLF HELPERS # ============================================================ def _find_2d_bottom_friction_variable( self, variables_2d, bottom_friction_2d_variable_candidates=None ): """ Finds the bottom friction / Nikuradse ks variable in the generated 2D SLF. """ if bottom_friction_2d_variable_candidates is None: bottom_friction_2d_variable_candidates = config_telemac.slf_2d_variables_from_3d normalized_var_lookup = { self._normalize_var_name(var_name): var_name for var_name in variables_2d } for candidate in bottom_friction_2d_variable_candidates: candidate_clean = self._normalize_var_name(candidate) if candidate_clean in normalized_var_lookup: return normalized_var_lookup[candidate_clean] raise ValueError( "Could not find a bottom-friction / Nikuradse variable in the " "generated 2D SLF file. " f"Tried these names: {bottom_friction_2d_variable_candidates}. " f"Available 2D SLF variables are: {variables_2d}" ) def _load_generated_2d_slf_for_bottom_friction( self, model_directory, bottom_friction_2d_variable_candidates=None ): """ Loads the 2D result file generated from the TELEMAC-3D simulation. Expected default: 3D file: result.slf 2D file: result_2d.slf If self.tm_2d_results_filename exists, it is used instead. """ if ( hasattr(self, "tm_2d_results_filename") and self.tm_2d_results_filename_from_3d is not None ): input_file_2d = self.tm_2d_results_filename_from_3d else: input_file_2d = self._get_2d_result_filename_from_3d( self.tm_results_filename ) slf_2d_path = os.path.join(model_directory, input_file_2d) if not os.path.exists(slf_2d_path): raise FileNotFoundError( "The generated 2D result file required for wall-law diagnostics " f"was not found: {slf_2d_path}. " "Expected the same name as the 3D SLF with '_2d' before '.slf', " "unless self.tm_2d_results_filename is explicitly defined." ) slf_2d = ppSELAFIN(slf_2d_path) slf_2d.readHeader() slf_2d.readTimes() variables_2d = [' '.join(v.split()) for v in slf_2d.getVarNames()] var_index_2d = {v: i for i, v in enumerate(variables_2d)} bottom_friction_variable = self._find_2d_bottom_friction_variable( variables_2d=variables_2d, bottom_friction_2d_variable_candidates=bottom_friction_2d_variable_candidates ) NELEM2D, NPOIN2D, NDP2D, IKLE2D, IPOBO2D, x2d_slf, y2d_slf = slf_2d.getMesh() tree_2d = spatial.cKDTree(np.column_stack((x2d_slf, y2d_slf))) logger.info( f"Using generated 2D SLF for wall-law diagnostics: {slf_2d_path}" ) logger.info( f"Using bottom-friction variable from generated 2D SLF: " f"{bottom_friction_variable}" ) return { "slf": slf_2d, "path": slf_2d_path, "filename": input_file_2d, "variables": variables_2d, "var_index": var_index_2d, "bottom_friction_variable": bottom_friction_variable, "bottom_friction_index": var_index_2d[bottom_friction_variable], "x": x2d_slf, "y": y2d_slf, "tree": tree_2d } def _extract_bottom_friction_from_2d_slf( self, slf_2d_data, target_xy, output_extraction_time="last", time_index=0, n=5 ): """ Extracts bottom friction / Nikuradse ks from the generated 2D SLF at a horizontal coordinate. Nearest-node extraction is used because the generated 2D SLF should share the same horizontal mesh as the 3D SLF. """ slf_2d = slf_2d_data["slf"] tree_2d = slf_2d_data["tree"] friction_index = slf_2d_data["bottom_friction_index"] d2, idx2 = tree_2d.query(target_xy, k=1) node_2d = int(idx2) slf_2d.readVariablesAtNode(node_2d) all_times_outputs_2d = slf_2d.getVarValuesAtNode() vals_2d = self._apply_time_mode( all_times_outputs=all_times_outputs_2d, output_extraction_time=output_extraction_time, time_index=time_index, n=n ) bottom_friction = float(vals_2d[friction_index]) if bottom_friction <= 0.0: raise ValueError( f"Invalid bottom friction / Nikuradse ks extracted from 2D SLF " f"at coordinate {target_xy}: value={bottom_friction}. " "It must be > 0." ) return bottom_friction # ============================================================ # WALL-LAW DIAGNOSTICS FROM 3D PLANE 2 + 2D BOTTOM FRICTION # ============================================================ def _compute_wall_law_from_3d_plane2_and_2d_bottom_friction( self, slf_3d, var_index_3d, idx_base_3d, idx_coord_3d, step_3d, k_use, target_xy, key, slf_2d_data, output_extraction_time="last", time_index=0, n=5, kappa=None, nikuradse_log_factor=None, kinematic_viscosity=None ): """ Computes TELEMAC-style friction velocity and y+. From 3D SLF: - VELOCITY U at plane 2 - VELOCITY V at plane 2 - ELEVATION Z at plane 1 and plane 2 From generated 2D SLF: - bottom friction / Nikuradse ks """ if kappa is None: kappa = config_telemac.von_Karman_constant if nikuradse_log_factor is None: nikuradse_log_factor = config_telemac.nikuradse_log_factor if kinematic_viscosity is None: kinematic_viscosity = config_telemac.kinematic_viscosity_water if kinematic_viscosity <= 0.0: raise ValueError( f"Cannot compute Y PLUS because " f"kinematic_viscosity={kinematic_viscosity}. It must be > 0." ) z_index = var_index_3d["ELEVATION Z"] u_index = var_index_3d["VELOCITY U"] v_index = var_index_3d["VELOCITY V"] friction_velocity_values = np.zeros(k_use) y_plus_values = np.zeros(k_use) dz_values = np.zeros(k_use) u_plane2_values = np.zeros(k_use) bottom_friction_values = np.zeros(k_use) for j in range(k_use): node_plane1 = idx_base_3d[j] node_plane2 = idx_base_3d[j] + step_3d xy_neighbor = ( float(idx_coord_3d[j, 0]), float(idx_coord_3d[j, 1]) ) # ---------------------------------------------------- # 2D SLF: bottom friction / Nikuradse ks # ---------------------------------------------------- bottom_friction = self._extract_bottom_friction_from_2d_slf( slf_2d_data=slf_2d_data, target_xy=xy_neighbor, output_extraction_time=output_extraction_time, time_index=time_index, n=n ) # ---------------------------------------------------- # 3D SLF plane 1: bed elevation # ---------------------------------------------------- slf_3d.readVariablesAtNode(node_plane1) all_times_outputs_plane1 = slf_3d.getVarValuesAtNode() vals_plane1 = self._apply_time_mode( all_times_outputs=all_times_outputs_plane1, output_extraction_time=output_extraction_time, time_index=time_index, n=n ) z_plane1 = vals_plane1[z_index] # ---------------------------------------------------- # 3D SLF plane 2: near-bed velocity and elevation # ---------------------------------------------------- slf_3d.readVariablesAtNode(node_plane2) all_times_outputs_plane2 = slf_3d.getVarValuesAtNode() vals_plane2 = self._apply_time_mode( all_times_outputs=all_times_outputs_plane2, output_extraction_time=output_extraction_time, time_index=time_index, n=n ) z_plane2 = vals_plane2[z_index] u_plane2 = vals_plane2[u_index] v_plane2 = vals_plane2[v_index] dz = float(z_plane2 - z_plane1) if dz <= 0.0: raise ValueError( f"Cannot compute wall-law diagnostics for point {key}: " f"Dz = Z_plane2 - Z_plane1 = {dz}. It must be > 0." ) u_horizontal_plane2 = float( np.sqrt(u_plane2**2 + v_plane2**2) ) log_argument = float( nikuradse_log_factor * dz / bottom_friction ) if log_argument <= 1.0: raise ValueError( f"Cannot compute wall-law diagnostics reliably for point {key}: " f"log argument = {log_argument:.6f}. " f"Computed as {nikuradse_log_factor} * Dz / bottom_friction, " f"with Dz={dz:.6e} m and bottom_friction={bottom_friction:.6e} m. " "The logarithm denominator must be positive. " "Check whether the 2D variable is really Nikuradse ks in meters." ) friction_velocity = float( u_horizontal_plane2 * kappa / np.log(log_argument) ) y_plus = float( friction_velocity * dz / kinematic_viscosity ) friction_velocity_values[j] = friction_velocity y_plus_values[j] = y_plus dz_values[j] = dz u_plane2_values[j] = u_horizontal_plane2 bottom_friction_values[j] = bottom_friction friction_velocity_interp = self._interpolate_scalar( idx_coord_3d, friction_velocity_values, target_xy ) y_plus_interp = self._interpolate_scalar( idx_coord_3d, y_plus_values, target_xy ) dz_interp = self._interpolate_scalar( idx_coord_3d, dz_values, target_xy ) u_plane2_interp = self._interpolate_scalar( idx_coord_3d, u_plane2_values, target_xy ) bottom_friction_interp = self._interpolate_scalar( idx_coord_3d, bottom_friction_values, target_xy ) return { "FRICTION VELOCITY": friction_velocity_interp, "Y PLUS": y_plus_interp, "DZ PLANE 1 2": dz_interp, "U PLANE 2": u_plane2_interp, "BOTTOM FRICTION": bottom_friction_interp, "BOTTOM FRICTION SOURCE FILE": slf_2d_data["filename"], "BOTTOM FRICTION VARIABLE": slf_2d_data["bottom_friction_variable"] }
[docs] @staticmethod def check_tm_inputs(user_inputs): # Helper function to check if a path exists and print success message def path_exists(path, path_name): if os.path.exists(path): print(f"{path_name} exists: {path}") else: raise ValueError(f"{path_name} does not exist: {path}") if not isinstance(path, str): raise TypeError(f"{path_name} should be a string") # Extract values from dictionary control_file = user_inputs['control_file_name'] friction_file = user_inputs['friction_file'] tm_solver = user_inputs['Telemac_solver'] model_simulation_path = user_inputs['model_simulation_path'] results_folder_path = user_inputs['results_folder_path'] calib_pts_csv_file = user_inputs['calib_pts_file_path'] n_cpus = user_inputs['n_cpus'] init_runs = user_inputs['init_runs'] calibration_parameters = user_inputs['calibration_parameters'] param_values = user_inputs['param_values'] calibration_quantities = user_inputs['calibration_quantities'] dict_output_name = user_inputs['dict_output_name'] results_filename_base = user_inputs['results_filename_base'] parameter_sampling_method = user_inputs['parameter_sampling_method'] n_max_tp = user_inputs['n_max_tp'] n_samples = user_inputs['n_samples'] mc_samples = user_inputs['mc_samples'] mc_exploration = user_inputs['mc_exploration'] eval_steps = user_inputs['eval_steps'] # Check model_simulation_path path_exists(model_simulation_path, "model_simulation_path") # Check control_file if os.path.exists(os.path.join(model_simulation_path, control_file)): print(f"Control file exists: {control_file}") else: raise ValueError(f"Control file does not exist: {control_file}") if not isinstance(os.path.join(model_simulation_path, control_file), str): raise TypeError("control_file should be a string") # Check telemac_solver if not isinstance(tm_solver, str): raise TypeError("Telemac_solver should be a string") if tm_solver not in ["1", "2"]: raise ValueError("Telemac_solver should be '1' (Telemac 2D) or '2' (Telemac 3D)") # Check results_folder_path path_exists(results_folder_path, "results_folder_path") # Check calib_pts_csv_file path_exists(calib_pts_csv_file, "calib_pts_csv_file") if not calib_pts_csv_file.endswith('.csv'): raise ValueError("calib_pts_csv_file should be a CSV file") # Check n_cpus if not isinstance(n_cpus, int): raise TypeError("CPUs should be an integer") # Check init_runs if not isinstance(init_runs, int): raise TypeError("init_runs should be an integer") # Check calibration_parameters and param_ranges if not isinstance(calibration_parameters, list): raise TypeError("calibration_parameters should be a list") if not isinstance(param_values, list): raise TypeError("param_ranges should be a list") if len(calibration_parameters) != len(param_values): raise ValueError("calibration_parameters and param_ranges must have the same length") for param in calibration_parameters: if not isinstance(param, str): raise TypeError("Each calibration parameter should be a string") # If all checks pass, print success message print("Calibration parameters and parameter ranges have been validated successfully.") # Check friction_file conditionally if any(param.lower().startswith('zone') for param in calibration_parameters): if isinstance(friction_file, str): print(f"Friction file is a valid string: {friction_file}") # Check if the friction file exists if os.path.exists(os.path.join(model_simulation_path, friction_file)): print(f"Friction file exists: {friction_file}") else: raise ValueError(f"Friction file does not exist: {friction_file}") else: raise TypeError("friction_file should be a string") # Check calibration_quantities if not isinstance(calibration_quantities, list): raise TypeError("calibration_quantities should be a list of strings") if len(calibration_quantities) > 2: raise ValueError("calibration_quantities can have a maximum of 2 quantities") for quantity in calibration_quantities: if not isinstance(quantity, str): raise TypeError("Each calibration quantity should be a string") if calibration_quantities: print(f"Calibration quantities are valid: {calibration_quantities}") # Check dict_output_name if not isinstance(dict_output_name, str): raise TypeError("dict_output_name should be a string") print(f"Dictionary output name is given as: {dict_output_name}.json") # Check results_filename_base if not isinstance(results_filename_base, str): raise TypeError("results_filename_base should be a string") print(f"Base Telemac results file name is gives as: {results_filename_base}") # Check parameter_sampling_method if not isinstance(parameter_sampling_method, str): raise TypeError("parameter_sampling_method should be a string") print(f"Parameter sampling method is selected as: {parameter_sampling_method}") # Check n_max_tp if not isinstance(n_max_tp, int): raise TypeError("n_max_tp should be an integer") if n_max_tp <= init_runs: raise ValueError("n_max_tp must be greater than init_runs") print(f"n_max_tp is valid: {n_max_tp}") # Check n_samples if not isinstance(n_samples, int): raise TypeError("n_samples should be an integer") print(f"n_samples is valid: {n_samples}") # Check mc_samples if not isinstance(mc_samples, int): raise TypeError("mc_samples should be an integer") if mc_samples > n_samples: raise ValueError("mc_samples must be less than or equal to n_samples") print(f"mc_samples is valid: {mc_samples}") # Check mc_exploration if not isinstance(mc_exploration, int): raise TypeError("mc_exploration should be an integer") if mc_exploration > mc_samples: raise ValueError("mc_exploration must be less than or equal to mc_samples") print(f"mc_exploration is valid: {mc_exploration}") # Check eval_steps if not isinstance(eval_steps, int): raise TypeError("eval_steps should be an integer") print(f"Surrogate evaluation steps is valid: {eval_steps}") print("All inputs are valid")
def __call__(self, *args, **kwargs): self.run_single_simulation()