Astronomical Data Processing for HUGS Project
This Python code is designed for processing and analyzing astronomical data, particularly focusing on the Hubble Space Telescope's UV Legacy Survey of Galactic Globular Clusters (HUGS) project. It includes functions for reading the survey data, cleaning it based on specific criteria, and performing various analysis tasks such as generating color-magnitude diagrams (CMDs), extracting specific stellar sequences, and identifying stellar populations using machine learning clustering algorithms. The code leverages scientific libraries such as NumPy, pandas, and SciPy for data manipulation and calculations, astropy for astronomical computations, and hdbscan for density-based clustering.
import re import os import pathlib import numpy as np import pandas as pd from tqdm import tqdm from scipy.signal import savgol_filter from scipy.optimize import minimize, newton, curve_fit from scipy.interpolate import interp1d, UnivariateSpline from scipy.integrate import quad from astropy.convolution import convolve, Box1DKernel, Gaussian1DKernel from astropy.stats import freedman_bin_width from astropy.stats import sigma_clip from itertools import permutations import hdbscan def read_HUGS_data(path): colNames = ["X","Y", "F275W","F275W_RMS","F275W_QFP","F275W_SP","F275W_NI","F275W_NG", "F336W","F336W_RMS","F336W_QFP","F336W_SP","F336W_NI","F336W_NG", "F438W","F438W_RMS","F438W_QFP","F438W_SP","F438W_NI","F438W_NG", "F606W","F606W_RMS","F606W_QFP","F606W_SP","F606W_NI","F606W_NG", "F814W","F814W_RMS","F814W_QFP","F814W_SP","F814W_NI","F814W_NG", "MP", "RA", "Dec", "ID", "Itr" ] df = pd.read_csv(path, names=colNames,comment="#",sep=r'\s+').set_index("ID") filters = ["F275W", "F336W", "F438W", "F606W", "F814W"] for f in filters: df = df[abs(df[f]) != 99.9999] names = set() for C in permutations(filters): name = f"C_{C[0][1:-1]}_{C[1][1:-1]}_{C[2][1:-1]}" preLen = len(names) names.add(name) postLen = len(names) if preLen != postLen: df[name] = (df[C[0]] - df[C[1]]) - (df[C[1]] - df[C[2]]) return df def clean_HUGS_data(df, filters, binNum=12, removeGiantBranch=False): """ Clean HUGS data based on the procedudure laied out in section 3 of 10.1093/mnras/sty2515 Parameters ---------- df : pd.DataFrame Pandas dataframe with HUGS data, keys must be in the form of filterName, filterName_QFP, filterName_RMS, filterName_SP so that for the F606W filter for example you would have the keys: F606W, F606W_QFP, F606W_RMS, and F606W_SP filters : list[str] List of filter names to clean based on. All filters will be cleaned simultaniously so that the final resultant dataframe only retains targets which pass all quality checks in all requested filters. binNum : int, default=12 Number of magnitude bins to break filter into. Default is 12 to match the filtering done in 10.1093/mnras/sty2515 removeGiantBranch : bool, default=True If True filter the giant branch out of the data Returns ------- pd.DataFrame clenaed dataframe with only targets that match the quality parameter an all filters Examples -------- Say there is some loaded HUGS data in the varaiable meth1DF, then to filter based on 606 and 814 >>> cleaned = clean_HUGS_data(meth1DF, ["F606W", "F814W"]) if you just want to clean based on one filter then that still needs to be in a list >>> cleaned = clean_HUGS_data(meth1DF, ["F606W"]) """ line = lambda x, m, b: m*x + b above = lambda x, y, f: y >= f(x) below = lambda x, y, f: y < f(x) cut = np.ones(shape=df.shape[0],dtype=bool) for fID, filterName in enumerate(filters): X = df[filterName] YQFP = df[f"{filterName}_QFP"] YRMS = df[f"{filterName}_RMS"] RDAX = df[f"{filterName}_SP"] binLeft = np.linspace(X.min(),X.max(),binNum) meanBinSize = np.mean(binLeft[1:]-binLeft[:-1]) binRight = binLeft+meanBinSize bounds = np.zeros(shape=(3,binLeft.shape[0])) for binID, (left, right) in enumerate(zip(binLeft, binRight)): binCut = (X >= left) & (X < right) subDataX = X[binCut] subDataYQFP = YQFP[binCut] subDataYRMS = YRMS[binCut] subData = np.vstack((subDataX,subDataYQFP,subDataYRMS)) clippedQFP = sigma_clip(subDataYQFP,sigma=3.5,maxiters=5) clippedRMS = sigma_clip(subDataYRMS,sigma=3.5,maxiters=5) clippedQFPX = subDataX[~clippedQFP.mask] clippedQFPY = subDataYQFP[~clippedQFP.mask] clippedRMSX = subDataX[~clippedRMS.mask] clippedRMSY = subDataYRMS[~clippedRMS.mask] sigmaMeanQFP = np.mean(clippedQFPY) sigmaMeanRMS = np.mean(clippedRMSY) sigmaQFP = np.std(clippedQFPY) sigmaRMS = np.std(clippedRMSY) lowerBoundQFP = sigmaMeanQFP-3.5*sigmaQFP lowerBoundRMS = sigmaMeanRMS+3.5*sigmaRMS bounds[0,binID] = (left+right)/2 bounds[1,binID] = lowerBoundQFP bounds[2,binID] = lowerBoundRMS interpFQFP = interp1d(bounds[0], bounds[1],bounds_error=False,fill_value='extrapolate') interpFRMS = interp1d(bounds[0], bounds[2],bounds_error=False,fill_value='extrapolate') cutQFP = above(X.values,YQFP.values,interpFQFP) cutRMS = below(X.values,YRMS.values,interpFRMS) cutRDX = (RDAX.values > -0.15) & (RDAX.values < 0.15) cut &= cutQFP & cutRMS & cutRDX if removeGiantBranch: color = df["F275W"] - df["F814W"] mag = df["F814W"] giantBranchCut = above(color,mag,lambda x: line(x, -1.5, 20)) cut &= giantBranchCut return df[cut] def load_HUGS(root): methods = pathlib.Path(root).rglob("*.txt") methods = filter(lambda x: not(str(os.path.basename(x)).startswith('.')), methods) methods = list(filter(lambda x: 'catalog-meth' in str(x), methods)) methodNums = [int(re.findall(r'meth(\d)', str(path))[0]) for path in methods] out = dict() for methID, methPath in zip(methodNums, methods): df = read_HUGS_data(methPath) out[methID] = clean_HUGS_data(df, ["F275W", "F336W", "F438W", "F814W"], removeGiantBranch=True) return out def ridge_bounding(a1,a2,binsLeft,histBins=50,smooth=True,boxCarWidth=3): gaus = lambda x, a, b, c: a*np.exp(-(x-b)**2/(2*c**2)) binSize = binsLeft[1] - binsLeft[0] binsRight = binsLeft + binSize ridgeLine = np.zeros(shape=(4,binsLeft.shape[0])) for binID, (left, right) in enumerate(zip(binsLeft, binsRight)): cut = (a2 >= left) & (a2 < right) Xss = a1[cut] Yss = a2[cut] histV, histLE = np.histogram(Xss,bins=histBins) histC = (histLE + (histLE[1] - histLE[0])/2)[:-1] try: fit, covar = curve_fit(gaus,histC,histV,p0=[max(histV),histC[histV.argmax()],0.01]) ridgeLine[0,binID] = fit[1] ridgeLine[1,binID] = (left+right)/2 ridgeLine[2,binID] = np.percentile(Xss,4) ridgeLine[3,binID] = np.percentile(Xss,96) except RuntimeError: print(f"Unable to fit for bin {binID} from {left}->{right} with {cut.shape[0]} targets") for i in range(4): ridgeLine[i,binID] = np.nan if smooth: ridgeLine[2] = convolve(ridgeLine[2], [1,1,1]) ridgeLine[3] = convolve(ridgeLine[3], [1,1,1]) return ridgeLine[:, 1:-1] def get_mag_and_color_ranges(color, mag, lowerSig, upperSig): colorRange = (np.percentile(color, lowerSig), np.percentile(color, upperSig)) magRange = (np.percentile(mag, lowerSig), np.percentile(mag, upperSig)) return colorRange, magRange def CMD_ridge_line_array(CMD, f1, f2, f3, lowerSig=1, upperSig=99, fiducialSigma=2, binSize=0.2, histBins=50): color = CMD[f1] - CMD[f2] colorRange, magRange = get_mag_and_color_ranges(color, CMD[f3], lowerSig, upperSig) binsLeft = np.arange(magRange[0], magRange[1], binSize) colorCut = CMD[(color >= colorRange[0]) & (color < colorRange[1])] ridgeLine = ridge_bounding(colorCut[f1]-colorCut[f2],colorCut[f3],binsLeft, histBins=histBins) return ridgeLine def PseudoColor_ridge_line_array(CMD, pf, f2, magRange = (12, 18), fiducialSigma=2, binSize=0.1, histBins=50): binsLeft = np.arange(magRange[0], magRange[1], binSize) pseudoColorRange = (np.percentile(CMD[pf].values,0.25), np.percentile(CMD[pf].values, 100-0.25)) pseudoColorCut = CMD[(CMD[pf] > pseudoColorRange[0]) & (CMD[pf] < pseudoColorRange[1])] ridgeLine = ridge_bounding(pseudoColorCut[pf], pseudoColorCut[f2], binsLeft, smooth=True,boxCarWidth=6) return ridgeLine def find_steepest_root(x, y): f = interp1d(x,y,bounds_error=False,fill_value='extrapolate') df = interp1d(x,np.gradient(y)/np.gradient(x)) roots = set() for x0 in x: root = newton(f,x0) if x.min() <= root <= x.max(): slope = df(root)[()] roots.add((root, slope)) roots = sorted(roots, key=lambda x: abs(x[1])) return roots[-1][0] def extract_sequences(CMD, f1, f2, reverse_order=False, sigDepth=0.25): upperSig = 100-sigDepth lowerSig = sigDepth if not reverse_order: f3 = f1 else: f3 = f2 ridgeLine = CMD_ridge_line_array(CMD, f1, f2, f3,lowerSig=lowerSig, upperSig=upperSig) MSdCdM = np.gradient(ridgeLine[0])/np.gradient(ridgeLine[1]) df = interp1d(ridgeLine[1], MSdCdM) initGuess = ridgeLine[1][abs(MSdCdM) == min(abs(MSdCdM))] MSroot = newton(df, initGuess)[0] RGB = MSdCdM[ridgeLine[1] < MSroot] RGBMag = ridgeLine[1][ridgeLine[1] < MSroot] RGBdCdM = np.gradient(RGB)/np.gradient(RGBMag) RGBRoot = find_steepest_root(RGBMag, RGBdCdM) _, magRange = get_mag_and_color_ranges(CMD[f1]-CMD[f2], CMD[f3], lowerSig, upperSig) MSMagRange = (MSroot, magRange[1]) # RGBMagRange = (magRange[0], RGBRoot) RGBMagRange = (15.25, 18) color = CMD[f1].values-CMD[f2].values mag = CMD[f3].values MSColorMagCut = color[(mag >= MSMagRange[0]) & (mag < MSMagRange[1])] RGBColorMagCut = color[(mag >= RGBMagRange[0]) & (mag < RGBMagRange[1])] MSColorRange = (np.percentile(MSColorMagCut,sigDepth), np.percentile(MSColorMagCut, 100-sigDepth)) RGBColorRange = (np.percentile(RGBColorMagCut,sigDepth), np.percentile(RGBColorMagCut, 100-sigDepth)) out = { "MS": { "mag": MSMagRange, "color": MSColorRange }, "RGB": { "mag": RGBMagRange, "color": RGBColorRange } } return out def interpolate_CMD_ridge_line(df, f1, f2, f3, fiducialSigma=2, binSize=0.2, histBins=50): ridgeLine = CMD_ridge_line_array(df, f1, f2, f3, fiducialSigma=fiducialSigma, binSize=binSize, histBins=histBins) rlF = interp1d(ridgeLine[1], ridgeLine[0], bounds_error=False, fill_value="extrapolate") rlpsF = interp1d(ridgeLine[1], ridgeLine[2], bounds_error=False, fill_value="extrapolate") rlssF = interp1d(ridgeLine[1], ridgeLine[3], bounds_error=False, fill_value="extrapolate") return rlF, rlpsF, rlssF def interpolate_PsuedoColor_ridge_line(df, pf, f2, magRange = (12, 18), fiducialSigma=2, binSize=0.5, histBins=50): ridgeLine = PseudoColor_ridge_line_array( df, pf, f2, fiducialSigma=fiducialSigma, magRange=magRange, binSize=binSize, histBins=histBins ) rlF = interp1d(ridgeLine[1], ridgeLine[0],bounds_error=False,fill_value="extrapolate") rlpsF = interp1d(ridgeLine[1], ridgeLine[2],bounds_error=False,fill_value="extrapolate") rlssF = interp1d(ridgeLine[1], ridgeLine[3],bounds_error=False,fill_value="extrapolate") return rlF, rlpsF, rlssF def cut_sequence(CMD, f1, f2, f3, magRange, colorRange): sequenceCut = (CMD[f3] >= magRange[0]) & (CMD[f3] < magRange[1]) sequenceCut &= (CMD[f1] - CMD[f2] >= colorRange[0]) sequenceCut &= (CMD[f1] - CMD[f2] < colorRange[1]) s = CMD[sequenceCut] return s def RGB_chromosome_map(CMD, f1, f2, pf, binSize=0.1, histBins=50, reverse_order=False): if reverse_order: f3 = f2 else: f3 = f1 sequences = extract_sequences(CMD, f1, f2, reverse_order=reverse_order) RGBMagRange = sequences['RGB']['mag'] RGBcolorRange = sequences['RGB']['color'] rgb = cut_sequence(CMD, f1, f2, f3, RGBMagRange, RGBcolorRange) CMDRidgeLine, CMDBlueFiducial, CMDRedFiducial = interpolate_CMD_ridge_line( CMD, f1, f2, f3, binSize=binSize, histBins=histBins ) PSRidgeLine, PSBlueFiducial, PSRedFiducial = interpolate_PsuedoColor_ridge_line( CMD, pf, f2, magRange=RGBMagRange, binSize=binSize, histBins=histBins ) CMDcolorWidth = CMDRedFiducial(rgb[f2]) - CMDBlueFiducial(rgb[f2]) PSWidth = PSRedFiducial(rgb[f2])-PSBlueFiducial(rgb[f2]) rgb['CW1'] = CMDcolorWidth * ((rgb[f1].values-rgb[f2].values) - \ CMDRedFiducial(rgb[f2].values))/(CMDRedFiducial(rgb[f2].values)\ - CMDBlueFiducial(rgb[f2].values)) rgb['CW2'] = PSWidth * ((PSRedFiducial(rgb[f2].values) -\ rgb[pf].values)/(PSRedFiducial(rgb[f2].values) - \ PSBlueFiducial(rgb[f2].values))) return rgb def extract_pop(CMD, n, f1="F275W", f2="F814W", pf="C_275_336_438", binSize=0.4, reverse_order=True): RGBchromosomeMap = RGB_chromosome_map(CMD, f1, f2, pf, reverse_order=reverse_order, binSize=binSize) filtered = RGBchromosomeMap[ # Automate these cuts (RGBchromosomeMap["CW1"] >= -0.66) & (RGBchromosomeMap["CW1"] < 0.3) & (RGBchromosomeMap["CW2"] >= -0.2) & (RGBchromosomeMap["CW2"] < 0.5) ][["CW1", "CW2"]] clusters = hdbscan.HDBSCAN(min_cluster_size=30, min_samples=40).fit(filtered.values) filtered["cluster"] = clusters.labels_ merged = CMD.merge(filtered, how='outer', on="ID") assert n in merged["cluster"].unique() return (merged[(merged["cluster"] == n) | (np.isnan(merged["cluster"]))]) def verticalize_CMD(CMD, f1, f2, binSize=0.2, histBins=50, reverseOrder=False): newCMD = CMD.copy() if reverseOrder: f3 = f2 else: f3 = f1 sigDepth = 0.25 upperSig = 100-sigDepth lowerSig = sigDepth rff, _, _ = interpolate_CMD_ridge_line(CMD, f1, f2, f3, binSize=binSize, histBins=histBins) vertColor = (CMD[f1] - CMD[f2]) - rff(CMD[f2]) newCMD.insert(0, f"{f1}-{f2}_vert", vertColor.values) return newCMD