DSEP Automated Testing Suite Documentation

This content outlines the DSEP Automated Testing Suite, a script designed for automatically generating and comparing solar, RGB, highmass, and lowmass models using the DSEP executable. The suite tests if the generated model results are within an acceptable tolerance level compared to stored standard results. It includes details on the prerequisites (like pysep installation), functions provided for setting up the environment, running model simulations, and comparing results, as well as instructions for generating comparison plots.

"""
DSEP Automated Testing Suite
 
Authors: Thomas M. Boudreaux, Brian Chaboyer
Created: July 2021
Last Modified: July 2021
 
Automated testing suite for DSEP. This script will automatically generate a
solar model, RGB model, highmass model, and lowmass model using the currently
built dsep executable on your computer (@ $DSEPRoot/dsepX/dsep). The results of
these 4 models will be saved and then compared against stored "standard"
results. If the results are within the given tolerance (default=0.0001) then
the test will pass, otherwise the test will report as having failed. If the
test fails the model(s) which the test failes on will be printed along with the
difference between the expected value and observed value for each column where
they diverge at each model.
 
Note for this to work you must have pysep installed and working!
 
Functions
---------
    setup_output_env
        Clear output folders if they exist and make a new folder to store
        output in.
    setup_run
        Setup the pysep.dsep.stellarModel object for a particular set of input
        files.
    read_with_warn
        Track read function wrapping pysep.io.trk.read.read_trk with a handler
        to catch warnings associated with the final track record not being
        written to disk properly.
    did_pass
        Check if the results generated from the dsep executable match the saved
        "standard results" to within the tolerance given.
    fail_report
        Print out some information to standard output about how the two track
        files being compared diverge.
    report
        Print out whether or not the two track files matched. If not call
        fail_report to show more information about how they diverged.
    solar
        Evolve a solar model using the dsep executable on disk.
    RGB
        Evolve a RGB model using the dsep executable on disk.
    highmass
        Evolve a highmass model using the dsep executable on disk.
    lowmass
        Evolve a lowmass model using the dsep executable on disk.
    compSolar
        Compare the results of the solar run from the dsep executable on disk
        to the stored "standard" solar results.
    compRGB
        Compare the results of the RGB run from the dsep executable on disk
        to the stored "standard" RGB results.
    compHighmass
        Compare the results of the highmass run from the dsep executable on disk
        to the stored "standard" highmass results.
    compLowmass
        Compare the results of the lowmass run from the dsep executable on disk
        to the stored "standard" lowmass results.
    main
        Run all the dsep runs and comparisons in order automatically (called
        when invoked from the command line)
"""
import pysep.dm.filetypes as ft
 
from pysep.dsep import stellarModel as sm
 
from pysep.io.nml.physics import load as load_pnml
from pysep.io.nml.control import load as load_cnml
 
from pysep.io.trk.read import read_trk
 
import shutil
import os
from os.path import join as pjoin
import argparse
 
from typing import Tuple, Union
import pandas as pd
 
import warnings
 
# ANSI color codes
RED="\u001b[31m"
YELLOW="\u001b[33m"
GREEN="\u001b[32m"
RESET="\u001b[0m"
 
def setup_output_env(outputPath : str):
    """
    Clear output folders if they exist and make a new folder to store
    output in.
 
    Parameters
    ----------
        outputPath : str
            Path to delete if it exists and make a new folder at regardless.
    """
    if os.path.exists(outputPath):
        shutil.rmtree(outputPath)
    os.mkdir(outputPath)
 
 
