import os
import re
import glob
import json
import sys
import shutil
import subprocess
import csv
import logging
from pathlib import Path
from typing import Any, Dict, Iterable, Optional, Tuple
import numpy as np
import pandas as pd
import pyvista as pv
from scipy import spatial
[docs]
class OpenFOAMController:
def __init__(self, case_dir: str):
self.case_dir = os.path.normpath(case_dir)
# Turns a relative path into an absolute one inside the case directory.
# Strips any leading case or template folder name to avoid double-nesting.
def _case_path(self, relpath: str) -> str:
if os.path.isabs(relpath):
return relpath
rp = relpath.replace("\\", "/").lstrip("/")
if rp.startswith("./"):
rp = rp[2:]
template_folder = os.path.basename(os.path.dirname(self.case_dir))
case_folder = os.path.basename(self.case_dir)
for prefix in (template_folder, case_folder):
if rp == prefix:
rp = ""
elif rp.startswith(prefix + "/"):
rp = rp[len(prefix) + 1 :]
return os.path.join(self.case_dir, rp)
# Runs a command and streams output line by line to stdout and optionally to a log file.
# Raises RuntimeError if the process exits with a non-zero return code.
def _run_streamed(self, cmd: Iterable[str], log_path: Optional[str] = None) -> None:
env = os.environ.copy()
env.pop("DISPLAY", None)
log_fh = open(log_path, "w") if log_path else None
try:
process = subprocess.Popen(
list(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
cwd=self.case_dir,
)
assert process.stdout is not None
for line in process.stdout:
print(line.rstrip())
if log_fh:
log_fh.write(line)
process.wait()
if process.returncode != 0:
raise RuntimeError(f"Command failed with return code {process.returncode}: {' '.join(cmd)}")
finally:
if log_fh:
log_fh.close()
# Runs command silently, raises CalledProcessError on failure.
def _run_checked(self, cmd: Iterable[str]) -> None:
env = os.environ.copy()
env.pop("DISPLAY", None)
subprocess.run(list(cmd), check=True, env=env, cwd=self.case_dir)
# Removes all processorN directories from the case folder before decomposePar.
def _clean_processor_dirs(self) -> None:
for p in glob.glob(os.path.join(self.case_dir, "processor[0-9]*")):
if os.path.isdir(p):
shutil.rmtree(p)
# Checks that system/decomposeParDict exists and returns its path.
def _ensure_decompose_dict(self) -> str:
path = self._case_path("system/decomposeParDict")
if not os.path.isfile(path):
raise FileNotFoundError(
f"Missing system/decomposeParDict in case: {self.case_dir}\n"
"Parallel runs need this file so decomposePar can create processor directories."
)
return path
# Updates numberOfSubdomains in decomposeParDict to match the requested processor count.
def _set_number_of_subdomains(self, nprocs: int) -> None:
path = self._ensure_decompose_dict()
with open(path, "r") as f:
lines = f.readlines()
pat = re.compile(r"^(\s*numberOfSubdomains\s+)\d+(\s*;\s*)$")
replaced = False
new_lines = []
for line in lines:
m = pat.match(line)
if m:
new_lines.append(f"{m.group(1)}{int(nprocs)}{m.group(2)}\n")
replaced = True
else:
new_lines.append(line)
if not replaced:
insert_at = len(new_lines)
for i, line in enumerate(new_lines):
if line.strip() == "}":
insert_at = i
break
new_lines.insert(insert_at, f"\nnumberOfSubdomains {int(nprocs)};\n")
with open(path, "w") as f:
f.writelines(new_lines)
# Cleans processor directories, sets the subdomain count, and runs decomposePar.
[docs]
def decompose_parallel_case(self, nprocs: int) -> None:
self._clean_processor_dirs()
self._set_number_of_subdomains(nprocs)
self._run_checked(["decomposePar", "-case", self.case_dir, "-force"])
# Reassembles the decomposed case into a single directory via reconstructPar.
[docs]
def reconstruct_parallel_case(self) -> None:
self._run_checked(["reconstructPar", "-case", self.case_dir])
# Modifies type, Ks, or value for a given patch in an OpenFOAM field file.
[docs]
def update_boundary_condition(self, file: str, patch: str, field_type: str, bc_type: str, value: Any) -> None:
path = self._case_path(file)
if not os.path.isfile(path):
raise FileNotFoundError(f"File not found: {path}")
with open(path, "r") as f:
lines = f.readlines()
new_lines = []
inside_boundary_field = False
inside_patch = False
patch_found = False
brace_level = 0
for i, line in enumerate(lines):
stripped = line.strip()
if not inside_boundary_field and stripped.startswith("boundaryField"):
inside_boundary_field = True
new_lines.append(line)
continue
if inside_boundary_field:
brace_level += line.count("{") - line.count("}")
if not inside_patch:
if re.match(rf"^\s*{re.escape(patch)}\s*\{{\s*$", line):
inside_patch = True
patch_found = True
new_lines.append(line)
continue
if re.match(rf"^\s*{re.escape(patch)}\s*$", line):
if i + 1 < len(lines) and lines[i + 1].strip() == "{":
inside_patch = True
patch_found = True
new_lines.append(line)
continue
if inside_patch:
if re.match(r"\s*type\s+", line):
new_lines.append(f" type {bc_type};\n")
continue
if re.match(r"\s*Ks\s+", line):
if value is None:
raise ValueError(f"Missing 'value' for patch '{patch}' in {file}")
new_lines.append(f" Ks uniform {float(value):.5f};\n")
continue
if re.match(r"\s*value\s+", line):
new_lines.append(" value uniform 0;\n")
continue
if "}" in line:
inside_patch = False
new_lines.append(line)
continue
new_lines.append(line)
if not patch_found:
raise ValueError(f"Patch '{patch}' not found in {file}")
with open(path, "w") as f:
f.writelines(new_lines)
# Updates a scalar key inside a named subdictionary in an OpenFOAM dict file.
[docs]
def update_dictionary_entry(self, file: str, subdict: str, key: str, value: float) -> None:
path = self._case_path(file)
if not os.path.isfile(path):
raise FileNotFoundError(f"File not found: {path}")
with open(path, "r") as f:
lines = f.readlines()
new_lines = []
inside_subdict = False
pending_subdict = False
brace_level = 0
key_updated = False
subdict_inline = re.compile(rf"^\s*{re.escape(subdict)}\s*\{{\s*$")
subdict_nameonly = re.compile(rf"^\s*{re.escape(subdict)}\s*$")
key_re = re.compile(rf"^\s*{re.escape(key)}\s+")
for line in lines:
stripped = line.strip()
if not inside_subdict and not pending_subdict:
if subdict_inline.match(line):
inside_subdict = True
brace_level = 1
new_lines.append(line)
continue
if subdict_nameonly.match(line):
pending_subdict = True
new_lines.append(line)
continue
if pending_subdict:
if stripped == "{":
inside_subdict = True
pending_subdict = False
brace_level = 1
new_lines.append(line)
continue
pending_subdict = False
if inside_subdict:
brace_level += line.count("{") - line.count("}")
if key_re.match(line):
new_lines.append(f" {key} {float(value):.5f};\n")
key_updated = True
continue
new_lines.append(line)
if brace_level <= 0:
inside_subdict = False
continue
new_lines.append(line)
if not key_updated:
raise ValueError(f"Key '{key}' not found in {file}::{subdict}")
with open(path, "w") as f:
f.writelines(new_lines)
# Dispatches all calibration parameter updates: Cmu goes to turbulenceProperties,
# everything else is written as a boundary condition; alpha.water fields are skipped.
[docs]
def update_model_controls(self, params: Dict[str, Dict[str, Any]]) -> None:
skip_fields = {"alpha.water", "alpha.water.orig"}
for patch, param in params.items():
file = param["file"]
if patch == "Cmu":
value = float(param["value"])
print("Updating model coefficient 'Cmu' in constant/turbulenceProperties...")
self.update_dictionary_entry(
file="constant/turbulenceProperties",
subdict="kEpsilonCoeffs",
key="Cmu",
value=value,
)
continue
if patch == "ks":
value = float(param["value"])
print(f"Updating Ks={value} on patch 'bottom' in 0/nut...")
self.update_boundary_condition(
file=param["file"],
patch=param["patch"],
field_type=param["field_type"],
bc_type=param["bc_type"],
value=value,
)
continue
field_type = param.get("field_type", "")
bc_type = param["bc_type"]
value = param.get("value", None)
field_name = os.path.basename(file)
if field_name in skip_fields:
print(f"Skipping field '{field_name}' (managed by setFields).")
continue
print(f"Updating boundary condition for patch '{patch}' in {file}...")
self.update_boundary_condition(file, patch, field_type, bc_type, value)
# Runs the solver; if nprocs > 1 decomposes first and reconstructs after.
[docs]
def run_simulation(self, nprocs: int = 8, solver: str = "interFoam") -> None:
print("Running OpenFOAM simulation...")
run_log = os.path.join(self.case_dir, "log.run")
if int(nprocs) > 1:
self.decompose_parallel_case(int(nprocs))
mpi_cmd = (
f"source $HOME/OpenFOAM/OpenFOAM-v2412/etc/bashrc && "
f"mpirun -np {int(nprocs)} {solver} -parallel"
)
self._run_streamed(
["bash", "-c", mpi_cmd],
log_path=run_log,
)
self.reconstruct_parallel_case()
else:
self._run_streamed([solver, "-case", self.case_dir], log_path=run_log)
print(f"Simulation finished. Log saved to {run_log}")
# Converts only the latest time directory to VTK format using foamToVTK.
[docs]
def convert_to_vtk(self) -> None:
print("Converting to VTK...")
cmd = (
f"source $HOME/OpenFOAM/OpenFOAM-v2412/etc/bashrc && "
f"foamToVTK -case {self.case_dir}"
)
env = os.environ.copy()
env.pop("DISPLAY", None)
result = subprocess.run(["bash", "-c", cmd], env=env, cwd=self.case_dir,
capture_output=True, text=True)
if result.returncode != 0:
print(f"foamToVTK stderr: {result.stderr}", flush=True)
print(f"foamToVTK stdout: {result.stdout}", flush=True)
raise RuntimeError(f"foamToVTK failed with exit code {result.returncode}")
print("VTK conversion completed.")
# =============================================================================
# OpenFOAMModel - BAL-compatible wrapper around OpenFOAMController
# =============================================================================
from hydroBayesCal.hysim import HydroSimulations
from hydroBayesCal.function_pool import logger
[docs]
class OpenFOAMModel(HydroSimulations):
"""
BAL-compatible wrapper around OpenFOAMController.
Provides the interface expected by bal_openfoam.py while using
your existing OpenFOAMController for the actual OpenFOAM operations.
"""
def __init__(
self,
case_template_dir,
solver_name="interFoam",
n_processors=8,
results_filename_base="results_interfoam",
alpha_water_name="alpha.water",
water_surface_alpha=0.5,
reference_z=0.0,
control_file="system/controlDict",
model_dir="",
res_dir="",
calibration_pts_file_path="",
n_cpus=8,
init_runs=5,
calibration_parameters=None,
param_values=None,
extraction_quantities=None,
calibration_quantities=None,
dict_output_name="extraction-data",
user_param_values=False,
max_runs=50,
complete_bal_mode=True,
only_bal_mode=False,
delete_complex_outputs=False,
validation=False,
multitask_selection="variables",
n_avg_timesteps=1,
*args,
**kwargs
):
# OpenFOAM-specific directory defaults, resolved before the base
# constructor runs so the standard result layout is built on them.
case_template_dir = os.path.abspath(case_template_dir)
model_dir = os.path.abspath(model_dir) if model_dir else os.path.dirname(case_template_dir)
res_dir = os.path.abspath(res_dir) if res_dir else os.path.join(model_dir, "results")
# Fall back to a minimal single-parameter k-epsilon Cmu setup so a bare
# OpenFOAM case is still usable without an explicit calibration config.
calibration_parameters = calibration_parameters or ["Cmu"]
param_values = param_values or [[0.06, 0.12]]
extraction_quantities = extraction_quantities or ["U_x", "U_y", "U_z"]
calibration_quantities = calibration_quantities or ["U_x", "U_y", "U_z"]
# The base class owns the common state: parameters, observations and
# variances (from the calibration CSV), and the standard result folder
# layout (asr_dir, calibration-data/<quantities>, restart_data, plots,
# surrogate-gpe). This keeps the OpenFOAM binding aligned with Telemac.
os.makedirs(model_dir, exist_ok=True)
super().__init__(
control_file=control_file,
model_dir=model_dir,
res_dir=res_dir,
calibration_pts_file_path=calibration_pts_file_path,
n_cpus=n_cpus,
init_runs=init_runs,
calibration_parameters=calibration_parameters,
param_values=param_values,
calibration_quantities=calibration_quantities,
extraction_quantities=extraction_quantities,
dict_output_name=dict_output_name,
user_param_values=user_param_values,
max_runs=max_runs,
complete_bal_mode=complete_bal_mode,
only_bal_mode=only_bal_mode,
delete_complex_outputs=delete_complex_outputs,
validation=validation,
multitask_selection=multitask_selection,
)
# OpenFOAM-specific attributes
self.case_template_dir = case_template_dir
self.solver_name = solver_name
self.n_processors = n_processors
self.results_filename_base = results_filename_base
self.alpha_water_name = alpha_water_name
self.water_surface_alpha = water_surface_alpha
self.reference_z = reference_z
self.n_avg_timesteps = max(1, int(n_avg_timesteps)) # minimum 1
# Alias consumed by the GP layer (see bal_openfoam.py).
self.parameter_ranges = self.param_values
# The base class sets these only when a calibration file is present;
# provide robust fallbacks so the BAL driver never sees None.
if self.num_calibration_quantities is None:
self.num_calibration_quantities = len(self.calibration_quantities)
if self.nloc is None:
self.nloc = 0
# XYZ coordinates of the calibration points. The base class stores the
# dataframe and the observations/variances; the OpenFOAM field
# extraction additionally needs the point coordinates.
self._load_control_points()
# Check that k is written to VTK output
self._check_k_in_controldict()
# Results storage
self.model_evaluations = None
logger.info(f"OpenFOAMModel initialized: {self.ndim} parameter(s), {self.nloc} locations")
def _check_k_in_controldict(self):
"""Check that the k field is written in the case template controlDict.
OpenFOAM does not always write k to VTK by default. If k is missing,
TKE and velocity fluctuations will be NaN in all outputs.
"""
controldict_path = os.path.join(self.case_template_dir, "system", "controlDict")
if not os.path.isfile(controldict_path):
logger.warning(
f"controlDict not found at {controldict_path} -- "
"cannot verify that 'k' is written to VTK. "
"TKE and fluctuations will be NaN if 'k' is absent from VTK output."
)
return
with open(controldict_path, "r") as f:
content = f.read()
# k is written if it appears in writeFields or if writeFormat includes it,
# or if there is no writeFields restriction (i.e. all fields are written)
has_write_fields = "writeFields" in content
k_explicitly_listed = bool(re.search(r'\bk\b', content))
if has_write_fields and not k_explicitly_listed:
logger.warning(
"controlDict contains 'writeFields' but 'k' does not appear to be listed. "
"TKE and velocity fluctuations will be NaN. "
"Add 'k' to writeFields in system/controlDict to fix this."
)
else:
logger.info("controlDict check passed: 'k' field appears to be available for VTK output.")
def _load_control_points(self):
"""Read the XYZ coordinates of the calibration points.
Observations, variances and ``nloc`` are set by the base class from the
calibration CSV (``<quantity>_DATA`` / ``<quantity>_ERROR`` columns).
The OpenFOAM field extraction additionally needs the point coordinates,
which are read here from the same dataframe the base class stored.
"""
df = self.calibration_pts_df
if df is None:
self.control_points = np.array([])
return
x_col = next((c for c in df.columns if c.lower() == 'x'), None)
y_col = next((c for c in df.columns if c.lower() == 'y'), None)
z_col = next((c for c in df.columns if c.lower() == 'z'), None)
if x_col and y_col and z_col:
self.control_points = df[[x_col, y_col, z_col]].values
else:
self.control_points = np.array([])
[docs]
def run_multiple_simulations(
self,
collocation_points,
complete_bal_mode=True,
validation=False,
bal_iteration=None,
bal_new_set_parameters=None
):
"""Run multiple simulations - BAL interface."""
if bal_new_set_parameters is not None:
params_to_run = np.atleast_2d(bal_new_set_parameters)
start_idx = collocation_points.shape[0]
else:
params_to_run = np.atleast_2d(collocation_points)
start_idx = 0
all_results = []
all_detailed_results = [] # for comprehensive CSV: one row per run x control point
for i, params in enumerate(params_to_run):
run_idx = start_idx + i
case_name = f"run_{run_idx:04d}"
case_dir = os.path.join(self.model_dir, case_name)
logger.info(f"\n{'='*60}")
logger.info(f"Simulation {run_idx + 1}: {case_name}")
logger.info(f"Parameters: {dict(zip(self.calibration_parameters, params))}")
# Copy template
if os.path.exists(case_dir):
shutil.rmtree(case_dir)
shutil.copytree(self.case_template_dir, case_dir)
# Note: nut internalField is already uniform 0 in the case template.
# No reset needed.
# Create controller for this run
foam = OpenFOAMController(case_dir)
# Build params dict for update_model_controls
update_params = {}
for pname, pval in zip(self.calibration_parameters, params):
if pname.lower() == "cmu":
update_params["Cmu"] = {
"file": "constant/turbulenceProperties",
"value": float(pval),
}
elif pname.lower() == "ks":
update_params["ks"] = {
"file": "0/nut",
"patch": "bottom",
"field_type": "scalar",
"bc_type": "nutkRoughWallFunction",
"value": float(pval),
}
else:
raise NotImplementedError(f"No dispatch defined for calibration parameter '{pname}'. "
f"Add a branch in run_multiple_simulations.")
try:
# Update parameters
foam.update_model_controls(update_params)
# Run simulation
foam.run_simulation(nprocs=self.n_processors, solver=self.solver_name)
# Convert to VTK
foam.convert_to_vtk()
# Extract fields from VTK (averaged over n_avg_timesteps)
coords, U, k = foam.extract_fields_from_vtk(n_avg_timesteps=self.n_avg_timesteps)
# Sanity check: verify fields differ from previous run.
# Identical fields across runs indicate a VTK extraction bug (e.g. missing -latestTime).
if run_idx > start_idx:
prev_U_path = os.path.join(self.restart_data_folder, "raw_fields", f"U_{run_idx - 1:04d}.npy")
if os.path.isfile(prev_U_path):
prev_U = np.load(prev_U_path)
if prev_U.shape == U.shape and np.allclose(prev_U, U, rtol=1e-6):
raise RuntimeError(
f"SANITY CHECK FAILED: Run {run_idx} produced identical U fields to run {run_idx - 1}. "
f"This almost certainly means foamToVTK is reading the wrong time directory. "
f"Check that -latestTime is being applied and that the simulation actually ran."
)
else:
logger.info(f"Sanity check passed: run {run_idx} fields differ from run {run_idx - 1}.")
if self.nloc == 0:
# No measurements file yet save raw fields to disk so they can
# be re-extracted at control points once measurements.csv is ready.
# This allows the initial runs to complete and free disk space
# without needing measurement coordinates.
raw_dir = os.path.join(self.restart_data_folder, "raw_fields")
os.makedirs(raw_dir, exist_ok=True)
np.save(os.path.join(raw_dir, f"coords_{run_idx:04d}.npy"), coords)
np.save(os.path.join(raw_dir, f"U_{run_idx:04d}.npy"), U)
if k is not None:
np.save(os.path.join(raw_dir, f"k_{run_idx:04d}.npy"), k)
logger.info(f"No measurements file raw fields saved to {raw_dir} for run {run_idx}.")
# Save collocation points so they can be reloaded later
current_cp = params_to_run[:i + 1]
np.save(os.path.join(self.restart_data_folder, "collocation_points.npy"), current_cp)
else:
# Measurements file exists interpolate to control points as normal
results = self._extract_at_control_points(coords, U, k)
run_results = []
for loc_idx in range(self.nloc):
for qty in self.calibration_quantities:
if qty in results:
run_results.append(float(results[qty][loc_idx]))
else:
run_results.append(np.nan)
all_results.append(run_results)
# Store detailed results: one row per control point
for pt_idx, cp in enumerate(self.control_points):
row = {
"run_idx": run_idx,
**{p: float(v) for p, v in zip(self.calibration_parameters, params)},
"x": float(cp[0]), "y": float(cp[1]), "z": float(cp[2]),
"U_x": float(results["U_x"][pt_idx]),
"U_y": float(results["U_y"][pt_idx]),
"U_z": float(results["U_z"][pt_idx]),
"U_magnitude": float(results["U_magnitude"][pt_idx]),
"u_fluct": float(results["u_fluct"][pt_idx]),
"v_fluct": float(results["v_fluct"][pt_idx]),
"w_fluct": float(results["w_fluct"][pt_idx]),
"TKE": float(results["TKE"][pt_idx]),
}
all_detailed_results.append(row)
# Save per-run JSON inside the run folder
self._save_run_results(case_dir, run_idx, params, results)
# Update model_evaluations
current_results = np.array(all_results)
if bal_new_set_parameters is None:
self.model_evaluations = current_results
current_cp = params_to_run[:i + 1]
else:
self.model_evaluations = np.vstack([
self.model_evaluations, current_results
]) if self.model_evaluations is not None else current_results
current_cp = np.vstack([collocation_points, params_to_run[:i + 1]])
# Save cumulative results after every run
self._save_all_results(current_cp, all_detailed_results)
# Delete the run folder immediately to free disk space
if self.delete_complex_outputs:
self._cleanup_run(case_dir)
except Exception as e:
import traceback
tb = traceback.format_exc()
logger.error(f"Simulation {case_name} failed: {e}")
logger.error(tb)
print(f"\n{'='*60}", flush=True)
print(f"EXCEPTION IN RUN {case_name}: {e}", flush=True)
print(tb, flush=True)
print(f"{'='*60}\n", flush=True)
if self.nloc > 0:
all_results.append([np.nan] * (self.nloc * len(self.calibration_quantities)))
for pt_idx, cp in enumerate(self.control_points):
row = {
"run_idx": run_idx,
**{p: float(v) for p, v in zip(self.calibration_parameters, params)},
"x": float(cp[0]), "y": float(cp[1]), "z": float(cp[2]),
"U_x": np.nan, "U_y": np.nan, "U_z": np.nan,
"U_magnitude": np.nan,
"u_fluct": np.nan, "v_fluct": np.nan, "w_fluct": np.nan,
"TKE": np.nan,
}
all_detailed_results.append(row)
# model_evaluations is already up to date from the per-run updates above.
# Final save to ensure all results are on disk after the full batch.
# Skip if nloc=0 (no measurements file yet) raw fields were saved per-run instead.
if self.nloc > 0:
if bal_new_set_parameters is not None:
all_cp = np.vstack([collocation_points, params_to_run])
else:
all_cp = params_to_run
self._save_all_results(all_cp, all_detailed_results)
def _extract_at_control_points(self, coords, U, k=None):
"""Extract velocity and TKE at control points.
TKE is read directly from the OpenFOAM k field (RANS turbulent kinetic energy).
Args:
coords: Array of VTK point coordinates (water phase only)
U: Velocity array at VTK points, shape (n_points, 3)
k: RANS turbulent kinetic energy array from OpenFOAM k field, or None
Returns:
dict with U_x, U_y, U_z, U_magnitude, u_fluct, v_fluct, w_fluct, TKE at control points
"""
if len(self.control_points) == 0:
return {
"U_x": np.array([]), "U_y": np.array([]), "U_z": np.array([]),
"U_magnitude": np.array([]),
"u_fluct": np.array([]), "v_fluct": np.array([]), "w_fluct": np.array([]),
"TKE": np.array([]),
}
tree = spatial.cKDTree(coords)
_, indices = tree.query(self.control_points)
U_cp = U[indices]
# TKE read directly from OpenFOAM k field
# Isotropic fluctuation components estimated as sqrt(2k/3) for reporting only
if k is not None:
k_cp = k[indices]
fluct = np.sqrt(np.maximum(2.0 / 3.0 * k_cp, 0.0))
else:
k_cp = np.full(len(self.control_points), np.nan)
fluct = np.full(len(self.control_points), np.nan)
return {
"U_x": U_cp[:, 0],
"U_y": U_cp[:, 1],
"U_z": U_cp[:, 2],
"U_magnitude": np.linalg.norm(U_cp, axis=1),
"u_fluct": fluct,
"v_fluct": fluct,
"w_fluct": fluct,
"TKE": k_cp, # k from OpenFOAM k-epsilon field
}
def _save_run_results(self, case_dir, run_idx, params, results):
"""Save individual run results."""
output = {
"run_index": int(run_idx),
"parameters": {k: float(v) for k, v in zip(self.calibration_parameters, params)},
"results": {k: v.tolist() for k, v in results.items()}
}
with open(os.path.join(case_dir, f"{self.dict_output_name}.json"), 'w') as f:
json.dump(output, f, indent=2)
def _save_all_results(self, collocation_points, detailed_results=None):
"""Save all results to calibration folder.
Saves:
- collocation_points.npy: calibration parameter values tested, shape (n_runs, n_params)
- model_evaluations.npy: flat model outputs, shape (n_runs, nloc * n_quantities)
- initial-model-outputs.json: same data as JSON
- collocation-points-{quantities}.csv: calibration parameter values as CSV
- results-detailed-{quantities}.csv: comprehensive table with one row per
run x control point, including coordinates, U, fluctuations and TKE
- results-detailed-{quantities}.npy: same data as structured numpy array
"""
np.save(os.path.join(self.calibration_folder, "collocation_points.npy"), collocation_points)
np.save(os.path.join(self.calibration_folder, "model_evaluations.npy"), self.model_evaluations)
output = {
"collocation_points": collocation_points.tolist(),
"model_evaluations": self.model_evaluations.tolist(),
"calibration_parameters": self.calibration_parameters,
"calibration_quantities": self.calibration_quantities,
"n_runs": int(collocation_points.shape[0])
}
with open(os.path.join(self.calibration_folder, "initial-model-outputs.json"), 'w') as f:
json.dump(output, f, indent=2)
# Also save to restart_data_folder so only_bal_mode can find it
with open(os.path.join(self.restart_data_folder, "initial-model-outputs.json"), 'w') as f:
json.dump(output, f, indent=2)
logger.info(f"Saved initial-model-outputs.json to restart_data folder for BAL restart.")
# Collocation points CSV (calibration parameter values)
quantities_str = '_'.join(self.calibration_quantities)
csv_path = os.path.join(self.calibration_folder, f"collocation-points-{quantities_str}.csv")
with open(csv_path, mode='w', newline='') as f:
writer = csv.writer(f)
writer.writerow(self.calibration_parameters)
writer.writerows(collocation_points.tolist())
logger.info(f"Saved collocation points CSV to {csv_path}")
# Comprehensive results CSV and npy (run x control point rows)
if detailed_results:
detailed_csv_path = os.path.join(
self.calibration_folder, f"results-detailed-{quantities_str}.csv"
)
fieldnames = list(detailed_results[0].keys())
# Append if file exists, write fresh if not
file_exists = os.path.isfile(detailed_csv_path)
with open(detailed_csv_path, mode='a', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerows(detailed_results)
logger.info(f"Saved detailed results CSV to {detailed_csv_path}")
# Also save as npy structured array
dtype = [(k, 'f8') for k in fieldnames]
arr = np.array([tuple(r[k] for k in fieldnames) for r in detailed_results], dtype=dtype)
npy_path = os.path.join(
self.calibration_folder, f"results-detailed-{quantities_str}.npy"
)
np.save(npy_path, arr)
logger.info(f"Saved detailed results npy to {npy_path}")
[docs]
def save_calibration_data(self, it, collocation_points, bayesian_dict):
"""Write per-iteration CSV files to ``calibration-data/<quantities>/``.
Called once per BAL iteration from ``bal_openfoam.py`` after
``estimate_bme()``. Produces three files per iteration::
collocation_points_N{n_tp}.csv parameter values tested so far
model_results_N{n_tp}.csv simulation outputs (model_evaluations)
bayesian_scores.csv BME, RE, IE, ELPD for all iterations
``bayesian_scores.csv`` is appended on each call (one row per iteration).
The posterior is saved as a separate ``.npy`` file because it is a
variable-length array (rejection sampling keeps only accepted samples).
"""
n_tp = int(collocation_points.shape[0])
folder = self.calibration_folder
# 1. Collocation points CSV
cp_path = os.path.join(folder, f"collocation_points_N{n_tp:03d}.csv")
cp_df = pd.DataFrame(collocation_points, columns=self.calibration_parameters)
cp_df.index.name = "run_idx"
cp_df.to_csv(cp_path)
# 2. Model evaluations CSV
quantities_str = '_'.join(self.calibration_quantities)
col_names = [
f"{qty}_z{i}"
for i in range(self.nloc)
for qty in self.calibration_quantities
]
if self.model_evaluations is not None:
me_path = os.path.join(folder, f"model_results_N{n_tp:03d}.csv")
me_df = pd.DataFrame(self.model_evaluations, columns=col_names)
me_df.index.name = "run_idx"
me_df.to_csv(me_path)
# 3. Posterior npy (variable size - one file per iteration)
posterior = bayesian_dict['posterior'][it]
if posterior is not None and len(posterior) > 0:
post_path = os.path.join(folder, f"posterior_N{n_tp:03d}.npy")
np.save(post_path, posterior)
# 4. Bayesian scores CSV (one growing file, appended each call)
scores_path = os.path.join(folder, "bayesian_scores.csv")
scores_row = {
"iteration": it,
"N_tp": n_tp,
"BME": bayesian_dict['BME'][it],
"RE": bayesian_dict['RE'][it],
"IE": bayesian_dict['IE'][it],
"ELPD": bayesian_dict['ELPD'][it],
"post_size": int(bayesian_dict['post_size'][it]),
}
scores_df = pd.DataFrame([scores_row])
write_header = not os.path.isfile(scores_path)
scores_df.to_csv(scores_path, mode='a', header=write_header, index=False)
logger.info(
f"Saved calibration-data for iteration {it} "
f"(N_tp={n_tp}, BME={bayesian_dict['BME'][it]:.4e}, "
f"RE={bayesian_dict['RE'][it]:.4f})"
)
def _cleanup_run(self, case_dir):
"""Delete the entire run folder to free disk space.
All results have already been saved to:
- model_evaluations.npy (GP training data)
- collocation_points.npy
- results-detailed-*.csv / .npy
- initial-model-outputs.json
so the raw OpenFOAM output is no longer needed.
"""
if os.path.isdir(case_dir):
shutil.rmtree(case_dir)
logger.info(f"Deleted run folder to free disk space: {case_dir}")
[docs]
def output_processing(self, output_data_path=None, **kwargs):
"""Load existing results for BAL restart."""
if output_data_path and os.path.isfile(output_data_path):
with open(output_data_path, 'r') as f:
data = json.load(f)
self.restart_collocation_points = np.array(data["collocation_points"])
self.model_evaluations = np.array(data["model_evaluations"])
return self.model_evaluations
raise FileNotFoundError(f"Output data not found: {output_data_path}")