iris_backend / backend /src /extraction /job_extractor.py
Saandraahh's picture
Pushing to IRIS_latest repository
ad01d65
import os
import re
import json
import time
from dotenv import load_dotenv
from typing import Any, Dict, List, Optional
from google import genai
from google.genai import types
from supabase import create_client
from src.embeddings.job_embed import safe_generate_and_store_job_embeddings
# ------------------ CONFIGURATION ------------------
RAW_DIR = "data/jobs/raw"
PROCESSED_DIR = "data/jobs/entities"
# ------------------ SETUP ------------------
load_dotenv()
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if GEMINI_API_KEY:
try:
client = genai.Client(api_key=GEMINI_API_KEY)
except Exception as e:
client = None
print(f"⚠️ Failed to initialize Gemini client: {e}")
else:
client = None
print("⚠️ GEMINI_API_KEY not set; extraction will be disabled.")
def clean_text(text: str) -> str:
text = re.sub(r"<.*?>", " ", text)
text = re.sub(r"[^\x00-\x7F]+", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def extract_job_entities_gemini(text: str) -> Dict[str, Any]:
cleaned_text = clean_text(text)
system_prompt = """
You are an intelligent information extractor specialized in job descriptions.
Your task is to extract ONLY what is explicitly mentioned and categorize them into the following JSON structure.
Output JSON Schema:
{
"skills": ["List of soft skills, general competencies..."],
"technical_skills": ["List of technical skills, programming languages, tools..."],
"qualification": ["List of educational qualifications..."],
"work_experience": ["List of work experience requirements..."],
"preferred_skills": ["List of preferred/nice-to-have skills..."]
}
Rules:
- Extract exact text as it appears.
- Do NOT infer or add anything not stated.
- If no data for a category, return an empty list [].
- Output MUST be valid JSON.
"""
if client is None:
print("❌ Extraction disabled (no Client).")
return {}
max_retries = 3
for attempt in range(max_retries):
try:
response = client.models.generate_content(
model="gemini-2.5-flash-lite",
contents=system_prompt + "\n\nJOB DESCRIPTION:\n" + cleaned_text,
config=types.GenerateContentConfig(
temperature=0.1,
response_mime_type="application/json"
)
)
extracted_text = response.text.strip()
# Clean potential markdown fences if present (though response_mime_type usually handles it)
if extracted_text.startswith("```json"):
extracted_text = extracted_text[7:]
if extracted_text.startswith("```"):
extracted_text = extracted_text[3:]
if extracted_text.endswith("```"):
extracted_text = extracted_text[:-3]
return json.loads(extracted_text)
except Exception as e:
error_str = str(e)
if "503" in error_str or "overloaded" in error_str.lower():
wait_time = 2 ** (attempt + 1)
print(f"⚠️ Model overloaded. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"❌ Gemini Extraction failed: {e}")
return {}
return {}
def upsert_job_entities(sb, job_id: str, experience_level: str, data: Dict[str, Any]) -> None:
"""
Upserts the extracted entities into the jobs_entities table.
"""
payload = {
"job_id": job_id,
"experience_level": experience_level,
"skills": data.get("skills", []),
"technical_skills": data.get("technical_skills", []),
"qualification": data.get("qualification", []),
"work_experience": data.get("work_experience", []),
"preferred_skills": data.get("preferred_skills", []),
"updated_at": "now()"
}
try:
sb.table("jobs_entities").upsert(payload).execute()
print(f"✅ Database updated for Job ID: {job_id}")
# ⚠️ NEW: Generate Embeddings
try:
safe_generate_and_store_job_embeddings(sb, job_id)
except Exception as e:
print(f"⚠️ Job embedding generation failed (non-critical): {e}")
except Exception as e:
print(f"❌ DB Upsert Error for {job_id}: {e}")
def process_single_job(sb, job_id: str, description: str, experience_level: str = None) -> None:
"""
Processes a single job: extracts entities and upserts to DB.
"""
if not description or not description.strip():
print(f"⚠️ Skipping empty description for job {job_id}")
return
print(f"🔍 Processing Job ID: {job_id}")
extracted_data = extract_job_entities_gemini(description)
if not extracted_data:
print("⚠️ No entities extracted.")
return
upsert_job_entities(sb, job_id, experience_level, extracted_data)
def process_jobs_from_db() -> None:
if not SUPABASE_URL or not SUPABASE_KEY:
print("⚠️ SUPABASE_URL or SUPABASE_KEY not set; skipping job fetch")
return
try:
sb = create_client(SUPABASE_URL, SUPABASE_KEY)
except Exception as e:
print(f"⚠️ Failed to create Supabase client: {e}")
return
# Fetch jobs from 'jobs' table
try:
resp = sb.table("jobs").select("id, description, experience_level").execute()
except Exception as e:
print(f"⚠️ Supabase query failed: {e}")
return
data = resp.data if hasattr(resp, "data") else []
if not data:
print("⚠️ No job descriptions returned from Supabase.")
return
print(f"found {len(data)} jobs to process.")
os.makedirs(PROCESSED_DIR, exist_ok=True)
for row in data:
job_id = row.get("id")
desc = row.get("description") or ""
process_single_job(sb, job_id, desc, experience_level)
if __name__ == '__main__':
print("🧪 Starting job entity extraction (DB -> Gemini -> DB)...\n")
process_jobs_from_db()
print("\n🎯 All jobs processed.")