OBD_Logger / data /firebase_saver.py
LiamKhoaLe's picture
Upd blob
bb3c76a
# firebase_saver.py
import os
import io
import re
import json
import logging
from datetime import datetime
from typing import Optional, Tuple, List
import pandas as pd
logger = logging.getLogger("firebase-saver")
logger.setLevel(logging.INFO)
if not logger.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s"))
logger.addHandler(_h)
# ---------- Constants (fixed as requested) ----------
FIXED_BUCKET = "skyledge-36b56.firebasestorage.app"
FIXED_PREFIX = "skyledge/processed" # no trailing slash
# Pattern: NNN_YYYY-MM-DD_processed.csv
FILENAME_RE = re.compile(r"^(?P<num>\d{3})_(?P<date>\d{4}-\d{2}-\d{2})_processed\.csv$")
def _parse_gs_uri(uri: Optional[str]):
if not uri or not uri.startswith("gs://"):
return None, None
path = uri[len("gs://"):]
parts = path.split("/", 1)
bucket = parts[0]
prefix = parts[1] if len(parts) > 1 else ""
return bucket, prefix
def _maybe_default_firebase_bucket(name: Optional[str]) -> Optional[str]:
# If user passed a project ID (no dot), convert to <project>.appspot.com
if name and "." not in name:
return f"{name}.appspot.com"
return name
# -------------------- Low-level clients --------------------
class _AdminClient:
"""Firebase Admin SDK storage client."""
def __init__(self, bucket: str):
import firebase_admin
from firebase_admin import credentials, storage as fb_storage
raw = os.getenv("FIREBASE_ADMIN_JSON")
if not raw:
raise RuntimeError("FIREBASE_ADMIN_JSON not set")
info = json.loads(raw)
client_email = info.get("client_email")
cred = credentials.Certificate(info)
if not firebase_admin._apps:
firebase_admin.initialize_app(cred, {"storageBucket": bucket})
# fb_storage.bucket returns a google.cloud.storage.bucket.Bucket
self.bucket = fb_storage.bucket(bucket)
self._bucket_name = bucket
logger.info(f"✅ Firebase Admin initialized | bucket={bucket} as {client_email}")
# Uploads
def upload_from_filename(self, local_path: str, dest_path: str, content_type: str):
blob = self.bucket.blob(dest_path)
blob.cache_control = "no-store"
blob.upload_from_filename(local_path, content_type=content_type)
def upload_from_bytes(self, data: bytes, dest_path: str, content_type: str):
blob = self.bucket.blob(dest_path)
blob.cache_control = "no-store"
blob.upload_from_string(data, content_type=content_type)
# Listing (needs storage.objects.list permission)
def list_names(self, prefix: str) -> List[str]:
# Bucket.list_blobs works via the underlying GCS client
blobs = self.bucket.list_blobs(prefix=prefix)
return [b.name for b in blobs]
# Existence check (for collision-safe retry)
def blob_exists(self, path: str) -> bool:
blob = self.bucket.blob(path)
return blob.exists()
class _GCSClient:
"""google-cloud-storage client."""
def __init__(self, bucket: str):
from google.cloud import storage
from google.oauth2 import service_account
raw = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON")
if not raw:
raise RuntimeError("FIREBASE_SERVICE_ACCOUNT_JSON not set")
info = json.loads(raw)
client_email = info.get("client_email")
creds = service_account.Credentials.from_service_account_info(info)
project_id = info.get("project_id")
self.client = storage.Client(credentials=creds, project=project_id)
self.bucket = self.client.bucket(bucket)
self._bucket_name = bucket
logger.info(f"✅ GCS client initialized | bucket={bucket} as {client_email}")
def upload_from_filename(self, local_path: str, dest_path: str, content_type: str):
blob = self.bucket.blob(dest_path)
blob.cache_control = "no-store"
blob.upload_from_filename(local_path, content_type=content_type)
def upload_from_bytes(self, data: bytes, dest_path: str, content_type: str):
blob = self.bucket.blob(dest_path)
blob.cache_control = "no-store"
blob.upload_from_string(data, content_type=content_type)
def list_names(self, prefix: str) -> List[str]:
blobs = self.client.list_blobs(self._bucket_name, prefix=prefix)
return [b.name for b in blobs]
def blob_exists(self, path: str) -> bool:
blob = self.bucket.blob(path)
return blob.exists(self.client)
# -------------------- Saver (high level) --------------------
class FirebaseSaver:
"""
Fixed target:
Bucket: skyledge-36b56.firebasestorage.app
Prefix: skyledge/processed
Filename convention: NNN_YYYY-MM-DD_processed.csv (NNN is 001-based, zero-padded).
Auto-increments by listing current objects and picking max+1.
"""
def __init__(self):
# Force fixed location regardless of env (as requested)
bucket_name = FIXED_BUCKET
self.prefix = FIXED_PREFIX
# Try Admin SDK first; fallback to GCS client
self.client = None
self.mode = None
try:
if os.getenv("FIREBASE_ADMIN_JSON"):
self.client = _AdminClient(bucket_name)
self.mode = "admin"
except Exception as e:
logger.warning(f"⚠️ Admin SDK init failed: {e}")
if self.client is None:
try:
self.client = _GCSClient(bucket_name)
self.mode = "gcs"
except Exception as e:
logger.error(f"❌ GCS client init failed: {e}")
raise
logger.info(f"📦 FirebaseSaver ready | mode={self.mode} bucket={bucket_name} prefix={self.prefix}")
def is_available(self) -> bool:
return self.client is not None
# ---------- Incremental naming helpers ----------
def _list_existing_filenames(self) -> List[str]:
"""List object names under the fixed prefix, return just basenames under that folder."""
names = self.client.list_names(prefix=self.prefix + "/")
# keep only items immediately under prefix (not subfolders) & matching our filename pattern
base_names = []
for full in names:
# full looks like 'skyledge/processed/NNN_YYYY-MM-DD_processed.csv'
if not full.startswith(self.prefix + "/"):
continue
base = full[len(self.prefix) + 1:] # strip 'prefix/'
if "/" in base:
# skip nested items (none expected)
continue
if FILENAME_RE.match(base):
base_names.append(base)
return base_names
def _max_existing_id(self) -> int:
"""Return max NNN found under prefix, or 0 if none."""
try:
base_names = self._list_existing_filenames()
except Exception as e:
logger.warning(f"⚠️ Unable to list existing objects; defaulting max_id=0: {e}")
return 0
max_id = 0
for name in base_names:
m = FILENAME_RE.match(name)
if not m:
continue
try:
num = int(m.group("num"))
if num > max_id:
max_id = num
except ValueError:
continue
return max_id
@staticmethod
def _format_id(n: int) -> str:
return f"{n:03d}"
@staticmethod
def _today_au() -> str:
# Use Australia/Melbourne local date; if zoneinfo unavailable, fall back to UTC date.
try:
from zoneinfo import ZoneInfo
dt = datetime.now(ZoneInfo("Australia/Melbourne"))
except Exception:
dt = datetime.utcnow()
return dt.strftime("%Y-%m-%d")
def _build_filename(self, n_int: int, date_str: Optional[str] = None) -> str:
date_val = (date_str or self._today_au())
return f"{self._format_id(n_int)}_{date_val}_processed.csv"
def _dest_path(self, filename: str) -> str:
return f"{self.prefix}/{filename}"
def _next_available_name(self, date_str: Optional[str] = None, max_retries: int = 5) -> Tuple[str, str]:
"""
Compute the next file name by listing existing ones and incrementing.
Includes a collision check (exists) and retries if necessary.
Returns: (filename, full_gcs_path)
"""
start = self._max_existing_id() + 1
n = start
for _ in range(max_retries):
candidate = self._build_filename(n, date_str=date_str)
dest_path = self._dest_path(candidate)
# collision check
if not self.client.blob_exists(dest_path):
return candidate, dest_path
n += 1
# As a final fallback, return the last tried (very unlikely to collide repeatedly)
candidate = self._build_filename(n, date_str=date_str)
return candidate, self._dest_path(candidate)
# ---------- Public save methods (incremental) ----------
def upload_file_with_increment(
self,
local_path: str,
date_str: Optional[str] = None,
content_type: str = "text/csv",
) -> str:
"""
Upload a local file using the next incremental name.
Returns the gs:// URL of the uploaded object (string) or "" on failure.
"""
if not self.is_available():
logger.warning("⚠️ Firebase saver unavailable")
return ""
try:
filename, dest_path = self._next_available_name(date_str=date_str)
self.client.upload_from_filename(local_path, dest_path, content_type)
logger.info(f"✅ Uploaded file to gs://{FIXED_BUCKET}/{dest_path}")
return f"gs://{FIXED_BUCKET}/{dest_path}"
except Exception as e:
logger.error(f"❌ Firebase upload failed: {e}")
return ""
def upload_dataframe_with_increment(
self,
df: pd.DataFrame,
date_str: Optional[str] = None,
content_type: str = "text/csv",
) -> str:
"""
Upload a DataFrame (as CSV) using the next incremental name.
Returns the gs:// URL of the uploaded object (string) or "" on failure.
"""
if not self.is_available():
logger.warning("⚠️ Firebase saver unavailable")
return ""
try:
buf = io.StringIO()
df.to_csv(buf, index=False)
data = buf.getvalue().encode("utf-8")
filename, dest_path = self._next_available_name(date_str=date_str)
self.client.upload_from_bytes(data, dest_path, content_type)
logger.info(f"✅ Uploaded DataFrame to gs://{FIXED_BUCKET}/{dest_path}")
return f"gs://{FIXED_BUCKET}/{dest_path}"
except Exception as e:
logger.error(f"❌ Firebase DF upload failed: {e}")
return ""
def save_efficiency_data(self, filename: str, efficiency_score: float) -> bool:
"""
Save efficiency data to efficiency.json file in Firebase.
Args:
filename: The processed filename (e.g., "001_2024-12-01_processed.csv")
efficiency_score: The fuel efficiency score (0-100)
Returns:
bool: True if successful, False otherwise
"""
try:
# Load existing efficiency data
efficiency_data = self.load_efficiency_data()
# Add new entry
efficiency_data[filename] = {
"efficiency_score": efficiency_score,
"timestamp": datetime.now().isoformat(),
"filename": filename
}
# Convert to JSON
json_data = json.dumps(efficiency_data, indent=2)
# Upload to Firebase
dest_path = f"{FIXED_PREFIX}/efficiency.json"
self.client.upload_from_bytes(
json_data.encode("utf-8"),
dest_path,
"application/json"
)
logger.info(f"✅ Efficiency data saved for {filename}: {efficiency_score}%")
return True
except Exception as e:
logger.error(f"❌ Failed to save efficiency data: {e}")
return False
def load_efficiency_data(self) -> dict:
"""
Load efficiency data from efficiency.json file in Firebase.
Returns:
dict: Efficiency data or empty dict if file doesn't exist
"""
try:
dest_path = f"{FIXED_PREFIX}/efficiency.json"
# Try to download the file
blob = self.client.bucket.blob(dest_path)
if not blob.exists():
logger.info("📄 efficiency.json not found, returning empty data")
return {}
# Download and parse JSON
json_data = blob.download_as_text()
efficiency_data = json.loads(json_data)
logger.info(f"✅ Loaded efficiency data: {len(efficiency_data)} entries")
return efficiency_data
except Exception as e:
logger.warning(f"⚠️ Failed to load efficiency data: {e}")
return {}
def get_efficiency_by_filename(self, filename: str) -> Optional[dict]:
"""
Get efficiency data for a specific filename.
Args:
filename: The processed filename
Returns:
dict: Efficiency data or None if not found
"""
try:
efficiency_data = self.load_efficiency_data()
return efficiency_data.get(filename)
except Exception as e:
logger.error(f"❌ Failed to get efficiency data for {filename}: {e}")
return None
# ---------- Convenience free functions ----------
def save_csv_increment(csv_path: str, date_str: Optional[str] = None) -> str:
"""
Upload local CSV with auto-incremented name 'NNN_YYYY-MM-DD_processed.csv'.
Returns gs:// URL or "".
"""
saver = FirebaseSaver()
return saver.upload_file_with_increment(csv_path, date_str=date_str)
def save_dataframe_increment(df: pd.DataFrame, date_str: Optional[str] = None) -> str:
"""
Upload DataFrame with auto-incremented name 'NNN_YYYY-MM-DD_processed.csv'.
Returns gs:// URL or "".
"""
saver = FirebaseSaver()
return saver.upload_dataframe_with_increment(df, date_str=date_str)
def save_efficiency_data(filename: str, efficiency_score: float) -> bool:
"""
Save efficiency data to Firebase efficiency.json file.
Args:
filename: The processed filename
efficiency_score: The fuel efficiency score (0-100)
Returns:
bool: True if successful, False otherwise
"""
saver = FirebaseSaver()
return saver.save_efficiency_data(filename, efficiency_score)
def get_efficiency_by_filename(filename: str) -> Optional[dict]:
"""
Get efficiency data for a specific filename from Firebase.
Args:
filename: The processed filename
Returns:
dict: Efficiency data or None if not found
"""
saver = FirebaseSaver()
return saver.get_efficiency_by_filename(filename)