Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| from datetime import datetime, timezone | |
| from fastmcp import FastMCP | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import httpx | |
| import pandas as pd | |
| from dateutil import parser as dateparser | |
| import feedparser | |
| from youtube_transcript_api import ( | |
| YouTubeTranscriptApi, | |
| TranscriptsDisabled, | |
| NoTranscriptFound, | |
| VideoUnavailable, | |
| ) | |
| import re | |
| # ---------------------------- | |
| # App & HTTP client | |
| # ---------------------------- | |
| mcp = FastMCP("CampusTools") | |
| HTTP_TIMEOUT = 20.0 | |
| client = httpx.AsyncClient( | |
| timeout=HTTP_TIMEOUT, headers={"User-Agent": "CampusTools/1.0"} | |
| ) | |
| def receipt() -> Dict[str, Any]: | |
| return { | |
| "tool_used": True, | |
| "server_time": datetime.now(timezone.utc).isoformat(), | |
| "request_id": str(uuid.uuid4()), | |
| } | |
| # ---------------------------- | |
| # CSVs pre-load (fail-soft) | |
| # ---------------------------- | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| COURSES_CSV = os.path.join(BASE_DIR, "campus_courses.csv") | |
| TIMETABLE_CSV = os.path.join(BASE_DIR, "timetable.csv") | |
| _courses_df: Optional[pd.DataFrame] = None | |
| _timetable_df: Optional[pd.DataFrame] = None | |
| def _load_csvs(): | |
| global _courses_df, _timetable_df | |
| try: | |
| _courses_df = pd.read_csv(COURSES_CSV) | |
| except Exception: | |
| _courses_df = None | |
| try: | |
| _timetable_df = pd.read_csv(TIMETABLE_CSV) | |
| except Exception: | |
| _timetable_df = None | |
| # Load CSVs at startup | |
| _load_csvs() | |
| # ---------------------------- | |
| # Utilities | |
| # ---------------------------- | |
| def _norm_country(code: str) -> str: | |
| return (code or "").strip().upper() | |
| def _parse_hhmm(s: str) -> Tuple[int, int]: | |
| hh, mm = s.split(":") | |
| return int(hh), int(mm) | |
| def _overlap(a_start: str, a_end: str, b_start: str, b_end: str) -> bool: | |
| a0 = _parse_hhmm(a_start) | |
| a1 = _parse_hhmm(a_end) | |
| b0 = _parse_hhmm(b_start) | |
| b1 = _parse_hhmm(b_end) | |
| return max(a0, b0) < min(a1, b1) # strict overlap | |
| # Accept bare IDs or URLs | |
| _YT_ID_RX = re.compile( | |
| r"""(?ix) | |
| (?:https?://(?:www\.)?youtu(?:\.be|be\.com)/(?:watch\?v=|embed/|shorts/)?([A-Za-z0-9_-]{11})) | |
| |(^([A-Za-z0-9_-]{11})$) | |
| """ | |
| ) | |
| def _extract_video_id(s: str) -> Optional[str]: | |
| if not s: | |
| return None | |
| m = _YT_ID_RX.search(s.strip()) | |
| if not m: | |
| return None | |
| return (m.group(1) or m.group(3) or "").strip() | |
| # ---------------------------- | |
| async def _geocode_city(city: str) -> Optional[Tuple[float, float]]: | |
| url = "https://geocoding-api.open-meteo.com/v1/search" | |
| r = await client.get(url, params={"name": city, "count": 1}) | |
| r.raise_for_status() | |
| data = r.json() | |
| if data.get("results"): | |
| lat = data["results"][0]["latitude"] | |
| lon = data["results"][0]["longitude"] | |
| return float(lat), float(lon) | |
| return None | |
| # ---------------------------- | |
| # 2) Weather now — Open-Meteo | |
| # ---------------------------- | |
| async def get_weather(city: str) -> Dict[str, Any]: | |
| """ | |
| Current weather by city name. | |
| Returns: {temperature_c, precipitation, wind_kph, note} | |
| """ | |
| geo = await _geocode_city(city) | |
| if not geo: | |
| return {"error": f"City not found: {city}", "_receipt": receipt()} | |
| lat, lon = geo | |
| url = "https://api.open-meteo.com/v1/forecast" | |
| params = { | |
| "latitude": lat, | |
| "longitude": lon, | |
| "current": "temperature_2m,precipitation,wind_speed_10m", | |
| } | |
| r = await client.get(url, params=params) | |
| r.raise_for_status() | |
| cur = (r.json() or {}).get("current", {}) | |
| temp = cur.get("temperature_2m") | |
| precip = cur.get("precipitation") | |
| wind = cur.get("wind_speed_10m") | |
| note = "Carry an umbrella." if (precip or 0) > 0 else "No precipitation reported." | |
| return { | |
| "city": city, | |
| "temperature_c": temp, | |
| "precipitation_mm": precip, | |
| "wind_kph": wind, | |
| "note": note, | |
| "_receipt": receipt(), | |
| } | |
| # ---------------------------- | |
| # 3) Currency rate — Frankfurter | |
| # ---------------------------- | |
| async def fx_rate(base: str, target: str) -> Dict[str, Any]: | |
| """ | |
| Get latest FX rate between two currencies (e.g., base='MYR', target='USD'). | |
| Returns: {rate, date} | |
| """ | |
| url = "https://api.frankfurter.app/latest" | |
| params = {"from": base.upper(), "to": target.upper()} | |
| r = await client.get(url, params=params) | |
| r.raise_for_status() | |
| data = r.json() | |
| rate = (data.get("rates") or {}).get(target.upper()) | |
| return { | |
| "base": base.upper(), | |
| "target": target.upper(), | |
| "rate": rate, | |
| "date": data.get("date"), | |
| "_receipt": receipt(), | |
| } | |
| # ---------------------------- | |
| # 4) Scholarly search — OpenAlex | |
| # ---------------------------- | |
| async def openalex_search(query: str, limit: int = 3) -> dict: | |
| """ | |
| Search scholarly works on OpenAlex. | |
| Returns: {"results": [{title, first_author, year, url}], "_receipt": {...}} | |
| """ | |
| url = "https://api.openalex.org/works" | |
| params = {"search": query, "per_page": max(1, min(limit, 10))} | |
| try: | |
| r = await client.get(url, params=params) | |
| r.raise_for_status() | |
| data = r.json() | |
| except Exception as e: | |
| return {"error": f"OpenAlex request failed: {e}", "_receipt": receipt()} | |
| results = [] | |
| for w in data.get("results") or []: | |
| title = w.get("title") | |
| # Be robust to missing/invalid dates | |
| year = w.get("publication_year") | |
| if not year: | |
| from_publication_date = w.get("from_publication_date") | |
| if from_publication_date: | |
| try: | |
| year = dateparser.parse(from_publication_date).year | |
| except Exception: | |
| year = None | |
| # Prefer OpenAlex work URL; fall back to DOI or primary location | |
| url_w = ( | |
| w.get("id") | |
| or w.get("doi") | |
| or (w.get("primary_location") or {}).get("source", {}).get("url") | |
| ) | |
| # First author (if present) | |
| first_author = None | |
| auths = w.get("authorships") or [] | |
| if auths: | |
| author = auths[0].get("author") or {} | |
| first_author = author.get("display_name") | |
| results.append( | |
| { | |
| "title": title, | |
| "first_author": first_author, | |
| "year": year, | |
| "url": url_w, | |
| } | |
| ) | |
| return {"results": results, "_receipt": receipt()} | |
| # ---------------------------- | |
| # 5) DOI → formatted citation — Crossref (content negotiation) | |
| # ---------------------------- | |
| async def format_citation(doi: str, style: str = "APA") -> Dict[str, Any]: | |
| """ | |
| Get a formatted citation for a DOI. Styles commonly: APA, MLA, IEEE, Chicago. | |
| Returns: {formatted} | |
| """ | |
| # Crossref content negotiation via doi.org | |
| # Example: Accept: text/x-bibliography; style=apa | |
| style_header = style.lower() | |
| headers = {"Accept": f"text/x-bibliography; style={style_header}"} | |
| url = f"https://doi.org/{doi}" | |
| r = await client.get(url, headers=headers) | |
| if r.status_code >= 400: | |
| return {"error": f"DOI not found or unsupported: {doi}", "_receipt": receipt()} | |
| return { | |
| "doi": doi, | |
| "style": style.upper(), | |
| "formatted": r.text.strip(), | |
| "_receipt": receipt(), | |
| } | |
| # ---------------------------- | |
| # 6) Campus lookup — local CSV | |
| # ---------------------------- | |
| def campus_lookup(course_code: str) -> Dict[str, Any]: | |
| """ | |
| Look up a course by code from campus_courses.csv. | |
| Returns: {course, lecturer, room, time} | |
| """ | |
| if _courses_df is None: | |
| return {"error": "campus_courses.csv not loaded.", "_receipt": receipt()} | |
| code = (course_code or "").strip().upper() | |
| row = _courses_df[_courses_df["course_code"].str.upper() == code] | |
| if row.empty: | |
| return {"error": f"Course not found: {course_code}", "_receipt": receipt()} | |
| rec = row.iloc[0].to_dict() | |
| return { | |
| "course_code": rec.get("course_code"), | |
| "course": rec.get("course"), | |
| "lecturer": rec.get("lecturer"), | |
| "room": rec.get("room"), | |
| "time": rec.get("time"), | |
| "_receipt": receipt(), | |
| } | |
| # ---------------------------- | |
| # 7) Clash checker — local CSVs | |
| # ---------------------------- | |
| def timetable_conflicts() -> dict: | |
| """ | |
| Detect room-time conflicts in timetable.csv. | |
| Returns: list of {course_code, room, weekday, start, end, conflict_with} | |
| """ | |
| if _timetable_df is None: | |
| return {"error": "timetable.csv not loaded.", "_receipt": receipt()} | |
| df = _timetable_df.copy() | |
| df["weekday"] = df["weekday"].str.strip() | |
| conflicts = [] | |
| # Compare each pair that shares room+weekday | |
| for i in range(len(df)): | |
| a = df.iloc[i] | |
| for j in range(i + 1, len(df)): | |
| b = df.iloc[j] | |
| if a["room"] == b["room"] and a["weekday"] == b["weekday"]: | |
| if _overlap(a["start"], a["end"], b["start"], b["end"]): | |
| conflicts.append( | |
| { | |
| "course_code": a["course_code"], | |
| "room": a["room"], | |
| "weekday": a["weekday"], | |
| "start": a["start"], | |
| "end": a["end"], | |
| "conflict_with": { | |
| "course_code": b["course_code"], | |
| "start": b["start"], | |
| "end": b["end"], | |
| }, | |
| } | |
| ) | |
| return {"conflicts": conflicts, "_receipt": receipt()} | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| mcp.run(transport="http", host="0.0.0.0", port=port, path="/mcp") | |