Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import os | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from models.schemas import UserProfile, WorkExperience, Education, JobPosting | |
| class MCPLinkedInClient: | |
| """Optional MCP client for a LinkedIn MCP server. | |
| Configuration via env: | |
| - MCP_LINKEDIN_SERVER_URL: SSE endpoint (e.g., http://localhost:3333/sse) | |
| - MCP_LINKEDIN_TOOL_PROFILE: tool name to fetch profile (default: linkedin.get_profile) | |
| - MCP_LINKEDIN_TOOL_SAVED_JOBS: tool name to fetch saved jobs (default: linkedin.get_saved_jobs) | |
| """ | |
| def __init__(self) -> None: | |
| self.url = os.getenv("MCP_LINKEDIN_SERVER_URL") | |
| self.tool_profile = os.getenv("MCP_LINKEDIN_TOOL_PROFILE", "linkedin.get_profile") | |
| self.tool_saved_jobs = os.getenv("MCP_LINKEDIN_TOOL_SAVED_JOBS", "linkedin.get_saved_jobs") | |
| self.enabled = bool(self.url) | |
| self._session = None | |
| async def _ensure_session(self): | |
| if not self.enabled: | |
| return None | |
| if self._session is not None: | |
| return self._session | |
| try: | |
| # Lazy import to avoid hard dependency if not used | |
| from mcp.client.sse import connect_sse | |
| session = await connect_sse(self.url) | |
| self._session = session | |
| return session | |
| except Exception: | |
| self.enabled = False | |
| return None | |
| async def _call_tool(self, tool: str, args: Dict[str, Any] | None = None) -> Any: | |
| session = await self._ensure_session() | |
| if not session: | |
| return None | |
| try: | |
| # Some MCP clients use session.call_tool(name, arguments) | |
| result = await session.call_tool(tool, args or {}) # type: ignore | |
| # Expect a JSON-serializable result | |
| return result | |
| except Exception: | |
| return None | |
| def _map_profile(data: Dict[str, Any]) -> Optional[UserProfile]: | |
| try: | |
| exps_raw = data.get("experiences", []) or [] | |
| exps = [ | |
| WorkExperience( | |
| title=e.get("title", ""), | |
| company=e.get("company", ""), | |
| start_date=e.get("start_date"), | |
| end_date=e.get("end_date"), | |
| location=e.get("location"), | |
| achievements=e.get("achievements") or [], | |
| technologies=e.get("technologies") or [], | |
| ) | |
| for e in exps_raw | |
| ] | |
| edus_raw = data.get("education", []) or [] | |
| edus = [ | |
| Education( | |
| school=ed.get("school", ""), | |
| degree=ed.get("degree"), | |
| field_of_study=ed.get("field_of_study"), | |
| start_date=ed.get("start_date"), | |
| end_date=ed.get("end_date"), | |
| ) | |
| for ed in edus_raw | |
| ] | |
| return UserProfile( | |
| full_name=data.get("full_name", data.get("name", "")), | |
| headline=data.get("headline"), | |
| summary=data.get("summary"), | |
| email=data.get("email"), | |
| phone=data.get("phone"), | |
| location=data.get("location"), | |
| skills=data.get("skills") or [], | |
| experiences=exps, | |
| education=edus, | |
| links=data.get("links") or {}, | |
| ) | |
| except Exception: | |
| return None | |
| def _map_jobs(items: List[Dict[str, Any]]) -> List[JobPosting]: | |
| jobs: List[JobPosting] = [] | |
| for it in items or []: | |
| try: | |
| jobs.append( | |
| JobPosting( | |
| id=str(it.get("id") or it.get("job_id") or "job_mcp"), | |
| title=it.get("title", ""), | |
| company=it.get("company", ""), | |
| location=it.get("location"), | |
| description=it.get("description") or "", | |
| url=it.get("url"), | |
| source=it.get("source") or "mcp", | |
| saved_by_user=bool(it.get("saved_by_user", True)), | |
| seniority=it.get("seniority"), | |
| employment_type=it.get("employment_type"), | |
| ) | |
| ) | |
| except Exception: | |
| continue | |
| return jobs | |
| async def get_profile(self) -> Optional[UserProfile]: | |
| if not self.enabled: | |
| return None | |
| res = await self._call_tool(self.tool_profile, {}) | |
| if not res: | |
| return None | |
| # Accept dict or embedded structure | |
| data = res if isinstance(res, dict) else getattr(res, "data", None) | |
| if not isinstance(data, dict): | |
| return None | |
| return self._map_profile(data) | |
| async def get_saved_jobs(self) -> List[JobPosting]: | |
| if not self.enabled: | |
| return [] | |
| res = await self._call_tool(self.tool_saved_jobs, {}) | |
| items = None | |
| if isinstance(res, list): | |
| items = res | |
| elif isinstance(res, dict): | |
| items = res.get("jobs") or res.get("data") or [] | |
| return self._map_jobs(items or []) | |
| mcp_linkedin_client = MCPLinkedInClient() |