from __future__ import annotations from typing import List, Optional, Dict import logging from models.schemas import JobPosting, UserProfile from services.linkedin_client import LinkedInClient from services.mcp_linkedin_client import mcp_linkedin_client from utils.salary import estimate_salary_range logger = logging.getLogger(__name__) class LinkedInManagerAgent: def __init__(self) -> None: self.client = LinkedInClient() self.user_profile: Optional[UserProfile] = None def get_login_url(self) -> str: return self.client.get_authorize_url() def handle_oauth_callback(self, code: str, state: Optional[str] = None) -> bool: """Handle OAuth callback with state validation.""" ok = self.client.exchange_code_for_token(code, state) if ok: self.user_profile = self.client.get_profile() return ok def get_profile(self) -> UserProfile: if not self.user_profile: # Try MCP first if available if mcp_linkedin_client.enabled: try: import asyncio prof = asyncio.run(mcp_linkedin_client.get_profile()) if prof: self.user_profile = prof except Exception: self.user_profile = None if not self.user_profile: self.user_profile = self.client.get_profile() return self.user_profile def set_profile(self, profile: UserProfile) -> None: """Update the stored profile with new data.""" self.user_profile = profile logger.info(f"Profile updated: {profile.full_name}") def update_profile_fields(self, **kwargs) -> None: """Update specific profile fields.""" if not self.user_profile: self.user_profile = UserProfile() for key, value in kwargs.items(): if hasattr(self.user_profile, key): setattr(self.user_profile, key, value) logger.debug(f"Updated profile.{key}") def get_saved_jobs(self) -> List[JobPosting]: all_jobs = [] # Try MCP client first if mcp_linkedin_client.enabled: try: import asyncio jobs = asyncio.run(mcp_linkedin_client.get_saved_jobs()) if jobs: all_jobs.extend(jobs) except Exception: pass # Try LinkedIn API linkedin_jobs = self.client.get_saved_jobs() all_jobs.extend(linkedin_jobs) # If in mock mode or no real LinkedIn jobs, supplement with job aggregators if self.client.mock_mode or len(all_jobs) < 5: # Try JobSpy MCP Server first (most comprehensive) try: from services.jobspy_client import JobSpyClient jobspy = JobSpyClient() jobspy_jobs = jobspy.search_jobs_sync( search_term="software engineer", location="Remote", site_names="indeed,linkedin,glassdoor", results_wanted=15 ) all_jobs.extend(jobspy_jobs) except Exception as e: import logging logging.getLogger(__name__).info(f"JobSpy not available: {e}") # Fall back to basic job aggregator if len(all_jobs) < 5: try: from services.job_aggregator import JobAggregator aggregator = JobAggregator() aggregated_jobs = aggregator.search_all("software engineer", "Remote") all_jobs.extend(aggregated_jobs[:10]) except Exception as e: import logging logging.getLogger(__name__).info(f"Job aggregator not available: {e}") # Deduplicate jobs seen = set() unique_jobs = [] for job in all_jobs: key = (job.title.lower(), job.company.lower()) if key not in seen: seen.add(key) unique_jobs.append(job) return unique_jobs def get_job(self, job_id: str) -> Optional[JobPosting]: return self.client.get_job_details(job_id) def estimate_salary(self, job: JobPosting) -> Dict[str, Dict[str, int]]: profile = self.get_profile() industry = None return estimate_salary_range(job.title, job.location, industry, profile.skills)