Astrophysical Photometry Analysis with Python

photometry.py

This Python script, photometry.py, appears to be designed for advanced photometric analysis in astrophysics, specifically tailored for analyzing data from the Hubble Space Telescope's Ultraviolet Legacy Library of Young Stars as Essential Standards (HUGS). It includes functions for reading, cleaning, and processing HUGS data; extracting features such as the main sequence, red giant branch, and pseudo-color ridgelines from color-magnitude diagrams (CMDs); and identifying stellar populations using machine learning techniques with HDBSCAN. The script applies various astrophysical data analysis techniques including sigma clipping, curve fitting, gradient calculation, and CMD verticalization, indicating a comprehensive tool for astrophysical data exploration and analysis.

import re
import os
import pathlib
import numpy as np
import pandas as pd
from tqdm import tqdm
 
from scipy.signal import savgol_filter
from scipy.optimize import minimize, newton, curve_fit
from scipy.interpolate import interp1d, UnivariateSpline
from scipy.integrate import quad
 
from astropy.convolution import convolve, Box1DKernel, Gaussian1DKernel
from astropy.stats import freedman_bin_width
from astropy.stats import sigma_clip
 
from itertools import permutations
 
import hdbscan
 
def read_HUGS_data(path):
    colNames = ["X","Y",
                "F275W","F275W_RMS","F275W_QFP","F275W_SP","F275W_NI","F275W_NG",
                "F336W","F336W_RMS","F336W_QFP","F336W_SP","F336W_NI","F336W_NG",
                "F438W","F438W_RMS","F438W_QFP","F438W_SP","F438W_NI","F438W_NG",
                "F606W","F606W_RMS","F606W_QFP","F606W_SP","F606W_NI","F606W_NG",
                "F814W","F814W_RMS","F814W_QFP","F814W_SP","F814W_NI","F814W_NG",
                "MP", "RA", "Dec", "ID", "Itr"
               ]
    df = pd.read_csv(path, names=colNames,comment="#",sep=r'\s+').set_index("ID")
    filters = ["F275W", "F336W", "F438W", "F606W", "F814W"]
    for f in filters:
        df = df[abs(df[f]) != 99.9999]
    names = set()
    for C in permutations(filters):
        name = f"C_{C[0][1:-1]}_{C[1][1:-1]}_{C[2][1:-1]}"
        preLen = len(names)
        names.add(name)
        postLen = len(names)
        if preLen != postLen:
            df[name] = (df[C[0]] - df[C[1]]) - (df[C[1]] - df[C[2]])
    return df
 
 
