Bolometrically Correct DSEP Evolutionary Tracks

This script provides utility to scan a directory, build a queryable in memory datastore for that directory, then bolometrically correct those tracks


from pysep.dsep import load_model                                                                                                                                                                                                                                                           
from pysep.io.trk import read_trk                                                                                                                                                                                                                                                           
import os                                                                                                                                                                                                                                                                                   
import pickle                                                                                                                                                                                                                                                                               
 
import matplotlib.pyplot as plt                                                                                                                                                                                                                                                             
import pandas as pd                                                                                                                                                                                                                                                                         
import numpy as np                                                                                                                                                                                                                                                                          
 
from scipy.interpolate import interp1d                                                                                                                                                                                                                                                      
 
from fidanka.bolometric.bctab import BolometricCorrector                                                                                                                                                                                                                                    
 
def scan_simulation_data(root_dir):                                                                                                                                                                                                                                                         
    sim_data = SimulationData()                                                                                                                                                                                                                                                             
    for population_dir in os.listdir(root_dir):                                                                                                                                                                                                                                             
        population_path = os.path.join(root_dir, population_dir)                                                                                                                                                                                                                            
        if not os.path.isdir(population_path):                                                                                                                                                                                                                                              
            continue                                                                                                                                                                                                                                                                        
        for mixing_length_dir in os.listdir(population_path):                                                                                                                                                                                                                               
            mixing_length_path = os.path.join(population_path, mixing_length_dir)                                                                                                                                                                                                           
            if not os.path.isdir(mixing_length_path):                                                                                                                                                                                                                                       
                continue                                                                                                                                                                                                                                                                    
            for mass_dir in os.listdir(mixing_length_path):                                                                                                                                                                                                                                 
                mass_path = os.path.join(mixing_length_path, mass_dir)                                                                                                                                                                                                                      
                if not os.path.isdir(mass_path):                                                                                                                                                                                                                                            
                    continue                                                                                                                                                                                                                                                                
                model_path = os.path.join(mass_path, 'model.dsep')                                                                                                                                                                                                                          
                if os.path.exists(model_path):                                                                                                                                                                                                                                              
                    if 'alpha' in mixing_length_dir: cut = 6                                                                                                                                                                                                                                
                    elif 'FeH' in mixing_length_dir: cut = 4                                                                                                                                                                                                                                
                    mixing_length = float(mixing_length_dir[cut:])                                                                                                                                                                                                                          
                    mass = float(mass_dir[2:])                                                                                                                                                                                                                                              
                    sim_data.add_entry(population_dir,mixing_length, mass, model_path)                                                                                                                                                                                                      
    return sim_data                                                                                                                                                                                                                                                                         
 
# mixing length comes up a lot in this class as I wrote it to scan through a grid                                                                                                                                                                                                           
# which was over mixing length. Here I use it for FeH. So in the class definition                                                                                                                                                                                                           
# where mixing length is refered to that should really be FeH. Note that this does                                                                                                                                                                                                          
# not actually change the internal logic of this class as it is really just a way                                                                                                                                                                                                           
# to query data.
class SimulationData:                                                                                                                                                                                                                                                               [66/173]
    def __init__(self):                                                                                                                                                                                                                                                                     
        self.data = {}  # Dictionary to hold the simulation data structure                                                                                                                                                                                                                  
 
    def add_entry(self, population, mixing_length, mass, path):                                                                                                                                                                                                                             
        """Add an entry to the data structure."""                                                                                                                                                                                                                                           
        if population not in self.data:                                                                                                                                                                                                                                                     
            self.data[population] = {}                                                                                                                                                                                                                                                      
        if mixing_length not in self.data[population]:                                                                                                                                                                                                                                      
            self.data[population][mixing_length] = {}                                                                                                                                                                                                                                       
        self.data[population][mixing_length][mass] = path                                                                                                                                                                                                                                   
 
    def get_path(self, population, mixing_length, mass):                                                                                                                                                                                                                                    
        """Retrieve the path for a given set of parameters."""                                                                                                                                                                                                                              
        return self.data.get(population, {}).get(mixing_length, {}).get(mass)                                                                                                                                                                                                               
 
    def save_to_file(self, filename):                                                                                                                                                                                                                                                       
        """Save the structure to a pickle file."""                                                                                                                                                                                                                                          
        with open(filename, 'wb') as f:                                                                                                                                                                                                                                                     
            pickle.dump(self.data, f)                                                                                                                                                                                                                                                       
 
    def load_from_file(self, filename):                                                                                                                                                                                                                                                     
        """Load the structure from a pickle file."""                                                                                                                                                                                                                                        
        with open(filename, 'rb') as f:                                                                                                                                                                                                                                                     
            self.data = pickle.load(f)                                                                                                                                                                                                                                                      
 
    def query(self, population, mixing_length, mass):                                                                                                                                                                                                                                       
        """Query the closest match for mixing length and mass within the specified population."""                                                                                                                                                                                           
        # Check if the population exists; if not, return None or an error message                                                                                                                                                                                                           
        if population not in self.data:                                                                                                                                                                                                                                                     
            return None, "Population not found"                                                                                                                                                                                                                                             
 
        mixing_length = float(mixing_length)                                                                                                                                                                                                                                                
        mass = float(mass)                                                                                                                                                                                                                                                                  
 
        if population in self.data:                                                                                                                                                                                                                                                         
            closest_mixing_length = self._find_closest(mixing_length,self.data[population].keys())                                                                                                                                                                                          
            closest_mass = self._find_closest(mass, self.data[population][closest_mixing_length].keys())                                                                                                                                                                                    
            path = self.data[population][closest_mixing_length][closest_mass]                                                                                                                                                                                                               
            return path, (population, closest_mixing_length, closest_mass)                                                                                                                                                                                                                  
        else:                                                                                                                                                                                                                                                                               
            return None, "Population not found"                                                                                                                                                                                                                                             
 
    def query_and_load(self, population, mixing_length, mass):                                                                                                                                                                                                                              
        queryResults = self.query(population, mixing_length, mass)                                                                                                                                                                                                                          
        return load_model(queryResults[0]), queryResults                                                                                                                                                                                                                                    
 
    def query_and_load_trk(self, population, val, mass):                                                                                                                                                                                                                                    
        queryResults = self.query(population, val, mass)                                                                                                                                                                                                                                    
        newPath = os.path.dirname(queryResults[0])                                                                                                                                                                                                                                          
        trkFile = filter(lambda x: x.endswith(".track"), os.listdir(newPath))                                                                                                                                                                                                               
        trkFilePath = os.path.join(newPath, next(trkFile))                                                                                                                                                                                                                                  
        return read_trk(trkFilePath), queryResults                                                                                                                                                                                                                                          
 
    def _find_closest(self, value, options):                                                                                                                                                                                                                                                
        """Find the closest numerical match to 'value' in 'options'."""                                                                                                                                                                                                                     
        options = [float(option) for option in options]                                                                                                                                                                                                                                     
        closest_value = min(options, key=lambda x: abs(x - value))                                                                                                                                                                                                                          
        return closest_value                                                                                                                                                                                                                                                                
 
