Job-Application-Assistant / utils /langextractor_enhanced.py
Noo88ear's picture
πŸš€ Initial deployment of Multi-Agent Job Application Assistant
7498f2c
"""
Enhanced LangExtract Integration
Leverages full capabilities: structured extraction, visualization, parallel processing
"""
from __future__ import annotations
import os
import logging
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
from pathlib import Path
import json
import langextract as lx
from pydantic import BaseModel, Field
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logger = logging.getLogger(__name__)
# Set up LangExtract API key
if not os.getenv('LANGEXTRACT_API_KEY'):
os.environ['LANGEXTRACT_API_KEY'] = os.getenv('GEMINI_API_KEY', '')
# =====================================
# Pydantic Models for Structured Extraction
# =====================================
class JobExtraction(BaseModel):
"""Structured job posting extraction"""
title: str = Field(description="Job title")
company: str = Field(description="Company name")
location: Optional[str] = Field(None, description="Job location")
salary_range: Optional[str] = Field(None, description="Salary information")
required_skills: List[str] = Field(default_factory=list, description="Required skills/technologies")
nice_to_have_skills: List[str] = Field(default_factory=list, description="Preferred but not required skills")
years_experience: Optional[str] = Field(None, description="Years of experience required")
education: Optional[str] = Field(None, description="Education requirements")
benefits: List[str] = Field(default_factory=list, description="Benefits offered")
remote_work: Optional[bool] = Field(None, description="Remote work availability")
application_deadline: Optional[str] = Field(None, description="Application deadline")
class ResumeExtraction(BaseModel):
"""Structured resume content extraction"""
name: Optional[str] = Field(None, description="Candidate name")
email: Optional[str] = Field(None, description="Email address")
phone: Optional[str] = Field(None, description="Phone number")
summary: Optional[str] = Field(None, description="Professional summary")
skills: List[str] = Field(default_factory=list, description="Technical and soft skills")
experience: List[Dict[str, Any]] = Field(default_factory=list, description="Work experience entries")
education: List[Dict[str, Any]] = Field(default_factory=list, description="Education entries")
certifications: List[str] = Field(default_factory=list, description="Certifications")
achievements: List[str] = Field(default_factory=list, description="Key achievements")
class ATSKeyword(BaseModel):
"""ATS keyword with context and importance"""
keyword: str = Field(description="The keyword or phrase")
context: str = Field(description="Context where keyword appears")
importance: str = Field(description="Importance level: high/medium/low")
frequency: int = Field(default=1, description="How often it appears")
class CompanyInsights(BaseModel):
"""Structured company research extraction"""
company_name: str = Field(description="Company name")
culture_values: List[str] = Field(default_factory=list, description="Company culture and values")
recent_news: List[str] = Field(default_factory=list, description="Recent news and achievements")
tech_stack: List[str] = Field(default_factory=list, description="Technologies used")
interview_process: Optional[str] = Field(None, description="Interview process details")
employee_reviews: List[str] = Field(default_factory=list, description="Key points from reviews")
# =====================================
# Example Data for Different Extraction Types
# =====================================
def get_extraction_examples(extraction_type: str) -> List[Dict[str, Any]]:
"""Get relevant examples for each extraction type"""
examples = {
"job_details": [
{
"title": "Senior Software Engineer",
"company": "TechCorp",
"required_skills": ["Python", "React", "AWS"],
"years_experience": "5+ years",
"remote_work": True
}
],
"resume_content": [
{
"skills": ["Python", "JavaScript", "Docker"],
"experience": [
{
"company": "TechCorp",
"position": "Software Engineer",
"duration": "2020-2023",
"achievements": ["Led team of 5", "Reduced costs by 30%"]
}
]
}
],
"ats_keywords": [
{
"keyword": "Python",
"context": "5+ years Python experience required",
"importance": "high",
"frequency": 3
}
],
"company_insights": [
{
"company_name": "TechCorp",
"culture_values": ["Innovation", "Work-life balance"],
"tech_stack": ["Python", "React", "AWS", "Kubernetes"]
}
],
"key_points": [
"Implemented microservices architecture",
"Led cross-functional team of 10 engineers",
"Reduced system latency by 40%"
]
}
return examples.get(extraction_type, examples["key_points"])
# =====================================
# Core Extraction Functions
# =====================================
def extract_job_details(
job_text: str,
visualize: bool = False,
parallel: bool = False
) -> JobExtraction:
"""
Extract structured job details from job posting text
"""
try:
result = lx.extract(
text_or_documents=job_text,
prompt_description="""
Extract job posting details including:
- Job title and company
- Location and remote work options
- Required and nice-to-have skills
- Years of experience needed
- Education requirements
- Benefits and salary information
- Application deadline
""",
examples=get_extraction_examples("job_details"),
model_id="gemini-2.0-flash-exp",
extraction_passes=2 if not parallel else 1,
max_workers=10 if parallel else 1
)
# Parse result based on actual LangExtract response format
if isinstance(result, dict):
# Create JobExtraction from the result
job_data = result.get('extraction', result)
if isinstance(job_data, dict):
return JobExtraction(**job_data)
else:
# Fallback: create minimal extraction
return JobExtraction(
title="Unknown Position",
company="Unknown Company",
required_skills=[]
)
# Visualize if requested
if visualize:
visualize_extraction(result, "job_extraction.html")
return JobExtraction(
title="Unknown Position",
company="Unknown Company"
)
except Exception as e:
logger.error(f"Job extraction failed: {e}")
return JobExtraction(
title="Error extracting job",
company="Unknown"
)
def extract_resume_content(
resume_text: str,
visualize: bool = False
) -> ResumeExtraction:
"""
Extract structured content from resume text
"""
try:
result = lx.extract(
text_or_documents=resume_text,
prompt_description="""
Extract resume information including:
- Contact information (name, email, phone)
- Professional summary
- Skills (technical and soft)
- Work experience with companies, positions, dates, and achievements
- Education details
- Certifications and achievements
""",
examples=get_extraction_examples("resume_content"),
model_id="gemini-2.0-flash-exp",
extraction_passes=3
)
# Parse and return structured data
if isinstance(result, dict):
resume_data = result.get('extraction', result)
if isinstance(resume_data, dict):
return ResumeExtraction(**resume_data)
if visualize:
visualize_extraction(result, "resume_extraction.html")
return ResumeExtraction()
except Exception as e:
logger.error(f"Resume extraction failed: {e}")
return ResumeExtraction()
def extract_ats_keywords(
job_description: str,
context_window: int = 50
) -> List[ATSKeyword]:
"""
Extract ATS-critical keywords with context and importance
"""
try:
result = lx.extract(
text_or_documents=job_description,
prompt_description="""
Extract important keywords for ATS (Applicant Tracking Systems):
- Technical skills and technologies
- Certifications and qualifications
- Industry-specific terms
- Action verbs and achievements
Include the context where each keyword appears and rate its importance.
""",
examples=get_extraction_examples("ats_keywords"),
model_id="gemini-2.0-flash-exp",
extraction_passes=2
)
# Parse keywords
if isinstance(result, dict):
keywords_data = result.get('extraction', result)
if isinstance(keywords_data, list):
return [ATSKeyword(**kw) if isinstance(kw, dict) else
ATSKeyword(keyword=str(kw), context="", importance="medium")
for kw in keywords_data]
return []
except Exception as e:
logger.error(f"ATS keyword extraction failed: {e}")
return []
def extract_company_insights(
research_text: str,
visualize: bool = False
) -> CompanyInsights:
"""
Extract structured insights from company research
"""
try:
result = lx.extract(
text_or_documents=research_text,
prompt_description="""
Extract company information including:
- Company culture and values
- Recent news and achievements
- Technology stack and tools
- Interview process details
- Key points from employee reviews
""",
examples=get_extraction_examples("company_insights"),
model_id="gemini-2.0-flash-exp"
)
if isinstance(result, dict):
insights_data = result.get('extraction', result)
if isinstance(insights_data, dict):
return CompanyInsights(**insights_data)
if visualize:
visualize_extraction(result, "company_insights.html")
return CompanyInsights(company_name="Unknown Company")
except Exception as e:
logger.error(f"Company insights extraction failed: {e}")
return CompanyInsights(company_name="Unknown Company")
# =====================================
# Parallel Processing
# =====================================
def extract_multiple_jobs(
job_texts: List[str],
max_workers: int = 20
) -> List[JobExtraction]:
"""
Process multiple job descriptions in parallel
"""
try:
results = lx.extract(
text_or_documents=job_texts,
prompt_description="Extract job details from each posting",
examples=get_extraction_examples("job_details"),
model_id="gemini-2.0-flash-exp",
max_workers=max_workers,
extraction_passes=2
)
# Parse all results
extractions = []
if isinstance(results, list):
for result in results:
if isinstance(result, dict):
job_data = result.get('extraction', result)
if isinstance(job_data, dict):
extractions.append(JobExtraction(**job_data))
return extractions
except Exception as e:
logger.error(f"Parallel job extraction failed: {e}")
return []
# =====================================
# Visualization
# =====================================
def visualize_extraction(
result: Any,
output_file: str = "extraction_viz.html",
show_sources: bool = True
) -> str:
"""
Generate interactive HTML visualization of extraction results
"""
try:
output_path = Path(output_file)
# Try to use LangExtract's visualization
try:
lx.visualize(
result,
output_file=str(output_path),
show_sources=show_sources,
highlight_entities=True
)
logger.info(f"Visualization saved to {output_path}")
return str(output_path)
except:
# Fallback: Create simple HTML visualization
html_content = create_fallback_visualization(result)
output_path.write_text(html_content)
logger.info(f"Fallback visualization saved to {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Visualization failed: {e}")
return ""
def create_fallback_visualization(result: Any) -> str:
"""
Create a simple HTML visualization as fallback
"""
html = """
<!DOCTYPE html>
<html>
<head>
<title>Extraction Results</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.extraction { background: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 5px; }
.field { margin: 5px 0; }
.field-name { font-weight: bold; color: #333; }
.field-value { color: #666; }
.list-item { margin-left: 20px; }
</style>
</head>
<body>
<h1>Extraction Results</h1>
<div class="extraction">
"""
if isinstance(result, dict):
for key, value in result.items():
html += f'<div class="field"><span class="field-name">{key}:</span> '
if isinstance(value, list):
html += '<ul>'
for item in value:
html += f'<li class="list-item">{item}</li>'
html += '</ul>'
else:
html += f'<span class="field-value">{value}</span>'
html += '</div>'
else:
html += f'<div class="field">{result}</div>'
html += """
</div>
</body>
</html>
"""
return html
# =====================================
# Enhanced Distillation (Backward Compatible)
# =====================================
def distill_text_enhanced(
text: str,
max_points: int = 10,
extraction_type: str = "key_points",
visualize: bool = False
) -> List[str]:
"""
Enhanced version of distill_text using LangExtract's full capabilities
Backward compatible with original distill_text function
"""
if not text or not text.strip():
return []
try:
prompt_map = {
"key_points": "Extract the main points and key information as bullet points",
"achievements": "Extract key achievements and accomplishments",
"requirements": "Extract key requirements and qualifications",
"skills": "Extract technical and soft skills mentioned"
}
result = lx.extract(
text_or_documents=text,
prompt_description=prompt_map.get(extraction_type, prompt_map["key_points"]),
examples=get_extraction_examples(extraction_type),
model_id="gemini-2.0-flash-exp",
extraction_passes=2
)
# Parse result
if isinstance(result, dict):
extraction = result.get('extraction', result)
if isinstance(extraction, list):
bullets = [str(item) for item in extraction][:max_points]
if bullets:
if visualize:
visualize_extraction(result, f"{extraction_type}_viz.html")
return bullets
# Fallback to original implementation
from .langextractor import distill_text
return distill_text(text, max_points)
except Exception as e:
logger.warning(f"Enhanced extraction failed, using fallback: {e}")
from .langextractor import distill_text
return distill_text(text, max_points)
# =====================================
# Batch Processing with Caching
# =====================================
class ExtractionCache:
"""Simple cache for extraction results"""
def __init__(self, cache_dir: str = ".langextract_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def get_cache_key(self, text: str, extraction_type: str) -> str:
"""Generate cache key from text and type"""
import hashlib
text_hash = hashlib.md5(text.encode()).hexdigest()
return f"{extraction_type}_{text_hash}"
def get(self, text: str, extraction_type: str) -> Optional[Any]:
"""Get cached extraction if exists"""
cache_key = self.get_cache_key(text, extraction_type)
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
try:
with open(cache_file, 'r') as f:
return json.load(f)
except:
pass
return None
def set(self, text: str, extraction_type: str, result: Any):
"""Cache extraction result"""
cache_key = self.get_cache_key(text, extraction_type)
cache_file = self.cache_dir / f"{cache_key}.json"
try:
# Convert Pydantic models to dict
if hasattr(result, 'dict'):
result = result.dict()
elif isinstance(result, list) and result and hasattr(result[0], 'dict'):
result = [item.dict() for item in result]
with open(cache_file, 'w') as f:
json.dump(result, f)
except:
pass
# Global cache instance
extraction_cache = ExtractionCache()
# =====================================
# Main Interface
# =====================================
def extract_structured_info(
text: str,
extraction_type: str = "key_points",
use_cache: bool = True,
visualize: bool = False,
parallel: bool = False
) -> Dict[str, Any]:
"""
Main interface for structured extraction with caching
Args:
text: Input text to extract from
extraction_type: Type of extraction (job_details, resume_content, etc.)
use_cache: Whether to use caching
visualize: Generate HTML visualization
parallel: Use parallel processing (for multiple documents)
Returns:
Dictionary with extraction results
"""
# Check cache first
if use_cache:
cached = extraction_cache.get(text, extraction_type)
if cached:
logger.info(f"Using cached extraction for {extraction_type}")
return cached
# Perform extraction based on type
result = None
if extraction_type == "job_details":
result = extract_job_details(text, visualize, parallel)
elif extraction_type == "resume_content":
result = extract_resume_content(text, visualize)
elif extraction_type == "ats_keywords":
result = extract_ats_keywords(text)
elif extraction_type == "company_insights":
result = extract_company_insights(text, visualize)
else:
# Default to key points extraction
points = distill_text_enhanced(text, extraction_type=extraction_type, visualize=visualize)
result = {"key_points": points}
# Cache result
if use_cache and result:
extraction_cache.set(text, extraction_type, result)
# Convert Pydantic models to dict for JSON serialization
if hasattr(result, 'dict'):
return result.dict()
elif isinstance(result, list) and result and hasattr(result[0], 'dict'):
return {"results": [item.dict() for item in result]}
return result if isinstance(result, dict) else {"result": result}
# =====================================
# Backward Compatibility
# =====================================
# Keep original function name for backward compatibility
distill_text = distill_text_enhanced
if __name__ == "__main__":
# Test the enhanced extraction
sample_job = """
Senior Software Engineer - TechCorp
Location: San Francisco, CA (Remote Available)
Salary: $150,000 - $200,000
We're looking for a Senior Software Engineer with 5+ years of experience.
Required Skills:
- Python, React, TypeScript
- AWS or GCP experience
- Microservices architecture
Nice to have:
- Kubernetes, Docker
- Machine Learning experience
Benefits:
- Health insurance
- 401k matching
- Unlimited PTO
"""
print("Testing enhanced LangExtract integration...")
# Test job extraction
job = extract_job_details(sample_job, visualize=True)
print(f"Extracted job: {job.title} at {job.company}")
print(f"Required skills: {job.required_skills}")
# Test ATS keywords
keywords = extract_ats_keywords(sample_job)
print(f"ATS Keywords: {[kw.keyword for kw in keywords]}")
print("\nβœ… Enhanced LangExtract is ready!")