def clean_HUGS_data(df, filters, binNum=12, removeGiantBranch=False):
    """
    Clean HUGS data based on the procedudure laied out in section 3 of 10.1093/mnras/sty2515
 
    Parameters
    ----------
        df : pd.DataFrame
            Pandas dataframe with HUGS data, keys must be in the form of filterName, filterName_QFP, filterName_RMS, filterName_SP
            so that for the F606W filter for example you would have the keys: F606W, F606W_QFP, F606W_RMS, and F606W_SP
        filters : list[str]
            List of filter names to clean based on. All filters will be cleaned simultaniously so that the final
            resultant dataframe only retains targets which pass all quality checks in all requested filters.
        binNum : int, default=12
            Number of magnitude bins to break filter into. Default is 12 to match the filtering done in
            10.1093/mnras/sty2515
        removeGiantBranch : bool, default=True
            If True filter the giant branch out of the data
    Returns
    -------
        pd.DataFrame
            clenaed dataframe with only targets that match the quality parameter an all filters
 
    Examples
    --------
        Say there is some loaded HUGS data in the varaiable meth1DF, then to filter based on 606 and 814
 
        >>> cleaned = clean_HUGS_data(meth1DF, ["F606W", "F814W"])
 
        if you just want to clean based on one filter then that still needs to be in a list
 
        >>> cleaned = clean_HUGS_data(meth1DF, ["F606W"])
    """
    line = lambda x, m, b: m*x + b
    above = lambda x, y, f: y >= f(x) 
    below = lambda x, y, f: y < f(x)
    cut = np.ones(shape=df.shape[0],dtype=bool)
    for fID, filterName in enumerate(filters):
        X = df[filterName]
        YQFP = df[f"{filterName}_QFP"]
        YRMS = df[f"{filterName}_RMS"]
        RDAX = df[f"{filterName}_SP"]
 
        binLeft = np.linspace(X.min(),X.max(),binNum)
        meanBinSize = np.mean(binLeft[1:]-binLeft[:-1])
        binRight = binLeft+meanBinSize
 
        bounds = np.zeros(shape=(3,binLeft.shape[0]))
        for binID, (left, right) in enumerate(zip(binLeft, binRight)):
            binCut = (X >= left) & (X < right)
            subDataX = X[binCut]
            subDataYQFP = YQFP[binCut]
            subDataYRMS = YRMS[binCut]
            subData = np.vstack((subDataX,subDataYQFP,subDataYRMS))
            clippedQFP = sigma_clip(subDataYQFP,sigma=3.5,maxiters=5)
            clippedRMS = sigma_clip(subDataYRMS,sigma=3.5,maxiters=5)
 
            clippedQFPX = subDataX[~clippedQFP.mask]
            clippedQFPY = subDataYQFP[~clippedQFP.mask]
 
            clippedRMSX = subDataX[~clippedRMS.mask]
            clippedRMSY = subDataYRMS[~clippedRMS.mask]
 
            sigmaMeanQFP = np.mean(clippedQFPY)
            sigmaMeanRMS = np.mean(clippedRMSY)
 
            sigmaQFP = np.std(clippedQFPY)
            sigmaRMS = np.std(clippedRMSY)
 
            lowerBoundQFP = sigmaMeanQFP-3.5*sigmaQFP
            lowerBoundRMS = sigmaMeanRMS+3.5*sigmaRMS
 
            bounds[0,binID] = (left+right)/2
            bounds[1,binID] = lowerBoundQFP
            bounds[2,binID] = lowerBoundRMS
 
        interpFQFP = interp1d(bounds[0], bounds[1],bounds_error=False,fill_value='extrapolate')
        interpFRMS = interp1d(bounds[0], bounds[2],bounds_error=False,fill_value='extrapolate')
 
        cutQFP = above(X.values,YQFP.values,interpFQFP)
        cutRMS = below(X.values,YRMS.values,interpFRMS)
 
        cutRDX = (RDAX.values > -0.15) & (RDAX.values < 0.15)
 
        cut &= cutQFP & cutRMS & cutRDX
        if removeGiantBranch:
            color = df["F275W"] - df["F814W"]
            mag = df["F814W"]
            giantBranchCut = above(color,mag,lambda x: line(x, -1.5, 20))
            cut &= giantBranchCut
    return df[cut]
 
def load_HUGS(root):
    methods = pathlib.Path(root).rglob("*.txt")
    methods = filter(lambda x: not(str(os.path.basename(x)).startswith('.')), methods)
    methods = list(filter(lambda x: 'catalog-meth' in str(x), methods))
    methodNums = [int(re.findall(r'meth(\d)', str(path))[0]) for path in methods]
 
    out = dict()
    for methID, methPath in zip(methodNums, methods):
        df = read_HUGS_data(methPath)
        out[methID] = clean_HUGS_data(df, ["F275W", "F336W", "F438W", "F814W"], removeGiantBranch=True)
    return out
 
def ridge_bounding(a1,a2,binsLeft,histBins=50,smooth=True,boxCarWidth=3):
    gaus = lambda x, a, b, c: a*np.exp(-(x-b)**2/(2*c**2))
    binSize = binsLeft[1] - binsLeft[0]
    binsRight = binsLeft + binSize
    ridgeLine = np.zeros(shape=(4,binsLeft.shape[0]))
    for binID, (left, right) in enumerate(zip(binsLeft, binsRight)):
        cut = (a2 >= left) & (a2 < right)
        Xss = a1[cut]
        Yss = a2[cut]
        histV, histLE = np.histogram(Xss,bins=histBins)
        histC = (histLE + (histLE[1] - histLE[0])/2)[:-1]
        try:
            fit, covar = curve_fit(gaus,histC,histV,p0=[max(histV),histC[histV.argmax()],0.01])
            ridgeLine[0,binID] = fit[1]
            ridgeLine[1,binID] = (left+right)/2
            ridgeLine[2,binID] = np.percentile(Xss,4)
            ridgeLine[3,binID] = np.percentile(Xss,96)
        except RuntimeError:
            print(f"Unable to fit for bin {binID} from {left}->{right} with {cut.shape[0]} targets")
            for i in range(4):
                ridgeLine[i,binID] = np.nan
    if smooth:
        ridgeLine[2] = convolve(ridgeLine[2], [1,1,1])
        ridgeLine[3] = convolve(ridgeLine[3], [1,1,1])
    return ridgeLine[:, 1:-1]
 
def get_mag_and_color_ranges(color, mag, lowerSig, upperSig):
    colorRange = (np.percentile(color, lowerSig), np.percentile(color, upperSig))
    magRange = (np.percentile(mag, lowerSig), np.percentile(mag, upperSig))
    return colorRange, magRange
 
