#!/usr/bin/env python3 """ Multi-Agent Job Application Assistant - HuggingFace Spaces Deployment Production-ready system with Gemini 2.5 Flash, A2A Protocol, and MCP Integration Features: Resume/Cover Letter Generation, Job Matching, Document Export, Advanced AI Agents """ import os import uuid import time import logging import asyncio from typing import List, Optional, Dict, Any from dataclasses import dataclass, field import webbrowser from datetime import datetime, timedelta import json from pathlib import Path import gradio as gr from dotenv import load_dotenv import nest_asyncio # Apply nest_asyncio for async support in Gradio try: nest_asyncio.apply() except: pass # Load environment variables load_dotenv(override=True) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ======================= # Try to import from system, fall back to standalone mode if not available # ======================= USE_SYSTEM_AGENTS = True ADVANCED_FEATURES = False LANGEXTRACT_AVAILABLE = False try: from agents.orchestrator import OrchestratorAgent from models.schemas import JobPosting, OrchestrationResult logger.info("System agents loaded - full functionality available") # Try to import LangExtract service try: from services.langextract_service import ( extract_job_info, extract_ats_keywords, optimize_for_ats, create_extraction_summary, create_ats_report ) LANGEXTRACT_AVAILABLE = True logger.info("šŸ“Š LangExtract service loaded for enhanced extraction") except ImportError: LANGEXTRACT_AVAILABLE = False # Try to import advanced AI agent features try: from agents.parallel_executor import ParallelAgentExecutor, ParallelJobProcessor, MetaAgent from agents.temporal_tracker import TemporalApplicationTracker, TemporalKnowledgeGraph from agents.observability import AgentTracer, AgentMonitor, TriageAgent, global_tracer from agents.context_engineer import ContextEngineer, DataFlywheel from agents.context_scaler import ContextScalingOrchestrator ADVANCED_FEATURES = True logger.info("✨ Advanced AI agent features loaded successfully!") except ImportError as e: logger.info(f"Advanced features not available: {e}") # Try to import knowledge graph service try: from services.knowledge_graph_service import get_knowledge_graph_service kg_service = get_knowledge_graph_service() KG_AVAILABLE = kg_service.is_enabled() if KG_AVAILABLE: logger.info("šŸ“Š Knowledge Graph service initialized - tracking enabled") except ImportError: KG_AVAILABLE = False kg_service = None logger.info("Knowledge graph service not available") USE_SYSTEM_AGENTS = True except ImportError: logger.info("Running in standalone mode - using simplified agents") USE_SYSTEM_AGENTS = False # Define minimal data structures for standalone operation @dataclass class JobPosting: id: str title: str company: str description: str location: Optional[str] = None url: Optional[str] = None source: Optional[str] = None saved_by_user: bool = False @dataclass class ResumeDraft: job_id: str text: str keywords_used: List[str] = field(default_factory=list) @dataclass class CoverLetterDraft: job_id: str text: str keywords_used: List[str] = field(default_factory=list) @dataclass class OrchestrationResult: job: JobPosting resume: ResumeDraft cover_letter: CoverLetterDraft metrics: Optional[Dict[str, Any]] = None # Simplified orchestrator for standalone operation class OrchestratorAgent: def __init__(self): self.mock_jobs = [ JobPosting( id="example_1", title="Senior Software Engineer", company="Tech Corp", location="Remote", description="We need a Senior Software Engineer with Python, AWS, Docker experience.", saved_by_user=True ) ] def get_saved_jobs(self): return self.mock_jobs def run_for_jobs(self, jobs, **kwargs): results = [] for job in jobs: resume = ResumeDraft( job_id=job.id, text=f"Professional Resume for {job.title}\n\nExperienced professional with skills matching {job.company} requirements.", keywords_used=["Python", "AWS", "Docker"] ) cover = CoverLetterDraft( job_id=job.id, text=f"Dear Hiring Manager,\n\nI am excited to apply for the {job.title} position at {job.company}.", keywords_used=["leadership", "innovation"] ) results.append(OrchestrationResult( job=job, resume=resume, cover_letter=cover, metrics={ "salary": {"USD": {"low": 100000, "high": 150000}}, "p_resume": 0.75, "p_cover": 0.80, "overall_p": 0.60 } )) return results def regenerate_for_job(self, job, **kwargs): return self.run_for_jobs([job], **kwargs)[0] # Initialize orchestrator and advanced features try: orch = OrchestratorAgent() logger.info("Orchestrator initialized successfully") # Initialize advanced features if available if ADVANCED_FEATURES: # Initialize parallel executor parallel_executor = ParallelAgentExecutor(max_workers=4) parallel_processor = ParallelJobProcessor() meta_agent = MetaAgent() # Initialize temporal tracker temporal_tracker = TemporalApplicationTracker() # Initialize observability agent_tracer = AgentTracer() agent_monitor = AgentMonitor() triage_agent = TriageAgent(agent_tracer) # Initialize context engineering context_engineer = ContextEngineer() context_scaler = ContextScalingOrchestrator() logger.info("āœ… All advanced AI agent features initialized") else: parallel_executor = None temporal_tracker = None agent_tracer = None context_engineer = None except Exception as e: logger.error(f"Failed to initialize orchestrator: {e}") raise # Session state STATE = { "user_id": "default_user", "cv_seed": None, "cover_seed": None, "agent2_notes": "", "custom_jobs": [], "cv_chat": "", "cover_chat": "", "results": [], "inspiration_url": "https://www.careeraddict.com/7-funniest-cover-letters", "use_inspiration": False, "linkedin_authenticated": False, "linkedin_profile": None, "parallel_mode": False, "track_applications": True, "enable_observability": True, "use_context_engineering": True, "execution_timeline": None, "application_history": [], } # Check LinkedIn OAuth configuration LINKEDIN_CLIENT_ID = os.getenv("LINKEDIN_CLIENT_ID") LINKEDIN_CLIENT_SECRET = os.getenv("LINKEDIN_CLIENT_SECRET") MOCK_MODE = os.getenv("MOCK_MODE", "true").lower() == "true" # Check Adzuna configuration ADZUNA_APP_ID = os.getenv("ADZUNA_APP_ID") ADZUNA_APP_KEY = os.getenv("ADZUNA_APP_KEY") def add_custom_job(title: str, company: str, location: str, url: str, desc: str): """Add a custom job with validation""" try: if not title or not company or not desc: return gr.update(value="āŒ Title, Company, and Description are required"), None job = JobPosting( id=f"custom_{uuid.uuid4().hex[:8]}", title=title.strip(), company=company.strip(), location=location.strip() if location else None, description=desc.strip(), url=url.strip() if url else None, source="custom", saved_by_user=True, ) STATE["custom_jobs"].append(job) logger.info(f"Added custom job: {job.title} at {job.company}") return gr.update(value=f"āœ… Added: {job.title} at {job.company}"), "" except Exception as e: logger.error(f"Error adding job: {e}") return gr.update(value=f"āŒ Error: {str(e)}"), None def get_linkedin_auth_url(): """Get LinkedIn OAuth URL""" if USE_SYSTEM_AGENTS and not MOCK_MODE and LINKEDIN_CLIENT_ID: try: from services.linkedin_client import LinkedInClient client = LinkedInClient() return client.get_authorize_url() except Exception as e: logger.error(f"LinkedIn OAuth error: {e}") return None def linkedin_login(): """Handle LinkedIn login""" auth_url = get_linkedin_auth_url() if auth_url: webbrowser.open(auth_url) return "āœ… Opening LinkedIn login in browser...", True else: return "āš ļø LinkedIn OAuth not configured or in mock mode", False def search_adzuna_jobs(query: str = "Software Engineer", location: str = "London"): """Search jobs using Adzuna API""" if ADZUNA_APP_ID and ADZUNA_APP_KEY: try: from services.job_aggregator import JobAggregator aggregator = JobAggregator() # Handle SSL issues for corporate networks import requests import urllib3 old_get = requests.get def patched_get(*args, **kwargs): if 'adzuna' in str(args[0]): kwargs['verify'] = False urllib3.disable_warnings() return old_get(*args, **kwargs) requests.get = patched_get jobs = aggregator.search_adzuna(query, location) return jobs, f"āœ… Found {len(jobs)} jobs from Adzuna" except Exception as e: logger.error(f"Adzuna search error: {e}") return [], f"āŒ Adzuna search failed: {str(e)}" return [], "āš ļø Adzuna API not configured" def list_jobs_options(): """Get list of available jobs with enhanced sources""" try: all_jobs = [] # Get LinkedIn/mock jobs saved_jobs = orch.get_saved_jobs() all_jobs.extend(saved_jobs) # Add custom jobs custom_jobs = STATE.get("custom_jobs", []) all_jobs.extend(custom_jobs) # Try to add Adzuna jobs if configured if ADZUNA_APP_ID and ADZUNA_APP_KEY: adzuna_jobs, _ = search_adzuna_jobs("Software Engineer", "Remote") all_jobs.extend(adzuna_jobs[:10]) # Add top 10 Adzuna jobs labels = [f"{j.title} — {j.company} ({j.location or 'N/A'}) [{j.source or 'custom'}]" for j in all_jobs] return labels except Exception as e: logger.error(f"Error listing jobs: {e}") return [] def generate(selected_labels: List[str]): """Generate documents with advanced AI features""" try: if not selected_labels: return "āš ļø Please select at least one job to process", None, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) # Triage the request if observability is enabled if ADVANCED_FEATURES and STATE.get("enable_observability") and agent_tracer: routing = triage_agent.triage_request(f"Generate documents for {len(selected_labels)} jobs") logger.info(f"Triage routing: {routing}") # Map labels to job objects all_jobs = orch.get_saved_jobs() + STATE.get("custom_jobs", []) # Update label mapping to handle source tags label_to_job = {} for j in all_jobs: label = f"{j.title} — {j.company} ({j.location or 'N/A'})" label_with_source = f"{label} [{j.source or 'custom'}]" # Map both versions label_to_job[label] = j label_to_job[label_with_source] = j jobs = [label_to_job[l] for l in selected_labels if l in label_to_job] if not jobs: return "āŒ No valid jobs found", None, None logger.info(f"Generating documents for {len(jobs)} jobs") # Use context engineering if enabled if ADVANCED_FEATURES and STATE.get("use_context_engineering") and context_engineer: for job in jobs: # Engineer optimal context for each job context = context_engineer.engineer_context( query=f"Generate resume and cover letter for {job.title} at {job.company}", raw_sources=[ ("job_description", job.description), ("cv_seed", STATE.get("cv_seed") or ""), ("notes", STATE.get("agent2_notes") or "") ] ) # Store engineered context job.metadata = job.metadata or {} job.metadata['engineered_context'] = context # Run generation (parallel or sequential) start = time.time() if ADVANCED_FEATURES and STATE.get("parallel_mode") and parallel_executor: # Use parallel processing logger.info("Using parallel processing for document generation") results = asyncio.run(parallel_processor.process_jobs_parallel( jobs=jobs, cv_agent_func=lambda j: orch.cv_agent.get_draft(j, STATE.get("cv_seed")), cover_agent_func=lambda j: orch.cover_letter_agent.get_draft(j, STATE.get("cover_seed")) )) else: # Standard sequential processing results = orch.run_for_jobs( jobs, user_id=STATE.get("user_id", "default_user"), cv_chat=STATE.get("cv_chat"), cover_chat=STATE.get("cover_chat"), cv_seed=STATE.get("cv_seed"), cover_seed=STATE.get("cover_seed"), agent2_notes=STATE.get("agent2_notes"), inspiration_url=(STATE.get("inspiration_url") if STATE.get("use_inspiration") else None), ) total_time = time.time() - start STATE["results"] = results # Track applications temporally if enabled if ADVANCED_FEATURES and STATE.get("track_applications") and temporal_tracker: for result in results: temporal_tracker.track_application(result.job, "generated", { 'generation_time': total_time, 'parallel_mode': STATE.get("parallel_mode", False) }) # Track in knowledge graph if available if 'kg_service' in globals() and kg_service and kg_service.is_enabled(): for result in results: try: # Extract skills from job description skills = [] if hasattr(result, 'matched_keywords'): skills = result.matched_keywords elif hasattr(result.job, 'description'): # Simple skill extraction from job description common_skills = ['python', 'java', 'javascript', 'react', 'node', 'aws', 'azure', 'docker', 'kubernetes', 'sql', 'machine learning', 'ai', 'data science'] job_desc_lower = result.job.description.lower() skills = [s for s in common_skills if s in job_desc_lower] # Track the application kg_service.track_application( user_name=STATE.get("user_name", "User"), company=result.job.company, job_title=result.job.title, job_description=result.job.description, cv_text=result.resume.text, cover_letter=result.cover_letter.text, skills_matched=skills, score=getattr(result, 'match_score', 0.0) ) logger.info(f"Tracked application in knowledge graph: {result.job.title} @ {result.job.company}") except Exception as e: logger.warning(f"Failed to track in knowledge graph: {e}") # Record to context engineering flywheel if ADVANCED_FEATURES and context_engineer: for result in results: if hasattr(result.job, 'metadata') and 'engineered_context' in result.job.metadata: context_engineer.record_feedback( result.job.metadata['engineered_context'], result.resume.text[:500], # Sample output 0.8 # Success score (could be calculated) ) # Build preview blocks = [f"āœ… Generated {len(results)} documents in {total_time:.2f}s\n"] pptx_buttons = [] for i, res in enumerate(results): blocks.append(f"### šŸ“„ {res.job.title} — {res.job.company}") blocks.append("**Resume Preview:**") blocks.append("```") blocks.append(res.resume.text[:1500] + "...") blocks.append("```") blocks.append("\n**Cover Letter Preview:**") blocks.append("```") blocks.append(res.cover_letter.text[:1000] + "...") blocks.append("```") # Add PowerPoint export option blocks.append(f"\n**[šŸ“Š Export as PowerPoint CV - Job #{i+1}]**") pptx_buttons.append((res.resume, res.job)) STATE["pptx_candidates"] = pptx_buttons return "\n".join(blocks), total_time, gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) except Exception as e: logger.error(f"Error generating documents: {e}") return f"āŒ Error: {str(e)}", None, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) def regenerate_one(job_label: str): """Regenerate documents for a single job""" try: if not job_label: return "āš ļø Please select a job to regenerate", None all_jobs = orch.get_saved_jobs() + STATE.get("custom_jobs", []) label_to_job = {f"{j.title} — {j.company} ({j.location or 'N/A'})": j for j in all_jobs} job = label_to_job.get(job_label) if not job: return f"āŒ Job not found: {job_label}", None start = time.time() result = orch.regenerate_for_job( job, user_id=STATE.get("user_id", "default_user"), cv_chat=STATE.get("cv_chat"), cover_chat=STATE.get("cover_chat"), cv_seed=STATE.get("cv_seed"), cover_seed=STATE.get("cover_seed"), agent2_notes=STATE.get("agent2_notes"), inspiration_url=(STATE.get("inspiration_url") if STATE.get("use_inspiration") else None), ) elapsed = time.time() - start # Update state new_results = [] for r in STATE.get("results", []): if r.job.id == job.id: new_results.append(result) else: new_results.append(r) STATE["results"] = new_results preview = f"### šŸ”„ Regenerated: {result.job.title} — {result.job.company}\n\n" preview += "**Resume:**\n```\n" + result.resume.text[:1500] + "\n...```\n\n" preview += "**Cover Letter:**\n```\n" + result.cover_letter.text[:1000] + "\n...```" return preview, elapsed except Exception as e: logger.error(f"Error regenerating: {e}") return f"āŒ Error: {str(e)}", None def export_to_powerpoint(job_index: int, template: str = "modern_blue"): """Export resume to PowerPoint CV""" try: candidates = STATE.get("pptx_candidates", []) if not candidates or job_index >= len(candidates): return "āŒ No resume available for export", None resume, job = candidates[job_index] # Import the PowerPoint CV generator try: from services.powerpoint_cv import convert_resume_to_powerpoint pptx_path = convert_resume_to_powerpoint(resume, job, template) if pptx_path: return f"āœ… PowerPoint CV created: {pptx_path}", pptx_path except ImportError: # Fallback to local generation from pptx import Presentation from pptx.util import Inches, Pt prs = Presentation() # Title slide slide = prs.slides.add_slide(prs.slide_layouts[0]) slide.shapes.title.text = resume.sections.get("name", "Professional CV") slide.placeholders[1].text = f"{resume.sections.get('title', '')}\n{resume.sections.get('email', '')}" # Summary slide slide = prs.slides.add_slide(prs.slide_layouts[1]) slide.shapes.title.text = "Professional Summary" slide.placeholders[1].text = resume.sections.get("summary", "")[:500] # Experience slide slide = prs.slides.add_slide(prs.slide_layouts[1]) slide.shapes.title.text = "Professional Experience" exp_text = [] for exp in resume.sections.get("experience", [])[:3]: exp_text.append(f"• {exp.get('title', '')} @ {exp.get('company', '')}") exp_text.append(f" {exp.get('dates', '')}") slide.placeholders[1].text = "\n".join(exp_text) # Skills slide slide = prs.slides.add_slide(prs.slide_layouts[1]) slide.shapes.title.text = "Core Skills" skills_text = [] for category, items in resume.sections.get("skills", {}).items(): if isinstance(items, list): skills_text.append(f"{category}: {', '.join(items[:5])}") slide.placeholders[1].text = "\n".join(skills_text) # Save output_path = f"cv_{job.company.replace(' ', '_')}_{template}.pptx" prs.save(output_path) return f"āœ… PowerPoint CV created: {output_path}", output_path except Exception as e: logger.error(f"PowerPoint export error: {e}") return f"āŒ Export failed: {str(e)}", None def extract_from_powerpoint(file_path: str): """Extract content from uploaded PowerPoint""" try: from pptx import Presentation prs = Presentation(file_path) extracted_text = [] for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): text = shape.text.strip() if text: extracted_text.append(text) combined_text = "\n".join(extracted_text) # Use as CV seed STATE["cv_seed"] = combined_text return f"āœ… Extracted {len(extracted_text)} text blocks from PowerPoint\n\nPreview:\n{combined_text[:500]}..." except Exception as e: logger.error(f"PowerPoint extraction error: {e}") return f"āŒ Extraction failed: {str(e)}" def summary_table(): """Generate summary table""" try: import pandas as pd res = STATE.get("results", []) if not res: return pd.DataFrame({"Status": ["No results yet. Generate documents first."]}) rows = [] for r in res: m = r.metrics or {} sal = m.get("salary", {}) # Handle different salary formats usd = sal.get("USD", {}) gbp = sal.get("GBP", {}) rows.append({ "Job": f"{r.job.title} — {r.job.company}", "Location": r.job.location or "N/A", "USD": f"${usd.get('low', 0):,}-${usd.get('high', 0):,}" if usd else "N/A", "GBP": f"Ā£{gbp.get('low', 0):,}-Ā£{gbp.get('high', 0):,}" if gbp else "N/A", "Resume Score": f"{m.get('p_resume', 0):.1%}", "Cover Score": f"{m.get('p_cover', 0):.1%}", "Overall": f"{m.get('overall_p', 0):.1%}", }) return pd.DataFrame(rows) except ImportError: # If pandas not available, return simple dict return {"Error": ["pandas not installed - table view unavailable"]} except Exception as e: logger.error(f"Error generating summary: {e}") return {"Error": [str(e)]} def build_app(): """Build the Gradio interface with LinkedIn OAuth and Adzuna integration""" with gr.Blocks( title="Job Application Assistant", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1400px; margin: auto; } """ ) as demo: gr.Markdown(""" # šŸš€ Multi-Agent Job Application Assistant ### AI-Powered Resume & Cover Letter Generation with ATS Optimization ### Now with LinkedIn OAuth + Adzuna Job Search! """) # System Status status_items = [] if USE_SYSTEM_AGENTS: status_items.append("āœ… **Full System Mode**") else: status_items.append("āš ļø **Standalone Mode**") if ADVANCED_FEATURES: status_items.append("šŸš€ **Advanced AI Features**") if LANGEXTRACT_AVAILABLE: status_items.append("šŸ“Š **LangExtract Enhanced**") if not MOCK_MODE and LINKEDIN_CLIENT_ID: status_items.append("āœ… **LinkedIn OAuth Ready**") else: status_items.append("āš ļø **LinkedIn in Mock Mode**") if ADZUNA_APP_ID and ADZUNA_APP_KEY: status_items.append("āœ… **Adzuna API Active** (5000 jobs/month)") else: status_items.append("āš ļø **Adzuna Not Configured**") gr.Markdown(" | ".join(status_items)) # Show advanced features if available if ADVANCED_FEATURES: advanced_features = [] if 'parallel_executor' in locals(): advanced_features.append("⚔ Parallel Processing") if 'temporal_tracker' in locals(): advanced_features.append("šŸ“Š Temporal Tracking") if 'agent_tracer' in locals(): advanced_features.append("šŸ” Observability") if 'context_engineer' in locals(): advanced_features.append("🧠 Context Engineering") if advanced_features: gr.Markdown(f"**Advanced Features Available:** {' | '.join(advanced_features)}") # Import enhanced UI components try: from services.enhanced_ui import ( create_enhanced_ui_components, handle_resume_upload, handle_linkedin_import, handle_job_matching, handle_document_export, populate_ui_from_data, format_job_matches_for_display, generate_recommendations_markdown, generate_skills_gap_analysis ) ENHANCED_UI_AVAILABLE = True except ImportError: ENHANCED_UI_AVAILABLE = False logger.warning("Enhanced UI components not available") with gr.Row(): # Left column - Configuration with gr.Column(scale=2): gr.Markdown("## āš™ļø Configuration") # Enhanced Resume Upload Section (if available) if ENHANCED_UI_AVAILABLE: ui_components = create_enhanced_ui_components() # Create a wrapper function that properly handles the response def process_resume_and_populate(file_path): """Process resume upload and return extracted data for UI fields""" if not file_path: return populate_ui_from_data({}) try: # Call handle_resume_upload to extract data response = handle_resume_upload(file_path) # Extract the data from the response if response and isinstance(response, dict): data = response.get('data', {}) # Return the populated fields return populate_ui_from_data(data) else: return populate_ui_from_data({}) except Exception as e: logger.error(f"Error processing resume: {e}") return populate_ui_from_data({}) # Wire up the handlers - single function call ui_components['extract_btn'].click( fn=process_resume_and_populate, inputs=[ui_components['resume_upload']], outputs=[ ui_components['contact_name'], ui_components['contact_email'], ui_components['contact_phone'], ui_components['contact_linkedin'], ui_components['contact_location'], ui_components['summary_text'], ui_components['experience_data'], ui_components['skills_list'], ui_components['education_data'] ] ) ui_components['linkedin_auto_fill'].click( fn=handle_linkedin_import, inputs=[ui_components['linkedin_url'], gr.State()], outputs=[gr.State()] ).then( fn=populate_ui_from_data, inputs=[gr.State()], outputs=[ ui_components['contact_name'], ui_components['contact_email'], ui_components['contact_phone'], ui_components['contact_linkedin'], ui_components['contact_location'], ui_components['summary_text'], ui_components['experience_data'], ui_components['skills_list'], ui_components['education_data'] ] ) # LinkedIn OAuth Section (keep existing) elif not MOCK_MODE and LINKEDIN_CLIENT_ID: with gr.Accordion("šŸ” LinkedIn Authentication", open=True): linkedin_status = gr.Textbox( label="Status", value="Not authenticated", interactive=False ) linkedin_btn = gr.Button("šŸ”— Sign in with LinkedIn", variant="primary") linkedin_btn.click( fn=linkedin_login, outputs=[linkedin_status, gr.State()] ) # Advanced AI Features Section if ADVANCED_FEATURES: with gr.Accordion("šŸš€ Advanced AI Features", open=True): gr.Markdown("### AI Agent Enhancements") with gr.Row(): parallel_mode = gr.Checkbox( label="⚔ Parallel Processing (3-5x faster)", value=STATE.get("parallel_mode", False) ) track_apps = gr.Checkbox( label="šŸ“Š Temporal Tracking", value=STATE.get("track_applications", True) ) with gr.Row(): observability = gr.Checkbox( label="šŸ” Observability & Tracing", value=STATE.get("enable_observability", True) ) context_eng = gr.Checkbox( label="🧠 Context Engineering", value=STATE.get("use_context_engineering", True) ) def update_features(parallel, track, observe, context): STATE["parallel_mode"] = parallel STATE["track_applications"] = track STATE["enable_observability"] = observe STATE["use_context_engineering"] = context features = [] if parallel: features.append("Parallel") if track: features.append("Tracking") if observe: features.append("Observability") if context: features.append("Context Engineering") return f"āœ… Features enabled: {', '.join(features) if features else 'None'}" features_status = gr.Textbox(label="Features Status", interactive=False) parallel_mode.change( fn=lambda p: update_features(p, track_apps.value, observability.value, context_eng.value), inputs=[parallel_mode], outputs=features_status ) track_apps.change( fn=lambda t: update_features(parallel_mode.value, t, observability.value, context_eng.value), inputs=[track_apps], outputs=features_status ) observability.change( fn=lambda o: update_features(parallel_mode.value, track_apps.value, o, context_eng.value), inputs=[observability], outputs=features_status ) context_eng.change( fn=lambda c: update_features(parallel_mode.value, track_apps.value, observability.value, c), inputs=[context_eng], outputs=features_status ) with gr.Accordion("šŸ“ Profile & Notes", open=True): agent2_notes = gr.Textbox( label="Additional Context", value=STATE["agent2_notes"], lines=4, placeholder="E.g., visa requirements, years of experience, preferred technologies..." ) def set_notes(n): STATE["agent2_notes"] = n or "" return "āœ… Notes saved" notes_result = gr.Textbox(label="Status", interactive=False) agent2_notes.change(set_notes, inputs=agent2_notes, outputs=notes_result) with gr.Accordion("šŸ“„ Resume Settings", open=False): cv_chat = gr.Textbox( label="Resume Instructions", value=STATE["cv_chat"], lines=3, placeholder="E.g., Emphasize leadership experience..." ) # PowerPoint Upload gr.Markdown("### šŸ“Š Upload PowerPoint to Extract Content") pptx_upload = gr.File( label="Upload PowerPoint (.pptx)", file_types=[".pptx"], type="filepath" ) pptx_extract_btn = gr.Button("šŸ“„ Extract from PowerPoint") pptx_extract_status = gr.Textbox(label="Extraction Status", interactive=False) cv_seed = gr.Textbox( label="Resume Template (optional)", value=STATE["cv_seed"] or "", lines=10, placeholder="Paste your existing resume here or extract from PowerPoint..." ) def set_cv(c, s): STATE["cv_chat"] = c or "" STATE["cv_seed"] = s or None return "āœ… Resume settings updated" def handle_pptx_upload(file): if file: status = extract_from_powerpoint(file) return status, STATE.get("cv_seed", "") return "No file uploaded", STATE.get("cv_seed", "") pptx_extract_btn.click( fn=handle_pptx_upload, inputs=pptx_upload, outputs=[pptx_extract_status, cv_seed] ) cv_info = gr.Textbox(label="Status", interactive=False) cv_chat.change(lambda x: set_cv(x, cv_seed.value), inputs=cv_chat, outputs=cv_info) cv_seed.change(lambda x: set_cv(cv_chat.value, x), inputs=cv_seed, outputs=cv_info) with gr.Accordion("āœ‰ļø Cover Letter Settings", open=False): cover_chat = gr.Textbox( label="Cover Letter Instructions", value=STATE["cover_chat"], lines=3, placeholder="E.g., Professional tone, mention relocation..." ) cover_seed = gr.Textbox( label="Cover Letter Template (optional)", value=STATE["cover_seed"] or "", lines=10, placeholder="Paste your existing cover letter here..." ) def set_cover(c, s): STATE["cover_chat"] = c or "" STATE["cover_seed"] = s or None return "āœ… Cover letter settings updated" cover_info = gr.Textbox(label="Status", interactive=False) cover_chat.change(lambda x: set_cover(x, cover_seed.value), inputs=cover_chat, outputs=cover_info) cover_seed.change(lambda x: set_cover(cover_chat.value, x), inputs=cover_seed, outputs=cover_info) gr.Markdown("## šŸ’¼ Jobs") # Adzuna Job Search if ADZUNA_APP_ID and ADZUNA_APP_KEY: with gr.Accordion("šŸ” Search Adzuna Jobs", open=True): with gr.Row(): adzuna_query = gr.Textbox( label="Job Title", value="Software Engineer", placeholder="e.g., Python Developer" ) adzuna_location = gr.Textbox( label="Location", value="London", placeholder="e.g., New York, Remote" ) adzuna_search_btn = gr.Button("šŸ” Search Adzuna", variant="primary") adzuna_results = gr.Textbox( label="Search Results", lines=3, interactive=False ) def search_and_display(query, location): jobs, message = search_adzuna_jobs(query, location) # Add jobs to state if jobs: STATE["custom_jobs"].extend(jobs[:5]) # Add top 5 to available jobs return message adzuna_search_btn.click( fn=search_and_display, inputs=[adzuna_query, adzuna_location], outputs=adzuna_results ) with gr.Accordion("āž• Add Custom Job", open=True): c_title = gr.Textbox(label="Job Title*", placeholder="e.g., Senior Software Engineer") c_company = gr.Textbox(label="Company*", placeholder="e.g., Google") c_loc = gr.Textbox(label="Location", placeholder="e.g., Remote, New York") c_url = gr.Textbox(label="Job URL", placeholder="https://...") c_desc = gr.Textbox( label="Job Description*", lines=8, placeholder="Paste the complete job description here..." ) with gr.Row(): add_job_btn = gr.Button("āž• Add Job", variant="primary") load_example_btn = gr.Button("šŸ“ Load Example") add_job_info = gr.Textbox(label="Status", interactive=False) def load_example(): return ( "Senior Software Engineer", "Tech Corp", "Remote", "", "We are looking for a Senior Software Engineer with 5+ years of experience in Python, AWS, and Docker. You will lead technical initiatives and build scalable systems." ) load_example_btn.click( fn=load_example, outputs=[c_title, c_company, c_loc, c_url, c_desc] ) add_job_btn.click( fn=add_custom_job, inputs=[c_title, c_company, c_loc, c_url, c_desc], outputs=[add_job_info, c_title] ) job_select = gr.CheckboxGroup( choices=list_jobs_options(), label="šŸ“‹ Select Jobs to Process" ) refresh_jobs = gr.Button("šŸ”„ Refresh Job List") refresh_jobs.click(lambda: gr.update(choices=list_jobs_options()), outputs=job_select) # Right column - Generation with gr.Column(scale=3): gr.Markdown("## šŸ“„ Document Generation") gen_btn = gr.Button("šŸš€ Generate Documents", variant="primary", size="lg") out_preview = gr.Markdown("Ready to generate documents...") out_time = gr.Number(label="Processing Time (seconds)") # PowerPoint Export Section with gr.Accordion("šŸ“Š Export to PowerPoint CV", open=False, visible=False) as pptx_section: gr.Markdown("### Convert your resume to a professional PowerPoint presentation") with gr.Row(): pptx_job_select = gr.Number( label="Job Index (1, 2, 3...)", value=1, minimum=1, step=1 ) pptx_template = gr.Dropdown( choices=["modern_blue", "corporate_gray", "elegant_green", "warm_red"], value="modern_blue", label="Template Style" ) export_pptx_btn = gr.Button("šŸ“Š Create PowerPoint CV", variant="primary") pptx_status = gr.Textbox(label="Export Status", interactive=False) pptx_file = gr.File(label="Download PowerPoint", visible=False) def handle_pptx_export(job_idx, template): status, file_path = export_to_powerpoint(int(job_idx) - 1, template) if file_path: return status, gr.update(visible=True, value=file_path) return status, gr.update(visible=False) export_pptx_btn.click( fn=handle_pptx_export, inputs=[pptx_job_select, pptx_template], outputs=[pptx_status, pptx_file] ) # Word Document Export Section with gr.Accordion("šŸ“ Export to Word Documents", open=False, visible=False) as word_section: gr.Markdown("### Generate professional Word documents") with gr.Row(): word_job_select = gr.Number( label="Job Index (1, 2, 3...)", value=1, minimum=1, step=1 ) word_template = gr.Dropdown( choices=["modern", "executive", "creative", "minimal", "academic"], value="modern", label="Document Style" ) with gr.Row(): export_word_resume_btn = gr.Button("šŸ“„ Export Resume as Word", variant="primary") export_word_cover_btn = gr.Button("āœ‰ļø Export Cover Letter as Word", variant="primary") word_status = gr.Textbox(label="Export Status", interactive=False) word_files = gr.File(label="Download Word Documents", visible=False, file_count="multiple") def handle_word_export(job_idx, template, doc_type="resume"): try: from services.word_cv import WordCVGenerator generator = WordCVGenerator() candidates = STATE.get("pptx_candidates", []) if not candidates or job_idx > len(candidates): return "āŒ No documents available", gr.update(visible=False) resume, job = candidates[int(job_idx) - 1] files = [] if doc_type == "resume" or doc_type == "both": resume_path = generator.create_resume_document(resume, job, template) if resume_path: files.append(resume_path) if doc_type == "cover" or doc_type == "both": # Get cover letter from results results = STATE.get("results", []) cover_letter = None for r in results: if r.job.id == job.id: cover_letter = r.cover_letter break if cover_letter: cover_path = generator.create_cover_letter_document(cover_letter, job, template) if cover_path: files.append(cover_path) if files: return f"āœ… Created {len(files)} Word document(s)", gr.update(visible=True, value=files) return "āŒ Failed to create documents", gr.update(visible=False) except Exception as e: return f"āŒ Error: {str(e)}", gr.update(visible=False) export_word_resume_btn.click( fn=lambda idx, tmpl: handle_word_export(idx, tmpl, "resume"), inputs=[word_job_select, word_template], outputs=[word_status, word_files] ) export_word_cover_btn.click( fn=lambda idx, tmpl: handle_word_export(idx, tmpl, "cover"), inputs=[word_job_select, word_template], outputs=[word_status, word_files] ) # Excel Tracker Export with gr.Accordion("šŸ“Š Export Excel Tracker", open=False, visible=False) as excel_section: gr.Markdown("### Create comprehensive job application tracker") export_excel_btn = gr.Button("šŸ“ˆ Generate Excel Tracker", variant="primary") excel_status = gr.Textbox(label="Export Status", interactive=False) excel_file = gr.File(label="Download Excel Tracker", visible=False) def handle_excel_export(): try: from services.excel_tracker import ExcelTracker tracker = ExcelTracker() results = STATE.get("results", []) if not results: return "āŒ No results to track", gr.update(visible=False) tracker_path = tracker.create_tracker(results) if tracker_path: return f"āœ… Excel tracker created with {len(results)} applications", gr.update(visible=True, value=tracker_path) return "āŒ Failed to create tracker", gr.update(visible=False) except Exception as e: return f"āŒ Error: {str(e)}", gr.update(visible=False) export_excel_btn.click( fn=handle_excel_export, outputs=[excel_status, excel_file] ) gen_btn.click(fn=generate, inputs=[job_select], outputs=[out_preview, out_time, pptx_section, word_section, excel_section]) gr.Markdown("## šŸ”„ Regenerate Individual Job") with gr.Row(): job_single = gr.Dropdown(choices=list_jobs_options(), label="Select Job") refresh_single = gr.Button("šŸ”„") refresh_single.click(lambda: gr.update(choices=list_jobs_options()), outputs=job_single) regen_btn = gr.Button("šŸ”„ Regenerate Selected Job") regen_preview = gr.Markdown() regen_time = gr.Number(label="Regeneration Time (seconds)") regen_btn.click(fn=regenerate_one, inputs=[job_single], outputs=[regen_preview, regen_time]) gr.Markdown("## šŸ“Š Results Summary") update_summary = gr.Button("šŸ“Š Update Summary") table = gr.Dataframe(value=summary_table(), interactive=False) update_summary.click(fn=summary_table, outputs=table) # Knowledge Graph Section if 'kg_service' in globals() and kg_service and kg_service.is_enabled(): with gr.Accordion("šŸ“Š Knowledge Graph & Application Tracking", open=False): gr.Markdown(""" ### 🧠 Application Knowledge Graph Track your job applications, skills, and patterns over time. """) with gr.Row(): with gr.Column(scale=1): kg_user_name = gr.Textbox( label="Your Name", value=STATE.get("user_name", "User"), placeholder="Enter your name for tracking" ) def update_user_name(name): STATE["user_name"] = name return f"Tracking as: {name}" kg_user_status = gr.Markdown("Enter your name to start tracking") kg_user_name.change(update_user_name, inputs=[kg_user_name], outputs=[kg_user_status]) gr.Markdown("### šŸ“ˆ Quick Actions") show_history_btn = gr.Button("šŸ“œ Show My History", variant="primary", size="sm") show_trends_btn = gr.Button("šŸ“Š Show Skill Trends", variant="secondary", size="sm") show_insights_btn = gr.Button("šŸ’” Company Insights", variant="secondary", size="sm") with gr.Column(scale=2): kg_output = gr.JSON(label="Knowledge Graph Data", visible=True) def show_user_history(user_name): if kg_service and kg_service.is_enabled(): history = kg_service.get_user_history(user_name) return history return {"error": "Knowledge graph not available"} def show_skill_trends(): if kg_service and kg_service.is_enabled(): trends = kg_service.get_skill_trends() return trends return {"error": "Knowledge graph not available"} def show_company_insights(): if kg_service and kg_service.is_enabled(): # Get insights for all companies user applied to history = kg_service.get_user_history(STATE.get("user_name", "User")) companies = set() for app in history.get("applications", []): if isinstance(app, dict) and "properties" in app: company = app["properties"].get("company") if company: companies.add(company) insights = {} for company in list(companies)[:5]: # Limit to 5 companies insights[company] = kg_service.get_company_insights(company) return insights if insights else {"message": "No companies found in history"} return {"error": "Knowledge graph not available"} show_history_btn.click( show_user_history, inputs=[kg_user_name], outputs=[kg_output] ) show_trends_btn.click( show_skill_trends, inputs=[], outputs=[kg_output] ) show_insights_btn.click( show_company_insights, inputs=[], outputs=[kg_output] ) gr.Markdown(""" ### šŸ“Š Features: - **Application History**: Track all your job applications - **Skill Analysis**: See which skills are in demand - **Company Insights**: Learn about companies you've applied to - **Pattern Recognition**: Identify successful application patterns - All data stored locally in SQLite - no external dependencies! """) # Enhanced Extraction with LangExtract if LANGEXTRACT_AVAILABLE: with gr.Accordion("šŸ” Enhanced Job Analysis (LangExtract)", open=False): gr.Markdown("### AI-Powered Job & Resume Analysis") with gr.Tabs(): # Job Analysis Tab with gr.TabItem("šŸ“‹ Job Analysis"): job_analysis_text = gr.Textbox( label="Paste Job Description", lines=10, placeholder="Paste the full job description here for analysis..." ) analyze_job_btn = gr.Button("šŸ” Analyze Job", variant="primary") job_analysis_output = gr.Markdown() def analyze_job(text): if not text: return "Please paste a job description" job = extract_job_info(text) keywords = extract_ats_keywords(text) output = create_extraction_summary(job) output += "\n\n### šŸŽÆ ATS Keywords\n" output += f"**High Priority:** {', '.join(keywords.high_priority[:10]) or 'None'}\n" output += f"**Medium Priority:** {', '.join(keywords.medium_priority[:10]) or 'None'}\n" return output analyze_job_btn.click( fn=analyze_job, inputs=job_analysis_text, outputs=job_analysis_output ) # ATS Optimization Tab with gr.TabItem("šŸŽÆ ATS Optimizer"): gr.Markdown("Compare your resume against job requirements") with gr.Row(): ats_resume = gr.Textbox( label="Your Resume", lines=10, placeholder="Paste your resume text..." ) ats_job = gr.Textbox( label="Job Description", lines=10, placeholder="Paste the job description..." ) optimize_btn = gr.Button("šŸŽÆ Optimize for ATS", variant="primary") ats_report = gr.Markdown() def run_ats_optimization(resume, job): if not resume or not job: return "Please provide both resume and job description" result = optimize_for_ats(resume, job) return create_ats_report(result) optimize_btn.click( fn=run_ats_optimization, inputs=[ats_resume, ats_job], outputs=ats_report ) # Bulk Analysis Tab with gr.TabItem("šŸ“Š Bulk Analysis"): gr.Markdown("Analyze multiple jobs at once") bulk_jobs_text = gr.Textbox( label="Paste Multiple Job Descriptions (separated by ---)", lines=15, placeholder="Job 1...\n---\nJob 2...\n---\nJob 3..." ) bulk_analyze_btn = gr.Button("šŸ“Š Analyze All Jobs", variant="primary") bulk_output = gr.Markdown() def analyze_bulk_jobs(text): if not text: return "Please paste job descriptions" jobs = text.split("---") results = [] for i, job_text in enumerate(jobs, 1): if job_text.strip(): job = extract_job_info(job_text) results.append(f"### Job {i}: {job.title or 'Unknown'}") results.append(f"**Company:** {job.company or 'Unknown'}") results.append(f"**Skills:** {', '.join(job.skills[:5]) or 'None detected'}") results.append("") return "\n".join(results) if results else "No valid jobs found" bulk_analyze_btn.click( fn=analyze_bulk_jobs, inputs=bulk_jobs_text, outputs=bulk_output ) # Advanced Features Results if ADVANCED_FEATURES: with gr.Accordion("šŸŽÆ Advanced Analytics", open=False): with gr.Tabs(): # Execution Timeline Tab with gr.TabItem("⚔ Execution Timeline"): show_timeline_btn = gr.Button("šŸ“Š Generate Timeline") timeline_image = gr.Image(label="Parallel Execution Timeline", visible=False) def show_execution_timeline(): if parallel_executor and hasattr(parallel_executor, 'execution_history'): try: import matplotlib.pyplot as plt fig = parallel_executor.plot_timeline() timeline_path = "execution_timeline.png" fig.savefig(timeline_path) plt.close() return gr.update(visible=True, value=timeline_path) except Exception as e: logger.error(f"Timeline generation error: {e}") return gr.update(visible=False) show_timeline_btn.click(fn=show_execution_timeline, outputs=timeline_image) # Application History Tab with gr.TabItem("šŸ“œ Application History"): history_btn = gr.Button("šŸ“‹ Show History") history_text = gr.Textbox(label="Application Timeline", lines=10, interactive=False) def show_application_history(): if temporal_tracker: try: active = temporal_tracker.get_active_applications() patterns = temporal_tracker.analyze_patterns() history = "šŸ“Š Application Patterns:\n" history += f"• Total applications: {patterns.get('total_applications', 0)}\n" history += f"• This week: {patterns.get('applications_this_week', 0)}\n" history += f"• Response rate: {patterns.get('response_rate', '0%')}\n\n" history += "šŸ“‹ Active Applications:\n" for app in active[:5]: history += f"• {app['company']} - {app['position']} ({app['status']})\n" return history except Exception as e: return f"Error retrieving history: {e}" return "Temporal tracking not available" history_btn.click(fn=show_application_history, outputs=history_text) # Observability Tab with gr.TabItem("šŸ” Agent Tracing"): trace_btn = gr.Button("šŸ“ Show Agent Trace") trace_text = gr.Textbox(label="Agent Interaction Flow", lines=15, interactive=False) def show_agent_trace(): if agent_tracer: try: import io from contextlib import redirect_stdout f = io.StringIO() with redirect_stdout(f): agent_tracer.print_interaction_flow() trace_output = f.getvalue() # Also get metrics metrics = agent_tracer.get_metrics() trace_output += f"\n\nšŸ“Š Metrics:\n" trace_output += f"• Total events: {metrics['total_events']}\n" trace_output += f"• Agents involved: {metrics['agents_involved']}\n" trace_output += f"• Tool calls: {metrics['tool_calls']}\n" trace_output += f"• Errors: {metrics['errors']}\n" return trace_output except Exception as e: return f"Error generating trace: {e}" return "Observability not available" trace_btn.click(fn=show_agent_trace, outputs=trace_text) # Context Engineering Tab with gr.TabItem("🧠 Context Insights"): context_btn = gr.Button("šŸ“Š Show Context Stats") context_text = gr.Textbox(label="Context Engineering Insights", lines=10, interactive=False) def show_context_insights(): if context_engineer: try: # Get flywheel recommendations sample_query = "Generate resume for software engineer" recommended = context_engineer.flywheel.get_recommended_sources(sample_query) insights = "🧠 Context Engineering Insights:\n\n" insights += f"šŸ“Š Flywheel Learning:\n" insights += f"• Successful contexts: {len(context_engineer.flywheel.successful_contexts)}\n" insights += f"• Pattern cache size: {len(context_engineer.flywheel.pattern_cache)}\n\n" if recommended: insights += f"šŸ’” Recommended sources for '{sample_query}':\n" for source in recommended: insights += f" • {source}\n" # Memory hierarchy stats insights += f"\nšŸ“š Memory Hierarchy:\n" insights += f"• L1 Cache: {len(context_engineer.memory.l1_cache)} items\n" insights += f"• L2 Memory: {len(context_engineer.memory.l2_memory)} items\n" insights += f"• L3 Storage: {len(context_engineer.memory.l3_index)} indexed\n" return insights except Exception as e: return f"Error getting insights: {e}" return "Context engineering not available" context_btn.click(fn=show_context_insights, outputs=context_text) # Configuration status config_status = [] # LinkedIn OAuth if not MOCK_MODE and LINKEDIN_CLIENT_ID: config_status.append(f"āœ… LinkedIn OAuth ({LINKEDIN_CLIENT_ID[:8]}...)") # Adzuna if ADZUNA_APP_ID and ADZUNA_APP_KEY: config_status.append(f"āœ… Adzuna API ({ADZUNA_APP_ID})") # Gemini if os.getenv("GEMINI_API_KEY"): config_status.append("āœ… Gemini AI") # Tavily if os.getenv("TAVILY_API_KEY"): config_status.append("āœ… Tavily Research") if not config_status: config_status.append("ā„¹ļø Add API keys to .env for full functionality") gr.Markdown(f""" --- ### šŸ”§ Active Services: {' | '.join(config_status)} ### šŸ’” Quick Start: 1. **Sign in** with LinkedIn (if configured) 2. **Search** for jobs on Adzuna or add custom jobs 3. **Configure** advanced features (if available) 4. **Select** jobs and click "Generate Documents" 5. **Review** AI-generated resume and cover letter 6. **Export** to Word/PowerPoint/Excel 7. **Analyze** with advanced analytics (if enabled) ### šŸ“Š Current Capabilities: - **Job Sources**: { 'Adzuna (5000/month)' if ADZUNA_APP_ID else 'Mock Data' } - **Authentication**: { 'LinkedIn OAuth' if not MOCK_MODE and LINKEDIN_CLIENT_ID else 'Mock Mode' } - **AI Generation**: { 'Gemini' if os.getenv("GEMINI_API_KEY") else 'Template Mode' } - **Advanced AI**: { 'Parallel + Temporal + Observability + Context' if ADVANCED_FEATURES else 'Not Available' } ### šŸš€ Performance Enhancements: - **Parallel Processing**: 3-5x faster document generation - **Temporal Tracking**: Complete application history with versioning - **Observability**: Full agent tracing and debugging - **Context Engineering**: Continuous learning and optimization - **Memory Hierarchy**: L1/L2/L3 caching for instant retrieval - **Compression**: Handle 1M+ tokens with intelligent scaling """) return demo if __name__ == "__main__": print("=" * 60) print("Job Application Assistant - Gradio Interface") print("=" * 60) # Check configuration if USE_SYSTEM_AGENTS: print("āœ… Full system mode - all features available") else: print("āš ļø Standalone mode - basic features only") print(" Place this file in the project directory for full features") if ADVANCED_FEATURES: print("šŸš€ Advanced AI Agent Features Loaded:") print(" ⚔ Parallel Processing (3-5x faster)") print(" šŸ“Š Temporal Tracking (complete history)") print(" šŸ” Observability (full tracing)") print(" 🧠 Context Engineering (continuous learning)") print(" šŸ“ˆ Context Scaling (1M+ tokens)") if os.getenv("GEMINI_API_KEY"): print("āœ… Gemini API configured") else: print("ā„¹ļø No Gemini API key - using fallback generation") if os.getenv("TAVILY_API_KEY"): print("āœ… Tavily API configured for web research") if ADZUNA_APP_ID: print("āœ… Adzuna API configured for job search") if LINKEDIN_CLIENT_ID: print("āœ… LinkedIn OAuth configured") print("\nStarting Gradio app...") print("=" * 60) try: app = build_app() app.launch( server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860)), share=False, show_error=True ) except Exception as e: logger.error(f"Failed to start app: {e}") print(f"\nāŒ Error: {e}") print("\nTroubleshooting:") print("1. Install required packages: pip install gradio pandas python-dotenv") print("2. Check your .env file exists and is valid") print("3. Ensure port 7860 is not in use") raise