def setup_run(
        output : str,
        physics : str,
        control : str,
        fermi : str,
        opacf : str,
        bckur : str,
        bcphx95 : str,
        bcphx96 : str,
        bcphx97 : str,
        bcphx98 : str,
        bcphx99 : str,
        premsf : str
        ) -> sm:
    """
    Setup the pysep.dsep.stellarModel object for a particular set of input
    files.
 
    Parameters
    ----------
        output : str
            Path to save results too.
        physics : str
            Path to physics control namelist file.
        control : str
            Path to control control namelist file.
        fermi : str
            Path to fermi opacity file.
        opacf : str
            Path to high termperature opacity file.
        bckur : str
            Path to Kurz model atmosphere boundary conditions table.
        bcphx95 : str
            Path to Pheonix model atmosphere 95 boundary conditions table.
        bcphx96 : str
            Path to Pheonix model atmosphere 96 boundary conditions table.
        bcphx97 : str
            Path to Pheonix model atmosphere 97 boundary conditions table.
        bcphx98 : str
            Path to Pheonix model atmosphere 98 boundary conditions table.
        bcphx99 : str
            Path to Pheonix model atmosphere 99 boundary conditions table.
        premsf : str
            Path to premain sequence model to use.
 
    Returns
    -------
        model : pysep.dsep.stellarModel
            Unevolved but all setup stellar model.
    """
    physics = load_pnml(physics)
    control = load_cnml(control)
 
    fermi = ft.fermi_file(fermi)
    opacf = ft.opacf_file(opacf)
 
    bckur = ft.bckur_file(bckur)
 
    bcphx95 = ft.bcphx95_file(bcphx95)
    bcphx96 = ft.bcphx96_file(bcphx96)
    bcphx97 = ft.bcphx97_file(bcphx97)
    bcphx98 = ft.bcphx98_file(bcphx98)
    bcphx99 = ft.bcphx99_file(bcphx99)
 
    premsf = ft.premsf_file(premsf)
 
    model = sm(output, control, physics, opacf, premsf, fermi=fermi,
               bckur=bckur, bcphx95=bcphx95, bcphx96=bcphx96,
               bcphx97=bcphx97, bcphx98=bcphx98, bcphx99=bcphx99)
    return model
 
def read_with_warn(path : str) -> Tuple[pd.DataFrame, bool]:
    """
    Read in track file but accept that the final record may be corrupted. If
    this is the case set a flag to ignore the final record when calculating
    residuals
 
    Parameters
    ----------
        path : str
            Path to track file to read in
 
    Returns
    -------
        read : pd.DataFrame
            Datframe of track file at path
        ignoreFinalRecord : bool
            True if the final record of the read in track file is corrupted,
            False otherwise.
 
    Raises
    ------
        RuntimeError
            If warning does not match known warning for corrupted final record.
    """
    ignoreFinalRecord = False
    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always")
        read,_ = read_trk(path)
        assert len(w) == 0 or len(w) == 1
        if len(w) == 1:
            try:
                assert "Track file is non rectangular" in str(w[0].message)
                print(f"{YELLOW}Read Error with final record in track file at "
                      f"{path}{RESET}")
                ignoreFinalRecord = True
            except AssertionError:
                raise RuntimeError(f"Error! Unrecognized warning thrown! "
                                   f"{w[0].message}")
    return read, ignoreFinalRecord
 
 