def CMD_ridge_line_array(CMD, f1, f2, f3, lowerSig=1, upperSig=99, fiducialSigma=2, binSize=0.2, histBins=50):
    color = CMD[f1] - CMD[f2]
    colorRange, magRange = get_mag_and_color_ranges(color, CMD[f3], lowerSig, upperSig)
    binsLeft = np.arange(magRange[0], magRange[1], binSize)
    colorCut = CMD[(color >= colorRange[0]) & (color < colorRange[1])]
    ridgeLine = ridge_bounding(colorCut[f1]-colorCut[f2],colorCut[f3],binsLeft, histBins=histBins)
    return ridgeLine
 
def PseudoColor_ridge_line_array(CMD, pf, f2, magRange = (12, 18), fiducialSigma=2, binSize=0.1, histBins=50):
    binsLeft = np.arange(magRange[0], magRange[1], binSize)
    pseudoColorRange = (np.percentile(CMD[pf].values,0.25), np.percentile(CMD[pf].values, 100-0.25))
    pseudoColorCut = CMD[(CMD[pf] > pseudoColorRange[0]) & (CMD[pf] < pseudoColorRange[1])]
    ridgeLine = ridge_bounding(pseudoColorCut[pf], pseudoColorCut[f2], binsLeft, smooth=True,boxCarWidth=6)
    return ridgeLine
 
def find_steepest_root(x, y):
    f = interp1d(x,y,bounds_error=False,fill_value='extrapolate')
    df = interp1d(x,np.gradient(y)/np.gradient(x))
    roots = set()
    for x0 in x:
        root = newton(f,x0)
        if x.min() <= root <= x.max():
            slope = df(root)[()]
            roots.add((root, slope))
 
    roots = sorted(roots, key=lambda x: abs(x[1]))
    return roots[-1][0]
 
def extract_sequences(CMD, f1, f2, reverse_order=False, sigDepth=0.25):
    upperSig = 100-sigDepth
    lowerSig = sigDepth
 
    if not reverse_order:
        f3 = f1
    else:
        f3 = f2
 
    ridgeLine = CMD_ridge_line_array(CMD, f1, f2, f3,lowerSig=lowerSig, upperSig=upperSig)
    MSdCdM = np.gradient(ridgeLine[0])/np.gradient(ridgeLine[1])
    df = interp1d(ridgeLine[1], MSdCdM)
    initGuess = ridgeLine[1][abs(MSdCdM) == min(abs(MSdCdM))]
    MSroot = newton(df, initGuess)[0]
 
 
    RGB = MSdCdM[ridgeLine[1] < MSroot]
    RGBMag = ridgeLine[1][ridgeLine[1] < MSroot]
 
    RGBdCdM = np.gradient(RGB)/np.gradient(RGBMag)
    RGBRoot = find_steepest_root(RGBMag, RGBdCdM)
 
    _, magRange = get_mag_and_color_ranges(CMD[f1]-CMD[f2], CMD[f3], lowerSig, upperSig)
 
    MSMagRange = (MSroot, magRange[1])
    # RGBMagRange = (magRange[0], RGBRoot)
    RGBMagRange = (15.25, 18)
 
    color = CMD[f1].values-CMD[f2].values
    mag = CMD[f3].values
    MSColorMagCut = color[(mag >= MSMagRange[0]) & (mag < MSMagRange[1])]
    RGBColorMagCut = color[(mag >= RGBMagRange[0]) & (mag < RGBMagRange[1])]
 
    MSColorRange = (np.percentile(MSColorMagCut,sigDepth), np.percentile(MSColorMagCut, 100-sigDepth))
    RGBColorRange = (np.percentile(RGBColorMagCut,sigDepth), np.percentile(RGBColorMagCut, 100-sigDepth))
 
    out = {
        "MS": {
            "mag": MSMagRange,
            "color": MSColorRange
        },
        "RGB": {
            "mag": RGBMagRange,
            "color": RGBColorRange
        }
    }
    return out
 
def interpolate_CMD_ridge_line(df, f1, f2, f3, fiducialSigma=2, binSize=0.2, histBins=50):
    ridgeLine = CMD_ridge_line_array(df, f1, f2, f3, fiducialSigma=fiducialSigma, binSize=binSize, histBins=histBins)
    rlF = interp1d(ridgeLine[1], ridgeLine[0], bounds_error=False, fill_value="extrapolate")
    rlpsF = interp1d(ridgeLine[1], ridgeLine[2], bounds_error=False, fill_value="extrapolate")
    rlssF = interp1d(ridgeLine[1], ridgeLine[3], bounds_error=False, fill_value="extrapolate")
    return rlF, rlpsF, rlssF
 
