import os import uuid from datetime import datetime, timezone from fastmcp import FastMCP from typing import List, Dict, Any, Optional, Tuple import httpx import pandas as pd from dateutil import parser as dateparser import feedparser from youtube_transcript_api import ( YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, VideoUnavailable, ) import re # ---------------------------- # App & HTTP client # ---------------------------- mcp = FastMCP("CampusTools") HTTP_TIMEOUT = 20.0 client = httpx.AsyncClient( timeout=HTTP_TIMEOUT, headers={"User-Agent": "CampusTools/1.0"} ) def receipt() -> Dict[str, Any]: return { "tool_used": True, "server_time": datetime.now(timezone.utc).isoformat(), "request_id": str(uuid.uuid4()), } # ---------------------------- # CSVs pre-load (fail-soft) # ---------------------------- BASE_DIR = os.path.dirname(os.path.abspath(__file__)) COURSES_CSV = os.path.join(BASE_DIR, "campus_courses.csv") TIMETABLE_CSV = os.path.join(BASE_DIR, "timetable.csv") _courses_df: Optional[pd.DataFrame] = None _timetable_df: Optional[pd.DataFrame] = None def _load_csvs(): global _courses_df, _timetable_df try: _courses_df = pd.read_csv(COURSES_CSV) except Exception: _courses_df = None try: _timetable_df = pd.read_csv(TIMETABLE_CSV) except Exception: _timetable_df = None # Load CSVs at startup _load_csvs() # ---------------------------- # Utilities # ---------------------------- def _norm_country(code: str) -> str: return (code or "").strip().upper() def _parse_hhmm(s: str) -> Tuple[int, int]: hh, mm = s.split(":") return int(hh), int(mm) def _overlap(a_start: str, a_end: str, b_start: str, b_end: str) -> bool: a0 = _parse_hhmm(a_start) a1 = _parse_hhmm(a_end) b0 = _parse_hhmm(b_start) b1 = _parse_hhmm(b_end) return max(a0, b0) < min(a1, b1) # strict overlap # Accept bare IDs or URLs _YT_ID_RX = re.compile( r"""(?ix) (?:https?://(?:www\.)?youtu(?:\.be|be\.com)/(?:watch\?v=|embed/|shorts/)?([A-Za-z0-9_-]{11})) |(^([A-Za-z0-9_-]{11})$) """ ) def _extract_video_id(s: str) -> Optional[str]: if not s: return None m = _YT_ID_RX.search(s.strip()) if not m: return None return (m.group(1) or m.group(3) or "").strip() # ---------------------------- async def _geocode_city(city: str) -> Optional[Tuple[float, float]]: url = "https://geocoding-api.open-meteo.com/v1/search" r = await client.get(url, params={"name": city, "count": 1}) r.raise_for_status() data = r.json() if data.get("results"): lat = data["results"][0]["latitude"] lon = data["results"][0]["longitude"] return float(lat), float(lon) return None # ---------------------------- # 2) Weather now — Open-Meteo # ---------------------------- @mcp.tool async def get_weather(city: str) -> Dict[str, Any]: """ Current weather by city name. Returns: {temperature_c, precipitation, wind_kph, note} """ geo = await _geocode_city(city) if not geo: return {"error": f"City not found: {city}", "_receipt": receipt()} lat, lon = geo url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": lat, "longitude": lon, "current": "temperature_2m,precipitation,wind_speed_10m", } r = await client.get(url, params=params) r.raise_for_status() cur = (r.json() or {}).get("current", {}) temp = cur.get("temperature_2m") precip = cur.get("precipitation") wind = cur.get("wind_speed_10m") note = "Carry an umbrella." if (precip or 0) > 0 else "No precipitation reported." return { "city": city, "temperature_c": temp, "precipitation_mm": precip, "wind_kph": wind, "note": note, "_receipt": receipt(), } # ---------------------------- # 3) Currency rate — Frankfurter # ---------------------------- @mcp.tool async def fx_rate(base: str, target: str) -> Dict[str, Any]: """ Get latest FX rate between two currencies (e.g., base='MYR', target='USD'). Returns: {rate, date} """ url = "https://api.frankfurter.app/latest" params = {"from": base.upper(), "to": target.upper()} r = await client.get(url, params=params) r.raise_for_status() data = r.json() rate = (data.get("rates") or {}).get(target.upper()) return { "base": base.upper(), "target": target.upper(), "rate": rate, "date": data.get("date"), "_receipt": receipt(), } # ---------------------------- # 4) Scholarly search — OpenAlex # ---------------------------- @mcp.tool async def openalex_search(query: str, limit: int = 3) -> dict: """ Search scholarly works on OpenAlex. Returns: {"results": [{title, first_author, year, url}], "_receipt": {...}} """ url = "https://api.openalex.org/works" params = {"search": query, "per_page": max(1, min(limit, 10))} try: r = await client.get(url, params=params) r.raise_for_status() data = r.json() except Exception as e: return {"error": f"OpenAlex request failed: {e}", "_receipt": receipt()} results = [] for w in data.get("results") or []: title = w.get("title") # Be robust to missing/invalid dates year = w.get("publication_year") if not year: from_publication_date = w.get("from_publication_date") if from_publication_date: try: year = dateparser.parse(from_publication_date).year except Exception: year = None # Prefer OpenAlex work URL; fall back to DOI or primary location url_w = ( w.get("id") or w.get("doi") or (w.get("primary_location") or {}).get("source", {}).get("url") ) # First author (if present) first_author = None auths = w.get("authorships") or [] if auths: author = auths[0].get("author") or {} first_author = author.get("display_name") results.append( { "title": title, "first_author": first_author, "year": year, "url": url_w, } ) return {"results": results, "_receipt": receipt()} # ---------------------------- # 5) DOI → formatted citation — Crossref (content negotiation) # ---------------------------- @mcp.tool async def format_citation(doi: str, style: str = "APA") -> Dict[str, Any]: """ Get a formatted citation for a DOI. Styles commonly: APA, MLA, IEEE, Chicago. Returns: {formatted} """ # Crossref content negotiation via doi.org # Example: Accept: text/x-bibliography; style=apa style_header = style.lower() headers = {"Accept": f"text/x-bibliography; style={style_header}"} url = f"https://doi.org/{doi}" r = await client.get(url, headers=headers) if r.status_code >= 400: return {"error": f"DOI not found or unsupported: {doi}", "_receipt": receipt()} return { "doi": doi, "style": style.upper(), "formatted": r.text.strip(), "_receipt": receipt(), } # ---------------------------- # 6) Campus lookup — local CSV # ---------------------------- @mcp.tool def campus_lookup(course_code: str) -> Dict[str, Any]: """ Look up a course by code from campus_courses.csv. Returns: {course, lecturer, room, time} """ if _courses_df is None: return {"error": "campus_courses.csv not loaded.", "_receipt": receipt()} code = (course_code or "").strip().upper() row = _courses_df[_courses_df["course_code"].str.upper() == code] if row.empty: return {"error": f"Course not found: {course_code}", "_receipt": receipt()} rec = row.iloc[0].to_dict() return { "course_code": rec.get("course_code"), "course": rec.get("course"), "lecturer": rec.get("lecturer"), "room": rec.get("room"), "time": rec.get("time"), "_receipt": receipt(), } # ---------------------------- # 7) Clash checker — local CSVs # ---------------------------- @mcp.tool def timetable_conflicts() -> dict: """ Detect room-time conflicts in timetable.csv. Returns: list of {course_code, room, weekday, start, end, conflict_with} """ if _timetable_df is None: return {"error": "timetable.csv not loaded.", "_receipt": receipt()} df = _timetable_df.copy() df["weekday"] = df["weekday"].str.strip() conflicts = [] # Compare each pair that shares room+weekday for i in range(len(df)): a = df.iloc[i] for j in range(i + 1, len(df)): b = df.iloc[j] if a["room"] == b["room"] and a["weekday"] == b["weekday"]: if _overlap(a["start"], a["end"], b["start"], b["end"]): conflicts.append( { "course_code": a["course_code"], "room": a["room"], "weekday": a["weekday"], "start": a["start"], "end": a["end"], "conflict_with": { "course_code": b["course_code"], "start": b["start"], "end": b["end"], }, } ) return {"conflicts": conflicts, "_receipt": receipt()} if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) mcp.run(transport="http", host="0.0.0.0", port=port, path="/mcp")