File size: 7,293 Bytes
d2654d6
 
9f61641
383c7b0
a2b18d6
 
 
d2654d6
383c7b0
a2b18d6
383c7b0
 
 
 
 
 
 
 
 
 
a2b18d6
 
d2654d6
 
 
383c7b0
 
 
d2654d6
383c7b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d2654d6
383c7b0
a2b18d6
 
383c7b0
 
 
 
 
a2b18d6
 
383c7b0
a2b18d6
 
 
 
 
 
383c7b0
 
a2b18d6
 
 
383c7b0
 
 
 
a2b18d6
383c7b0
a2b18d6
 
383c7b0
a2b18d6
 
383c7b0
 
d2654d6
 
383c7b0
 
 
 
 
a2b18d6
 
 
 
383c7b0
 
a2b18d6
 
 
 
 
 
 
 
383c7b0
 
 
 
a2b18d6
 
 
 
 
 
 
 
 
 
 
 
 
383c7b0
a2b18d6
 
 
 
 
 
 
 
906c82a
383c7b0
 
 
 
 
906c82a
 
 
 
 
 
 
 
 
 
 
 
 
383c7b0
906c82a
 
383c7b0
906c82a
 
 
383c7b0
 
 
 
906c82a
 
383c7b0
 
 
 
 
 
 
 
 
 
 
906c82a
 
 
383c7b0
906c82a
 
383c7b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import os
import tempfile
import time
import shutil
from typing import Tuple, List, Dict, Optional
from supabase import create_client, Client
from .core import config as core_config

# --- CONFIGURATION ---
_cfg = core_config.load_config()

# Initialize Supabase Client
# Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file
try:
    supabase: Client = create_client(_cfg["SUPABASE_URL"], _cfg["SUPABASE_KEY"])
    SUPABASE_BUCKET = _cfg["STORAGE_BUCKET"]
except Exception as e:
    print(f"CRITICAL: Supabase client failed to initialize. Check .env. Error: {e}")
    supabase = None
    SUPABASE_BUCKET = "documents" # Fallback

# Local data dir for temp processing only
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "data"))
os.makedirs(DATA_DIR, exist_ok=True)


# --- LOCAL FILE HELPERS ---

def save_upload_file_tmp(upload_file) -> Tuple[str, str]:
    """
    Save a FastAPI UploadFile to a temporary file on disk.
    Returns: (absolute_path_to_temp_file, original_filename)
    """
    try:
        suffix = os.path.splitext(upload_file.filename)[1]
        fd, tmp_path = tempfile.mkstemp(suffix=suffix, dir=DATA_DIR)
        
        # Close the file descriptor immediately so we can use the path safely
        os.close(fd)
        
        with open(tmp_path, "wb") as out:
            shutil.copyfileobj(upload_file.file, out)
            
        return tmp_path, upload_file.filename
    except Exception as e:
        print(f"Error saving temp file: {e}")
        raise e


# --- SUPABASE STORAGE HELPERS ---

def upload_file_to_bucket(file_path: str, destination_path: str) -> str:
    """
    Uploads a local file to Supabase Storage.
    Returns the destination path on success, or empty string on failure.
    """
    if not supabase: return ""
    try:
        with open(file_path, 'rb') as f:
            supabase.storage.from_(SUPABASE_BUCKET).upload(
                path=destination_path,
                file=f,
                file_options={"content-type": "application/octet-stream", "upsert": "true"}
            )
        return destination_path
    except Exception as e:
        print(f"Supabase Upload failed for {destination_path}: {e}")
        # We return "" so the pipeline knows upload failed but doesn't crash strictly
        return ""

def get_signed_url(bucket_path: str, expiry_seconds=3600) -> Optional[str]:
    """
    Generates a secure, temporary download link for a file in Supabase Storage.
    """
    if not supabase: return None
    try:
        res = supabase.storage.from_(SUPABASE_BUCKET).create_signed_url(bucket_path, expiry_seconds)
        return res.get("signedURL")
    except Exception as e:
        print(f"Failed to generate signed URL for {bucket_path}: {e}")
        return None


