import json
import logging
import os
import warnings
from statistics import mean, stdev
import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from astropy.coordinates import SkyCoord
from dustmaps.sfd import SFDQuery
from extinction import fm07 as fm
from caat.utils import ROOT_DIR, WLE, colors
from .CAAT import CAAT
from .Plot import Plot
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
warnings.filterwarnings("ignore")
[docs]
class SN:
"""
A Supernova object, taking a classification (i.e. SN II, SESNe, FBOT, etc.),
a subtype (i.e., SN IIP, SN IIb, SN Ibn, etc.), and a name (i.e. SN2022acko).
Provides routines for the extraction and transformation of photometric data
that are uniformly run and saved within the `DataCube` class.
"""
base_path = os.path.join(ROOT_DIR, "data/")
### All ZPs for AB mags, in 1e-11 erg/s/cm**2/A
zps = {}
wle = WLE
def __init__(
self,
name: str,
data: dict = None,
type: str = None,
subtype: str = None,
info: dict = {},
):
"""
Initialize a SN object. Can initialize through either a name
or through a dictionary of data.
Args:
name (str): The name of the SN.
If it exists in the CAAT file, the data
will automatically be searched for and loaded.
data (dict, optional): A dictionary of data. Allows for loading
a new SN object that has not already been saved.
Defaults to None.
type (str, optional): The type (or classification) of the object.
If one is not provided, it will be searched for within the
CAAT database file and data directory automatically.
Defaults to None.
subtype (str, optional): The subtype of the object.
If one is not provided, it will be searched for within the
CAAT database file and data directory automatically.
Defaults to None.
info (dict, optional): A dictionary containing metadata of the transient.
This is useful if a SN object is initialized before it has been added
to the CAAT file. Users can pass information such as coordinates,
redshift, and light curve peak information directly, which can then later
be saved back to the CAAT file for easy re-initialization. Defaults to `{}`.
Raises:
Exception: If `name` is given, but is not found in the
CAAT archive.
"""
self.name = name
self.data = data if data else {}
if type and subtype:
self.classification = type
self.subtype = subtype
else:
found = False
for typ in os.listdir(self.base_path):
if os.path.isdir(os.path.join(self.base_path, typ)):
for subtyp in os.listdir(os.path.join(self.base_path, typ)):
if os.path.isdir(os.path.join(self.base_path, typ, subtyp)):
for snname in os.listdir(
os.path.join(self.base_path, typ, subtyp)
):
if name == snname:
self.classification = typ
self.subtype = subtyp
found = True
if not found:
# Type may have changed, check caat file for type
caat = CAAT().caat
row = caat[caat["Name"] == self.name]
if len(row) > 0:
self.classification = row["Type"].values[0]
self.subtype = row["Subtype"].values[0]
found = True
if not found:
raise Exception(f"No SN named {name} found in our archives")
if info:
self.info = info
else:
self.read_info_from_caat_file()
if isinstance(data, dict):
self.data = data
self.shifted_data = {}
else:
self.load_shifted_data()
for filt, wl in self.wle.items():
self.zps[filt] = (10**-23 * 3e18 / wl) * 1e11
def __repr__(self):
"""
Override the `__repr__` method to output
the name of the SN object.
Returns:
str: The name of the SN object.
"""
return self.name
[docs]
def write_info_to_caat_file(self, force=False):
"""
Save light curve peak information to the CAAT file.
This method is usually run after the light curve peak
is fit manually or automatically.
Args:
force (bool, optional): Overwrite the peak info,
if it already exists in the CAAT file. Defaults to False.
"""
caat = CAAT().caat
row = caat[caat["Name"] == self.name]
if not len(row):
# Does not already exist in the CAAT file, so add a new row
new_row = [
self.name,
self.classification,
self.subtype,
self.info.get("z", np.nan),
self.info.get("ra", np.nan),
self.info.get("dec", np.nan),
self.info.get("peak_mjd", np.nan),
self.info.get("peak_mag", np.nan),
self.info.get("peak_filt", ""),
]
caat.loc[len(caat)] = new_row
else:
row["Tmax"] = self.info.get("peak_mjd", np.nan)
row["Magmax"] = self.info.get("peak_mag", np.nan)
row["Filtmax"] = self.info.get("peak_filt", "")
caat[caat["Name"] == self.name] = row
### Save back to the csv file
CAAT().save_db_file(
os.path.join(ROOT_DIR, "data/", "caat.csv"), caat, force=force
)
[docs]
def read_info_from_caat_file(self):
"""
Load information about the SN object from the CAAT file.
Will search by name for the SN object in the CAAT file.
If it is found, the redshift, coordinates, and light curve
peak information are loaded as a dictionary attribute.
"""
caat = CAAT().caat
row = caat[caat["Name"] == self.name]
if (
np.isnan(row["Tmax"].values)
or np.isnan(row["Magmax"].values)
or not row["Filtmax"].values
):
self.info = {}
else:
info_dict = {}
info_dict["peak_mjd"] = row["Tmax"].values[0]
info_dict["peak_mag"] = row["Magmax"].values[0]
info_dict["peak_filt"] = row["Filtmax"].values[0]
info_dict["searched"] = True
info_dict["z"] = row["Redshift"].values[0]
info_dict["ra"] = row["RA"].values[0]
info_dict["dec"] = row["Dec"].values[0]
self.info = info_dict
[docs]
def load_swift_data(self):
"""
Load Swift data for this object.
Searches for the corresponding SOUSA output
.dat file. If one is found, reads the file
as a pandas dataframe, extracts detection and
nondetection information, and transforms the
magnitudes to the Vega system. Loaded photometry is saved as a
dictionary attribute to this class.
"""
if not os.path.exists(
os.path.join(
self.base_path,
self.classification,
self.subtype,
self.name,
self.name + "_uvotB15.1.dat",
)
):
logger.debug(f"No Swift file for {self.name}")
return
### Magnitudes in the SOUSA output file are in Vega mags
### We need to convert them to AB mags
### From here: https://swift.gsfc.nasa.gov/analysis/uvot_digest/zeropts.html
ab_minus_vega = {
"V": -0.01,
"B": -0.13,
"U": 1.02,
"UVW1": 1.51,
"UVM2": 1.69,
"UVW2": 1.73,
}
df = pd.read_csv(
os.path.join(
self.base_path,
self.classification,
self.subtype,
self.name,
self.name + "_uvotB15.1.dat",
),
delim_whitespace=True,
comment="#",
names=[
"Filter",
"MJD",
"Mag",
"MagErr",
"3SigMagLim",
"0.98SatLim",
"Rate",
"RateErr",
"Ap",
"Frametime",
"Exp",
"Telapse",
],
)
for i, row in df.iterrows():
if not np.isnan(row["Mag"]):
self.data.setdefault(row["Filter"], []).append(
{
"mag": row["Mag"] + ab_minus_vega[row["Filter"]],
"err": row["MagErr"],
"mjd": row["MJD"],
}
)
else:
self.data.setdefault(row["Filter"], []).append(
{
"mag": row["3SigMagLim"] + ab_minus_vega[row["Filter"]],
"err": 0.1,
"mjd": row["MJD"],
"nondetection": True,
}
)
[docs]
def load_json_data(self):
"""
Load data saved as a JSON file. This applies to data from
ZTF, ATLAS, OpenSN, and ASAS-SN>. This method searches for .json
data files and loads the photometry from any that it finds.
Loaded photometry is saved as a dictionary attribute to this class.
"""
if not os.path.exists(
os.path.join(self.base_path, self.classification, self.subtype, self.name)
):
logger.info(f"No additional data files for {self.name}")
return
dirfiles = os.listdir(
os.path.join(self.base_path, self.classification, self.subtype, self.name)
)
for f in dirfiles:
### Trying to filter out info file and shifted data file, should do this better
if ".json" in f and "_info.json" not in f and "_shifted_data.json" not in f:
with open(
os.path.join(
self.base_path, self.classification, self.subtype, self.name, f
),
"r",
) as jsonf:
d = json.load(jsonf)
for filt, mag_list in d.items():
self.data.setdefault(filt, []).extend(
[mag for mag in mag_list if mag["err"] < 9999]
)
self.data.setdefault(filt, []).extend(
[
mag | {"err": 0.1, "nondetection": True}
for mag in mag_list
if mag["err"] == 9999 and not np.isnan(mag["mag"])
]
)
[docs]
def write_json_data(self, dry_run=True):
"""
Writes photometry that has not been shifted relative to light curve
peak to a .json file. Runs before the photometry has been shifted.
Args:
dry_run (bool, optional): Run the method by logging what new directories
and files would be created or saved, without actually doing so.
Defaults to True.
"""
if not os.path.exists(
os.path.join(
self.base_path,
self.classification,
self.subtype,
self.name,
)
):
if not dry_run:
os.mkdir(
os.path.join(
self.base_path,
self.classification,
self.subtype,
self.name,
)
)
else:
logger.info("SN directory does not exist. This will make one if `dry_run=False`.")
if not dry_run:
with open(
os.path.join(
self.base_path,
self.classification,
self.subtype,
self.name,
self.name + "_data.json",
),
"w+",
) as f:
json.dump(self.data, f, indent=4)
else:
logger.info("This will save the data as a new file, or overwrite an existing one. To do so, specify `dry_run=False`.")
[docs]
def load_shifted_data(self):
"""
Loads any photometry for this SN object that has been
shifted relative to light curve peak and saved to file.
"""
if not os.path.exists(
os.path.join(
self.base_path,
self.classification,
self.subtype,
self.name,
self.name + "_shifted_data.json",
)
):
self.shifted_data = {}
else:
with open(
os.path.join(
self.base_path,
self.classification,
self.subtype,
self.name,
self.name + "_shifted_data.json",
),
"r",
) as f:
shifted_data = json.load(f)
self.shifted_data = shifted_data
[docs]
def convert_all_mags_to_fluxes(self):
"""
Converts all photometry in magnitudes to the corresponding
flux values. Uses AB zeropoints. Runs on both shifted and unshifted
photometry, relative to light curve peak.
"""
for filt in list(self.data.keys()):
new_phot = []
if filt in self.zps.keys():
for phot in self.data[filt]:
phot["flux"] = (
self.zps[filt] * 1e-11 * 10 ** (-0.4 * phot["mag"])
) # * 1e15
phot["fluxerr"] = phot["err"] # 1.086 * phot['err'] * phot['flux']
new_phot.append(phot)
self.data[filt] = new_phot
else:
# raise Exception(f"No zeropoint information found for filter {filt}")
logger.warning(f"No zeropoint information found for filter {filt}")
del self.data[filt]
for filt in list(self.shifted_data.keys()):
new_phot = []
if filt in self.zps.keys():
for phot in self.shifted_data[filt]:
unshifted_mag = phot["mag"] + self.info["peak_mag"]
shifted_flux = np.log10(
self.zps[filt] * 1e-11 * 10 ** (-0.4 * unshifted_mag)
) - np.log10(
self.zps[self.info["peak_filt"]]
* 1e-11
* 10 ** (-0.4 * self.info["peak_mag"])
) # * 1e15
phot["flux"] = shifted_flux
phot["shiftedmag"] = -1 * phot["mag"]
# phot["flux"] = -1*phot["mag"]
phot["fluxerr"] = phot["err"] # 1.086 * phot['err'] * phot['flux']
new_phot.append(phot)
self.shifted_data[filt] = new_phot
else:
# raise Exception(f"No zeropoint information found for filter {filt}")
logger.warning(f"No zeropoint information found for filter {filt}")
del self.shifted_data[filt]
[docs]
def correct_for_galactic_extinction(self):
"""
Uses the coordinates of the SN from the CAAT file
to find and correct for MW extinction.
NOTE: Must be run before convert_to_fluxes() is ran.
"""
sfd = SFDQuery()
if not self.info.get("ra", "") or not self.info.get("dec", ""):
logger.warning(
f"No info for {self.name}, either no coordinates or no peak info"
)
return
coord = SkyCoord(ra=self.info["ra"] * u.deg, dec=self.info["dec"] * u.deg)
exts = {}
for filt in self.data.keys():
if filt in self.wle.keys():
try:
exts[filt] = fm(
self.wle[filt] * (1 + self.info.get("z", 0)), sfd(coord)
)
except:
### First input needs to be an array
exts[filt] = fm(
np.asarray([self.wle[filt] * (1 + self.info.get("z", 0))]),
sfd(coord),
)
for filt in self.data.keys():
if filt in self.wle.keys():
new_phot = []
for phot in self.data[filt]:
if not phot.get("ext_corrected", False):
phot["mag"] -= exts[filt][0]
phot["ext_corrected"] = True
new_phot.append(phot)
self.data[filt] = new_phot
else:
self.data[filt] = []
if self.shifted_data:
for filt in self.shifted_data:
if filt in self.wle.keys():
new_phot = []
for phot in self.shifted_data[filt]:
if not phot.get("ext_corrected", False):
phot["mag"] -= exts[filt][0]
phot["ext_corrected"] = True
new_phot.append(phot)
self.shifted_data[filt] = new_phot
else:
self.shifted_data[filt] = []
[docs]
def plot_data(
self,
filts_to_plot=["all"],
shifted_data_exists=False,
view_shifted_data=False,
offset=0,
plot_fluxes=False,
):
"""
Plot photometry for this SN object. A user may control
the type of photometry that is plotted (e.g., filters,
magnitudes or fluxes, shifted or unshifted). If no data
is found, this method will attempt to load data first.
Args:
filts_to_plot (list, optional): A list of the filters
to plot. Defaults to ["all"].
shifted_data_exists (bool, optional): Flag to control
whether or not to load new shifted data. Defaults to False.
view_shifted_data (bool, optional): Flag to control plotting
data shifted relative to light curve peak. Defaults to False.
offset (int, optional): An offset to pass to the `shift_to_max` method
used for fitting the light curve peak. Defaults to 0.
plot_fluxes (bool, optional): Flag used to control plotting
photometry in magnitude or flux space. Defaults to False.
"""
if (
filts_to_plot[0] == "all"
): # if individual filters not specified, plot all by default
filts_to_plot = colors.keys()
if (
not self.data
): # check if data/SN has not been previously read in/initialized
self.load_swift_data()
self.load_json_data()
if shifted_data_exists:
if plot_fluxes:
self.convert_all_mags_to_fluxes()
data_to_plot = self.shifted_data
elif view_shifted_data:
for f in filts_to_plot:
self.shift_to_max(f, offset=offset)
if plot_fluxes:
self.convert_all_mags_to_fluxes()
data_to_plot = self.shifted_data
else:
if plot_fluxes:
self.convert_all_mags_to_fluxes()
data_to_plot = self.data
Plot().plot_sn_data(
sn_class=self,
data_to_plot=data_to_plot,
filts_to_plot=filts_to_plot,
plot_fluxes=plot_fluxes,
)
[docs]
def fit_for_max(
self, filt, shift_array=[-3, -2, -1, 0, 1, 2, 3], plot=False, offset=0
):
"""
Fit for the light curve peak.
This method identifies a preliminary light curve peak, selects
photometry within a 30 day window of the preliminary peak, and iteratively
fits a third-order polynomial to a random subset of that data. This process
only runs if more than 4 light curve points are within the preliminary peak in
the given filter. Writes the peak info the `info` attribute on this class.
Args:
filt (_type_): The filter of the photometry to fit.
shift_array (list, optional): An array of phases to shift the
selected photometry by, during the iterative peak fitting.
Defaults to [-3, -2, -1, 0, 1, 2, 3].
plot (bool, optional): Plot the fit to the peak. Defaults to False.
offset (int, optional): A number of days relative to the preliminary
peak to fit. Useful if the identified preliminary peak is not
close to the true light curve peak. Defaults to 0.
"""
mjd_array = np.asarray(
[
phot["mjd"]
for phot in self.data[filt]
if not phot.get("nondetection", False)
]
)
mag_array = np.asarray(
[
phot["mag"]
for phot in self.data[filt]
if not phot.get("nondetection", False)
]
)
err_array = np.asarray(
[
phot["err"]
for phot in self.data[filt]
if not phot.get("nondetection", False)
]
)
if len(mag_array) < 4: # == 0:
return
initial_guess_mjd_max = (
mjd_array[np.where((mag_array == min(mag_array)))[0]][0] + offset
)
fit_inds = np.where((abs(mjd_array - initial_guess_mjd_max) < 30))[0]
if len(fit_inds) < 4:
return
fit_coeffs = np.polyfit(mjd_array[fit_inds], mag_array[fit_inds], 3)
guess_phases = np.arange(min(mjd_array[fit_inds]), max(mjd_array[fit_inds]), 1)
p = np.poly1d(fit_coeffs)
guess_best_fit = p(guess_phases)
if len(guess_best_fit) == 0:
return
guess_mjd_max = guess_phases[
np.where((guess_best_fit == min(guess_best_fit)))[0]
][0]
### Do this because the array might not be ordered
inds_to_fit = np.where(
(mjd_array > guess_mjd_max - 10) & (mjd_array < guess_mjd_max + 10)
)
if len(inds_to_fit[0]) < 4:
return
numdata = len(mjd_array[inds_to_fit])
numiter = max(int(numdata * np.log(numdata) ** 2), 200)
fit_mjds = mjd_array[inds_to_fit]
fit_mags = mag_array[inds_to_fit]
fit_errs = err_array[inds_to_fit]
if plot:
Plot().plot_fit_for_max(
sn_class=self,
mjd_array=mjd_array,
mag_array=mag_array,
err_array=err_array,
fit_mjds=fit_mjds,
fit_mags=fit_mags,
fit_errs=fit_errs,
inds_to_fit=inds_to_fit,
)
peak_mags = []
peak_mjds = []
for num in range(numiter):
simulated_points = []
### Shift by a certain number of days to randomly sample the light curve
sim_shift = np.random.choice(shift_array)
inds_to_fit = np.where(
(mjd_array > guess_mjd_max - 5 + sim_shift)
& (mjd_array < guess_mjd_max + 5 + sim_shift)
)[0]
if len(inds_to_fit) > 0:
fit_mjds = mjd_array[inds_to_fit]
fit_mags = mag_array[inds_to_fit]
fit_errs = err_array[inds_to_fit]
for i in range(len(fit_mjds)):
simulated_points.append(np.random.normal(fit_mags[i], fit_errs[i]))
fit = np.polyfit(fit_mjds, simulated_points, 2)
f = np.poly1d(fit)
fit_time = np.linspace(min(fit_mjds), max(fit_mjds), 100)
if num % 25 == 0 and plot:
plt.plot(fit_time, f(fit_time), color="black", linewidth=0.5)
peak_mag = min(f(fit_time))
peak_mags.append(peak_mag)
peak_mjds.append(fit_time[np.argmin(f(fit_time))])
if plot:
plt.errorbar(
mean(peak_mjds),
mean(peak_mags),
xerr=stdev(peak_mjds),
yerr=stdev(peak_mags),
color="red",
fmt="o",
label="Best Fit Peak",
)
plt.xlim(guess_mjd_max - 10, guess_mjd_max + 10)
self.info["peak_mjd"] = mean(peak_mjds)
self.info["peak_mag"] = mean(peak_mags)
self.info["peak_filt"] = filt
self.info["searched"] = True
[docs]
def shift_to_max(
self,
filt: str,
shift_array=[-3, -2, -1, 0, 1, 2, 3],
plot=False,
offset=0,
shift_fluxes=False,
try_other_filts=True,
return_wls=False,
):
"""
Shift the SN photometry relative to the light curve peak.
This method will load the SN photometry if it is not
already loaded.
If no peak info is found, this method will attempt to fit for
the light curve peak.
Args:
filt (_type_): The filter of the photometry to fit.
shift_array (list, optional): An array of phases to shift the
selected photometry by, during the iterative peak fitting.
Defaults to [-3, -2, -1, 0, 1, 2, 3].
plot (bool, optional): Plot the fit to the peak. Defaults to False.
offset (int, optional): A number of days relative to the preliminary
peak to fit. Useful if the identified preliminary peak is not
close to the true light curve peak. Defaults to 0.
shift_fluxes (bool, optional): Shift the photometry in
flux units relative to light curve peak. Defaults to False.
try_other_filts (bool, optional): If the light curve peak cannot be
found for the input `filt`, try fitting for the peak in
a predefined list of filters. Defaults to True.
return_wls (bool, optional): Return wavelength information for
the photometry. Defaults to False.
Returns:
list, list, list, list, list (optional): Lists of shifted
time, magnitudes, error, nondetections, and optional wavelengths.
"""
if not self.data:
self.load_swift_data()
self.load_json_data()
if filt not in self.data.keys() or filt not in self.wle.keys():
if return_wls:
return [], [], [], [], []
return [], [], [], []
if not self.info.get("peak_mjd") and not self.info.get("peak_mag"):
self.fit_for_max(filt, shift_array=shift_array, plot=plot, offset=offset)
if not self.info.get("peak_mjd", 0) > 0 and try_other_filts:
for newfilt in ["V", "g", "c", "B", "r", "o", "U", "i", "UVW1"]:
if newfilt in self.data.keys() and newfilt != filt:
self.fit_for_max(
newfilt, shift_array=shift_array, plot=plot, offset=offset
)
if self.info.get("peak_mjd", 0) > 0:
break
if newfilt == "UVW1" and not self.info.get("peak_mjd", 0) > 0:
logger.debug(
f"Reached last filter and could not fit for peak for {self.name}"
)
self.info["searched"] = True
if not self.info.get("peak_mag", 0) > 0:
if return_wls:
return [], [], [], [], []
return [], [], [], []
mjds = (
np.asarray([phot["mjd"] for phot in self.data[filt]])
- self.info["peak_mjd"]
)
mags = (
np.asarray([phot["mag"] for phot in self.data[filt]])
- self.info["peak_mag"]
)
errs = np.asarray([phot["err"] for phot in self.data[filt]])
nondets = np.asarray(
[phot.get("nondetection", False) for phot in self.data[filt]]
)
wls = np.asarray(
[
phot.get("wle", self.wle[filt] * (1 + self.info.get("z", 0.0)))
for phot in self.data[filt]
]
)
if plot:
Plot().plot_shift_to_max(
sn_class=self,
mjds=mjds,
mags=mags,
errs=errs,
nondets=nondets,
filt=filt,
)
self.shifted_data.setdefault(filt, []).extend(
[
{
"mjd": mjds[i],
"mag": mags[i],
"err": errs[i],
"nondetection": nondets[i],
"wle": wls[i],
}
for i in range(len(mjds))
]
)
if shift_fluxes:
self.convert_all_mags_to_fluxes()
shifted_mjd = np.asarray([phot["mjd"] for phot in self.shifted_data[filt]])
shifted_flux = np.asarray(
[phot["shiftedmag"] for phot in self.shifted_data[filt]]
)
shifted_err = np.asarray(
[phot["fluxerr"] for phot in self.shifted_data[filt]]
)
nondets = np.asarray(
[phot.get("nondetection", False) for phot in self.shifted_data[filt]]
)
wls = np.asarray(
[
phot.get("wle", self.wle[filt] * (1 + self.info.get("z", 0.0)))
for phot in self.shifted_data[filt]
]
)
if return_wls:
return shifted_mjd, shifted_flux, shifted_err, nondets, wls
return shifted_mjd, shifted_flux, shifted_err, nondets
if return_wls:
return mjds, mags, errs, nondets, wls
return mjds, mags, errs, nondets
[docs]
def interactively_fit_for_max(
self,
filt="",
shift_array=[-3, -2, -1, 0, 1, 2, 3],
plot=True,
offset=0,
save_to_caat=False,
force=False,
):
"""
Iteratively fit for the SN light curve peak.
This method will load the SN photometry automatically.
At each step of the fitting process, a plot is generated and
the user is prompted to supply the filter to fit, an offset,
whether to accept the fit, or refit with new parameters.
Args:
filt (str, optional): The fiter to fit. If one is not passed,
the user is shown the full light curve and prompted to pick
a filter. Defaults to "".
shift_array (list, optional): An array of phases to shift the
selected photometry by, during the iterative peak fitting.
Defaults to [-3, -2, -1, 0, 1, 2, 3].
plot (bool, optional): Plot the fit to the peak and all
intermediate plots. Defaults to True.
offset (int, optional): A number of days relative to the preliminary
peak to fit. Useful if the identified preliminary peak is not
close to the true light curve peak. Defaults to 0.
save_to_caat (bool, optional): Save the best-fit peak information to
the CAAT file. Defaults to False.
force (bool, optional): Overwrite the existing peak information in the
CAAT file. Defaults to False.
"""
self.load_json_data()
self.load_swift_data()
self.shifted_data = {}
if not filt:
self.plot_data()
logger.info("Data in filters {}\n".format(list(self.data.keys())))
filt = input(
'Which filter would you like to use to fit for max? To skip, type "skip"\n'
)
if filt == "skip":
return
mjds, _, _, _ = self.shift_to_max(
filt, shift_array=shift_array, plot=plot, offset=offset
)
if len(mjds) == 0:
refit = input("No photometry found for this filter. Try to refit? y/n \n")
else:
self.plot_data(view_shifted_data=True)
refit = input("Refit the data with new filter or offset? y/n \n")
if refit == "n" and save_to_caat:
self.write_info_to_caat_file(force=force)
elif refit == "n" and not save_to_caat:
logger.info('To save these parameters, rerun with "save_to_caat=True"')
elif refit == "y":
self.info = {}
newfilt = input(
"Try fitting a new filter? If so, enter the filter here. If not, leave blank to pick new offset\n"
)
if newfilt:
self.interactively_fit_for_max(
newfilt,
shift_array=shift_array,
plot=plot,
offset=offset,
save_to_caat=save_to_caat,
force=force,
)
else:
newoffset = input("Enter new offset here\n")
if newoffset:
self.interactively_fit_for_max(
filt,
shift_array=shift_array,
plot=plot,
offset=float(newoffset),
save_to_caat=save_to_caat,
force=force,
)