MCP_SERVER_01 / app.py
thalhathai's picture
Add application file
e78c46c
import os
import uuid
from datetime import datetime, timezone
from fastmcp import FastMCP
from typing import List, Dict, Any, Optional, Tuple
import httpx
import pandas as pd
from dateutil import parser as dateparser
import feedparser
from youtube_transcript_api import (
YouTubeTranscriptApi,
TranscriptsDisabled,
NoTranscriptFound,
VideoUnavailable,
)
import re
# ----------------------------
# App & HTTP client
# ----------------------------
mcp = FastMCP("CampusTools")
HTTP_TIMEOUT = 20.0
client = httpx.AsyncClient(
timeout=HTTP_TIMEOUT, headers={"User-Agent": "CampusTools/1.0"}
)
def receipt() -> Dict[str, Any]:
return {
"tool_used": True,
"server_time": datetime.now(timezone.utc).isoformat(),
"request_id": str(uuid.uuid4()),
}
# ----------------------------
# CSVs pre-load (fail-soft)
# ----------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
COURSES_CSV = os.path.join(BASE_DIR, "campus_courses.csv")
TIMETABLE_CSV = os.path.join(BASE_DIR, "timetable.csv")
_courses_df: Optional[pd.DataFrame] = None
_timetable_df: Optional[pd.DataFrame] = None
def _load_csvs():
global _courses_df, _timetable_df
try:
_courses_df = pd.read_csv(COURSES_CSV)
except Exception:
_courses_df = None
try:
_timetable_df = pd.read_csv(TIMETABLE_CSV)
except Exception:
_timetable_df = None
# Load CSVs at startup
_load_csvs()
# ----------------------------
# Utilities
# ----------------------------
def _norm_country(code: str) -> str:
return (code or "").strip().upper()
def _parse_hhmm(s: str) -> Tuple[int, int]:
hh, mm = s.split(":")
return int(hh), int(mm)
def _overlap(a_start: str, a_end: str, b_start: str, b_end: str) -> bool:
a0 = _parse_hhmm(a_start)
a1 = _parse_hhmm(a_end)
b0 = _parse_hhmm(b_start)
b1 = _parse_hhmm(b_end)
return max(a0, b0) < min(a1, b1) # strict overlap
# Accept bare IDs or URLs
_YT_ID_RX = re.compile(
r"""(?ix)
(?:https?://(?:www\.)?youtu(?:\.be|be\.com)/(?:watch\?v=|embed/|shorts/)?([A-Za-z0-9_-]{11}))
|(^([A-Za-z0-9_-]{11})$)
"""
)
def _extract_video_id(s: str) -> Optional[str]:
if not s:
return None
m = _YT_ID_RX.search(s.strip())
if not m:
return None
return (m.group(1) or m.group(3) or "").strip()
# ----------------------------
async def _geocode_city(city: str) -> Optional[Tuple[float, float]]:
url = "https://geocoding-api.open-meteo.com/v1/search"
r = await client.get(url, params={"name": city, "count": 1})
r.raise_for_status()
data = r.json()
if data.get("results"):
lat = data["results"][0]["latitude"]
lon = data["results"][0]["longitude"]
return float(lat), float(lon)
return None
# ----------------------------
# 2) Weather now — Open-Meteo
# ----------------------------
@mcp.tool
async def get_weather(city: str) -> Dict[str, Any]:
"""
Current weather by city name.
Returns: {temperature_c, precipitation, wind_kph, note}
"""
geo = await _geocode_city(city)
if not geo:
return {"error": f"City not found: {city}", "_receipt": receipt()}
lat, lon = geo
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"current": "temperature_2m,precipitation,wind_speed_10m",
}
r = await client.get(url, params=params)
r.raise_for_status()
cur = (r.json() or {}).get("current", {})
temp = cur.get("temperature_2m")
precip = cur.get("precipitation")
wind = cur.get("wind_speed_10m")
note = "Carry an umbrella." if (precip or 0) > 0 else "No precipitation reported."
return {
"city": city,
"temperature_c": temp,
"precipitation_mm": precip,
"wind_kph": wind,
"note": note,
"_receipt": receipt(),
}
# ----------------------------
# 3) Currency rate — Frankfurter
# ----------------------------
@mcp.tool
async def fx_rate(base: str, target: str) -> Dict[str, Any]:
"""
Get latest FX rate between two currencies (e.g., base='MYR', target='USD').
Returns: {rate, date}
"""
url = "https://api.frankfurter.app/latest"
params = {"from": base.upper(), "to": target.upper()}
r = await client.get(url, params=params)
r.raise_for_status()
data = r.json()
rate = (data.get("rates") or {}).get(target.upper())
return {
"base": base.upper(),
"target": target.upper(),
"rate": rate,
"date": data.get("date"),
"_receipt": receipt(),
}
# ----------------------------
# 4) Scholarly search — OpenAlex
# ----------------------------
@mcp.tool
async def openalex_search(query: str, limit: int = 3) -> dict:
"""
Search scholarly works on OpenAlex.
Returns: {"results": [{title, first_author, year, url}], "_receipt": {...}}
"""
url = "https://api.openalex.org/works"
params = {"search": query, "per_page": max(1, min(limit, 10))}
try:
r = await client.get(url, params=params)
r.raise_for_status()
data = r.json()
except Exception as e:
return {"error": f"OpenAlex request failed: {e}", "_receipt": receipt()}
results = []
for w in data.get("results") or []:
title = w.get("title")
# Be robust to missing/invalid dates
year = w.get("publication_year")
if not year:
from_publication_date = w.get("from_publication_date")
if from_publication_date:
try:
year = dateparser.parse(from_publication_date).year
except Exception:
year = None
# Prefer OpenAlex work URL; fall back to DOI or primary location
url_w = (
w.get("id")
or w.get("doi")
or (w.get("primary_location") or {}).get("source", {}).get("url")
)
# First author (if present)
first_author = None
auths = w.get("authorships") or []
if auths:
author = auths[0].get("author") or {}
first_author = author.get("display_name")
results.append(
{
"title": title,
"first_author": first_author,
"year": year,
"url": url_w,
}
)
return {"results": results, "_receipt": receipt()}
# ----------------------------
# 5) DOI → formatted citation — Crossref (content negotiation)
# ----------------------------
@mcp.tool
async def format_citation(doi: str, style: str = "APA") -> Dict[str, Any]:
"""
Get a formatted citation for a DOI. Styles commonly: APA, MLA, IEEE, Chicago.
Returns: {formatted}
"""
# Crossref content negotiation via doi.org
# Example: Accept: text/x-bibliography; style=apa
style_header = style.lower()
headers = {"Accept": f"text/x-bibliography; style={style_header}"}
url = f"https://doi.org/{doi}"
r = await client.get(url, headers=headers)
if r.status_code >= 400:
return {"error": f"DOI not found or unsupported: {doi}", "_receipt": receipt()}
return {
"doi": doi,
"style": style.upper(),
"formatted": r.text.strip(),
"_receipt": receipt(),
}
# ----------------------------
# 6) Campus lookup — local CSV
# ----------------------------
@mcp.tool
def campus_lookup(course_code: str) -> Dict[str, Any]:
"""
Look up a course by code from campus_courses.csv.
Returns: {course, lecturer, room, time}
"""
if _courses_df is None:
return {"error": "campus_courses.csv not loaded.", "_receipt": receipt()}
code = (course_code or "").strip().upper()
row = _courses_df[_courses_df["course_code"].str.upper() == code]
if row.empty:
return {"error": f"Course not found: {course_code}", "_receipt": receipt()}
rec = row.iloc[0].to_dict()
return {
"course_code": rec.get("course_code"),
"course": rec.get("course"),
"lecturer": rec.get("lecturer"),
"room": rec.get("room"),
"time": rec.get("time"),
"_receipt": receipt(),
}
# ----------------------------
# 7) Clash checker — local CSVs
# ----------------------------
@mcp.tool
def timetable_conflicts() -> dict:
"""
Detect room-time conflicts in timetable.csv.
Returns: list of {course_code, room, weekday, start, end, conflict_with}
"""
if _timetable_df is None:
return {"error": "timetable.csv not loaded.", "_receipt": receipt()}
df = _timetable_df.copy()
df["weekday"] = df["weekday"].str.strip()
conflicts = []
# Compare each pair that shares room+weekday
for i in range(len(df)):
a = df.iloc[i]
for j in range(i + 1, len(df)):
b = df.iloc[j]
if a["room"] == b["room"] and a["weekday"] == b["weekday"]:
if _overlap(a["start"], a["end"], b["start"], b["end"]):
conflicts.append(
{
"course_code": a["course_code"],
"room": a["room"],
"weekday": a["weekday"],
"start": a["start"],
"end": a["end"],
"conflict_with": {
"course_code": b["course_code"],
"start": b["start"],
"end": b["end"],
},
}
)
return {"conflicts": conflicts, "_receipt": receipt()}
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
mcp.run(transport="http", host="0.0.0.0", port=port, path="/mcp")