class TriangulatedSphere: _SLUM = 3.846e33 _SRAD = 6.957e+10 def __init__(self, nTheta, nPhi, times, model, modelAge=5, noiseScale=0.1): trk = interpTRK(modelAge, model.trk) self.radius = 10**trk.log_R.values[0] * self._SRAD self.lum = 10**trk.log_L.values[0] * self._SLUM self.nTheta = nTheta self.nPhi = nPhi self.times = times self.fluxMap = np.zeros((times.shape[0], nTheta, nPhi)) self.thetaValues = np.linspace(0, np.pi, nTheta ) self.phiValues = np.linspace(0, 2 * np.pi, nPhi) self.surfaceArea = 4 * np.pi * self.radius**2 self.triangleArea = self.surfaceArea / (nTheta * nPhi) self.meanFlux = (self.lum)/(4*np.pi * (self.radius)**2) self.set_overall_flux(self.meanFlux) self.noiseLevel = noiseScale*self.meanFlux def set_overall_flux(self, flux): """Set the same flux value for the entire surface.""" self.fluxMap[:, :, :] = flux def set_overall_flux_at_time(self, flux, time): tID = self.time_to_index(time) self.fluxMap[tID, :, :] = flux def incriment_overall_flux_at_time(self, df, time): tID = self.time_to_index(time) self.fluxMap[tID, :, :] += df def set_flux_at(self, theta, phi, flux, time): iTheta, iPhi = self.theta_phi_to_index(theta, phi) tID = self.time_to_index(time) self.fluxMap[tID, iTheta, iPhi] = flux def get_flux_at(self, theta, phi, time): iTheta, iPhi = self.theta_phi_to_index(theta, phi) tID = self.time_to_index(time) return self.fluxMap[tID, iTheta, iPhi] def get_luminosity_over_solid_angle(self, thetaCenter, phiCenter, omega, time, solar=False): # Convert time to indices if isinstance(time, float): tIDs = np.array([self.time_to_index(time)]) elif isinstance(time, slice): tIDs = np.array([self.time_to_index(t) for t in self.times[time]]) elif isinstance(time, str) and time == '*': tIDs = np.arange(len(self.times)) lums = np.zeros(len(tIDs)) thetaGrid, phiGrid = np.meshgrid(self.thetaValues, self.phiValues, indexing='ij') distance = self.angular_distance_vectorized(thetaGrid, phiGrid, thetaCenter, phiCenter) withinSolidAngle = distance < np.sqrt(omega / np.pi) for i, tID in enumerate(tIDs): if solar: lums[i] = np.sum(self.fluxMap[tID, withinSolidAngle] * self.triangleArea / self._SLUM) else: lums[i] = np.sum(self.fluxMap[tID, withinSolidAngle] * self.triangleArea) return lums @staticmethod def angular_distance_vectorized(theta1, phi1, theta2, phi2): """Vectorized calculation of angular distance.""" return np.arccos(np.sin(theta1) * np.sin(theta2) + np.cos(theta1) * np.cos(theta2) * np.cos(phi1 - phi2)) def index_to_theta_phi(self, iTheta, iPhi): """Convert grid indices to theta and phi values.""" return self.thetaValues[iTheta], self.phiValues[iPhi] def theta_phi_to_index(self, theta, phi): iTheta = np.argmin(np.abs(self.thetaValues - theta)) iPhi = np.argmin(np.abs(self.phiValues - phi)) return iTheta, iPhi def index_to_time(self, tID): return self.times[tID] def time_to_index(self, time): return np.argmin(np.abs(self.times - time)) def evolve(self, flareProb, flareMeanDuration, flareMeanAmplitude, flareMeanLat, flareDurationScale, flareAmplitudeScale, flareLatScale): flareDecisions = np.random.uniform(0, 1, size=len(self.times)) < flareProb flareDurations = np.abs(np.random.normal(loc=flareMeanDuration, scale=flareDurationScale, size=flareDecisions.sum())) flareAmplitudes = np.abs(np.random.normal(loc=flareMeanAmplitude, scale=flareAmplitudeScale, size=flareDecisions.sum())) flarePhis = np.radians(np.random.choice([-1, 1], size=flareDecisions.sum())) * np.abs(np.random.normal(loc=flareMeanLat, scale=flareLatScale, size=flareDecisions.sum())) flareThetas = np.random.uniform(0, 2*np.pi, size=flareDecisions.sum()) noise = np.random.normal(loc=0, scale=self.noiseLevel, size=len(self.times)) fID = 0 for tid, (time, doFlare, s) in tqdm(enumerate(zip(self.times, flareDecisions, noise)), total=len(self.times), desc="Evolving"): if doFlare: timeCond = (self.times >= time) & (self.times <= time + flareDurations[fID]) flareTime = np.linspace(-1, 7, timeCond[timeCond==True].shape[0]) flareData = build_flare(flareTime) timeCond = (self.times >= time) & (self.times <= time + flareDurations[fID]) flare = self.meanFlux + (self.meanFlux * flareAmplitudes[fID] * normalize(flareData)) for flareTimeId, (flareFlux, dft) in enumerate(zip(flare, self.times[timeCond])): self.set_flux_at(flareThetas[fID], flarePhis[fID], flareFlux, time + dft) fID += 1 self.incriment_overall_flux_at_time(s, time)