""" Enhanced LangExtract Integration Leverages full capabilities: structured extraction, visualization, parallel processing """ from __future__ import annotations import os import logging from typing import List, Dict, Any, Optional, Union from dataclasses import dataclass from pathlib import Path import json import langextract as lx from pydantic import BaseModel, Field from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logger = logging.getLogger(__name__) # Set up LangExtract API key if not os.getenv('LANGEXTRACT_API_KEY'): os.environ['LANGEXTRACT_API_KEY'] = os.getenv('GEMINI_API_KEY', '') # ===================================== # Pydantic Models for Structured Extraction # ===================================== class JobExtraction(BaseModel): """Structured job posting extraction""" title: str = Field(description="Job title") company: str = Field(description="Company name") location: Optional[str] = Field(None, description="Job location") salary_range: Optional[str] = Field(None, description="Salary information") required_skills: List[str] = Field(default_factory=list, description="Required skills/technologies") nice_to_have_skills: List[str] = Field(default_factory=list, description="Preferred but not required skills") years_experience: Optional[str] = Field(None, description="Years of experience required") education: Optional[str] = Field(None, description="Education requirements") benefits: List[str] = Field(default_factory=list, description="Benefits offered") remote_work: Optional[bool] = Field(None, description="Remote work availability") application_deadline: Optional[str] = Field(None, description="Application deadline") class ResumeExtraction(BaseModel): """Structured resume content extraction""" name: Optional[str] = Field(None, description="Candidate name") email: Optional[str] = Field(None, description="Email address") phone: Optional[str] = Field(None, description="Phone number") summary: Optional[str] = Field(None, description="Professional summary") skills: List[str] = Field(default_factory=list, description="Technical and soft skills") experience: List[Dict[str, Any]] = Field(default_factory=list, description="Work experience entries") education: List[Dict[str, Any]] = Field(default_factory=list, description="Education entries") certifications: List[str] = Field(default_factory=list, description="Certifications") achievements: List[str] = Field(default_factory=list, description="Key achievements") class ATSKeyword(BaseModel): """ATS keyword with context and importance""" keyword: str = Field(description="The keyword or phrase") context: str = Field(description="Context where keyword appears") importance: str = Field(description="Importance level: high/medium/low") frequency: int = Field(default=1, description="How often it appears") class CompanyInsights(BaseModel): """Structured company research extraction""" company_name: str = Field(description="Company name") culture_values: List[str] = Field(default_factory=list, description="Company culture and values") recent_news: List[str] = Field(default_factory=list, description="Recent news and achievements") tech_stack: List[str] = Field(default_factory=list, description="Technologies used") interview_process: Optional[str] = Field(None, description="Interview process details") employee_reviews: List[str] = Field(default_factory=list, description="Key points from reviews") # ===================================== # Example Data for Different Extraction Types # ===================================== def get_extraction_examples(extraction_type: str) -> List[Dict[str, Any]]: """Get relevant examples for each extraction type""" examples = { "job_details": [ { "title": "Senior Software Engineer", "company": "TechCorp", "required_skills": ["Python", "React", "AWS"], "years_experience": "5+ years", "remote_work": True } ], "resume_content": [ { "skills": ["Python", "JavaScript", "Docker"], "experience": [ { "company": "TechCorp", "position": "Software Engineer", "duration": "2020-2023", "achievements": ["Led team of 5", "Reduced costs by 30%"] } ] } ], "ats_keywords": [ { "keyword": "Python", "context": "5+ years Python experience required", "importance": "high", "frequency": 3 } ], "company_insights": [ { "company_name": "TechCorp", "culture_values": ["Innovation", "Work-life balance"], "tech_stack": ["Python", "React", "AWS", "Kubernetes"] } ], "key_points": [ "Implemented microservices architecture", "Led cross-functional team of 10 engineers", "Reduced system latency by 40%" ] } return examples.get(extraction_type, examples["key_points"]) # ===================================== # Core Extraction Functions # ===================================== def extract_job_details( job_text: str, visualize: bool = False, parallel: bool = False ) -> JobExtraction: """ Extract structured job details from job posting text """ try: result = lx.extract( text_or_documents=job_text, prompt_description=""" Extract job posting details including: - Job title and company - Location and remote work options - Required and nice-to-have skills - Years of experience needed - Education requirements - Benefits and salary information - Application deadline """, examples=get_extraction_examples("job_details"), model_id="gemini-2.0-flash-exp", extraction_passes=2 if not parallel else 1, max_workers=10 if parallel else 1 ) # Parse result based on actual LangExtract response format if isinstance(result, dict): # Create JobExtraction from the result job_data = result.get('extraction', result) if isinstance(job_data, dict): return JobExtraction(**job_data) else: # Fallback: create minimal extraction return JobExtraction( title="Unknown Position", company="Unknown Company", required_skills=[] ) # Visualize if requested if visualize: visualize_extraction(result, "job_extraction.html") return JobExtraction( title="Unknown Position", company="Unknown Company" ) except Exception as e: logger.error(f"Job extraction failed: {e}") return JobExtraction( title="Error extracting job", company="Unknown" ) def extract_resume_content( resume_text: str, visualize: bool = False ) -> ResumeExtraction: """ Extract structured content from resume text """ try: result = lx.extract( text_or_documents=resume_text, prompt_description=""" Extract resume information including: - Contact information (name, email, phone) - Professional summary - Skills (technical and soft) - Work experience with companies, positions, dates, and achievements - Education details - Certifications and achievements """, examples=get_extraction_examples("resume_content"), model_id="gemini-2.0-flash-exp", extraction_passes=3 ) # Parse and return structured data if isinstance(result, dict): resume_data = result.get('extraction', result) if isinstance(resume_data, dict): return ResumeExtraction(**resume_data) if visualize: visualize_extraction(result, "resume_extraction.html") return ResumeExtraction() except Exception as e: logger.error(f"Resume extraction failed: {e}") return ResumeExtraction() def extract_ats_keywords( job_description: str, context_window: int = 50 ) -> List[ATSKeyword]: """ Extract ATS-critical keywords with context and importance """ try: result = lx.extract( text_or_documents=job_description, prompt_description=""" Extract important keywords for ATS (Applicant Tracking Systems): - Technical skills and technologies - Certifications and qualifications - Industry-specific terms - Action verbs and achievements Include the context where each keyword appears and rate its importance. """, examples=get_extraction_examples("ats_keywords"), model_id="gemini-2.0-flash-exp", extraction_passes=2 ) # Parse keywords if isinstance(result, dict): keywords_data = result.get('extraction', result) if isinstance(keywords_data, list): return [ATSKeyword(**kw) if isinstance(kw, dict) else ATSKeyword(keyword=str(kw), context="", importance="medium") for kw in keywords_data] return [] except Exception as e: logger.error(f"ATS keyword extraction failed: {e}") return [] def extract_company_insights( research_text: str, visualize: bool = False ) -> CompanyInsights: """ Extract structured insights from company research """ try: result = lx.extract( text_or_documents=research_text, prompt_description=""" Extract company information including: - Company culture and values - Recent news and achievements - Technology stack and tools - Interview process details - Key points from employee reviews """, examples=get_extraction_examples("company_insights"), model_id="gemini-2.0-flash-exp" ) if isinstance(result, dict): insights_data = result.get('extraction', result) if isinstance(insights_data, dict): return CompanyInsights(**insights_data) if visualize: visualize_extraction(result, "company_insights.html") return CompanyInsights(company_name="Unknown Company") except Exception as e: logger.error(f"Company insights extraction failed: {e}") return CompanyInsights(company_name="Unknown Company") # ===================================== # Parallel Processing # ===================================== def extract_multiple_jobs( job_texts: List[str], max_workers: int = 20 ) -> List[JobExtraction]: """ Process multiple job descriptions in parallel """ try: results = lx.extract( text_or_documents=job_texts, prompt_description="Extract job details from each posting", examples=get_extraction_examples("job_details"), model_id="gemini-2.0-flash-exp", max_workers=max_workers, extraction_passes=2 ) # Parse all results extractions = [] if isinstance(results, list): for result in results: if isinstance(result, dict): job_data = result.get('extraction', result) if isinstance(job_data, dict): extractions.append(JobExtraction(**job_data)) return extractions except Exception as e: logger.error(f"Parallel job extraction failed: {e}") return [] # ===================================== # Visualization # ===================================== def visualize_extraction( result: Any, output_file: str = "extraction_viz.html", show_sources: bool = True ) -> str: """ Generate interactive HTML visualization of extraction results """ try: output_path = Path(output_file) # Try to use LangExtract's visualization try: lx.visualize( result, output_file=str(output_path), show_sources=show_sources, highlight_entities=True ) logger.info(f"Visualization saved to {output_path}") return str(output_path) except: # Fallback: Create simple HTML visualization html_content = create_fallback_visualization(result) output_path.write_text(html_content) logger.info(f"Fallback visualization saved to {output_path}") return str(output_path) except Exception as e: logger.error(f"Visualization failed: {e}") return "" def create_fallback_visualization(result: Any) -> str: """ Create a simple HTML visualization as fallback """ html = """ Extraction Results

