"""Function pool for usage at different package levels"""
import subprocess, os, sys, logging
import numpy as _np
import pandas as _pd
import csv
import pickle
import h5py
import json
import glob
import matplotlib.tri as mtri
import pyvista as pv
import shutil
from numpy import linspace, dtype
from hydroBayesCal.telemac.pputils.ppmodules.readMesh import *
from hydroBayesCal.telemac.pputils.ppmodules.writeMesh import *
from hydroBayesCal.telemac.pputils.ppmodules.utilities import *
from hydroBayesCal.utils.config_logging import *
from hydroBayesCal.utils.config_physics import *
from hydroBayesCal.telemac.pputils.ppmodules.selafin_io_pp import *
[docs]
def append_new_line(file_name, text_to_append):
"""
Add new line to steering file
:param str file_name: path and name of the file to which the line should be appended
:param str text_to_append: text of the line to append
:return None:
"""
# Open the file in append & read mode ("a+")
with open(file_name, "a+") as file_object:
# Move read cursor to the start of file.
file_object.seek(0)
# If file is not empty then append "\n"
data = file_object.read(100)
if len(data) > 0:
file_object.write("\n")
# Append text at the end of file
file_object.write(text_to_append)
[docs]
def call_process(bash_command, environment=None):
"""
Call a terminal process via subprocess and return its exit status.
The process return code is checked and reported: a non-zero code (e.g. a
failed Telemac/OpenFOAM run) is logged together with the captured stderr and
returned to the caller, instead of silently reporting success.
:param str bash_command: terminal command to run
:param environment: optional environment mapping to run the process in
:return int: the process return code (0 on success, non-zero on failure,
-1 if the process could not be started)
"""
logger.info("* CALLING SUBROUTINE: %s", bash_command)
try:
result = subprocess.run(
bash_command,
shell=True,
capture_output=True,
env=environment,
)
except Exception as e:
logger.error("Command could not be started: %s", e)
return -1
if result.stdout:
print(result.stdout.decode(errors="replace"))
if result.returncode != 0:
stderr = result.stderr.decode(errors="replace") if result.stderr else ""
logger.error(
"Command failed (return code %s): %s\n%s",
result.returncode, bash_command, stderr,
)
else:
logger.info("* finished (return code 0)")
return result.returncode
[docs]
def calculate_settling_velocity(diameters):
"""
Calculate particle settling velocity as a function of diameter, densities of water and
sediment, and kinematic viscosity
:param np.array diameters: floats of sediment diameter in meters
:return np.array settling_vevlocity: settling velocities in m/s for every diameter in the diameters list
"""
settling_velocity = _np.zeros(diameters.shape[0])
s = SED_DENSITY / WATER_DENSITY
for i, d in enumerate(diameters):
if d <= 0.0001:
settling_velocity[i] = (s - 1) * GRAVITY * d ** 2 / (18 * KINEMATIC_VISCOSITY)
elif 0.0001 < d < 0.001:
settling_velocity[i] = 10 * KINEMATIC_VISCOSITY / d * (
_np.sqrt(1 + 0.01 * (s - 1) * GRAVITY * d ** 3 / KINEMATIC_VISCOSITY ** 2) - 1)
else:
settling_velocity[i] = 1.1 * _np.sqrt((s - 1) * GRAVITY * d)
return settling_velocity
[docs]
def concatenate_csv_pts(file_directory, *args):
"""Concatenate a csv-files with lists of XYZ points into one CSV file that is saved to the same directory where the
first CSV file name provided lives. The merged CSV file name starts with ``merged_`` and also ends with the name
of the first CSV file name provided.
:param file_directory: os.path of the directory where the CSV files live, and which must NOT end on '/' or '\\'
:param args: string or list of csv files (only names) containing comma-seperated XYZ coordinates without header
:return pandas.DataFrame: merged points
"""
point_file_names = []
# receive arguments (i.e. csv point file names)
for arg in args:
if type(arg) is str:
point_file_names.append(file_directory + os.sep + arg)
if type(arg) is list:
[point_file_names.append(file_directory + os.sep + e) for e in arg]
# read csv files
point_data = []
for file_name in point_file_names:
if os.path.isfile(file_name):
point_data.append(_pd.read_csv(file_name, names=["X", "Y", "Z"]))
else:
print("WARNING: Points CSV file does not exist: %s" % file_name)
# concatenate frames
merged_pts = _pd.concat(point_data)
# save concatenated points to a CSV file
merged_pts.to_csv(
# make sure to identify platform-independent separators
file_directory + "merged-" + str(point_file_names[0]).split("/")[-1].split("\\")[-1],
header=False,
index=False
)
return merged_pts
[docs]
def lookahead(iterable):
"""Pass through all values of an iterable, augmented by the information if there are more values to come
after the current one (True), or if it is the last value (False).
Source: Ferdinand Beyer (2015) on https://stackoverflow.com/questions/1630320/what-is-the-pythonic-way-to-detect-the-last-element-in-a-for-loop
"""
# Get an iterator and pull the first value.
it = iter(iterable)
last = next(it)
# Run the iterator to exhaustion (starting from the second value).
for val in it:
# Report the *previous* value (more to come).
yield last, True
last = val
# Report the last value.
yield last, False
[docs]
def str2seq(list_like_string, separator=",", return_type="tuple"):
"""Convert a list-like string into a tuple or list based on a separator such as comma or semi-column
:param str list_like_string: string to convert
:param str separator: separator to use
:param str return_type: defines if a list or tuple is returned (default: tuple)
:return: list or tuple
"""
seq = []
for number in list_like_string.split(separator):
try:
seq.append(float(number))
except ValueError:
print("WARNING: Could not interpret user parameter value range definition (%s)" % number)
print(" This Warning will probably cause an ERROR later in the script.")
if "tuple" in return_type:
return tuple(seq)
else:
return seq
[docs]
def log_actions(func):
"""
TODO: this is the logging wrapper!
:param func:
:return:
"""
def wrapper(*args, **kwargs):
func(*args, **kwargs)
for handler in logging.getLogger("HyBayesCal").handlers:
handler.close()
logging.getLogger("HyBayesCal").removeHandler(handler)
for handler in logging.getLogger("warnings").handlers:
handler.close()
logging.getLogger("warnings").removeHandler(handler)
for handler in logging.getLogger("errors").handlers:
handler.close()
logging.getLogger("errors").removeHandler(handler)
print("Check the logfiles: logfile.log, warnings.log, and errors.log.")
return wrapper
[docs]
def update_collocation_pts_file(
file_path,
new_collocation_point,
mode="update"
):
"""
Append a new row to a CSV file or create a new file depending on the mode.
:param file_path: Path to the CSV file.
:param new_collocation_point: List of values to be added as a new row.
:param mode: Mode to determine whether to 'update' (append) or 'generate' (overwrite) the file.
"""
# Ensure new_collocation_point is a list
if not isinstance(new_collocation_point, list):
raise ValueError("new_collocation_point must be a list")
# Check if the file exists
file_exists = os.path.isfile(file_path)
# Open the file in the appropriate mode: append for 'update', write for 'generate'
if mode == "generate" or not file_exists:
# Open file for writing (overwrite if exists)
with open(file_path, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(new_collocation_point) # Assuming the first row is the header
elif mode == "update":
# Open the file in append mode to add the new row
with open(file_path, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow(new_collocation_point)
else:
raise ValueError("Invalid mode. Use 'update' or 'generate'.")
[docs]
def save_data(file_path, data):
"""
Save NumPy array data to a file based on the file extension in the file path.
:param file_path: Path to the file where data should be saved.
:param data: NumPy array data to be saved.
"""
try:
# Determine file format based on file extension
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.npy':
_np.save(file_path, data)
elif file_extension in ['.pkl', '.pickle']:
with open(file_path, 'wb') as file:
pickle.dump(data, file)
elif file_extension == '.json':
# Convert NumPy array to list for JSON serialization
data_list = data.tolist()
with open(file_path, 'w') as file:
json.dump(data_list, file, indent=4)
elif file_extension == '.csv':
if isinstance(data, _np.ndarray):
_np.savetxt(file_path, data, delimiter=',', fmt='%.8f')
else:
raise ValueError("For CSV format, data should be a NumPy array.")
elif file_extension in ['.xlsx', '.xls']:
df = _pd.DataFrame(data)
df.to_excel(file_path, index=False)
elif file_extension in ['.h5', '.hdf5']:
with h5py.File(file_path, 'w') as file:
file.create_dataset('dataset', data=data)
else:
raise ValueError(
f"Unsupported file format: {file_extension}. Supported formats are 'npy', 'pickle', 'json', 'csv', 'excel', 'h5py'.")
except Exception as e:
print(f"An error occurred while saving the file: {e}")
[docs]
def rearrange_array(data, num_quantities):
"""
Rearrange a NumPy array such that data from multiple quantities is interleaved by columns.
:param data: A NumPy array of shape (num_quantities * n, m) where n is the number of data points per quantity.
:param num_quantities: An integer indicating the number of quantities (e.g., velocity, water depth, etc.).
:return: A NumPy array with interleaved columns for all quantities.
"""
# Determine the number of rows and columns
num_rows, num_columns = data.shape
# Ensure the number of rows is divisible by num_quantities
if num_rows % num_quantities != 0:
raise ValueError(
f"The number of rows ({num_rows}) should be divisible by the number of quantities ({num_quantities})."
)
# Calculate the number of data points per quantity
n = num_rows // num_quantities
# Split the data into parts for each quantity
quantity_data = [data[i * n:(i + 1) * n, :] for i in range(num_quantities)]
# Initialize an empty array to store the rearranged data
rearranged_data = _np.empty((n, num_columns * num_quantities))
# Interleave the data columns for all quantities
for i in range(num_columns):
for j in range(num_quantities):
rearranged_data[:, i * num_quantities + j] = quantity_data[j][:, i]
return rearranged_data
#----------------------------------------------
[docs]
def update_json_file(json_path, modeled_values_dict=None, detailed_dict=False, save_dict=False, saving_path=None):
"""
Updates the JSON file at `json_path` with data from `modeled_values_dict`.
If the file exists, it appends new values to the existing data.
If the file does not exist, it creates a new file with the initial data.
Parameters
----------
json_path: str
The path to the JSON file to be updated or created.
modeled_values_dict: dict
A dictionary with data to be added or updated in the JSON file.
detailed_dict: bool, optional
Whether to handle the data as nested lists for detailed structures.
save_dict: bool, optional
If True, saves the entire `output_data` to the `saving_path`.
saving_path: str, optional
The path to save the final JSON file when `save_dict` is True.
If not provided, defaults to `json_path`.
"""
if save_dict is False:
if os.path.exists(json_path):
# File exists, so open it for writing
with open(json_path, "r") as file:
output_data = json.load(file)
for key, value in modeled_values_dict.items():
if key in output_data:
if detailed_dict:
if isinstance(output_data[key], list):
output_data[key].append(value)
else:
output_data[key] = [output_data[key], value]
else:
output_data[key].append(value)
else:
output_data[key] = [value]
with open(json_path, 'w') as file:
json.dump(output_data, file, indent=4)
else:
# Save the updated JSON file
with open(json_path, "w") as file:
for key in modeled_values_dict:
# Convert the existing list into a nested list with a single element
modeled_values_dict[key] = [modeled_values_dict[key]]
json.dump(modeled_values_dict, file, indent=4)
else:
if os.path.exists(json_path):
# File exists, so open it for reading
with open(json_path, "r") as file:
output_data = json.load(file)
# Save the data to the specified saving path
if saving_path:
with open(saving_path, "w") as file:
json.dump(output_data, file, indent=4)
else:
print(f"File at {json_path} does not exist. Cannot save to {saving_path}.")
[docs]
def delete_slf(folder_path):
"""
Deletes all files with the .slf extension in the specified folder.
Parameters
----------
folder_path : str
The path to the folder where the .slf files will be deleted.
Returns
-------
None
"""
path = folder_path
# Get all files with .slf extension in the specified folder
slf_files = glob.glob(os.path.join(path, '*.slf'))
# Delete each .slf file
for file_path in slf_files:
try:
os.remove(file_path)
print(f"Deleted: {file_path}")
except Exception as e:
print(f"Error deleting {file_path}: {e}")
[docs]
def filter_model_outputs(data_dict, quantities, run_range_filtering=None):
"""
Filters the data from the model outputs dictionary based on desired quantities
and optionally limits the runs included to a specific range.
Parameters
----------
data_dict : dict
Dictionary containing model outputs with points as keys and lists of run outputs as values.
quantities : list of str
List of quantities to extract from the model outputs.
run_range : tuple of int, optional
Range of runs to include (start, end). If None, includes all runs.
The range is inclusive of the start index and exclusive of the end index.
Returns
-------
dict
Filtered dictionary containing only the selected quantities and runs within the specified range.
"""
filtered_data = {}
for point, runs in data_dict.items():
# Extract the specified range of runs
if run_range_filtering is not None:
start, end = run_range_filtering
runs = runs[start - 1:end]
# Filter quantities from each run
filtered_runs_data = []
for run in runs:
filtered_run = {quantity: run[quantity] for quantity in quantities if quantity in run}
filtered_runs_data.append(filtered_run)
filtered_data[point] = filtered_runs_data
# pdb.set_trace()
# all_values = {}
# for point_name, data in filtered_data.items():
# # Extract inner list of dictionaries (removing one level of nesting)
# all_values[point_name] = data[0]
return filtered_data
[docs]
def interpolate_values(coords, values, point):
"""
Interpolates values at a given point using Inverse Distance Weighting.
Parameters:
coords (np.ndarray): Coordinates of the triangle's vertices, shape (3, 2),
where each row is [X, Y] for a vertex.
values (np.ndarray): Values at each vertex for each variable, shape (3, num_variables).
point (tuple): Coordinates of the point where interpolation is desired, (px, py).
Returns:
np.ndarray: Interpolated values at the given point for each variable, shape (num_variables,).
"""
px, py = point
# Calculate distances from the point to each vertex
distances = _np.linalg.norm(coords - _np.array([px, py]), axis=1)
# Avoid division by zero for vertices that might be at the same location as the point
distances[distances == 0] = _np.finfo(float).eps # Small value to prevent division by zero
# Calculate inverse distance weights
weights = 1 / distances
weights /= _np.sum(weights) # Normalize weights to sum to 1
# Interpolate each variable using weighted sum
interpolated_values = _np.dot(weights, values) # Dot product for weighted sum
return interpolated_values.flatten() # Return as a 1D array
[docs]
def rasterize(saving_folder, slf_file_name, desired_variables, spacing):
# Define the full path for the SLF file
slf_file = os.path.join(saving_folder, slf_file_name)
print("The input file being probed: " + slf_file)
# constructor for pp_SELAFIN class
slf = ppSELAFIN(slf_file)
slf.readHeader()
slf.readTimes()
times = slf.getTimes()
vnames = slf.getVarNames()
vunits = slf.getVarUnits()
ftype, fsize = slf.getPrecision()
nplan = slf.getNPLAN()
NELEM = slf.getNELEM()
NPOIN = slf.getNPOIN()
IPOBO = slf.getIPOBO()
DATE = slf.getDATE()
if nplan > 1:
slf_type = '3d'
else:
slf_type = '2d'
if ftype == 'f' and fsize == 4:
precision = 'single'
elif ftype == 'd' and fsize == 8:
precision = 'double'
else:
precision = 'unknown'
# prints variable names
print('Precision: ' + precision)
print('File type: ' + slf_type)
if slf_type == '3d':
print('Number of planes: ' + str(nplan))
print('Number of elements: ' + str(NELEM))
print('Number of nodes: ' + str(NPOIN))
print('Date: ' + str(DATE[0]) + '-' + str(DATE[1]).zfill(2) + '-'
+ str(DATE[2]).zfill(2) + ' ' + str(DATE[3]).zfill(2) + ':'
+ str(DATE[4]).zfill(2) + ':' + str(DATE[5]).zfill(2))
print(' ')
print('#################################')
print('Variables in ' + slf_file + ' are: ')
print('---------------------------------')
print(' v variable unit')
print('---------------------------------')
desired_var_indices = [] # Initialize to a default value, e.g., -1, to indicate that no variable has been found yet.
for var in desired_variables:
for i in range(len(vnames)):
print(f" {i} --> {vnames[i]} [{vunits[i].strip()}]")
# Strip spaces from vnames[i] and check if it matches the current desired variable
if vnames[i].strip() == var: # Checks if the stripped vnames[i] matches the current desired_var
desired_var_indices.append(i) # Add the index to the list if there's a match
# Print out the desired variable indices in the order of desired_var
print("Desired variable indices in order:", desired_var_indices)
print(' ')
print('#################################')
# prints times
records = np.arange(len(times))
nrecs = len(times)
if len(times) < 2:
print(" t time (s)")
print('---------------------------------')
print(str(records[0]) + " -->" + str("{:10.1f}".format(times[0])))
elif len(times) < 3:
print(" t time (s)")
print('---------------------------------')
print(str(records[0]) + " -->" + str("{:10.1f}".format(times[0])))
print(str(records[1]) + " -->" + str("{:10.1f}".format(times[1])))
elif len(times) < 4:
print(" t time (s)")
print('---------------------------------')
print(str(records[0]) + " -->" + str("{:10.1f}".format(times[0])))
print(str(records[1]) + " -->" + str("{:10.1f}".format(times[1])))
print(str(records[2]) + " -->" + str("{:10.1f}".format(times[2])))
else:
print("t time (s)")
print('---------------------------------')
print(str(records[0]) + " -->" + str("{:10.1f}".format(times[0])))
print(str(records[1]) + " -->" + str("{:10.1f}".format(times[1])))
print(str(records[2]) + " -->" + str("{:10.1f}".format(times[2])))
print(str(records[3]) + " -->" + str("{:10.1f}".format(times[3])))
print(' ......')
print(str(records[nrecs - 1]) + "-->" + str("{:10.1f}".format(times[nrecs - 1])))
print('#################################')
last_time_step_index = nrecs - 1
# index number of grided output variable
t = last_time_step_index # index of time record of the output to use in griding (integer, 0 to n)
# Read the header of the selafin result file and get geometry and
# variable names and units
slf.readVariables(t)
# gets some of the mesh properties from the *.slf file
NELEM, NPOIN, NDP, IKLE, IPOBO, x, y = slf.getMesh()
# the IKLE array starts at element 1, but matplotlib needs it to start
# at zero
IKLE[:,:] = IKLE[:,:] - 1
# these are the results for all variables, for time step t
master_results = slf.getVarValues()
# creates a triangulation grid using matplotlib function Triangulation
triang = mtri.Triangulation(x, y, IKLE)
# determine the spacing of the regular grid
range_in_x = x.max() - x.min()
range_in_y = y.max() - y.min()
max_range = max(range_in_x, range_in_y)
# first index is integer divider, second is remainder
num_x_pts = divmod(range_in_x, spacing)
num_y_pts = divmod(range_in_y, spacing)
print("Size of output matrix is : " + str(int(num_x_pts[0])) + " x " + str(int(num_y_pts[0])))
print("Grid resolution is : " + str(spacing) + " m")
# creates the regular grid
xreg, yreg = np.meshgrid(np.linspace(x.min(), x.max(), int(num_x_pts[0])),
np.linspace(y.min(), y.max(), int(num_y_pts[0])))
x_regs = xreg[1, :]
y_regs = yreg[:, 1]
raster_data = {}
# to interpolate to a reg grid
for var_index in desired_var_indices:
interpolator = mtri.LinearTriInterpolator(triang, master_results[var_index])
z = interpolator(xreg, yreg)
print("Shape of array z: " + str(z.shape[0]))
print("Shape of arrays xreg and yreg: " + str(x_regs.shape) + " " + str(y_regs.shape))
where_are_NaNs = np.isnan(z)
z[where_are_NaNs] = -999.0
# open the output *.asc file, and write the header info
header_str = "NCOLS " + str(z.shape[1]) + "\n"
header_str = header_str + "NROWS " + str(z.shape[0]) + "\n"
header_str = header_str + "XLLCORNER " + str(x_regs[0]) + "\n"
header_str = header_str + "YLLCORNER " + str(y_regs[0]) + "\n"
header_str = header_str + "CELLSIZE " + str(spacing) + "\n"
header_str = header_str + "NODATA_VALUE " + str(-999.00) + "\n"
# Save the output raster to a file
output_file = os.path.join(saving_folder, f"interpolated_{vnames[var_index]}.asc")
np.savetxt(output_file, np.flipud(z), fmt='%10.3f', header=header_str, comments='', delimiter='')
raster_data[vnames[var_index].strip()] = {
"data": np.flipud(z),
"x_regs": x_regs,
"y_regs": y_regs,
"spacing": spacing
}
return raster_data
[docs]
def classify_mu(raster_data, classification, output_folder, output_filename):
"""
Classify the morphological units (MU) based on velocity and depth and save as a raster file.
Parameters:
raster_data (dict): Dictionary containing 'velocity' and 'depth' raster data as numpy arrays.
classification (dict): Dictionary of classification criteria for different MUs.
output_folder (str): Folder path where the output file will be saved.
output_filename (str): The filename for the output raster file (without extension).
Returns:
None: The function will save the classified MU raster as an ASCII file in the output folder.
"""
# Extract velocity and depth data from the raster_data dictionary
velocity_data = raster_data['SCALAR VELOCITY']['data'] # Access 'data' if the raster data is a masked array
depth_data = raster_data['WATER DEPTH']['data']
# Ensure that both velocity and depth are numpy arrays, not dictionaries
assert isinstance(velocity_data, np.ndarray), f"Expected numpy array for velocity_data, but got {type(velocity_data)}"
assert isinstance(depth_data, np.ndarray), f"Expected numpy array for depth_data, but got {type(depth_data)}"
# Create an empty array for the output MU classification
mu_raster = np.zeros_like(velocity_data, dtype=int) # Initialize with zeros (no classification)
# Iterate through each classification and assign values to the raster
for mu, criteria in classification.items():
velocity_range = criteria['velocity_range']
depth_range = criteria['depth_range']
velocity_masked = velocity_data.mask
depth_masked = depth_data.mask
# Classify cells where velocity and depth are within the specified ranges
mu_mask = (velocity_data >= velocity_range[0]) & (velocity_data <= velocity_range[1]) & \
(depth_data >= depth_range[0]) & (depth_data <= depth_range[1])
mu_mask &= ~velocity_masked & ~depth_masked
# Assign the classification index (or any desired value) for the morphological unit
mu_raster[mu_mask] = mu # Use the classification index directly from the dictionary (e.g., 0, 1, 2...)
# Save the classified MU raster as an ASCII file
header_str = f"NCOLS {mu_raster.shape[1]}\n"
header_str += f"NROWS {mu_raster.shape[0]}\n"
header_str += f"XLLCORNER {raster_data['SCALAR VELOCITY']['x_regs'].min()}\n" # Assuming min x-coordinate for the corner
header_str += f"YLLCORNER {raster_data['WATER DEPTH']['y_regs'].min()}\n" # Assuming min y-coordinate for the corner
header_str += f"CELLSIZE {raster_data['WATER DEPTH']['spacing']}\n" # You can adjust the cell size depending on your data
header_str += "NODATA_VALUE -999\n"
# Construct output file path
output_file = os.path.join(output_folder, f"{output_filename}.asc")
# Save the classified MU raster as an ASCII file
np.savetxt(output_file, mu_raster, fmt='%d', header=header_str, comments='', delimiter=' ')
print(f"Classified MU raster saved to {output_file}")
[docs]
def parse_classes_keyword(file_path, keyword):
with open(file_path, 'r') as file:
for line in file:
stripped = line.strip()
# Skip empty lines
if not stripped:
continue
# Check for ":" or "="
for symbol in [":", "="]:
if symbol in stripped:
key_part = stripped.split(symbol, 1)[0].strip()
if keyword in key_part:
return stripped #
return None # If not found
[docs]
def update_gaia_class_line(line, index, new_value):
# Split at ":" or "="
if ':' in line:
key, values = line.split(':', 1)
separator = ':'
elif '=' in line:
key, values = line.split('=', 1)
separator = '='
else:
raise ValueError("Line must contain ':' or '='")
# Split values and remove extra spaces
values_list = [v.strip() for v in values.strip().split(';')]
# Check if index is valid
if index < 0 or index >= len(values_list):
raise IndexError("Index out of range")
# Update the value at the specified index
values_list[index] = f"{new_value:.3f}" # Format with 3 decimals if you want
# Reconstruct the line
updated_line = f"{key.strip()} {separator} {';'.join(values_list)}"
return updated_line
[docs]
def classify_parameters_tm_gaia(elements, classification_dict):
telemac_vars = []
gaia_vars = []
for element in elements:
source = classification_dict.get(element)
if source == "telemac":
telemac_vars.append(element)
elif source == "gaia":
gaia_vars.append(element)
else:
raise ValueError(f"Element '{element}' not recognized in CLASSIFICATION_DICT.")
return telemac_vars, gaia_vars
[docs]
def vtk_to_2dm(input_file, output_file):
"""
Convert a VTK/VTP mesh to a 2DM file.
Parameters
----------
input_file : str
Path to the input VTK or VTP file (ASCII, Binary, or Compressed).
output_file : str
Path to the output 2DM file.
"""
input_file = os.path.abspath(input_file)
if not os.path.exists(input_file):
raise FileNotFoundError(f"File not found: {input_file}")
mesh = pv.read(input_file)
# Get node coordinates
nodes = mesh.points
# Handle cells/faces depending on mesh type
tris = []
if hasattr(mesh, "faces") and len(mesh.faces) > 0: # PolyData
faces = mesh.faces
offset = 0
while offset < len(faces):
n = faces[offset] # number of vertices in this face
if n == 3:
tris.append(faces[offset+1:offset+4])
elif n == 4: # quad → split into 2 triangles
quad = faces[offset+1:offset+5]
tris.append([quad[0], quad[1], quad[2]])
tris.append([quad[0], quad[2], quad[3]])
offset += n + 1
elif hasattr(mesh, "cells") and len(mesh.cells) > 0: # UnstructuredGrid
cells = mesh.cells
offset = 0
while offset < len(cells):
n = cells[offset]
if n == 3:
tris.append(cells[offset+1:offset+4])
elif n == 4:
quad = cells[offset+1:offset+5]
tris.append([quad[0], quad[1], quad[2]])
tris.append([quad[0], quad[2], quad[3]])
offset += n + 1
else:
raise ValueError("Mesh has no faces or cells to export.")
tris = np.array(tris)
# Ensure output folder exists
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# Write to 2DM
with open(output_file, "w") as f:
# Header
f.write("MESH2D\n")
# Elements
for i, tri in enumerate(tris):
f.write(f"E3T {i+1} {tri[0]+1} {tri[1]+1} {tri[2]+1}\n")
# Nodes
for i, (x, y, z) in enumerate(nodes):
f.write(f"ND {i+1} {x:.6f} {y:.6f} {z:.6f}\n")
print(f"✅ 2DM file saved: {output_file}")
print(f" Nodes: {len(nodes)}, Elements: {len(tris)}")
[docs]
def twodm2SLF(input_file_2dm, output_file_adcirc, output_file_slf):
# read the 2dm file
n, e, x, y, z, ikle = read2dm(input_file_2dm)
print(f"Length of x: {len(x)}, y: {len(y)}, z: {len(z)}, expected n: {n}")
print(f"Last node index: {n - 1}")
print(f"Last node values: x={x[n - 1]}, y={y[n - 1]}, z={z[n - 1]}")
# write the adcirc file (in the original location)
writeAdcirc(n, e, x, y, z, ikle, output_file_adcirc)
# --- Create a temporary copy in the main working directory ---
cwd = os.getcwd()
temp_adcirc = os.path.join(cwd, os.path.basename(output_file_adcirc))
shutil.copy(output_file_adcirc, temp_adcirc)
precision = "double"
# use the temporary adcirc file instead of the original
n, e, x, y, z, ikle, ppIPOB = getIPOBO_IKLE(temp_adcirc)
# the above method generates a file called temp.cli, which we rename here
cli_file = output_file_slf.split('.', 1)[0] + '.cli'
os.rename('temp.cli', cli_file)
# now we can write the *.slf file
#######################################################################
if precision == 'single':
ftype = 'f'
fsize = 4
elif precision == 'double':
ftype = 'd'
fsize = 8
else:
print('Precision unknown! Exiting!')
sys.exit(0)
NELEM = e
NPOIN = n
NDP = 3 # always 3 for triangular elements
IKLE = ikle
IPOBO = ppIPOB
slf = ppSELAFIN(output_file_slf)
slf.setPrecision(ftype, fsize)
slf.setTitle('created with pputils')
slf.setVarNames(['BOTTOM ', 'BOTTOM FRICTION '])
slf.setVarUnits(['M ', ' ']) # ND = nondimensional
slf.setIPARAM([1, 0, 0, 0, 0, 0, 0, 0, 0, 1])
slf.setMesh(NELEM, NPOIN, NDP, IKLE, IPOBO, x, y)
slf.writeHeader()
# --- Define variables ---
zz = np.zeros((2, NPOIN)) # two variables now
zz[0, :] = z # bottom elevation
zz[1, :] = 0.03 # roughness example (constant value)
# You can also load roughness from file → e.g. np.loadtxt(...)
# --- Write variables ---
slf.writeVariables(0.0, zz)
# --- Cleanup: remove the temporary adcirc file ---
if os.path.exists(temp_adcirc):
os.remove(temp_adcirc)