import os import tempfile import time import shutil from typing import Tuple, List, Dict, Optional from supabase import create_client, Client from .core import config as core_config # --- CONFIGURATION --- _cfg = core_config.load_config() # Initialize Supabase Client # Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file try: supabase: Client = create_client(_cfg["SUPABASE_URL"], _cfg["SUPABASE_KEY"]) SUPABASE_BUCKET = _cfg["STORAGE_BUCKET"] except Exception as e: print(f"CRITICAL: Supabase client failed to initialize. Check .env. Error: {e}") supabase = None SUPABASE_BUCKET = "documents" # Fallback # Local data dir for temp processing only DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data")) os.makedirs(DATA_DIR, exist_ok=True) # --- LOCAL FILE HELPERS --- def save_upload_file_tmp(upload_file) -> Tuple[str, str]: """ Save a FastAPI UploadFile to a temporary file on disk. Returns: (absolute_path_to_temp_file, original_filename) """ try: suffix = os.path.splitext(upload_file.filename)[1] fd, tmp_path = tempfile.mkstemp(suffix=suffix, dir=DATA_DIR) # Close the file descriptor immediately so we can use the path safely os.close(fd) with open(tmp_path, "wb") as out: shutil.copyfileobj(upload_file.file, out) return tmp_path, upload_file.filename except Exception as e: print(f"Error saving temp file: {e}") raise e # --- SUPABASE STORAGE HELPERS --- def upload_file_to_bucket(file_path: str, destination_path: str) -> str: """ Uploads a local file to Supabase Storage. Returns the destination path on success, or empty string on failure. """ if not supabase: return "" try: with open(file_path, 'rb') as f: supabase.storage.from_(SUPABASE_BUCKET).upload( path=destination_path, file=f, file_options={"content-type": "application/octet-stream", "upsert": "true"} ) return destination_path except Exception as e: print(f"Supabase Upload failed for {destination_path}: {e}") # We return "" so the pipeline knows upload failed but doesn't crash strictly return "" def get_signed_url(bucket_path: str, expiry_seconds=3600) -> Optional[str]: """ Generates a secure, temporary download link for a file in Supabase Storage. """ if not supabase: return None try: res = supabase.storage.from_(SUPABASE_BUCKET).create_signed_url(bucket_path, expiry_seconds) return res.get("signedURL") except Exception as e: print(f"Failed to generate signed URL for {bucket_path}: {e}") return None # --- SUPABASE DATABASE HELPERS --- def append_metadata_entry(entry: dict): """ Insert a new job entry into the 'job_metadata' table. Schema expected: job_id (uuid), original_filename (text), report_path (text), created_at (int8), etc. """ if not supabase: return try: data = { "job_id": str(entry.get("uuid")), "original_filename": entry.get("original_filename"), "report_path": entry.get("report"), # This is the BUCKET PATH (e.g., jobs/123/report.md) "type": entry.get("type", "single"), # 'single' or 'batch' "created_at": int(entry.get("created_at", time.time())), "expires_at": int(entry.get("expires_at", time.time() + 86400)) } supabase.table("job_metadata").insert(data).execute() except Exception as e: print(f"Error writing metadata to Supabase: {e}") def read_metadata(limit: int = 100) -> List[Dict]: """ Fetch recent metadata entries from Supabase 'job_metadata' table. """ if not supabase: return [] try: response = supabase.table("job_metadata")\ .select("*")\ .order("created_at", desc=True)\ .limit(limit)\ .execute() results = [] for row in response.data: results.append({ "uuid": row["job_id"], "original_filename": row["original_filename"], "report": row["report_path"], "type": row.get("type", "single"), "created_at": row["created_at"], "expires_at": row["expires_at"] }) return results except Exception as e: print(f"Error reading metadata from Supabase: {e}") return [] def get_job_by_filename(filename: str) -> Optional[Dict]: """ Check if a file with this name has already been processed. Used for duplicate detection. """ if not supabase: return None try: response = supabase.table("job_metadata")\ .select("*")\ .eq("original_filename", filename)\ .limit(1)\ .execute() if response.data and len(response.data) > 0: row = response.data[0] return { "uuid": row["job_id"], "original_filename": row["original_filename"], "report": row["report_path"], "created_at": row["created_at"] } except Exception as e: print(f"Error checking duplicate for {filename}: {e}") return None def list_all_jobs(limit: int = 100) -> Dict[str, List[str]]: """ Aggregates metadata to return simple lists of PDFs and Reports. Used by the /files/list endpoint. """ try: data = read_metadata(limit) # Extract unique filenames and report paths pdf_files = [item["original_filename"] for item in data if item.get("original_filename")] # We only want to list reports that actually exist md_files = [item["original_filename"].replace(".pdf", "_report.md") for item in data if item.get("report")] # Alternatively, if you want the filenames of the reports themselves (e.g. for display) # You might just want to return the raw list of jobs to the frontend # but to match your requested API structure: return { "pdf_files": pdf_files, "md_files": md_files, "full_data": data } except Exception as e: print(f"Error listing jobs: {e}") return {"pdf_files": [], "md_files": []} # --- BACKGROUND TASKS --- def cleanup_expired_reports(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60): """ Periodically deletes expired entries from the DB. """ if not supabase: return while True: try: now = int(time.time()) supabase.table("job_metadata").delete().lt("expires_at", now).execute() except Exception: pass time.sleep(interval_seconds) _cleanup_thread_started = False def start_cleanup_thread(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60): global _cleanup_thread_started if _cleanup_thread_started: return import threading as _th t = _th.Thread(target=cleanup_expired_reports, args=(retention_seconds, interval_seconds), daemon=True) t.start() _cleanup_thread_started = True