ingestion / app /utils.py
Hammad712's picture
Update app/utils.py
383c7b0 verified
import os
import tempfile
import time
import shutil
from typing import Tuple, List, Dict, Optional
from supabase import create_client, Client
from .core import config as core_config
# --- CONFIGURATION ---
_cfg = core_config.load_config()
# Initialize Supabase Client
# Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file
try:
supabase: Client = create_client(_cfg["SUPABASE_URL"], _cfg["SUPABASE_KEY"])
SUPABASE_BUCKET = _cfg["STORAGE_BUCKET"]
except Exception as e:
print(f"CRITICAL: Supabase client failed to initialize. Check .env. Error: {e}")
supabase = None
SUPABASE_BUCKET = "documents" # Fallback
# Local data dir for temp processing only
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data"))
os.makedirs(DATA_DIR, exist_ok=True)
# --- LOCAL FILE HELPERS ---
def save_upload_file_tmp(upload_file) -> Tuple[str, str]:
"""
Save a FastAPI UploadFile to a temporary file on disk.
Returns: (absolute_path_to_temp_file, original_filename)
"""
try:
suffix = os.path.splitext(upload_file.filename)[1]
fd, tmp_path = tempfile.mkstemp(suffix=suffix, dir=DATA_DIR)
# Close the file descriptor immediately so we can use the path safely
os.close(fd)
with open(tmp_path, "wb") as out:
shutil.copyfileobj(upload_file.file, out)
return tmp_path, upload_file.filename
except Exception as e:
print(f"Error saving temp file: {e}")
raise e
# --- SUPABASE STORAGE HELPERS ---
def upload_file_to_bucket(file_path: str, destination_path: str) -> str:
"""
Uploads a local file to Supabase Storage.
Returns the destination path on success, or empty string on failure.
"""
if not supabase: return ""
try:
with open(file_path, 'rb') as f:
supabase.storage.from_(SUPABASE_BUCKET).upload(
path=destination_path,
file=f,
file_options={"content-type": "application/octet-stream", "upsert": "true"}
)
return destination_path
except Exception as e:
print(f"Supabase Upload failed for {destination_path}: {e}")
# We return "" so the pipeline knows upload failed but doesn't crash strictly
return ""
def get_signed_url(bucket_path: str, expiry_seconds=3600) -> Optional[str]:
"""
Generates a secure, temporary download link for a file in Supabase Storage.
"""
if not supabase: return None
try:
res = supabase.storage.from_(SUPABASE_BUCKET).create_signed_url(bucket_path, expiry_seconds)
return res.get("signedURL")
except Exception as e:
print(f"Failed to generate signed URL for {bucket_path}: {e}")
return None
# --- SUPABASE DATABASE HELPERS ---
def append_metadata_entry(entry: dict):
"""
Insert a new job entry into the 'job_metadata' table.
Schema expected: job_id (uuid), original_filename (text), report_path (text), created_at (int8), etc.
"""
if not supabase: return
try:
data = {
"job_id": str(entry.get("uuid")),
"original_filename": entry.get("original_filename"),
"report_path": entry.get("report"), # This is the BUCKET PATH (e.g., jobs/123/report.md)
"type": entry.get("type", "single"), # 'single' or 'batch'
"created_at": int(entry.get("created_at", time.time())),
"expires_at": int(entry.get("expires_at", time.time() + 86400))
}
supabase.table("job_metadata").insert(data).execute()
except Exception as e:
print(f"Error writing metadata to Supabase: {e}")
def read_metadata(limit: int = 100) -> List[Dict]:
"""
Fetch recent metadata entries from Supabase 'job_metadata' table.
"""
if not supabase: return []
try:
response = supabase.table("job_metadata")\
.select("*")\
.order("created_at", desc=True)\
.limit(limit)\
.execute()
results = []
for row in response.data:
results.append({
"uuid": row["job_id"],
"original_filename": row["original_filename"],
"report": row["report_path"],
"type": row.get("type", "single"),
"created_at": row["created_at"],
"expires_at": row["expires_at"]
})
return results
except Exception as e:
print(f"Error reading metadata from Supabase: {e}")
return []
def get_job_by_filename(filename: str) -> Optional[Dict]:
"""
Check if a file with this name has already been processed.
Used for duplicate detection.
"""
if not supabase: return None
try:
response = supabase.table("job_metadata")\
.select("*")\
.eq("original_filename", filename)\
.limit(1)\
.execute()
if response.data and len(response.data) > 0:
row = response.data[0]
return {
"uuid": row["job_id"],
"original_filename": row["original_filename"],
"report": row["report_path"],
"created_at": row["created_at"]
}
except Exception as e:
print(f"Error checking duplicate for {filename}: {e}")
return None
def list_all_jobs(limit: int = 100) -> Dict[str, List[str]]:
"""
Aggregates metadata to return simple lists of PDFs and Reports.
Used by the /files/list endpoint.
"""
try:
data = read_metadata(limit)
# Extract unique filenames and report paths
pdf_files = [item["original_filename"] for item in data if item.get("original_filename")]
# We only want to list reports that actually exist
md_files = [item["original_filename"].replace(".pdf", "_report.md") for item in data if item.get("report")]
# Alternatively, if you want the filenames of the reports themselves (e.g. for display)
# You might just want to return the raw list of jobs to the frontend
# but to match your requested API structure:
return {
"pdf_files": pdf_files,
"md_files": md_files,
"full_data": data
}
except Exception as e:
print(f"Error listing jobs: {e}")
return {"pdf_files": [], "md_files": []}
# --- BACKGROUND TASKS ---
def cleanup_expired_reports(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60):
"""
Periodically deletes expired entries from the DB.
"""
if not supabase: return
while True:
try:
now = int(time.time())
supabase.table("job_metadata").delete().lt("expires_at", now).execute()
except Exception:
pass
time.sleep(interval_seconds)
_cleanup_thread_started = False
def start_cleanup_thread(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60):
global _cleanup_thread_started
if _cleanup_thread_started:
return
import threading as _th
t = _th.Thread(target=cleanup_expired_reports, args=(retention_seconds, interval_seconds), daemon=True)
t.start()
_cleanup_thread_started = True