def did_pass(
        expf : str,
        obsf : str,
        outputDir : str,
        tol : float
        ) -> Tuple[bool, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Check if the results generated from the dsep executable match the saved
    "standard results" to within the tolerance given.
 
    Parameters
    ----------
        expf : str
            File name of track file with expected output. File must be in the
            ./standardoutput directory.
        obsf : str
            File name of the track file with the observed output.
        outputDir : str
            Directory containing obsf.
        tol : float
            Tolerance to compare expected and observed to within.
 
    Returns
    -------
        didPass : bool
            True if the element wise differnece between the observeved and the
            expected dataframs from the track files is less than the tolerance
            for every entry in the every row, False otherwise.
        observed : pd.DataFrame
            observed track dataframe
        expected : pd.DataFrame
            expected track dataframe
        residuals : pd.DataFrame
            observed track dataframe - expected track dataframe
    """
    expected, eIgnoreFinalRecord = read_with_warn(pjoin("./standardoutput",
                                                        expf))
    observed, oIgnoreFinalRecord = read_with_warn(pjoin(outputDir, obsf))
 
    if eIgnoreFinalRecord or oIgnoreFinalRecord:
        print(f"{YELLOW}Ignoring final record when calculating residuals"
              f"{RESET}")
        expected = expected.iloc[:-1]
        observed = observed.iloc[:-1]
    if abs(expected.shape[0] - observed.shape[0]) > 1:
        print(f"{YELLOW}Shapes of track files do not match!{RESET}")
    residuals = observed-expected
    didPass = not ((residuals > tol).any()).any()
    return didPass, observed, expected, residuals
 
def fail_report(
        residuals : pd.DataFrame,
        tol : float,
        name : str = "test.flog"
        ):
    """
    Print out some information to standard output about how the two track
    files being compared diverge.
 
    Parameters
    ----------
        residuals : pd.DataFrame
            observed track dataframe - expected track dataframe
        tol : float
            Tolerance to compare against.
        name : str, default = test.flog
            Name of log file to write too.
    """
    badRecords = residuals[(residuals.select_dtypes(include=['number']) != 0.0).any(1)]
    print(f"{RED}Writing fail report to {name}{RESET}")
    with open(name, 'w') as f:
        for modelN, badRecord in badRecords.iterrows():
            for column in residuals:
                if abs(badRecord[column]) >= tol:
                    f.write(f"In model # {modelN+1} entry {column} diverges "
                            f"from expected value by {badRecord[column]}.\n")
 
def report(
        testOkay : bool,
        residuals : pd.DataFrame,
        testName : str,
        tol : float
        ):
    """
    Print out whether or not the two track files matched. If not call
    fail_report to show more information about how they diverged.
 
    Parameters
    ----------
        testOkay : bool
            Boolean flag defining whether the compaision showed that the two
            track files were consistent (True) or inconsistent (False)
        residuals : pd.DataFrame
            observed track dataframe - expected track dataframe
        testName : str
            Test name label so that the user can easily see which test the
            report is for.
        tol : float
            Tolerance to compare against.
    """
    if testOkay:
        print(f"{GREEN}{testName} Test Passed!{RESET}")
    else:
        print(f"{RED}{testName} Test Failed!{RESET}")
        fail_report(residuals, tol, name=f"{testName}.flog")
 
def plot(
        trkO : pd.DataFrame,
        trkC : pd.DataFrame,
        res : pd.DataFrame,
        x : str,
        y : str,
        path : str
        ):
    """
    Generate a plot of x vs y for both the standard and observed output. Save
    that to path.
 
    Parameters
    ----------
        trkO : pd.DataFrame
            Observed track file (the file generated by the test suite)
        trkC : pd.DataFrame
            Calculated track file (the file from standardOutput)
        res : pd.DataFrame
            Resudiauls between trkO and trkC (trkO-trkC)
        x : str
            Column name to plot on the x axis from trkO, trkC, and res
        y : str
            Column name to plot on the y axis from trkO, trkC, and res
        path : str
            Path to save resultant figure too.
    """
    import matplotlib.pyplot as plt
 
    fig, axs = plt.subplots(2, 1, figsize=(10, 7))
    axs[0].set_title('Observed and Calculated')
    axs[1].set_title('Residuals')
 
    axs[1].set_xlabel(x)
    axs[0].set_ylabel(y)
    axs[1].set_ylabel(f"{y}$_O$ - {x}$_C$")
 
    axs[0].plot(trkO[x].values,
                trkO[y].values,
                color='black',
                linestyle='solid',
                label="Observed")
    axs[0].plot(trkC[x].values,
                trkC[y].values,
                color='grey',
                linestyle='dashed',
                label="Calculated")
 
    axs[1].plot(res[x].values,
            res[y].values,
            color='black',
            linestyle='dashed',
            label='Residuals')
 
    axs[0].legend(frameon=False)
 
    fig.savefig(path, bbox_inches="tight")
 
 
def solar(run : bool, output : str='./output/solar'):
    """
    Evolve a solar model using the dsep executable on disk. Pysep model object
    will also be stashed to disk.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/solar
            Path to store output from model evolution.
    """
    if run:
        model = setup_run(output,
                          "./nml/phys1.nml",
                          "./nml/solar.nml",
                          "./opac/FERMI.TAB",
                          "./opac/GS98hz",
                          "./atm/atmk1990p00.tab",
                          "./atm/z_p0d0.afe_p0d0.dat",
                          "./atm/z_m0d5.afe_p0d0.dat",
                          "./atm/z_m0d7.afe_p0d0.dat",
                          "./atm/z_m1d0.afe_p0d0.dat",
                          "./atm/z_m1d5.afe_p0d0.dat",
                          "./prems/m100.GS98")
        setup_output_env(output)
        model.evolve()
        model.stash()
 
def FeH_2(run : bool, output : str='./output/feh-2.0'):
    """
    Evolve an RGB model using the dsep executable on disk. Pysep model object
    will also be stashed to disk.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/feh-2.0
            Path to store output from model evolution.
    """
    if run:
        model = setup_run(output,
                          "./nml/phys.rgb.nml",
                          "./nml/control.rgb.nml",
                          "./opac/FERMI.TAB",
                          "./opac/GS98hz",
                          "./atm/atmk1990p00.tab",
                          "./atm/z_m1d5.afe_p0d0.dat",
                          "./atm/z_m2d0.afe_p0d0.dat",
                          "./atm/z_m2d5.afe_p0d0.dat",
                          "./atm/z_m3d0.afe_p0d0.dat",
                          "./atm/z_m3d5.afe_p0d0.dat",
                          "./prems/m080.m2d0")
        setup_output_env(output)
        model.evolve()
        model.stash()
 
def highmass(run : bool, output : str='./output/highmass'):
    """
    Evolve a highmass model using the dsep executable on disk. Pysep model
    object will also be stashed to disk.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/highmass
            Path to store output from model evolution.
    """
    if run:
        model = setup_run(output,
                          "./nml/phys1.high.nml",
                          "./nml/control.high.nml",
                          "./opac/FERMI.TAB",
                          "./opac/GS98hz",
                          "./atm/atmk1990p00.tab",
                          "./atm/z_p0d0.afe_p0d0.dat",
                          "./atm/z_m0d5.afe_p0d0.dat",
                          "./atm/z_m0d7.afe_p0d0.dat",
                          "./atm/z_m1d0.afe_p0d0.dat",
                          "./atm/z_m1d5.afe_p0d0.dat",
                          "./prems/m280.GS98")
        setup_output_env(output)
        model.evolve()
        model.stash()
 
def lowmass(run : bool, output : str='./output/lowmass'):
    """
    Evolve a lowmass model using the dsep executable on disk. Pysep model object
    will also be stashed to disk.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/lowmass
            Path to store output from model evolution.
    """
    if run:
        model = setup_run(output,
                          "./nml/phys1.low.nml",
                          "./nml/control.low.nml",
                          "./opac/FERMI.TAB",
                          "./opac/GS98hz",
                          "./atm/atmk1990p00.tab",
                          "./atm/z_p0d0.afe_p0d0.dat",
                          "./atm/z_m0d5.afe_p0d0.dat",
                          "./atm/z_m0d7.afe_p0d0.dat",
                          "./atm/z_m1d0.afe_p0d0.dat",
                          "./atm/z_m1d5.afe_p0d0.dat",
                          "./prems/m030.GS98")
        setup_output_env(output)
        model.evolve()
        model.stash()
 
def compSolar(
        run : bool,
        output : str="./output/solar",
        tol : float=0.0001
        ) -> Union[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], None]:
    """
    Compare the results of the solar run from the dsep executable on disk
    to the stored "standard" solar results.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/solar
            Path where output from model evolution is stored.
        tol : float, default=0.0001
            Tolerance to compare observed and expected models to within.
    Returns
    -------
        observed : pd.DataFrame
            observed track dataframe
        expected : pd.DataFrame
            expected track dataframe
        residuals : pd.DataFrame
            observed track dataframe - expected track dataframe
    """
    if run:
 
        print("=======================")
        print("Testing Solar")
        testOkay, observed, expected, residuals = did_pass("m100solar.track",
                                                           "m100.GS98.track",
                                                           output,
                                                           tol)
        report(testOkay, residuals, "Solar", tol)
        return observed, expected, residuals
 
def compFeH_2(
        run : bool,
        output : str="./output/feh-2.0",
        tol : float=0.0001
        ) -> Union[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], None]:
    """
    Compare the results of the RBG run from the dsep executable on disk
    to the stored "standard" RBG results.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/RGB
            Path where output from model evolution is stored.
        tol : float, default=0.0001
            Tolerance to compare observed and expected models to within.
    Returns
    -------
        observed : pd.DataFrame
            observed track dataframe
        expected : pd.DataFrame
            expected track dataframe
        residuals : pd.DataFrame
            observed track dataframe - expected track dataframe
    """
    if run:
        print("=======================")
        print("Testing RGB")
        testOkay, observed, expected, residuals = did_pass("m078feh-2.0.track",
                                                           "m080.m2d0.track",
                                                           output,
                                                           tol)
        report(testOkay, residuals, "RGB", tol)
        return observed, expected, residuals
 
def compHighmass(
        run : bool,
        output : str="./output/highmass",
        tol : float=0.0001
        ) -> Union[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], None]:
    """
    Compare the results of the highmass run from the dsep executable on disk
    to the stored "standard" highmass results.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/highmass
            Path where output from model evolution is stored.
        tol : float, default=0.0001
            Tolerance to compare observed and expected models to within.
    Returns
    -------
        observed : pd.DataFrame
            observed track dataframe
        expected : pd.DataFrame
            expected track dataframe
        residuals : pd.DataFrame
            observed track dataframe - expected track dataframe
    """
    if run:
        print("=======================")
        print("Testing Highmass")
        testOkay, observed, expected, residuals = did_pass("m280solar.track",
                                                           "m280.GS98.track",
                                                           output,
                                                           tol)
        report(testOkay, residuals, "Highmass", tol)
        return observed, expected, residuals
 
def compLowmass(
        run : bool,
        output : str="./output/lowmass",
        tol : float=0.0001
        ) -> Union[Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], None]:
    """
    Compare the results of the lowmass run from the dsep executable on disk
    to the stored "standard" lowmass results.
 
    Parameters
    ----------
        run : bool
            Boolean flag, if true test will run, otherwise it will skip
        output : str, default=./output/lowmass
            Path where output from model evolution is stored.
        tol : float, default=0.0001
            Tolerance to compare observed and expected models to within.
    Returns
    -------
        observed : pd.DataFrame
            observed track dataframe
        expected : pd.DataFrame
            expected track dataframe
        residuals : pd.DataFrame
            observed track dataframe - expected track dataframe
    """
    if run:
        print("=======================")
        print("Testing Lowmass")
        testOkay, observed, expected, residuals = did_pass("m030solar.track",
                                                           "m030.GS98.track",
                                                           output,
                                                           tol)
        report(testOkay, residuals, "Lowmass", tol)
        return observed, expected, residuals
 
def get_test_bools(tests : list) -> dict:
    """
    Get a dictionary of bools defining if each test should be run
 
    Parameters
    ----------
        tests : list
            List of strings, each string in the list is a test that should be
            run
 
    Returns
    -------
        testDict : dict
            Dictionary of s, h, l, and r keys, they are true if the same key
            showed up in tests and false otherwise.
    """
    target = ['s', 'h', 'l', 'r']
    testDict = {key: True if key in tests else False for key in target}
    return testDict
 
def main(
        noGen : bool,
        tol : float,
        testDict : dict,
        doPlot : bool,
        xy : list,
        figurePath : str
        ):
    """
    Run all the dsep runs and comparisons in order automatically (called
    when invoked from the command line)
 
    Parameters
    ----------
        noGen : bool
            Flag controlling whether new track files will be generated or if
            the script will assume that there are already track files in the
            output directory. Can be helpful for degugging this script, but in
            general should be left to False
        tol : float
            Tolerance to compare observed and expected track files to within.
            This is needed because dsep is not nessiarilty bit for bit
            consistent across differnet computers / architectures / compilers.
            Rather, it is consistent in its output values down to some user
            defined tolerance. Whatever tolarance you run dsep with should be
            the same tolarance here.
        testDict : dict
            Dictionaries of what tests to run, each test has a string key which
            is true if the test should be run and false otherwise.
        doPlot : bool
            Generate plots. This will also import all required modules.
        xy : list of lists of strings
            list of size n where each element is a list of size 2. Each list of
            size 2 should have a variable to plot on the x axis as the first
            element and a variable to plot on the y axis as the second element.
            By the end of the functions execution (and if plot is True) there
            will be n total figures saved to disk
        figurePath : str
            Directory to save figures too.
 
    """
    if not noGen:
        solar(testDict['s'])
        FeH_2(testDict['r'])
        lowmass(testDict['l'])
        highmass(testDict['h'])
 
    solarResults = compSolar(testDict['s'], tol=tol)
    RGBResults = compFeH_2(testDict['r'], tol=tol)
    lowmassResults = compLowmass(testDict['l'], tol=tol)
    highmassResults = compHighmass(testDict['h'], tol=tol)
 
    if doPlot:
        for x, y in xy:
            plot(*solarResults, x, y, pjoin(figurePath,
                                            f"solar-{x}vs.{y}.pdf"))
            plot(*RGBResults, x, y, pjoin(figurePath,
                                          f"RGB-{x}vs.{y}.pdf"))
            plot(*lowmassResults, x, y, pjoin(figurePath,
                                              f"lowmass-{x}vs.{y}.pdf"))
            plot(*highmassResults, x, y, pjoin(figurePath,
                                               f"highmass-{x}vs.{y}.pdf"))
 
 
 
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Automatic tests for DSEP")
    parser.add_argument("--noGen", help="Do not regenerate output, just use "
                        "what is already stored in the output directory to run "
                        "just the comparisions", action='store_true',
                        default=False)
    parser.add_argument("--tol", help="Tolerance to check observerd to "
                        "expected output to within", type=float,
                        default=0.0001)
    parser.add_argument("-t", "--tests", default=["s", "r", "l", "h"],
                        help="Which tests to run, s for solar, r for rgb, l "
                        "for low mass, and h for highmass", nargs='*')
    parser.add_argument("-p", "--plot", action="store_true", help="Generate "
                        "plots for visual comparision")
    parser.add_argument("--xy", action="append", nargs=2,
                        help="sets of x and y axes to plot")
    parser.add_argument("--figurePath", help="path to save figures too",
                        type=str)
 
    args = parser.parse_args()
    if args.plot and not args.xy:
        args.xy = [["log_Teff", "log_L"]]
    if args.plot and not args.figurePath:
        parser.error("Error! --plot requires --figurePath to be set!")
 
    if len(args.tests) > 4:
        raise RuntimeError("Error! Only 4 tests are defined")
    if any(map(lambda x: not(x in ["s", "r", "l", "h"]), args.tests)):
        raise RuntimeError("Error! Only tests solar (s), RGB (r), Lowmass (l), "
                           "and Highmass (h) are defined")
    testDict = get_test_bools(args.tests)
    main(args.noGen, args.tol, testDict, args.plot, args.xy, args.figurePath)
    if os.path.exists("fort.23"):
        os.remove("fort.23")
    if os.path.exists("fort.33"):
        os.remove("fort.33")