Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| from dotenv import load_dotenv | |
| from typing import Any, Dict, List, Optional | |
| from google import genai | |
| from google.genai import types | |
| from supabase import create_client | |
| from src.embeddings.job_embed import safe_generate_and_store_job_embeddings | |
| # ------------------ CONFIGURATION ------------------ | |
| RAW_DIR = "data/jobs/raw" | |
| PROCESSED_DIR = "data/jobs/entities" | |
| # ------------------ SETUP ------------------ | |
| load_dotenv() | |
| SUPABASE_URL = os.environ.get("SUPABASE_URL") | |
| SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY") | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| if GEMINI_API_KEY: | |
| try: | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| except Exception as e: | |
| client = None | |
| print(f"⚠️ Failed to initialize Gemini client: {e}") | |
| else: | |
| client = None | |
| print("⚠️ GEMINI_API_KEY not set; extraction will be disabled.") | |
| def clean_text(text: str) -> str: | |
| text = re.sub(r"<.*?>", " ", text) | |
| text = re.sub(r"[^\x00-\x7F]+", " ", text) | |
| text = re.sub(r"\s+", " ", text) | |
| return text.strip() | |
| def extract_job_entities_gemini(text: str) -> Dict[str, Any]: | |
| cleaned_text = clean_text(text) | |
| system_prompt = """ | |
| You are an intelligent information extractor specialized in job descriptions. | |
| Your task is to extract ONLY what is explicitly mentioned and categorize them into the following JSON structure. | |
| Output JSON Schema: | |
| { | |
| "skills": ["List of soft skills, general competencies..."], | |
| "technical_skills": ["List of technical skills, programming languages, tools..."], | |
| "qualification": ["List of educational qualifications..."], | |
| "work_experience": ["List of work experience requirements..."], | |
| "preferred_skills": ["List of preferred/nice-to-have skills..."] | |
| } | |
| Rules: | |
| - Extract exact text as it appears. | |
| - Do NOT infer or add anything not stated. | |
| - If no data for a category, return an empty list []. | |
| - Output MUST be valid JSON. | |
| """ | |
| if client is None: | |
| print("❌ Extraction disabled (no Client).") | |
| return {} | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| response = client.models.generate_content( | |
| model="gemini-2.5-flash-lite", | |
| contents=system_prompt + "\n\nJOB DESCRIPTION:\n" + cleaned_text, | |
| config=types.GenerateContentConfig( | |
| temperature=0.1, | |
| response_mime_type="application/json" | |
| ) | |
| ) | |
| extracted_text = response.text.strip() | |
| # Clean potential markdown fences if present (though response_mime_type usually handles it) | |
| if extracted_text.startswith("```json"): | |
| extracted_text = extracted_text[7:] | |
| if extracted_text.startswith("```"): | |
| extracted_text = extracted_text[3:] | |
| if extracted_text.endswith("```"): | |
| extracted_text = extracted_text[:-3] | |
| return json.loads(extracted_text) | |
| except Exception as e: | |
| error_str = str(e) | |
| if "503" in error_str or "overloaded" in error_str.lower(): | |
| wait_time = 2 ** (attempt + 1) | |
| print(f"⚠️ Model overloaded. Retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| else: | |
| print(f"❌ Gemini Extraction failed: {e}") | |
| return {} | |
| return {} | |
| def upsert_job_entities(sb, job_id: str, experience_level: str, data: Dict[str, Any]) -> None: | |
| """ | |
| Upserts the extracted entities into the jobs_entities table. | |
| """ | |
| payload = { | |
| "job_id": job_id, | |
| "experience_level": experience_level, | |
| "skills": data.get("skills", []), | |
| "technical_skills": data.get("technical_skills", []), | |
| "qualification": data.get("qualification", []), | |
| "work_experience": data.get("work_experience", []), | |
| "preferred_skills": data.get("preferred_skills", []), | |
| "updated_at": "now()" | |
| } | |
| try: | |
| sb.table("jobs_entities").upsert(payload).execute() | |
| print(f"✅ Database updated for Job ID: {job_id}") | |
| # ⚠️ NEW: Generate Embeddings | |
| try: | |
| safe_generate_and_store_job_embeddings(sb, job_id) | |
| except Exception as e: | |
| print(f"⚠️ Job embedding generation failed (non-critical): {e}") | |
| except Exception as e: | |
| print(f"❌ DB Upsert Error for {job_id}: {e}") | |
| def process_single_job(sb, job_id: str, description: str, experience_level: str = None) -> None: | |
| """ | |
| Processes a single job: extracts entities and upserts to DB. | |
| """ | |
| if not description or not description.strip(): | |
| print(f"⚠️ Skipping empty description for job {job_id}") | |
| return | |
| print(f"🔍 Processing Job ID: {job_id}") | |
| extracted_data = extract_job_entities_gemini(description) | |
| if not extracted_data: | |
| print("⚠️ No entities extracted.") | |
| return | |
| upsert_job_entities(sb, job_id, experience_level, extracted_data) | |
| def process_jobs_from_db() -> None: | |
| if not SUPABASE_URL or not SUPABASE_KEY: | |
| print("⚠️ SUPABASE_URL or SUPABASE_KEY not set; skipping job fetch") | |
| return | |
| try: | |
| sb = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| except Exception as e: | |
| print(f"⚠️ Failed to create Supabase client: {e}") | |
| return | |
| # Fetch jobs from 'jobs' table | |
| try: | |
| resp = sb.table("jobs").select("id, description, experience_level").execute() | |
| except Exception as e: | |
| print(f"⚠️ Supabase query failed: {e}") | |
| return | |
| data = resp.data if hasattr(resp, "data") else [] | |
| if not data: | |
| print("⚠️ No job descriptions returned from Supabase.") | |
| return | |
| print(f"found {len(data)} jobs to process.") | |
| os.makedirs(PROCESSED_DIR, exist_ok=True) | |
| for row in data: | |
| job_id = row.get("id") | |
| desc = row.get("description") or "" | |
| process_single_job(sb, job_id, desc, experience_level) | |
| if __name__ == '__main__': | |
| print("🧪 Starting job entity extraction (DB -> Gemini -> DB)...\n") | |
| process_jobs_from_db() | |
| print("\n🎯 All jobs processed.") |