def find_bump_lum(l, age):                                                                                                                                                                                                                                                                  
    dlda = np.gradient(l, age)                                                                                                                                                                                                                                                              
    bumpAge = age[dlda == dlda.min()]                                                                                                                                                                                                                                                       
    lf = interp1d(age, l)                                                                                                                                                                                                                                                                   
    bumpLum = lf(bumpAge)                                                                                                                                                                                                                                                                   
    return bumpLum[0]
 
if __name__ == "__main__":
    if not os.path.exists("./simulationData.OPAL.pkl"):
        models = scan_simulation_data("../data/outputs.feH.OPAL/")
        models.save_to_file("simulationData.OPAL.pkl")
    else:
        models = SimulationData()
        models.load_from_file("simulationData.OPAL.pkl")
 
    # Masses calibrated to place the start of the red giant branch at about 9.1 Gyr for all models
    # the range is between 9.1 and 9.4 Gyr due to the spacing of the mass grid
    # This may be able to be improved with mass interpolation between the tracks
    FeHMasses = {
        -2.4: 0.86,
        -2.2: 0.86,
        -2.0: 0.86,
        -1.8: 0.86,
        -1.6: 0.86,
        -1.4: 0.88,
        -1.2: 0.9,
        -1.0: 0.92,
        -0.8: 0.96
    }
    trks = dict()
    for FeH, mass in FeHMasses.items():
        model, queryResults = models.query_and_load("GS98-afe0", FeH, mass)
        trks[FeH] = (model.trk(), queryResults[1][2])
 
    bumpLoc = list()
    for FeH, trk in trks.items():
        logL = trk[0]['log_L']
        Teff = 10**trk[0]['log_Teff']
        age = trk[0]['AGE']
        cond = age > 4
        bumpL = find_bump_lum(logL[cond], age[cond])
        bumpLoc.append((FeH, bumpL))
 
    # the RGB bump location in Log(L)
    bumpLoc = np.array(bumpLoc)
 
    # Apply bolometric corrections to find it in Mv
    bcs = {feh: BolometricCorrector("ubv(ri)c", feh) for feh in FeHMasses}
    apparentMags = {feh: 
    bcs[feh].apparent_mags(
            10**trk[0]['log_Teff'].values,
            trk[0]['log_g'].values,
            trk[0]['log_L'].values,
            )
        for feh, trk in trks.items()
    }
    dfs = {feh:
            pd.DataFrame(
                {
                    'Vmag': apparentMags[feh]['Bessell_V'].values,
                    'Bmag': apparentMags[feh]['Bessell_B'].values,
                    'log_g': trk[0]['log_g'].values,
                    'Teff': 10**trk[0]['log_Teff'].values,
                    'log_L': trk[0]['log_L'].values,
                    'Age': trk[0]['AGE'].values,
                })
           for feh, trk in trks.items()
           }
    with open("apparentMags.OPAL.pkl", "wb") as f:
        pickle.dump(dfs, f)
 
    print(dfs)