# --- SUPABASE DATABASE HELPERS ---

def append_metadata_entry(entry: dict):
    """
    Insert a new job entry into the 'job_metadata' table.
    Schema expected: job_id (uuid), original_filename (text), report_path (text), created_at (int8), etc.
    """
    if not supabase: return
    try:
        data = {
            "job_id": str(entry.get("uuid")),
            "original_filename": entry.get("original_filename"),
            "report_path": entry.get("report"), # This is the BUCKET PATH (e.g., jobs/123/report.md)
            "type": entry.get("type", "single"), # 'single' or 'batch'
            "created_at": int(entry.get("created_at", time.time())),
            "expires_at": int(entry.get("expires_at", time.time() + 86400))
        }
        supabase.table("job_metadata").insert(data).execute()
    except Exception as e:
        print(f"Error writing metadata to Supabase: {e}")

def read_metadata(limit: int = 100) -> List[Dict]:
    """
    Fetch recent metadata entries from Supabase 'job_metadata' table.
    """
    if not supabase: return []
    try:
        response = supabase.table("job_metadata")\
            .select("*")\
            .order("created_at", desc=True)\
            .limit(limit)\
            .execute()
        
        results = []
        for row in response.data:
            results.append({
                "uuid": row["job_id"],
                "original_filename": row["original_filename"],
                "report": row["report_path"],
                "type": row.get("type", "single"),
                "created_at": row["created_at"],
                "expires_at": row["expires_at"]
            })
        return results
    except Exception as e:
        print(f"Error reading metadata from Supabase: {e}")
        return []

def get_job_by_filename(filename: str) -> Optional[Dict]:
    """
    Check if a file with this name has already been processed.
    Used for duplicate detection.
    """
    if not supabase: return None
    try:
        response = supabase.table("job_metadata")\
            .select("*")\
            .eq("original_filename", filename)\
            .limit(1)\
            .execute()
        
        if response.data and len(response.data) > 0:
            row = response.data[0]
            return {
                "uuid": row["job_id"],
                "original_filename": row["original_filename"],
                "report": row["report_path"],
                "created_at": row["created_at"]
            }
    except Exception as e:
        print(f"Error checking duplicate for {filename}: {e}")
    return None

def list_all_jobs(limit: int = 100) -> Dict[str, List[str]]:
    """
    Aggregates metadata to return simple lists of PDFs and Reports.
    Used by the /files/list endpoint.
    """
    try:
        data = read_metadata(limit)
        
        # Extract unique filenames and report paths
        pdf_files = [item["original_filename"] for item in data if item.get("original_filename")]
        
        # We only want to list reports that actually exist
        md_files = [item["original_filename"].replace(".pdf", "_report.md") for item in data if item.get("report")]
        
        # Alternatively, if you want the filenames of the reports themselves (e.g. for display)
        # You might just want to return the raw list of jobs to the frontend 
        # but to match your requested API structure:
        
        return {
            "pdf_files": pdf_files,
            "md_files": md_files,
            "full_data": data 
        }
    except Exception as e:
        print(f"Error listing jobs: {e}")
        return {"pdf_files": [], "md_files": []}

# --- BACKGROUND TASKS ---

def cleanup_expired_reports(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60):
    """
    Periodically deletes expired entries from the DB.
    """
    if not supabase: return
    while True:
        try:
            now = int(time.time())
            supabase.table("job_metadata").delete().lt("expires_at", now).execute()
        except Exception:
            pass
        time.sleep(interval_seconds)

_cleanup_thread_started = False

def start_cleanup_thread(retention_seconds: int = 24 * 3600, interval_seconds: int = 60 * 60):
    global _cleanup_thread_started
    if _cleanup_thread_started:
        return
    import threading as _th
    t = _th.Thread(target=cleanup_expired_reports, args=(retention_seconds, interval_seconds), daemon=True)
    t.start()
    _cleanup_thread_started = True