Spaces:
Sleeping
Sleeping
File size: 6,355 Bytes
ea9ca44 ad01d65 ea9ca44 ad01d65 ea9ca44 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | import os
import re
import json
import time
from dotenv import load_dotenv
from typing import Any, Dict, List, Optional
from google import genai
from google.genai import types
from supabase import create_client
from src.embeddings.job_embed import safe_generate_and_store_job_embeddings
# ------------------ CONFIGURATION ------------------
RAW_DIR = "data/jobs/raw"
PROCESSED_DIR = "data/jobs/entities"
# ------------------ SETUP ------------------
load_dotenv()
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_SERVICE_ROLE_KEY") or os.environ.get("SUPABASE_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if GEMINI_API_KEY:
try:
client = genai.Client(api_key=GEMINI_API_KEY)
except Exception as e:
client = None
print(f"⚠️ Failed to initialize Gemini client: {e}")
else:
client = None
print("⚠️ GEMINI_API_KEY not set; extraction will be disabled.")
def clean_text(text: str) -> str:
text = re.sub(r"<.*?>", " ", text)
text = re.sub(r"[^\x00-\x7F]+", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def extract_job_entities_gemini(text: str) -> Dict[str, Any]:
cleaned_text = clean_text(text)
system_prompt = """
You are an intelligent information extractor specialized in job descriptions.
Your task is to extract ONLY what is explicitly mentioned and categorize them into the following JSON structure.
Output JSON Schema:
{
"skills": ["List of soft skills, general competencies..."],
"technical_skills": ["List of technical skills, programming languages, tools..."],
"qualification": ["List of educational qualifications..."],
"work_experience": ["List of work experience requirements..."],
"preferred_skills": ["List of preferred/nice-to-have skills..."]
}
Rules:
- Extract exact text as it appears.
- Do NOT infer or add anything not stated.
- If no data for a category, return an empty list [].
- Output MUST be valid JSON.
"""
if client is None:
print("❌ Extraction disabled (no Client).")
return {}
max_retries = 3
for attempt in range(max_retries):
try:
response = client.models.generate_content(
model="gemini-2.5-flash-lite",
contents=system_prompt + "\n\nJOB DESCRIPTION:\n" + cleaned_text,
config=types.GenerateContentConfig(
temperature=0.1,
response_mime_type="application/json"
)
)
extracted_text = response.text.strip()
# Clean potential markdown fences if present (though response_mime_type usually handles it)
if extracted_text.startswith("```json"):
extracted_text = extracted_text[7:]
if extracted_text.startswith("```"):
extracted_text = extracted_text[3:]
if extracted_text.endswith("```"):
extracted_text = extracted_text[:-3]
return json.loads(extracted_text)
except Exception as e:
error_str = str(e)
if "503" in error_str or "overloaded" in error_str.lower():
wait_time = 2 ** (attempt + 1)
print(f"⚠️ Model overloaded. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"❌ Gemini Extraction failed: {e}")
return {}
return {}
def upsert_job_entities(sb, job_id: str, experience_level: str, data: Dict[str, Any]) -> None:
"""
Upserts the extracted entities into the jobs_entities table.
"""
payload = {
"job_id": job_id,
"experience_level": experience_level,
"skills": data.get("skills", []),
"technical_skills": data.get("technical_skills", []),
"qualification": data.get("qualification", []),
"work_experience": data.get("work_experience", []),
"preferred_skills": data.get("preferred_skills", []),
"updated_at": "now()"
}
try:
sb.table("jobs_entities").upsert(payload).execute()
print(f"✅ Database updated for Job ID: {job_id}")
# ⚠️ NEW: Generate Embeddings
try:
safe_generate_and_store_job_embeddings(sb, job_id)
except Exception as e:
print(f"⚠️ Job embedding generation failed (non-critical): {e}")
except Exception as e:
print(f"❌ DB Upsert Error for {job_id}: {e}")
def process_single_job(sb, job_id: str, description: str, experience_level: str = None) -> None:
"""
Processes a single job: extracts entities and upserts to DB.
"""
if not description or not description.strip():
print(f"⚠️ Skipping empty description for job {job_id}")
return
print(f"🔍 Processing Job ID: {job_id}")
extracted_data = extract_job_entities_gemini(description)
if not extracted_data:
print("⚠️ No entities extracted.")
return
upsert_job_entities(sb, job_id, experience_level, extracted_data)
def process_jobs_from_db() -> None:
if not SUPABASE_URL or not SUPABASE_KEY:
print("⚠️ SUPABASE_URL or SUPABASE_KEY not set; skipping job fetch")
return
try:
sb = create_client(SUPABASE_URL, SUPABASE_KEY)
except Exception as e:
print(f"⚠️ Failed to create Supabase client: {e}")
return
# Fetch jobs from 'jobs' table
try:
resp = sb.table("jobs").select("id, description, experience_level").execute()
except Exception as e:
print(f"⚠️ Supabase query failed: {e}")
return
data = resp.data if hasattr(resp, "data") else []
if not data:
print("⚠️ No job descriptions returned from Supabase.")
return
print(f"found {len(data)} jobs to process.")
os.makedirs(PROCESSED_DIR, exist_ok=True)
for row in data:
job_id = row.get("id")
desc = row.get("description") or ""
process_single_job(sb, job_id, desc, experience_level)
if __name__ == '__main__':
print("🧪 Starting job entity extraction (DB -> Gemini -> DB)...\n")
process_jobs_from_db()
print("\n🎯 All jobs processed.") |