Spaces:
Sleeping
Sleeping
| """ | |
| supabase_ingest.py | |
| - Recursively downloads resumes from Supabase Storage | |
| - Extractor: Uses Google Gemini (via src.preprocess.person_details_extraction_gemini) | |
| - Database: Maps JSON to 'profiles' table columns | |
| - Updates: Uses User ID (id) as the unique key to prevent errors | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import hashlib | |
| from typing import List, Dict, Any | |
| from dotenv import load_dotenv | |
| from supabase import create_client | |
| # ✅ CORRECT IMPORT based on your file structure | |
| from src.extraction.person_details_extraction_gemini import process_single_resume | |
| from src.embeddings.local_embedder import safe_generate_and_store_embeddings | |
| # --------------------------------------------------------------------- | |
| # ENV SETUP | |
| # --------------------------------------------------------------------- | |
| from pathlib import Path | |
| # Explicitly load .env from the backend directory | |
| env_path = Path(__file__).resolve().parent / ".env" | |
| load_dotenv(dotenv_path=env_path) | |
| # ENV SETUP | |
| # --------------------------------------------------------------------- | |
| from pathlib import Path | |
| # Load env safely | |
| env_path = Path(__file__).resolve().parent / ".env" | |
| load_dotenv(dotenv_path=env_path) | |
| SUPABASE_URL = os.environ.get("SUPABASE_URL") | |
| SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY") | |
| # WARNING: Only raise error if running as MAIN script. | |
| # If imported as a library, allow it to pass (API provides its own client). | |
| client = None | |
| if SUPABASE_URL and SUPABASE_KEY: | |
| if not SUPABASE_URL.endswith("/"): | |
| SUPABASE_URL += "/" | |
| try: | |
| client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| except Exception as e: | |
| print(f"⚠️ Warning: Failed to create global Supabase client: {e}") | |
| else: | |
| print("⚠️ Warning: Supabase Credentials not found in environment. Only library functions will fail if called without a client.") | |
| ALLOWED_EXTENSIONS = {".pdf", ".docx", ".doc"} | |
| # --------------------------------------------------------------------- | |
| # UTILS | |
| # --------------------------------------------------------------------- | |
| def ensure_dir(path: str) -> None: | |
| os.makedirs(path, exist_ok=True) | |
| def compute_file_hash(path: str) -> str: | |
| h = hashlib.sha256() | |
| with open(path, "rb") as f: | |
| for chunk in iter(lambda: f.read(8192), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def extract_user_id(object_path: str) -> str: | |
| # Assumes folder structure: user_id/filename.pdf | |
| return object_path.split("/", 1)[0] | |
| # --------------------------------------------------------------------- | |
| # SUPABASE STORAGE HELPERS | |
| # --------------------------------------------------------------------- | |
| def list_all_objects(client, bucket: str, prefix: str = "") -> List[str]: | |
| storage = client.storage.from_(bucket) | |
| results = [] | |
| resp = storage.list(prefix) or [] | |
| for item in resp: | |
| name = item.get("name") | |
| if not name: | |
| continue | |
| full_path = f"{prefix}/{name}".strip("/") | |
| # If it has 'id', it's a file. If not, it's a folder. | |
| if item.get("id") and item.get("metadata"): | |
| results.append(full_path) | |
| else: | |
| results.extend(list_all_objects(client, bucket, full_path)) | |
| return results | |
| def download_object(client, bucket: str, object_path: str, dest_root: str) -> str: | |
| storage = client.storage.from_(bucket) | |
| data = storage.download(object_path) | |
| local_path = os.path.join(dest_root, object_path.replace("/", os.sep)) | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| with open(local_path, "wb") as f: | |
| f.write(data) | |
| return local_path | |
| # --------------------------------------------------------------------- | |
| # DATABASE & MAPPING HELPERS (The Logic Core) | |
| # --------------------------------------------------------------------- | |
| def is_resume_processed(client, user_id: str, file_hash: str) -> bool: | |
| """ | |
| Checks if this specific user already has a processed resume with this hash. | |
| """ | |
| resp = ( | |
| client.table("profiles") | |
| .select("id") | |
| .eq("id", user_id) | |
| .eq("file_hash", file_hash) | |
| .eq("processed", True) | |
| .execute() | |
| ) | |
| return bool(resp.data) | |
| def build_resume_payload(user_id: str, extracted: Dict[str, Any], resume_path: str, file_hash: str) -> Dict[str, Any]: | |
| """ | |
| Translates Gemini JSON keys to Supabase 'profiles' table columns. | |
| """ | |
| # 1. Base Payload | |
| payload = { | |
| "id": user_id, | |
| "resume_url": resume_path, | |
| "file_hash": file_hash, | |
| "processed": True, | |
| "updated_at": "now()", | |
| } | |
| # 2. Mapping Dictionary (Gemini JSON Key -> DB Column Name) | |
| FIELD_MAP = { | |
| # Identity | |
| "full_name": "full_name", | |
| "role": "role", | |
| "headline": "headline", | |
| "summary": "summary", | |
| # Contact & Socials (Crucial Mismatches Fixed Here) | |
| "phone": "phone", | |
| "email": "email", | |
| "linkedin": "linkedin", | |
| "github": "github", | |
| "portfolio": "portfolio", | |
| # Arrays & JSONB | |
| "skills": "skills", | |
| "technical_skills": "technical_skills", | |
| "education": "education", | |
| "current_position": "current_position", | |
| # Experience | |
| # Experience | |
| "work_experience": "work_experience", | |
| "experience_years": "experience_years", | |
| # Extra | |
| "certifications": "certifications", | |
| "languages": "languages", | |
| "projects": "projects", | |
| } | |
| # 3. Dynamic Mapping | |
| for json_key, db_col in FIELD_MAP.items(): | |
| val = extracted.get(json_key) | |
| # Only update if value is meaningful (not None or empty) | |
| if val not in (None, "", [], {}): | |
| # SPECIAL HANDLING: Convert Lists to Comma-Separated Strings for specific 'text' columns | |
| if json_key in ["certifications", "languages", "technical_skills"] and isinstance(val, list): | |
| val = ", ".join(val) | |
| payload[db_col] = val | |
| return payload | |
| def upsert_profile(client, payload: Dict[str, Any]): | |
| """ | |
| Updates the profile for the user using 'id' as the key. | |
| """ | |
| try: | |
| # ✅ FIX: on_conflict='id' ensures we update the specific User row | |
| # instead of failing on duplicate file_hashes. | |
| client.table("profiles").upsert( | |
| payload, | |
| on_conflict="id" | |
| ).execute() | |
| print(f"✅ Database updated for User ID: {payload['id']}") | |
| except Exception as e: | |
| print(f"❌ DB Upsert Error for {payload['id']}: {e}") | |
| raise e | |
| # --------------------------------------------------------------------- | |
| # UNIFIED PROCESSING FUNCTION (Called by API and Main) | |
| # --------------------------------------------------------------------- | |
| def process_resume(client, user_id: str, file_path: str, bucket: str = "resume", temp_dir: str = "data/resumes/raw") -> Dict[str, Any]: | |
| """ | |
| Downloads, extracts, and upserts a resume. | |
| Used by both the API (real-time) and the main script (batch). | |
| """ | |
| try: | |
| # 1. Download | |
| print(f"⬇️ Downloading {file_path} from bucket '{bucket}'...") | |
| local_path = download_object(client, bucket, file_path, temp_dir) | |
| # 2. Extract | |
| print("🧠 Sending to Gemini...") | |
| extracted_data = process_single_resume(local_path) | |
| if not extracted_data: | |
| raise ValueError("Gemini returned empty data") | |
| # 3. Hash | |
| file_hash = compute_file_hash(local_path) | |
| # 4. Payload & Upsert | |
| payload = build_resume_payload(user_id, extracted_data, file_path, file_hash) | |
| upsert_profile(client, payload) | |
| # 5. Generate Embeddings | |
| try: | |
| safe_generate_and_store_embeddings(client, user_id) | |
| except Exception as e: | |
| print(f"⚠️ Embedding generation failed (non-critical): {e}") | |
| # 6. Cleanup | |
| if os.path.exists(local_path): | |
| os.remove(local_path) | |
| return extracted_data | |
| except Exception as e: | |
| print(f"❌ Error processing resume {file_path}: {e}") | |
| raise e | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--bucket", default="resume") | |
| parser.add_argument("--prefix", default="") | |
| parser.add_argument("--dest", default="data/resumes/raw") | |
| args = parser.parse_args() | |
| ensure_dir(args.dest) | |
| print(f"🔍 Scanning bucket '{args.bucket}'...") | |
| objects = list_all_objects(client, args.bucket, args.prefix) | |
| if not objects: | |
| print("⚠️ No resumes found in Supabase storage.") | |
| return | |
| print(f"found {len(objects)} files.") | |
| for obj in objects: | |
| # 1. Filter Extensions | |
| if os.path.splitext(obj)[1].lower() not in ALLOWED_EXTENSIONS: | |
| continue | |
| user_id = extract_user_id(obj) | |
| print(f"\n⬇️ Processing User: {user_id} | File: {obj}") | |
| # 2. Download | |
| local_path = download_object(client, args.bucket, obj, args.dest) | |
| # 3. Hash Check (Save Money) | |
| current_hash = compute_file_hash(local_path) | |
| if is_resume_processed(client, user_id, current_hash): | |
| print(" ⏭️ Skipped: Resume already processed and unchanged.") | |
| continue | |
| # 4. Extract (Gemini) | |
| print(" 🧠 Sending to Gemini for extraction...") | |
| try: | |
| # ✅ CALLING THE HELPER FUNCTION CORRECTLY | |
| extracted_data = process_single_resume(local_path) | |
| print(extracted_data) | |
| if not extracted_data: | |
| print(" ⚠️ Gemini returned no data. Skipping DB update.") | |
| continue | |
| # 5. Build Payload (Map Keys) | |
| payload = build_resume_payload(user_id, extracted_data, obj, current_hash) | |
| # 6. Upsert to DB | |
| upsert_profile(client, payload) | |
| # 7. Generate Embeddings | |
| try: | |
| safe_generate_and_store_embeddings(client, user_id) | |
| except Exception as e: | |
| print(f" ⚠️ Embedding generation failed (non-critical): {e}") | |
| # 8. Cleanup | |
| if os.path.exists(local_path): | |
| os.remove(local_path) | |
| print(" 🗑️ Cleaned up temporary file.") | |
| except Exception as e: | |
| print(f" ❌ Pipeline failed for this file: {e}") | |
| if __name__ == "__main__": | |
| main() |