def interpolate_PsuedoColor_ridge_line(df, pf, f2, magRange = (12, 18), fiducialSigma=2, binSize=0.5, histBins=50):
    ridgeLine = PseudoColor_ridge_line_array(
        df,
        pf,
        f2,
        fiducialSigma=fiducialSigma,
        magRange=magRange,
        binSize=binSize,
        histBins=histBins
        )
    rlF = interp1d(ridgeLine[1], ridgeLine[0],bounds_error=False,fill_value="extrapolate")
    rlpsF = interp1d(ridgeLine[1], ridgeLine[2],bounds_error=False,fill_value="extrapolate")
    rlssF = interp1d(ridgeLine[1], ridgeLine[3],bounds_error=False,fill_value="extrapolate")
    return rlF, rlpsF, rlssF
 
def cut_sequence(CMD, f1, f2, f3, magRange, colorRange):
    sequenceCut = (CMD[f3] >= magRange[0]) & (CMD[f3] < magRange[1])
    sequenceCut &= (CMD[f1] - CMD[f2] >= colorRange[0])
    sequenceCut &= (CMD[f1] - CMD[f2] < colorRange[1])
    s = CMD[sequenceCut]
    return s
 
 
def RGB_chromosome_map(CMD, f1, f2, pf, binSize=0.1, histBins=50, reverse_order=False):
    if reverse_order:
        f3 = f2
    else:
        f3 = f1
    sequences = extract_sequences(CMD, f1, f2, reverse_order=reverse_order)
    RGBMagRange = sequences['RGB']['mag']
    RGBcolorRange = sequences['RGB']['color']
    rgb = cut_sequence(CMD, f1, f2, f3, RGBMagRange, RGBcolorRange)
 
 
    CMDRidgeLine, CMDBlueFiducial, CMDRedFiducial = interpolate_CMD_ridge_line(
        CMD,
        f1,
        f2,
        f3,
        binSize=binSize,
        histBins=histBins
        )
    PSRidgeLine, PSBlueFiducial, PSRedFiducial = interpolate_PsuedoColor_ridge_line(
        CMD,
        pf,
        f2,
        magRange=RGBMagRange,
        binSize=binSize,
        histBins=histBins
        )
 
    CMDcolorWidth = CMDRedFiducial(rgb[f2]) - CMDBlueFiducial(rgb[f2])
    PSWidth = PSRedFiducial(rgb[f2])-PSBlueFiducial(rgb[f2])
 
    rgb['CW1'] = CMDcolorWidth * ((rgb[f1].values-rgb[f2].values) - \
                CMDRedFiducial(rgb[f2].values))/(CMDRedFiducial(rgb[f2].values)\
                - CMDBlueFiducial(rgb[f2].values))
    rgb['CW2'] = PSWidth * ((PSRedFiducial(rgb[f2].values) -\
                rgb[pf].values)/(PSRedFiducial(rgb[f2].values) - \
                PSBlueFiducial(rgb[f2].values)))
 
    return rgb
 
def extract_pop(CMD, n, f1="F275W", f2="F814W", pf="C_275_336_438", binSize=0.4, reverse_order=True):
    RGBchromosomeMap = RGB_chromosome_map(CMD, f1, f2, pf, reverse_order=reverse_order, binSize=binSize)
    filtered = RGBchromosomeMap[ # Automate these cuts
        (RGBchromosomeMap["CW1"] >= -0.66) & 
        (RGBchromosomeMap["CW1"] < 0.3) & 
        (RGBchromosomeMap["CW2"] >= -0.2) &
        (RGBchromosomeMap["CW2"] < 0.5)
    ][["CW1", "CW2"]]
    clusters = hdbscan.HDBSCAN(min_cluster_size=30, min_samples=40).fit(filtered.values)
    filtered["cluster"] = clusters.labels_
    merged = CMD.merge(filtered, how='outer', on="ID")
    assert n in merged["cluster"].unique()
    return (merged[(merged["cluster"] == n) | (np.isnan(merged["cluster"]))])
 
 
def verticalize_CMD(CMD, f1, f2, binSize=0.2, histBins=50, reverseOrder=False):
    newCMD = CMD.copy()
    if reverseOrder:
        f3 = f2
    else:
        f3 = f1
    sigDepth = 0.25
    upperSig = 100-sigDepth
    lowerSig = sigDepth
 
 
    rff, _, _ = interpolate_CMD_ridge_line(CMD, f1, f2, f3, binSize=binSize, histBins=histBins)
    vertColor = (CMD[f1] - CMD[f2]) - rff(CMD[f2])
    newCMD.insert(0, f"{f1}-{f2}_vert", vertColor.values)
    return newCMD