Job-Application-Assistant / services /mcp_linkedin_client.py
Noo88ear's picture
πŸš€ Initial deployment of Multi-Agent Job Application Assistant
7498f2c
from __future__ import annotations
import os
from typing import Any, Dict, List, Optional, Tuple
from models.schemas import UserProfile, WorkExperience, Education, JobPosting
class MCPLinkedInClient:
"""Optional MCP client for a LinkedIn MCP server.
Configuration via env:
- MCP_LINKEDIN_SERVER_URL: SSE endpoint (e.g., http://localhost:3333/sse)
- MCP_LINKEDIN_TOOL_PROFILE: tool name to fetch profile (default: linkedin.get_profile)
- MCP_LINKEDIN_TOOL_SAVED_JOBS: tool name to fetch saved jobs (default: linkedin.get_saved_jobs)
"""
def __init__(self) -> None:
self.url = os.getenv("MCP_LINKEDIN_SERVER_URL")
self.tool_profile = os.getenv("MCP_LINKEDIN_TOOL_PROFILE", "linkedin.get_profile")
self.tool_saved_jobs = os.getenv("MCP_LINKEDIN_TOOL_SAVED_JOBS", "linkedin.get_saved_jobs")
self.enabled = bool(self.url)
self._session = None
async def _ensure_session(self):
if not self.enabled:
return None
if self._session is not None:
return self._session
try:
# Lazy import to avoid hard dependency if not used
from mcp.client.sse import connect_sse
session = await connect_sse(self.url)
self._session = session
return session
except Exception:
self.enabled = False
return None
async def _call_tool(self, tool: str, args: Dict[str, Any] | None = None) -> Any:
session = await self._ensure_session()
if not session:
return None
try:
# Some MCP clients use session.call_tool(name, arguments)
result = await session.call_tool(tool, args or {}) # type: ignore
# Expect a JSON-serializable result
return result
except Exception:
return None
@staticmethod
def _map_profile(data: Dict[str, Any]) -> Optional[UserProfile]:
try:
exps_raw = data.get("experiences", []) or []
exps = [
WorkExperience(
title=e.get("title", ""),
company=e.get("company", ""),
start_date=e.get("start_date"),
end_date=e.get("end_date"),
location=e.get("location"),
achievements=e.get("achievements") or [],
technologies=e.get("technologies") or [],
)
for e in exps_raw
]
edus_raw = data.get("education", []) or []
edus = [
Education(
school=ed.get("school", ""),
degree=ed.get("degree"),
field_of_study=ed.get("field_of_study"),
start_date=ed.get("start_date"),
end_date=ed.get("end_date"),
)
for ed in edus_raw
]
return UserProfile(
full_name=data.get("full_name", data.get("name", "")),
headline=data.get("headline"),
summary=data.get("summary"),
email=data.get("email"),
phone=data.get("phone"),
location=data.get("location"),
skills=data.get("skills") or [],
experiences=exps,
education=edus,
links=data.get("links") or {},
)
except Exception:
return None
@staticmethod
def _map_jobs(items: List[Dict[str, Any]]) -> List[JobPosting]:
jobs: List[JobPosting] = []
for it in items or []:
try:
jobs.append(
JobPosting(
id=str(it.get("id") or it.get("job_id") or "job_mcp"),
title=it.get("title", ""),
company=it.get("company", ""),
location=it.get("location"),
description=it.get("description") or "",
url=it.get("url"),
source=it.get("source") or "mcp",
saved_by_user=bool(it.get("saved_by_user", True)),
seniority=it.get("seniority"),
employment_type=it.get("employment_type"),
)
)
except Exception:
continue
return jobs
async def get_profile(self) -> Optional[UserProfile]:
if not self.enabled:
return None
res = await self._call_tool(self.tool_profile, {})
if not res:
return None
# Accept dict or embedded structure
data = res if isinstance(res, dict) else getattr(res, "data", None)
if not isinstance(data, dict):
return None
return self._map_profile(data)
async def get_saved_jobs(self) -> List[JobPosting]:
if not self.enabled:
return []
res = await self._call_tool(self.tool_saved_jobs, {})
items = None
if isinstance(res, list):
items = res
elif isinstance(res, dict):
items = res.get("jobs") or res.get("data") or []
return self._map_jobs(items or [])
mcp_linkedin_client = MCPLinkedInClient()