iris_backend / backend /supabase_ingest.py
Saandraahh's picture
Implemented clustering
4b3a33f
"""
supabase_ingest.py
- Recursively downloads resumes from Supabase Storage
- Extractor: Uses Google Gemini (via src.preprocess.person_details_extraction_gemini)
- Database: Maps JSON to 'profiles' table columns
- Updates: Uses User ID (id) as the unique key to prevent errors
"""
from __future__ import annotations
import argparse
import os
import hashlib
from typing import List, Dict, Any
from dotenv import load_dotenv
from supabase import create_client
# ✅ CORRECT IMPORT based on your file structure
from src.extraction.person_details_extraction_gemini import process_single_resume
from src.embeddings.local_embedder import safe_generate_and_store_embeddings
# ---------------------------------------------------------------------
# ENV SETUP
# ---------------------------------------------------------------------
from pathlib import Path
# Explicitly load .env from the backend directory
env_path = Path(__file__).resolve().parent / ".env"
load_dotenv(dotenv_path=env_path)
# ENV SETUP
# ---------------------------------------------------------------------
from pathlib import Path
# Load env safely
env_path = Path(__file__).resolve().parent / ".env"
load_dotenv(dotenv_path=env_path)
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY")
# WARNING: Only raise error if running as MAIN script.
# If imported as a library, allow it to pass (API provides its own client).
client = None
if SUPABASE_URL and SUPABASE_KEY:
if not SUPABASE_URL.endswith("/"):
SUPABASE_URL += "/"
try:
client = create_client(SUPABASE_URL, SUPABASE_KEY)
except Exception as e:
print(f"⚠️ Warning: Failed to create global Supabase client: {e}")
else:
print("⚠️ Warning: Supabase Credentials not found in environment. Only library functions will fail if called without a client.")
ALLOWED_EXTENSIONS = {".pdf", ".docx", ".doc"}
# ---------------------------------------------------------------------
# UTILS
# ---------------------------------------------------------------------
def ensure_dir(path: str) -> None:
os.makedirs(path, exist_ok=True)
def compute_file_hash(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def extract_user_id(object_path: str) -> str:
# Assumes folder structure: user_id/filename.pdf
return object_path.split("/", 1)[0]
# ---------------------------------------------------------------------
# SUPABASE STORAGE HELPERS
# ---------------------------------------------------------------------
def list_all_objects(client, bucket: str, prefix: str = "") -> List[str]:
storage = client.storage.from_(bucket)
results = []
resp = storage.list(prefix) or []
for item in resp:
name = item.get("name")
if not name:
continue
full_path = f"{prefix}/{name}".strip("/")
# If it has 'id', it's a file. If not, it's a folder.
if item.get("id") and item.get("metadata"):
results.append(full_path)
else:
results.extend(list_all_objects(client, bucket, full_path))
return results
def download_object(client, bucket: str, object_path: str, dest_root: str) -> str:
storage = client.storage.from_(bucket)
data = storage.download(object_path)
local_path = os.path.join(dest_root, object_path.replace("/", os.sep))
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as f:
f.write(data)
return local_path
# ---------------------------------------------------------------------
# DATABASE & MAPPING HELPERS (The Logic Core)
# ---------------------------------------------------------------------
def is_resume_processed(client, user_id: str, file_hash: str) -> bool:
"""
Checks if this specific user already has a processed resume with this hash.
"""
resp = (
client.table("profiles")
.select("id")
.eq("id", user_id)
.eq("file_hash", file_hash)
.eq("processed", True)
.execute()
)
return bool(resp.data)
def build_resume_payload(user_id: str, extracted: Dict[str, Any], resume_path: str, file_hash: str) -> Dict[str, Any]:
"""
Translates Gemini JSON keys to Supabase 'profiles' table columns.
"""
# 1. Base Payload
payload = {
"id": user_id,
"resume_url": resume_path,
"file_hash": file_hash,
"processed": True,
"updated_at": "now()",
}
# 2. Mapping Dictionary (Gemini JSON Key -> DB Column Name)
FIELD_MAP = {
# Identity
"full_name": "full_name",
"role": "role",
"headline": "headline",
"summary": "summary",
# Contact & Socials (Crucial Mismatches Fixed Here)
"phone": "phone",
"email": "email",
"linkedin": "linkedin",
"github": "github",
"portfolio": "portfolio",
# Arrays & JSONB
"skills": "skills",
"technical_skills": "technical_skills",
"education": "education",
"current_position": "current_position",
# Experience
# Experience
"work_experience": "work_experience",
"experience_years": "experience_years",
# Extra
"certifications": "certifications",
"languages": "languages",
"projects": "projects",
}
# 3. Dynamic Mapping
for json_key, db_col in FIELD_MAP.items():
val = extracted.get(json_key)
# Only update if value is meaningful (not None or empty)
if val not in (None, "", [], {}):
# SPECIAL HANDLING: Convert Lists to Comma-Separated Strings for specific 'text' columns
if json_key in ["certifications", "languages", "technical_skills"] and isinstance(val, list):
val = ", ".join(val)
payload[db_col] = val
return payload
def upsert_profile(client, payload: Dict[str, Any]):
"""
Updates the profile for the user using 'id' as the key.
"""
try:
# ✅ FIX: on_conflict='id' ensures we update the specific User row
# instead of failing on duplicate file_hashes.
client.table("profiles").upsert(
payload,
on_conflict="id"
).execute()
print(f"✅ Database updated for User ID: {payload['id']}")
except Exception as e:
print(f"❌ DB Upsert Error for {payload['id']}: {e}")
raise e
# ---------------------------------------------------------------------
# UNIFIED PROCESSING FUNCTION (Called by API and Main)
# ---------------------------------------------------------------------
def process_resume(client, user_id: str, file_path: str, bucket: str = "resume", temp_dir: str = "data/resumes/raw") -> Dict[str, Any]:
"""
Downloads, extracts, and upserts a resume.
Used by both the API (real-time) and the main script (batch).
"""
try:
# 1. Download
print(f"⬇️ Downloading {file_path} from bucket '{bucket}'...")
local_path = download_object(client, bucket, file_path, temp_dir)
# 2. Extract
print("🧠 Sending to Gemini...")
extracted_data = process_single_resume(local_path)
if not extracted_data:
raise ValueError("Gemini returned empty data")
# 3. Hash
file_hash = compute_file_hash(local_path)
# 4. Payload & Upsert
payload = build_resume_payload(user_id, extracted_data, file_path, file_hash)
upsert_profile(client, payload)
# 5. Generate Embeddings
try:
safe_generate_and_store_embeddings(client, user_id)
except Exception as e:
print(f"⚠️ Embedding generation failed (non-critical): {e}")
# 6. Cleanup
if os.path.exists(local_path):
os.remove(local_path)
return extracted_data
except Exception as e:
print(f"❌ Error processing resume {file_path}: {e}")
raise e
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", default="resume")
parser.add_argument("--prefix", default="")
parser.add_argument("--dest", default="data/resumes/raw")
args = parser.parse_args()
ensure_dir(args.dest)
print(f"🔍 Scanning bucket '{args.bucket}'...")
objects = list_all_objects(client, args.bucket, args.prefix)
if not objects:
print("⚠️ No resumes found in Supabase storage.")
return
print(f"found {len(objects)} files.")
for obj in objects:
# 1. Filter Extensions
if os.path.splitext(obj)[1].lower() not in ALLOWED_EXTENSIONS:
continue
user_id = extract_user_id(obj)
print(f"\n⬇️ Processing User: {user_id} | File: {obj}")
# 2. Download
local_path = download_object(client, args.bucket, obj, args.dest)
# 3. Hash Check (Save Money)
current_hash = compute_file_hash(local_path)
if is_resume_processed(client, user_id, current_hash):
print(" ⏭️ Skipped: Resume already processed and unchanged.")
continue
# 4. Extract (Gemini)
print(" 🧠 Sending to Gemini for extraction...")
try:
# ✅ CALLING THE HELPER FUNCTION CORRECTLY
extracted_data = process_single_resume(local_path)
print(extracted_data)
if not extracted_data:
print(" ⚠️ Gemini returned no data. Skipping DB update.")
continue
# 5. Build Payload (Map Keys)
payload = build_resume_payload(user_id, extracted_data, obj, current_hash)
# 6. Upsert to DB
upsert_profile(client, payload)
# 7. Generate Embeddings
try:
safe_generate_and_store_embeddings(client, user_id)
except Exception as e:
print(f" ⚠️ Embedding generation failed (non-critical): {e}")
# 8. Cleanup
if os.path.exists(local_path):
os.remove(local_path)
print(" 🗑️ Cleaned up temporary file.")
except Exception as e:
print(f" ❌ Pipeline failed for this file: {e}")
if __name__ == "__main__":
main()