Extraction Results

""" if isinstance(result, dict): for key, value in result.items(): html += f'
{key}: ' if isinstance(value, list): html += '' else: html += f'{value}' html += '
' else: html += f'
{result}
' html += """
""" return html # ===================================== # Enhanced Distillation (Backward Compatible) # ===================================== def distill_text_enhanced( text: str, max_points: int = 10, extraction_type: str = "key_points", visualize: bool = False ) -> List[str]: """ Enhanced version of distill_text using LangExtract's full capabilities Backward compatible with original distill_text function """ if not text or not text.strip(): return [] try: prompt_map = { "key_points": "Extract the main points and key information as bullet points", "achievements": "Extract key achievements and accomplishments", "requirements": "Extract key requirements and qualifications", "skills": "Extract technical and soft skills mentioned" } result = lx.extract( text_or_documents=text, prompt_description=prompt_map.get(extraction_type, prompt_map["key_points"]), examples=get_extraction_examples(extraction_type), model_id="gemini-2.0-flash-exp", extraction_passes=2 ) # Parse result if isinstance(result, dict): extraction = result.get('extraction', result) if isinstance(extraction, list): bullets = [str(item) for item in extraction][:max_points] if bullets: if visualize: visualize_extraction(result, f"{extraction_type}_viz.html") return bullets # Fallback to original implementation from .langextractor import distill_text return distill_text(text, max_points) except Exception as e: logger.warning(f"Enhanced extraction failed, using fallback: {e}") from .langextractor import distill_text return distill_text(text, max_points) # ===================================== # Batch Processing with Caching # ===================================== class ExtractionCache: """Simple cache for extraction results""" def __init__(self, cache_dir: str = ".langextract_cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) def get_cache_key(self, text: str, extraction_type: str) -> str: """Generate cache key from text and type""" import hashlib text_hash = hashlib.md5(text.encode()).hexdigest() return f"{extraction_type}_{text_hash}" def get(self, text: str, extraction_type: str) -> Optional[Any]: """Get cached extraction if exists""" cache_key = self.get_cache_key(text, extraction_type) cache_file = self.cache_dir / f"{cache_key}.json" if cache_file.exists(): try: with open(cache_file, 'r') as f: return json.load(f) except: pass return None def set(self, text: str, extraction_type: str, result: Any): """Cache extraction result""" cache_key = self.get_cache_key(text, extraction_type) cache_file = self.cache_dir / f"{cache_key}.json" try: # Convert Pydantic models to dict if hasattr(result, 'dict'): result = result.dict() elif isinstance(result, list) and result and hasattr(result[0], 'dict'): result = [item.dict() for item in result] with open(cache_file, 'w') as f: json.dump(result, f) except: pass # Global cache instance extraction_cache = ExtractionCache() # ===================================== # Main Interface # ===================================== def extract_structured_info( text: str, extraction_type: str = "key_points", use_cache: bool = True, visualize: bool = False, parallel: bool = False ) -> Dict[str, Any]: """ Main interface for structured extraction with caching Args: text: Input text to extract from extraction_type: Type of extraction (job_details, resume_content, etc.) use_cache: Whether to use caching visualize: Generate HTML visualization parallel: Use parallel processing (for multiple documents) Returns: Dictionary with extraction results """ # Check cache first if use_cache: cached = extraction_cache.get(text, extraction_type) if cached: logger.info(f"Using cached extraction for {extraction_type}") return cached # Perform extraction based on type result = None if extraction_type == "job_details": result = extract_job_details(text, visualize, parallel) elif extraction_type == "resume_content": result = extract_resume_content(text, visualize) elif extraction_type == "ats_keywords": result = extract_ats_keywords(text) elif extraction_type == "company_insights": result = extract_company_insights(text, visualize) else: # Default to key points extraction points = distill_text_enhanced(text, extraction_type=extraction_type, visualize=visualize) result = {"key_points": points} # Cache result if use_cache and result: extraction_cache.set(text, extraction_type, result) # Convert Pydantic models to dict for JSON serialization if hasattr(result, 'dict'): return result.dict() elif isinstance(result, list) and result and hasattr(result[0], 'dict'): return {"results": [item.dict() for item in result]} return result if isinstance(result, dict) else {"result": result} # ===================================== # Backward Compatibility # ===================================== # Keep original function name for backward compatibility distill_text = distill_text_enhanced if __name__ == "__main__": # Test the enhanced extraction sample_job = """ Senior Software Engineer - TechCorp Location: San Francisco, CA (Remote Available) Salary: $150,000 - $200,000 We're looking for a Senior Software Engineer with 5+ years of experience. Required Skills: - Python, React, TypeScript - AWS or GCP experience - Microservices architecture Nice to have: - Kubernetes, Docker - Machine Learning experience Benefits: - Health insurance - 401k matching - Unlimited PTO """ print("Testing enhanced LangExtract integration...") # Test job extraction job = extract_job_details(sample_job, visualize=True) print(f"Extracted job: {job.title} at {job.company}") print(f"Required skills: {job.required_skills}") # Test ATS keywords keywords = extract_ats_keywords(sample_job) print(f"ATS Keywords: {[kw.keyword for kw in keywords]}") print("\n✅ Enhanced LangExtract is ready!")