File size: 7,293 Bytes
d2654d6 9f61641 383c7b0 a2b18d6 d2654d6 383c7b0 a2b18d6 383c7b0 a2b18d6 d2654d6 383c7b0 d2654d6 383c7b0 d2654d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 d2654d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 a2b18d6 383c7b0 a2b18d6 906c82a 383c7b0 906c82a 383c7b0 906c82a 383c7b0 906c82a 383c7b0 906c82a 383c7b0 906c82a 383c7b0 906c82a 383c7b0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
import os
import tempfile
import time
import shutil
from typing import Tuple, List, Dict, Optional
from supabase import create_client, Client
from .core import config as core_config
# --- CONFIGURATION ---
_cfg = core_config.load_config()
# Initialize Supabase Client
# Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file
try:
supabase: Client = create_client(_cfg["SUPABASE_URL"], _cfg["SUPABASE_KEY"])
SUPABASE_BUCKET = _cfg["STORAGE_BUCKET"]
except Exception as e:
print(f"CRITICAL: Supabase client failed to initialize. Check .env. Error: {e}")
supabase = None
SUPABASE_BUCKET = "documents" # Fallback
# Local data dir for temp processing only
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data"))
os.makedirs(DATA_DIR, exist_ok=True)
# --- LOCAL FILE HELPERS ---
def save_upload_file_tmp(upload_file) -> Tuple[str, str]:
"""
Save a FastAPI UploadFile to a temporary file on disk.
Returns: (absolute_path_to_temp_file, original_filename)
"""
try:
suffix = os.path.splitext(upload_file.filename)[1]
fd, tmp_path = tempfile.mkstemp(suffix=suffix, dir=DATA_DIR)
# Close the file descriptor immediately so we can use the path safely
os.close(fd)
with open(tmp_path, "wb") as out:
shutil.copyfileobj(upload_file.file, out)
return tmp_path, upload_file.filename
except Exception as e:
print(f"Error saving temp file: {e}")
raise e
# --- SUPABASE STORAGE HELPERS ---
def upload_file_to_bucket(file_path: str, destination_path: str) -> str:
"""
Uploads a local file to Supabase Storage.
Returns the destination path on success, or empty string on failure.
"""
if not supabase: return ""
try:
with open(file_path, 'rb') as f:
supabase.storage.from_(SUPABASE_BUCKET).upload(
path=destination_path,
file=f,
file_options={"content-type": "application/octet-stream", "upsert": "true"}
)
return destination_path
except Exception as e:
print(f"Supabase Upload failed for {destination_path}: {e}")
# We return "" so the pipeline knows upload failed but doesn't crash strictly
return ""
def get_signed_url(bucket_path: str, expiry_seconds=3600) -> Optional[str]:
"""
Generates a secure, temporary download link for a file in Supabase Storage.
"""
if not supabase: return None
try:
res = supabase.storage.from_(SUPABASE_BUCKET).create_signed_url(bucket_path, expiry_seconds)
return res.get("signedURL")
except Exception as e:
print(f"Failed to generate signed URL for {bucket_path}: {e}")
return None
# --- SUPABASE DATABASE HELPERS ---
def append_metadata_entry(entry: dict):
"""
Insert a new job entry into the 'job_metadata' table.
Schema expected: job_id (uuid), original_filename (text), report_path (text), created_at (int8), etc.
"""
if not supabase: return
try:
data = {
"job_id": str(entry.get("uuid")),
"original_filename": entry.get("original_filename"),
"report_path": entry.get("report"), # This is the BUCKET PATH (e.g., jobs/123/report.md)
"type": entry.get("type", "single"), # 'single' or 'batch'
"created_at": int(entry.get("created_at", time.time())),
"expires_at": int(entry.get("expires_at", time.time() + 86400))
}
supabase.table("job_metadata").insert(data).execute()
except Exception as e:
print(f"Error writing metadata to Supabase: {e}")
def read_metadata(limit: int = 100) -> List[Dict]:
"""
Fetch recent metadata entries from Supabase 'job_metadata' table.
"""
if not supabase: return []
try:
response = supabase.table("job_metadata")\
.select("*")\
.order("created_at", desc=True)\
.limit(limit)\
.execute()
results = []
for row in response.data:
results.append({
"uuid": row["job_id"],
"original_filename": row["original_filename"],
"report": row["report_path"],
"type": row.get("type", "single"),
"created_at": row["created_at"],
"expires_at": row["expires_at"]
})
return results
except Exception as e:
print(f"Error reading metadata from Supabase: {e}")
return []
def get_job_by_filename(filename: str) -> Optional[Dict]:
"""
Check if a file with this name has already been processed.
Used for duplicate detection.
"""
if not supabase: return None
try:
response = supabase.table("job_metadata")\
.select("*")\
.eq("original_filename", filename)\
.limit(1)\
.execute()
if response.data and len(response.data) > 0:
row = response.data[0]
return {
"uuid": row["job_id"],
"original_filename": row["original_filename"],
"report": row["report_path"],
"created_at": row["created_at"]
}
except Exception as e:
print(f"Error checking duplicate for {filename}: {e}")
return None
def list_all_jobs(limit: int = 100) -> Dict[str, List[str]]:
"""
Aggregates metadata to return simple lists of PDFs and Reports.
Used by the /files/list endpoint.
"""
try:
data = read_metadata(limit)
# Extract unique filenames and report paths
pdf_files = [item["original_filename"] for item in data if item.get("original_filename")]
# We only want to list reports that actually exist
md_files = [item["original_filename"].replace(".pdf", "_report.md") for item in data if item.get("report")]
# Alternatively, if you want the filenames of the reports themselves (e.g. for display)
# You might just want to return the raw list of jobs to the frontend
# but to match your requested API structure:
return {
"pdf_files": pdf_files,
"md_files": md_files,
"full_data": data
}
except Exception as e:
print(f"Error listing jobs: {e}")
return {"pdf_files": [], "md_files": []}
# --- BACKGROUND TASKS ---
def cleanup_expired_reports(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60):
"""
Periodically deletes expired entries from the DB.
"""
if not supabase: return
while True:
try:
now = int(time.time())
supabase.table("job_metadata").delete().lt("expires_at", now).execute()
except Exception:
pass
time.sleep(interval_seconds)
_cleanup_thread_started = False
def start_cleanup_thread(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60):
global _cleanup_thread_started
if _cleanup_thread_started:
return
import threading as _th
t = _th.Thread(target=cleanup_expired_reports, args=(retention_seconds, interval_seconds), daemon=True)
t.start()
_cleanup_thread_started = True |