| import numpy as np |
| from transitleastsquares import transitleastsquares |
|
|
| def run_tls(time: np.ndarray, flux: np.ndarray, deep_recovery_mode: bool = False): |
| """ |
| Run Transit Least Squares (TLS) to detect transits. |
| Finds the strongest periodic transit signal. |
| """ |
| model = transitleastsquares(time, flux) |
| |
| if deep_recovery_mode and len(time) > 100: |
| baseline = time[-1] - time[0] |
| |
| |
| results = model.power(period_max=baseline / 2.01, oversampling_factor=3, use_threads=4) |
| else: |
| |
| results = model.power(use_threads=4) |
| |
| |
| transit_detected = bool(results.SDE > 7.0) |
| |
| |
| if results.transit_times is None: |
| t_times = [] |
| elif isinstance(results.transit_times, list): |
| t_times = results.transit_times |
| else: |
| t_times = results.transit_times.tolist() |
|
|
| return { |
| "transit_detected": transit_detected, |
| "period": float(results.period), |
| "depth": float(1.0 - results.depth), |
| "duration": float(results.duration), |
| "sde": float(results.SDE), |
| "tls_confidence": float(min(100.0, results.SDE * 10.0)), |
| "power_spectrum": { |
| "periods": results.periods.tolist(), |
| "power": results.power.tolist() |
| }, |
| "transit_times": t_times |
| } |
|
|
| from transitleastsquares import transit_mask |
|
|
| def run_multi_tls(time: np.ndarray, flux: np.ndarray, max_planets: int = 3): |
| """ |
| Run iterative TLS to detect multiple planets. |
| Masks out the transits of the strongest detected signal and searches again. |
| Returns a list of candidate dictionaries. |
| """ |
| candidates = [] |
| current_time = np.copy(time) |
| current_flux = np.copy(flux) |
| |
| for i in range(max_planets): |
| |
| result = run_tls(current_time, current_flux) |
| |
| |
| if not result["transit_detected"]: |
| break |
| |
| |
| candidate = result.copy() |
| candidate["candidate_number"] = i + 1 |
| candidates.append(candidate) |
| |
| |
| if result["transit_times"]: |
| t0 = result["transit_times"][0] |
| |
| intransit = transit_mask(current_time, result["period"], result["duration"] * 1.5, t0) |
| |
| |
| valid_points = ~intransit |
| current_time = current_time[valid_points] |
| current_flux = current_flux[valid_points] |
| |
| if len(current_time) < 100: |
| break |
| else: |
| break |
| |
| return candidates |
|
|