Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| from typing import List, Optional, Dict | |
| import logging | |
| from models.schemas import JobPosting, UserProfile | |
| from services.linkedin_client import LinkedInClient | |
| from services.mcp_linkedin_client import mcp_linkedin_client | |
| from utils.salary import estimate_salary_range | |
| logger = logging.getLogger(__name__) | |
| class LinkedInManagerAgent: | |
| def __init__(self) -> None: | |
| self.client = LinkedInClient() | |
| self.user_profile: Optional[UserProfile] = None | |
| def get_login_url(self) -> str: | |
| return self.client.get_authorize_url() | |
| def handle_oauth_callback(self, code: str, state: Optional[str] = None) -> bool: | |
| """Handle OAuth callback with state validation.""" | |
| ok = self.client.exchange_code_for_token(code, state) | |
| if ok: | |
| self.user_profile = self.client.get_profile() | |
| return ok | |
| def get_profile(self) -> UserProfile: | |
| if not self.user_profile: | |
| # Try MCP first if available | |
| if mcp_linkedin_client.enabled: | |
| try: | |
| import asyncio | |
| prof = asyncio.run(mcp_linkedin_client.get_profile()) | |
| if prof: | |
| self.user_profile = prof | |
| except Exception: | |
| self.user_profile = None | |
| if not self.user_profile: | |
| self.user_profile = self.client.get_profile() | |
| return self.user_profile | |
| def set_profile(self, profile: UserProfile) -> None: | |
| """Update the stored profile with new data.""" | |
| self.user_profile = profile | |
| logger.info(f"Profile updated: {profile.full_name}") | |
| def update_profile_fields(self, **kwargs) -> None: | |
| """Update specific profile fields.""" | |
| if not self.user_profile: | |
| self.user_profile = UserProfile() | |
| for key, value in kwargs.items(): | |
| if hasattr(self.user_profile, key): | |
| setattr(self.user_profile, key, value) | |
| logger.debug(f"Updated profile.{key}") | |
| def get_saved_jobs(self) -> List[JobPosting]: | |
| all_jobs = [] | |
| # Try MCP client first | |
| if mcp_linkedin_client.enabled: | |
| try: | |
| import asyncio | |
| jobs = asyncio.run(mcp_linkedin_client.get_saved_jobs()) | |
| if jobs: | |
| all_jobs.extend(jobs) | |
| except Exception: | |
| pass | |
| # Try LinkedIn API | |
| linkedin_jobs = self.client.get_saved_jobs() | |
| all_jobs.extend(linkedin_jobs) | |
| # If in mock mode or no real LinkedIn jobs, supplement with job aggregators | |
| if self.client.mock_mode or len(all_jobs) < 5: | |
| # Try JobSpy MCP Server first (most comprehensive) | |
| try: | |
| from services.jobspy_client import JobSpyClient | |
| jobspy = JobSpyClient() | |
| jobspy_jobs = jobspy.search_jobs_sync( | |
| search_term="software engineer", | |
| location="Remote", | |
| site_names="indeed,linkedin,glassdoor", | |
| results_wanted=15 | |
| ) | |
| all_jobs.extend(jobspy_jobs) | |
| except Exception as e: | |
| import logging | |
| logging.getLogger(__name__).info(f"JobSpy not available: {e}") | |
| # Fall back to basic job aggregator | |
| if len(all_jobs) < 5: | |
| try: | |
| from services.job_aggregator import JobAggregator | |
| aggregator = JobAggregator() | |
| aggregated_jobs = aggregator.search_all("software engineer", "Remote") | |
| all_jobs.extend(aggregated_jobs[:10]) | |
| except Exception as e: | |
| import logging | |
| logging.getLogger(__name__).info(f"Job aggregator not available: {e}") | |
| # Deduplicate jobs | |
| seen = set() | |
| unique_jobs = [] | |
| for job in all_jobs: | |
| key = (job.title.lower(), job.company.lower()) | |
| if key not in seen: | |
| seen.add(key) | |
| unique_jobs.append(job) | |
| return unique_jobs | |
| def get_job(self, job_id: str) -> Optional[JobPosting]: | |
| return self.client.get_job_details(job_id) | |
| def estimate_salary(self, job: JobPosting) -> Dict[str, Dict[str, int]]: | |
| profile = self.get_profile() | |
| industry = None | |
| return estimate_salary_range(job.title, job.location, industry, profile.skills) |