|
|
import os |
|
|
import tempfile |
|
|
import time |
|
|
import shutil |
|
|
from typing import Tuple, List, Dict, Optional |
|
|
from supabase import create_client, Client |
|
|
from .core import config as core_config |
|
|
|
|
|
|
|
|
_cfg = core_config.load_config() |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
supabase: Client = create_client(_cfg["SUPABASE_URL"], _cfg["SUPABASE_KEY"]) |
|
|
SUPABASE_BUCKET = _cfg["STORAGE_BUCKET"] |
|
|
except Exception as e: |
|
|
print(f"CRITICAL: Supabase client failed to initialize. Check .env. Error: {e}") |
|
|
supabase = None |
|
|
SUPABASE_BUCKET = "documents" |
|
|
|
|
|
|
|
|
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data")) |
|
|
os.makedirs(DATA_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_upload_file_tmp(upload_file) -> Tuple[str, str]: |
|
|
""" |
|
|
Save a FastAPI UploadFile to a temporary file on disk. |
|
|
Returns: (absolute_path_to_temp_file, original_filename) |
|
|
""" |
|
|
try: |
|
|
suffix = os.path.splitext(upload_file.filename)[1] |
|
|
fd, tmp_path = tempfile.mkstemp(suffix=suffix, dir=DATA_DIR) |
|
|
|
|
|
|
|
|
os.close(fd) |
|
|
|
|
|
with open(tmp_path, "wb") as out: |
|
|
shutil.copyfileobj(upload_file.file, out) |
|
|
|
|
|
return tmp_path, upload_file.filename |
|
|
except Exception as e: |
|
|
print(f"Error saving temp file: {e}") |
|
|
raise e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_file_to_bucket(file_path: str, destination_path: str) -> str: |
|
|
""" |
|
|
Uploads a local file to Supabase Storage. |
|
|
Returns the destination path on success, or empty string on failure. |
|
|
""" |
|
|
if not supabase: return "" |
|
|
try: |
|
|
with open(file_path, 'rb') as f: |
|
|
supabase.storage.from_(SUPABASE_BUCKET).upload( |
|
|
path=destination_path, |
|
|
file=f, |
|
|
file_options={"content-type": "application/octet-stream", "upsert": "true"} |
|
|
) |
|
|
return destination_path |
|
|
except Exception as e: |
|
|
print(f"Supabase Upload failed for {destination_path}: {e}") |
|
|
|
|
|
return "" |
|
|
|
|
|
def get_signed_url(bucket_path: str, expiry_seconds=3600) -> Optional[str]: |
|
|
""" |
|
|
Generates a secure, temporary download link for a file in Supabase Storage. |
|
|
""" |
|
|
if not supabase: return None |
|
|
try: |
|
|
res = supabase.storage.from_(SUPABASE_BUCKET).create_signed_url(bucket_path, expiry_seconds) |
|
|
return res.get("signedURL") |
|
|
except Exception as e: |
|
|
print(f"Failed to generate signed URL for {bucket_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def append_metadata_entry(entry: dict): |
|
|
""" |
|
|
Insert a new job entry into the 'job_metadata' table. |
|
|
Schema expected: job_id (uuid), original_filename (text), report_path (text), created_at (int8), etc. |
|
|
""" |
|
|
if not supabase: return |
|
|
try: |
|
|
data = { |
|
|
"job_id": str(entry.get("uuid")), |
|
|
"original_filename": entry.get("original_filename"), |
|
|
"report_path": entry.get("report"), |
|
|
"type": entry.get("type", "single"), |
|
|
"created_at": int(entry.get("created_at", time.time())), |
|
|
"expires_at": int(entry.get("expires_at", time.time() + 86400)) |
|
|
} |
|
|
supabase.table("job_metadata").insert(data).execute() |
|
|
except Exception as e: |
|
|
print(f"Error writing metadata to Supabase: {e}") |
|
|
|
|
|
def read_metadata(limit: int = 100) -> List[Dict]: |
|
|
""" |
|
|
Fetch recent metadata entries from Supabase 'job_metadata' table. |
|
|
""" |
|
|
if not supabase: return [] |
|
|
try: |
|
|
response = supabase.table("job_metadata")\ |
|
|
.select("*")\ |
|
|
.order("created_at", desc=True)\ |
|
|
.limit(limit)\ |
|
|
.execute() |
|
|
|
|
|
results = [] |
|
|
for row in response.data: |
|
|
results.append({ |
|
|
"uuid": row["job_id"], |
|
|
"original_filename": row["original_filename"], |
|
|
"report": row["report_path"], |
|
|
"type": row.get("type", "single"), |
|
|
"created_at": row["created_at"], |
|
|
"expires_at": row["expires_at"] |
|
|
}) |
|
|
return results |
|
|
except Exception as e: |
|
|
print(f"Error reading metadata from Supabase: {e}") |
|
|
return [] |
|
|
|
|
|
def get_job_by_filename(filename: str) -> Optional[Dict]: |
|
|
""" |
|
|
Check if a file with this name has already been processed. |
|
|
Used for duplicate detection. |
|
|
""" |
|
|
if not supabase: return None |
|
|
try: |
|
|
response = supabase.table("job_metadata")\ |
|
|
.select("*")\ |
|
|
.eq("original_filename", filename)\ |
|
|
.limit(1)\ |
|
|
.execute() |
|
|
|
|
|
if response.data and len(response.data) > 0: |
|
|
row = response.data[0] |
|
|
return { |
|
|
"uuid": row["job_id"], |
|
|
"original_filename": row["original_filename"], |
|
|
"report": row["report_path"], |
|
|
"created_at": row["created_at"] |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"Error checking duplicate for {filename}: {e}") |
|
|
return None |
|
|
|
|
|
def list_all_jobs(limit: int = 100) -> Dict[str, List[str]]: |
|
|
""" |
|
|
Aggregates metadata to return simple lists of PDFs and Reports. |
|
|
Used by the /files/list endpoint. |
|
|
""" |
|
|
try: |
|
|
data = read_metadata(limit) |
|
|
|
|
|
|
|
|
pdf_files = [item["original_filename"] for item in data if item.get("original_filename")] |
|
|
|
|
|
|
|
|
md_files = [item["original_filename"].replace(".pdf", "_report.md") for item in data if item.get("report")] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"pdf_files": pdf_files, |
|
|
"md_files": md_files, |
|
|
"full_data": data |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"Error listing jobs: {e}") |
|
|
return {"pdf_files": [], "md_files": []} |
|
|
|
|
|
|
|
|
|
|
|
def cleanup_expired_reports(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60): |
|
|
""" |
|
|
Periodically deletes expired entries from the DB. |
|
|
""" |
|
|
if not supabase: return |
|
|
while True: |
|
|
try: |
|
|
now = int(time.time()) |
|
|
supabase.table("job_metadata").delete().lt("expires_at", now).execute() |
|
|
except Exception: |
|
|
pass |
|
|
time.sleep(interval_seconds) |
|
|
|
|
|
_cleanup_thread_started = False |
|
|
|
|
|
def start_cleanup_thread(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60): |
|
|
global _cleanup_thread_started |
|
|
if _cleanup_thread_started: |
|
|
return |
|
|
import threading as _th |
|
|
t = _th.Thread(target=cleanup_expired_reports, args=(retention_seconds, interval_seconds), daemon=True) |
|
|
t.start() |
|
|
_cleanup_thread_started = True |