from __future__ import annotations import os from typing import Any, Dict, List, Optional, Tuple from models.schemas import UserProfile, WorkExperience, Education, JobPosting class MCPLinkedInClient: """Optional MCP client for a LinkedIn MCP server. Configuration via env: - MCP_LINKEDIN_SERVER_URL: SSE endpoint (e.g., http://localhost:3333/sse) - MCP_LINKEDIN_TOOL_PROFILE: tool name to fetch profile (default: linkedin.get_profile) - MCP_LINKEDIN_TOOL_SAVED_JOBS: tool name to fetch saved jobs (default: linkedin.get_saved_jobs) """ def __init__(self) -> None: self.url = os.getenv("MCP_LINKEDIN_SERVER_URL") self.tool_profile = os.getenv("MCP_LINKEDIN_TOOL_PROFILE", "linkedin.get_profile") self.tool_saved_jobs = os.getenv("MCP_LINKEDIN_TOOL_SAVED_JOBS", "linkedin.get_saved_jobs") self.enabled = bool(self.url) self._session = None async def _ensure_session(self): if not self.enabled: return None if self._session is not None: return self._session try: # Lazy import to avoid hard dependency if not used from mcp.client.sse import connect_sse session = await connect_sse(self.url) self._session = session return session except Exception: self.enabled = False return None async def _call_tool(self, tool: str, args: Dict[str, Any] | None = None) -> Any: session = await self._ensure_session() if not session: return None try: # Some MCP clients use session.call_tool(name, arguments) result = await session.call_tool(tool, args or {}) # type: ignore # Expect a JSON-serializable result return result except Exception: return None @staticmethod def _map_profile(data: Dict[str, Any]) -> Optional[UserProfile]: try: exps_raw = data.get("experiences", []) or [] exps = [ WorkExperience( title=e.get("title", ""), company=e.get("company", ""), start_date=e.get("start_date"), end_date=e.get("end_date"), location=e.get("location"), achievements=e.get("achievements") or [], technologies=e.get("technologies") or [], ) for e in exps_raw ] edus_raw = data.get("education", []) or [] edus = [ Education( school=ed.get("school", ""), degree=ed.get("degree"), field_of_study=ed.get("field_of_study"), start_date=ed.get("start_date"), end_date=ed.get("end_date"), ) for ed in edus_raw ] return UserProfile( full_name=data.get("full_name", data.get("name", "")), headline=data.get("headline"), summary=data.get("summary"), email=data.get("email"), phone=data.get("phone"), location=data.get("location"), skills=data.get("skills") or [], experiences=exps, education=edus, links=data.get("links") or {}, ) except Exception: return None @staticmethod def _map_jobs(items: List[Dict[str, Any]]) -> List[JobPosting]: jobs: List[JobPosting] = [] for it in items or []: try: jobs.append( JobPosting( id=str(it.get("id") or it.get("job_id") or "job_mcp"), title=it.get("title", ""), company=it.get("company", ""), location=it.get("location"), description=it.get("description") or "", url=it.get("url"), source=it.get("source") or "mcp", saved_by_user=bool(it.get("saved_by_user", True)), seniority=it.get("seniority"), employment_type=it.get("employment_type"), ) ) except Exception: continue return jobs async def get_profile(self) -> Optional[UserProfile]: if not self.enabled: return None res = await self._call_tool(self.tool_profile, {}) if not res: return None # Accept dict or embedded structure data = res if isinstance(res, dict) else getattr(res, "data", None) if not isinstance(data, dict): return None return self._map_profile(data) async def get_saved_jobs(self) -> List[JobPosting]: if not self.enabled: return [] res = await self._call_tool(self.tool_saved_jobs, {}) items = None if isinstance(res, list): items = res elif isinstance(res, dict): items = res.get("jobs") or res.get("data") or [] return self._map_jobs(items or []) mcp_linkedin_client = MCPLinkedInClient()