Spaces:
Sleeping
Sleeping
File size: 6,700 Bytes
a4b5ecb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | # Generated by Claude Code -- 2026-02-13
"""Firebase Firestore client for prediction logging.
Stores daily conjunction predictions and maneuver detection outcomes.
Uses the Firestore REST API to avoid heavy SDK dependencies.
Falls back to local JSONL logging if Firebase is not configured.
Environment variables:
FIREBASE_SERVICE_ACCOUNT: JSON string of the service account key
FIREBASE_PROJECT_ID: Project ID (auto-detected from service account if not set)
"""
import os
import json
import time
import numpy as np
from pathlib import Path
from datetime import datetime, timezone
def _json_default(obj):
"""Handle numpy types that json.dumps can't serialize."""
if isinstance(obj, (np.integer,)):
return int(obj)
if isinstance(obj, (np.floating,)):
return float(obj)
if isinstance(obj, (np.bool_,)):
return bool(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
# Try to import google-cloud-firestore (lightweight)
try:
from google.cloud.firestore import Client as FirestoreClient
from google.oauth2.service_account import Credentials
HAS_FIRESTORE = True
except ImportError:
HAS_FIRESTORE = False
class PredictionLogger:
"""Log predictions to Firebase Firestore with local JSONL fallback."""
def __init__(self, local_dir: Path = None):
self.db = None
self.local_dir = local_dir or Path("data/prediction_logs")
self.local_dir.mkdir(parents=True, exist_ok=True)
self._init_firebase()
def _init_firebase(self):
"""Initialize Firebase Firestore client from environment."""
sa_json = os.environ.get("FIREBASE_SERVICE_ACCOUNT", "")
if not sa_json or not HAS_FIRESTORE:
if not HAS_FIRESTORE:
print(" Firebase SDK not installed (pip install google-cloud-firestore)")
print(" Using local JSONL logging only")
return
try:
sa_info = json.loads(sa_json)
creds = Credentials.from_service_account_info(sa_info)
project_id = sa_info.get("project_id", os.environ.get("FIREBASE_PROJECT_ID", ""))
self.db = FirestoreClient(project=project_id, credentials=creds)
print(f" Firebase Firestore connected (project: {project_id})")
except Exception as e:
print(f" Firebase init failed: {e}")
print(" Falling back to local JSONL logging")
def log_predictions(self, date_str: str, predictions: list[dict]):
"""Log a batch of daily predictions.
Args:
date_str: Date string (YYYY-MM-DD)
predictions: List of prediction dicts with keys:
sat1_norad, sat2_norad, sat1_name, sat2_name,
risk_score, altitude_km, model_used
"""
# Always save locally
local_file = self.local_dir / f"{date_str}.jsonl"
with open(local_file, "a") as f:
for pred in predictions:
pred["date"] = date_str
pred["logged_at"] = datetime.now(timezone.utc).isoformat()
f.write(json.dumps(pred, default=_json_default) + "\n")
print(f" Saved {len(predictions)} predictions to {local_file}")
# Firebase upload
if self.db:
try:
batch = self.db.batch()
collection = self.db.collection("predictions").document(date_str)
collection.set({"date": date_str, "count": len(predictions)})
for i, pred in enumerate(predictions):
doc_ref = self.db.collection("predictions").document(date_str) \
.collection("pairs").document(f"pair_{i:04d}")
batch.set(doc_ref, pred)
batch.commit()
print(f" Uploaded {len(predictions)} predictions to Firebase")
except Exception as e:
print(f" Firebase upload failed: {e}")
def log_outcomes(self, date_str: str, outcomes: list[dict]):
"""Log maneuver detection outcomes for a previous prediction date.
Args:
date_str: Original prediction date (YYYY-MM-DD)
outcomes: List of outcome dicts with keys:
sat1_norad, sat2_norad, sat1_maneuvered, sat2_maneuvered,
sat1_delta_a_m, sat2_delta_a_m, validated_at
"""
local_file = self.local_dir / f"{date_str}_outcomes.jsonl"
with open(local_file, "a") as f:
for outcome in outcomes:
outcome["prediction_date"] = date_str
outcome["validated_at"] = datetime.now(timezone.utc).isoformat()
f.write(json.dumps(outcome, default=_json_default) + "\n")
print(f" Saved {len(outcomes)} outcomes to {local_file}")
if self.db:
try:
batch = self.db.batch()
for i, outcome in enumerate(outcomes):
doc_ref = self.db.collection("outcomes").document(date_str) \
.collection("results").document(f"result_{i:04d}")
batch.set(doc_ref, outcome)
batch.commit()
print(f" Uploaded {len(outcomes)} outcomes to Firebase")
except Exception as e:
print(f" Firebase upload failed: {e}")
def log_daily_summary(self, date_str: str, summary: dict):
"""Log a daily summary (n_predictions, n_maneuvers_detected, accuracy, etc)."""
local_file = self.local_dir / "daily_summaries.jsonl"
summary["date"] = date_str
with open(local_file, "a") as f:
f.write(json.dumps(summary, default=_json_default) + "\n")
if self.db:
try:
self.db.collection("daily_summaries").document(date_str).set(summary)
print(f" Uploaded daily summary to Firebase")
except Exception as e:
print(f" Firebase summary upload failed: {e}")
def get_predictions_for_date(self, date_str: str) -> list[dict]:
"""Retrieve predictions for a date (from local files)."""
local_file = self.local_dir / f"{date_str}.jsonl"
if not local_file.exists():
return []
predictions = []
with open(local_file) as f:
for line in f:
line = line.strip()
if line:
predictions.append(json.loads(line))
return predictions
|