Spaces:
Sleeping
Sleeping
| # Generated by Claude Code -- 2026-02-13 | |
| """Firebase Firestore client for prediction logging. | |
| Stores daily conjunction predictions and maneuver detection outcomes. | |
| Uses the Firestore REST API to avoid heavy SDK dependencies. | |
| Falls back to local JSONL logging if Firebase is not configured. | |
| Environment variables: | |
| FIREBASE_SERVICE_ACCOUNT: JSON string of the service account key | |
| FIREBASE_PROJECT_ID: Project ID (auto-detected from service account if not set) | |
| """ | |
| import os | |
| import json | |
| import time | |
| import numpy as np | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| def _json_default(obj): | |
| """Handle numpy types that json.dumps can't serialize.""" | |
| if isinstance(obj, (np.integer,)): | |
| return int(obj) | |
| if isinstance(obj, (np.floating,)): | |
| return float(obj) | |
| if isinstance(obj, (np.bool_,)): | |
| return bool(obj) | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable") | |
| # Try to import google-cloud-firestore (lightweight) | |
| try: | |
| from google.cloud.firestore import Client as FirestoreClient | |
| from google.oauth2.service_account import Credentials | |
| HAS_FIRESTORE = True | |
| except ImportError: | |
| HAS_FIRESTORE = False | |
| class PredictionLogger: | |
| """Log predictions to Firebase Firestore with local JSONL fallback.""" | |
| def __init__(self, local_dir: Path = None): | |
| self.db = None | |
| self.local_dir = local_dir or Path("data/prediction_logs") | |
| self.local_dir.mkdir(parents=True, exist_ok=True) | |
| self._init_firebase() | |
| def _init_firebase(self): | |
| """Initialize Firebase Firestore client from environment.""" | |
| sa_json = os.environ.get("FIREBASE_SERVICE_ACCOUNT", "") | |
| if not sa_json or not HAS_FIRESTORE: | |
| if not HAS_FIRESTORE: | |
| print(" Firebase SDK not installed (pip install google-cloud-firestore)") | |
| print(" Using local JSONL logging only") | |
| return | |
| try: | |
| sa_info = json.loads(sa_json) | |
| creds = Credentials.from_service_account_info(sa_info) | |
| project_id = sa_info.get("project_id", os.environ.get("FIREBASE_PROJECT_ID", "")) | |
| self.db = FirestoreClient(project=project_id, credentials=creds) | |
| print(f" Firebase Firestore connected (project: {project_id})") | |
| except Exception as e: | |
| print(f" Firebase init failed: {e}") | |
| print(" Falling back to local JSONL logging") | |
| def log_predictions(self, date_str: str, predictions: list[dict]): | |
| """Log a batch of daily predictions. | |
| Args: | |
| date_str: Date string (YYYY-MM-DD) | |
| predictions: List of prediction dicts with keys: | |
| sat1_norad, sat2_norad, sat1_name, sat2_name, | |
| risk_score, altitude_km, model_used | |
| """ | |
| # Always save locally | |
| local_file = self.local_dir / f"{date_str}.jsonl" | |
| with open(local_file, "a") as f: | |
| for pred in predictions: | |
| pred["date"] = date_str | |
| pred["logged_at"] = datetime.now(timezone.utc).isoformat() | |
| f.write(json.dumps(pred, default=_json_default) + "\n") | |
| print(f" Saved {len(predictions)} predictions to {local_file}") | |
| # Firebase upload | |
| if self.db: | |
| try: | |
| batch = self.db.batch() | |
| collection = self.db.collection("predictions").document(date_str) | |
| collection.set({"date": date_str, "count": len(predictions)}) | |
| for i, pred in enumerate(predictions): | |
| doc_ref = self.db.collection("predictions").document(date_str) \ | |
| .collection("pairs").document(f"pair_{i:04d}") | |
| batch.set(doc_ref, pred) | |
| batch.commit() | |
| print(f" Uploaded {len(predictions)} predictions to Firebase") | |
| except Exception as e: | |
| print(f" Firebase upload failed: {e}") | |
| def log_outcomes(self, date_str: str, outcomes: list[dict]): | |
| """Log maneuver detection outcomes for a previous prediction date. | |
| Args: | |
| date_str: Original prediction date (YYYY-MM-DD) | |
| outcomes: List of outcome dicts with keys: | |
| sat1_norad, sat2_norad, sat1_maneuvered, sat2_maneuvered, | |
| sat1_delta_a_m, sat2_delta_a_m, validated_at | |
| """ | |
| local_file = self.local_dir / f"{date_str}_outcomes.jsonl" | |
| with open(local_file, "a") as f: | |
| for outcome in outcomes: | |
| outcome["prediction_date"] = date_str | |
| outcome["validated_at"] = datetime.now(timezone.utc).isoformat() | |
| f.write(json.dumps(outcome, default=_json_default) + "\n") | |
| print(f" Saved {len(outcomes)} outcomes to {local_file}") | |
| if self.db: | |
| try: | |
| batch = self.db.batch() | |
| for i, outcome in enumerate(outcomes): | |
| doc_ref = self.db.collection("outcomes").document(date_str) \ | |
| .collection("results").document(f"result_{i:04d}") | |
| batch.set(doc_ref, outcome) | |
| batch.commit() | |
| print(f" Uploaded {len(outcomes)} outcomes to Firebase") | |
| except Exception as e: | |
| print(f" Firebase upload failed: {e}") | |
| def log_daily_summary(self, date_str: str, summary: dict): | |
| """Log a daily summary (n_predictions, n_maneuvers_detected, accuracy, etc).""" | |
| local_file = self.local_dir / "daily_summaries.jsonl" | |
| summary["date"] = date_str | |
| with open(local_file, "a") as f: | |
| f.write(json.dumps(summary, default=_json_default) + "\n") | |
| if self.db: | |
| try: | |
| self.db.collection("daily_summaries").document(date_str).set(summary) | |
| print(f" Uploaded daily summary to Firebase") | |
| except Exception as e: | |
| print(f" Firebase summary upload failed: {e}") | |
| def get_predictions_for_date(self, date_str: str) -> list[dict]: | |
| """Retrieve predictions for a date (from local files).""" | |
| local_file = self.local_dir / f"{date_str}.jsonl" | |
| if not local_file.exists(): | |
| return [] | |
| predictions = [] | |
| with open(local_file) as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| predictions.append(json.loads(line)) | |
| return predictions | |