Spaces:
Sleeping
Sleeping
| """FastAPI backend for Northwestern CS Kiosk - API only (no frontend).""" | |
| from __future__ import annotations | |
| import csv | |
| import json | |
| import logging | |
| import os | |
| import threading | |
| import time | |
| import warnings | |
| try: | |
| from huggingface_hub import CommitScheduler, hf_hub_download | |
| except ImportError: | |
| CommitScheduler = None # type: ignore | |
| hf_hub_download = None # type: ignore | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from .data import load_default_catalog | |
| from .tools import ( | |
| AnalysisEngine, | |
| FacultyByTopicBlueprint, | |
| LocationBlueprint, | |
| CenterBlueprint, | |
| AdvisorshipBlueprint, | |
| PersonLookupBlueprint, | |
| StaffSupportBlueprint, | |
| UpcomingEventsBlueprint, | |
| OfficeHoursBlueprint, | |
| BlueprintResult, | |
| ) | |
| from .responders import LLMResponder, Responder | |
| from .providers import ( | |
| BaseLLM, | |
| ProviderConfig, | |
| available_providers, | |
| get_provider, | |
| normalize_provider_name, | |
| ) | |
| from .data.utils import canonicalize_name | |
| from .mcp import ( | |
| Action, | |
| PlannerContext, | |
| LLMActionPlanner, | |
| ) | |
| from .mcp.tool_schemas import get_all_tool_schemas | |
| from .mcp.tool_executor import ToolExecutor | |
| from .mcp.context_resolver import ( | |
| is_affirmation, | |
| resolve as resolve_context, | |
| strip_context_on_topic_switch, | |
| ) | |
| BASE_DIR = Path(__file__).resolve().parent | |
| ARCHIVE_DIR = BASE_DIR.parent / "Archive" | |
| DATA_DIR = BASE_DIR / "storage" | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| HISTORY_FILE = DATA_DIR / "chat_history.jsonl" | |
| USAGE_FILE = DATA_DIR / "usage_metrics.jsonl" | |
| DEFAULT_SESSION = "default" | |
| app = FastAPI( | |
| title="Northwestern CS Kiosk API", | |
| description="REST API for the Northwestern CS Department Kiosk", | |
| version="1.0.0", | |
| ) | |
| # Enable CORS for external integrations | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure as needed for your integration | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| _orchestrator_lock = threading.Lock() | |
| logger = logging.getLogger(__name__) | |
| _hf_scheduler = None | |
| _entity_names: List[str] = [] | |
| def _load_entity_names() -> None: | |
| """Scrape entity names from Archive folder at startup and store in memory.""" | |
| global _entity_names | |
| def _extract_names_from_csv(filepath: Path) -> List[str]: | |
| names = [] | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| if reader.fieldnames is None: | |
| return names | |
| fieldnames = reader.fieldnames | |
| name_columns = [] | |
| for field in fieldnames: | |
| field_lower = field.lower() | |
| if field_lower == "name" or field_lower == "assignee name": | |
| name_columns = [field] | |
| break | |
| elif field_lower == "first name": | |
| name_columns.append(field) | |
| elif field_lower == "last name": | |
| name_columns.insert(0, field) | |
| for row in reader: | |
| if name_columns: | |
| if len(name_columns) == 1 and row.get(name_columns[0]): | |
| name = row[name_columns[0]].strip() | |
| if name and name.upper() != "NA": | |
| names.append(name) | |
| elif len(name_columns) == 2: | |
| last_name = row.get(name_columns[0], "").strip() | |
| first_name = row.get(name_columns[1], "").strip() | |
| if (last_name or first_name) and last_name.upper() != "NA" and first_name.upper() != "NA": | |
| full_name = f"{first_name} {last_name}".strip() if (last_name and first_name) else (first_name or last_name) | |
| if full_name: | |
| names.append(full_name) | |
| except Exception as e: | |
| logger.warning("Error reading CSV %s: %s", filepath, e) | |
| return names | |
| def _extract_names_from_text(filepath: Path) -> List[str]: | |
| names = [] | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line.startswith("Name:"): | |
| name = line.replace("Name:", "").strip() | |
| if name: | |
| names.append(name) | |
| except Exception as e: | |
| logger.warning("Error reading text file %s: %s", filepath, e) | |
| return names | |
| try: | |
| archive_dir = ARCHIVE_DIR | |
| if not archive_dir.exists(): | |
| logger.warning("Archive directory not found at %s", archive_dir) | |
| _entity_names = [] | |
| return | |
| all_names: set = set() | |
| file_count = 0 | |
| for filepath in sorted(archive_dir.iterdir()): | |
| if filepath.is_file(): | |
| if filepath.suffix.lower() == ".csv": | |
| names = _extract_names_from_csv(filepath) | |
| all_names.update(names) | |
| file_count += 1 | |
| elif filepath.suffix.lower() == ".txt": | |
| names = _extract_names_from_text(filepath) | |
| all_names.update(names) | |
| file_count += 1 | |
| _entity_names = sorted(all_names) | |
| logger.info("Scraped %d unique entity names from %d files in Archive", len(_entity_names), file_count) | |
| except Exception as e: | |
| logger.error("Failed to scrape entity names from Archive: %s", e) | |
| _entity_names = [] | |
| _load_entity_names() | |
| class QueryPayload(BaseModel): | |
| """Request payload for the /api/query endpoint.""" | |
| question: str | |
| session_id: Optional[str] = None | |
| provider: Optional[str] = None | |
| PROVIDER_ENV_SETTINGS: Dict[str, Dict[str, Optional[str]]] = { | |
| "claude": { | |
| "api_key": "ANTHROPIC_API_KEY", | |
| "model": "ANTHROPIC_MODEL", | |
| "base_url": "ANTHROPIC_BASE_URL", | |
| "default_model": "claude-haiku-4-5", | |
| }, | |
| "gpt": { | |
| "api_key": "OPENAI_API_KEY", | |
| "model": "OPENAI_MODEL", | |
| "base_url": "OPENAI_BASE_URL", | |
| "default_model": "gpt-4.1-mini", | |
| }, | |
| "gemini": { | |
| "api_key": "GEMINI_API_KEY", | |
| "model": "GEMINI_MODEL", | |
| "base_url": "GEMINI_BASE_URL", | |
| "default_model": "gemini-2.0-flash", | |
| }, | |
| "echo": { | |
| "api_key": None, | |
| "model": None, | |
| "base_url": None, | |
| "default_model": "echo", | |
| }, | |
| } | |
| def _load_env_once() -> None: | |
| """Load environment variables from .env exactly once.""" | |
| if getattr(_load_env_once, "_loaded", False): | |
| return | |
| env_path = os.getenv("KIOSK_ENV_FILE") | |
| if not env_path: | |
| default_path = BASE_DIR / ".env" | |
| env_path = str(default_path) if default_path.exists() else ".env" | |
| try: | |
| from dotenv import load_dotenv | |
| except ImportError: | |
| _load_env_once._loaded = True | |
| return | |
| load_dotenv(env_path, override=False) | |
| _load_env_once._loaded = True | |
| def _get_env_value(name: Optional[str]) -> str: | |
| """ | |
| Read environment variables with an HF Spaces secret fallback. | |
| HF Secrets expose values as HF_<NAME>, so check both keys. | |
| """ | |
| if not name: | |
| return "" | |
| direct = os.getenv(name, "").strip() | |
| if direct: | |
| return direct | |
| return os.getenv(f"HF_{name}", "").strip() | |
| def _maybe_download_existing_metrics() -> None: | |
| """Download existing usage metrics from HF dataset on startup.""" | |
| repo_id = os.getenv("KIOSK_HF_DATASET_REPO", "").strip() | |
| if not repo_id or hf_hub_download is None: | |
| return | |
| _load_env_once() | |
| token = _get_env_value("KIOSK_HF_TOKEN") or os.getenv("HF_TOKEN", "").strip() | |
| path_in_repo = os.getenv("KIOSK_HF_DATASET_PATH", "chat_history").strip() | |
| filename = f"{path_in_repo}/{USAGE_FILE.name}" if path_in_repo else USAGE_FILE.name | |
| try: | |
| import shutil | |
| downloaded = hf_hub_download( | |
| repo_id=repo_id, repo_type="dataset", filename=filename, token=token or None, | |
| ) | |
| USAGE_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy(downloaded, USAGE_FILE) | |
| logger.info("Downloaded usage metrics from HF: repo=%s file=%s", repo_id, filename) | |
| except Exception as exc: | |
| logger.info("No existing metrics to download (starting fresh): %s", exc) | |
| def _maybe_download_existing_history() -> None: | |
| """Download existing chat history from HF dataset on startup.""" | |
| repo_id = os.getenv("KIOSK_HF_DATASET_REPO", "").strip() | |
| if not repo_id or hf_hub_download is None: | |
| return | |
| _load_env_once() | |
| token = _get_env_value("KIOSK_HF_TOKEN") or os.getenv("HF_TOKEN", "").strip() | |
| path_in_repo = os.getenv("KIOSK_HF_DATASET_PATH", "chat_history").strip() | |
| filename = f"{path_in_repo}/{HISTORY_FILE.name}" if path_in_repo else HISTORY_FILE.name | |
| try: | |
| import shutil | |
| downloaded = hf_hub_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| filename=filename, | |
| token=token or None, | |
| ) | |
| HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy(downloaded, HISTORY_FILE) | |
| logger.info( | |
| "Downloaded chat history from HF dataset: repo=%s file=%s", | |
| repo_id, | |
| filename, | |
| ) | |
| except Exception as exc: | |
| logger.info("No existing chat history to download (starting fresh): %s", exc) | |
| def _maybe_start_hf_sync() -> None: | |
| """Start optional HF dataset syncing for chat history and usage metrics.""" | |
| global _hf_scheduler | |
| if _hf_scheduler is not None: | |
| return | |
| repo_id = os.getenv("KIOSK_HF_DATASET_REPO", "").strip() | |
| if not repo_id or CommitScheduler is None: | |
| return | |
| _load_env_once() | |
| token = _get_env_value("KIOSK_HF_TOKEN") or os.getenv("HF_TOKEN", "").strip() | |
| path_in_repo = os.getenv("KIOSK_HF_DATASET_PATH", "chat_history").strip() | |
| interval_minutes = float(os.getenv("KIOSK_HF_SYNC_INTERVAL_MINUTES", "10")) | |
| try: | |
| _hf_scheduler = CommitScheduler( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| folder_path=str(DATA_DIR), | |
| path_in_repo=path_in_repo, | |
| token=token or None, | |
| allow_patterns=[HISTORY_FILE.name, USAGE_FILE.name], | |
| every=interval_minutes, | |
| ) | |
| logger.info( | |
| "Started HF CommitScheduler for chat_history and usage_metrics: repo=%s path=%s interval=%s", | |
| repo_id, path_in_repo or ".", interval_minutes, | |
| ) | |
| except Exception as exc: | |
| warnings.warn(f"Unable to start HF sync: {exc}") | |
| def _run_startup_tasks_in_background() -> None: | |
| """Run HF download and sync in a background thread so the server starts immediately.""" | |
| def _run() -> None: | |
| try: | |
| _maybe_download_existing_metrics() | |
| _maybe_download_existing_history() | |
| _maybe_start_hf_sync() | |
| except Exception as exc: | |
| logger.warning("Background startup tasks failed: %s", exc) | |
| t = threading.Thread(target=_run, daemon=True) | |
| t.start() | |
| _run_startup_tasks_in_background() | |
| def _is_placeholder(value: Optional[str]) -> bool: | |
| if not value: | |
| return True | |
| lowered = value.strip().lower() | |
| return lowered.startswith("your-") or lowered in {"changeme", "placeholder"} | |
| def _build_client_from_env(provider: str, model_override: Optional[str]) -> Optional[BaseLLM]: | |
| canonical = normalize_provider_name(provider) | |
| settings = PROVIDER_ENV_SETTINGS.get(canonical) | |
| if not settings: | |
| warnings.warn(f"Provider '{provider}' not recognized; falling back to echo responder.") | |
| return None | |
| timeout = int(os.getenv("KIOSK_LLM_TIMEOUT", "60")) | |
| max_tokens_raw = os.getenv("KIOSK_LLM_MAX_TOKENS", "").strip() | |
| max_tokens = int(max_tokens_raw) if max_tokens_raw.isdigit() else None | |
| api_env = settings.get("api_key") | |
| model_env = settings.get("model") | |
| base_url_env = settings.get("base_url") | |
| default_model = settings.get("default_model") or "" | |
| if api_env: | |
| api_key = _get_env_value(api_env) | |
| if not api_key or _is_placeholder(api_key): | |
| warnings.warn(f"{api_env} not set; falling back to echo responder.") | |
| return None | |
| else: | |
| api_key = "local-echo" | |
| model = model_override or (_get_env_value(model_env) if model_env else "") or default_model | |
| base_url = _get_env_value(base_url_env) if base_url_env else "" | |
| config = ProviderConfig( | |
| api_key=api_key, | |
| model=model, | |
| timeout_sec=timeout, | |
| base_url=base_url or None, | |
| max_tokens=max_tokens, | |
| ) | |
| try: | |
| return get_provider(canonical, config) | |
| except ValueError as exc: | |
| warnings.warn(str(exc)) | |
| return None | |
| def _build_responder( | |
| provider: Optional[str], | |
| model_override: Optional[str], | |
| ) -> LLMResponder: | |
| _load_env_once() | |
| system_prompt = os.getenv( | |
| "KIOSK_LLM_SYSTEM_PROMPT", | |
| "You are a conversational receptionist for the Northwestern CS Kiosk whose responses are spoken aloud. Speak naturally and never include stage directions or annotations.", | |
| ) | |
| style = os.getenv("KIOSK_LLM_STYLE", "Be very brief. One or two sentences max. No long lists—summarize top 2-3 items only.") | |
| provider_name = provider or os.getenv("KIOSK_LLM_PROVIDER", "anthropic") | |
| model_override = model_override if provider else (model_override or os.getenv("KIOSK_LLM_MODEL")) | |
| client = _build_client_from_env(provider_name, model_override) | |
| canonical = normalize_provider_name(provider_name) | |
| if client: | |
| return LLMResponder( | |
| client=client, | |
| system_prompt=system_prompt, | |
| style_guidelines=style, | |
| provider_id=canonical, | |
| ) | |
| warnings.warn("LLM provider not configured; using echo responder for kiosk responses.") | |
| return LLMResponder( | |
| system_prompt=system_prompt, | |
| style_guidelines=style, | |
| provider_id="echo", | |
| ) | |
| def _default_responder_from_env() -> Responder: | |
| try: | |
| return _build_responder(None, None) | |
| except RuntimeError as exc: | |
| warnings.warn(f"Failed to initialize LLM responder: {exc}") | |
| return LLMResponder(provider_id="echo") | |
| def _create_planner() -> LLMActionPlanner: | |
| provider = os.getenv("KIOSK_PLANNER_PROVIDER") or os.getenv("KIOSK_LLM_PROVIDER", "anthropic") | |
| model_override = os.getenv("KIOSK_PLANNER_MODEL") or os.getenv("KIOSK_LLM_MODEL") | |
| client = _build_client_from_env(provider, model_override) | |
| if not client: | |
| raise RuntimeError("LLM planner requires a configured provider (set KIOSK_LLM_PROVIDER/KEY).") | |
| schemas = get_all_tool_schemas() | |
| return LLMActionPlanner(client, schemas=schemas, entity_names=_entity_names) | |
| class ConversationOrchestrator: | |
| """Glue class that ties planner, executor, and responder together.""" | |
| def __init__(self, engine: AnalysisEngine, responder: Optional[Responder] = None) -> None: | |
| _load_env_once() | |
| self.engine = engine | |
| self.responder = responder or _default_responder_from_env() | |
| self.executor = ToolExecutor(engine) | |
| self.planner = _create_planner() | |
| self.last_subject: Optional[str] = None | |
| self._faculty_lookup = self._build_name_lookup("faculty") | |
| self._student_lookup = self._build_name_lookup("students") | |
| self.provider_id = getattr(self.responder, "provider_id", None) | |
| def answer( | |
| self, | |
| question: str, | |
| context: Optional[PlannerContext] = None, | |
| resolved_input: Optional[Any] = None, | |
| ) -> Tuple[str, BlueprintResult, Action]: | |
| if context is None: | |
| context = PlannerContext(last_subject=self.last_subject) | |
| if is_affirmation(question) and context.last_subject: | |
| last_answer = "" | |
| if context.short_history: | |
| last_answer = (context.short_history[-1].get("answer") or "").lower() | |
| if any( | |
| x in last_answer | |
| for x in ("would you like", "look up", "find", "room number", "office") | |
| ): | |
| actions = [Action("lookup_location", {"use_last_subject": True})] | |
| else: | |
| actions = self.planner.plan(question, context) | |
| else: | |
| actions = self.planner.plan(question, context) | |
| if not actions: | |
| actions = [Action("noop", {"message": "I'm not sure how to help with that yet."})] | |
| # Inject resolved day (e.g. "F" → "friday", "today" → "wednesday") when planner returns lookup_office_hours without day | |
| if actions and resolved_input and getattr(resolved_input, "resolved_day", None): | |
| for act in actions: | |
| if act.type == "lookup_office_hours" and not act.arguments.get("day"): | |
| act.arguments["day"] = resolved_input.resolved_day | |
| if len(actions) > 1: | |
| merged_facts: List = [] | |
| merged_notes: List[str] = [] | |
| ran: List[str] = [] | |
| for act in actions: | |
| ran.append(act.type) | |
| sub_result = self.executor.execute(act, context) | |
| merged_facts.extend(sub_result.facts) | |
| for note in sub_result.notes: | |
| if note not in merged_notes: | |
| merged_notes.append(note) | |
| result = BlueprintResult("composite", {}, facts=merged_facts, notes=merged_notes) | |
| action = Action("composite", {"actions": [a.to_dict() for a in actions], "merged_actions": ran}) | |
| else: | |
| action = actions[0] | |
| name_like = None | |
| if isinstance(action.arguments, dict): | |
| for key in ("name", "person", "student", "faculty"): | |
| val = action.arguments.get(key) | |
| if val: | |
| name_like = val | |
| break | |
| if name_like: | |
| result = self.executor.execute(action, context) | |
| if not result.facts: | |
| canonical = canonicalize_name(name_like) | |
| faculty_match = self._faculty_lookup.get(canonical) | |
| student_match = self._student_lookup.get(canonical) | |
| if faculty_match and not student_match: | |
| action.arguments.pop("name", None) | |
| action.arguments.pop("person", None) | |
| action.arguments["faculty"] = faculty_match | |
| result = self.executor.execute(action, context) | |
| elif student_match and not faculty_match: | |
| action.arguments.pop("name", None) | |
| action.arguments.pop("person", None) | |
| action.arguments["student"] = student_match | |
| result = self.executor.execute(action, context) | |
| elif faculty_match and student_match: | |
| action = Action( | |
| "noop", | |
| { | |
| "message": ( | |
| f"I found both a faculty member and a student named {name_like}. " | |
| "Do you mean the faculty member or the student?" | |
| ) | |
| }, | |
| ) | |
| result = BlueprintResult("noop", action.arguments, facts=[], notes=[action.arguments.get("message")]) | |
| else: | |
| result = self.executor.execute(action, context) | |
| response_text = self.responder.render(question, result.name, result) | |
| subject = self._select_subject_from_result(result) | |
| if subject: | |
| self.last_subject = subject | |
| else: | |
| for key in ("name", "student", "faculty"): | |
| if key in action.arguments and action.arguments[key]: | |
| self.last_subject = action.arguments[key] | |
| break | |
| return response_text, result, action | |
| def ensure_responder(self, provider: Optional[str], model_override: Optional[str] = None) -> None: | |
| canonical = normalize_provider_name(provider) if provider else None | |
| if canonical and canonical == getattr(self.responder, "provider_id", None): | |
| return | |
| if not canonical and getattr(self.responder, "provider_id", None) != "unknown": | |
| return | |
| self.responder = _build_responder(provider, model_override) | |
| self.provider_id = getattr(self.responder, "provider_id", canonical) | |
| def _infer_subject(result: BlueprintResult) -> Optional[str]: | |
| if not result.facts: | |
| return None | |
| return result.facts[0].subject | |
| def _build_name_lookup(self, entity_name: str) -> Dict[str, str]: | |
| mapping: Dict[str, str] = {} | |
| entity = self.engine.catalog.try_get(entity_name) | |
| if not entity: | |
| return mapping | |
| for row in entity.records: | |
| name = row.get("Name") | |
| if not name: | |
| continue | |
| mapping[canonicalize_name(name)] = name | |
| return mapping | |
| def _select_subject_from_result(self, result: BlueprintResult) -> Optional[str]: | |
| candidates: List[str] = [] | |
| for fact in result.facts: | |
| if isinstance(fact.subject, str): | |
| candidates.append(fact.subject) | |
| if isinstance(fact.value, str): | |
| candidates.append(fact.value) | |
| for candidate in candidates: | |
| canonical = canonicalize_name(candidate) | |
| if canonical in self._faculty_lookup: | |
| return self._faculty_lookup[canonical] | |
| for candidate in candidates: | |
| canonical = canonicalize_name(candidate) | |
| if canonical in self._student_lookup: | |
| return self._student_lookup[canonical] | |
| return self._infer_subject(result) | |
| def _append_json_line(path: Path, payload: Dict[str, Any]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("a", encoding="utf-8") as handle: | |
| handle.write(json.dumps(payload, ensure_ascii=False) + "\n") | |
| def record_history( | |
| *, | |
| session_id: str, | |
| question: str, | |
| answer: str, | |
| blueprint: str, | |
| metadata: Dict[str, Any], | |
| facts: List[Dict[str, Any]], | |
| notes: List[str], | |
| action: Dict[str, Any], | |
| ) -> float: | |
| timestamp = time.time() | |
| payload = { | |
| "timestamp": timestamp, | |
| "session_id": session_id, | |
| "question": question, | |
| "answer": answer, | |
| "blueprint": blueprint, | |
| "facts": facts, | |
| "notes": notes, | |
| "usage": metadata, | |
| "action": action, | |
| } | |
| _append_json_line(HISTORY_FILE, payload) | |
| usage_entry = { | |
| "timestamp": timestamp, | |
| "session_id": session_id, | |
| "blueprint": blueprint, | |
| "question": question, | |
| } | |
| usage_entry.setdefault("action_type", action.get("type")) | |
| _append_json_line(USAGE_FILE, usage_entry) | |
| return timestamp | |
| def load_history(session_id: str) -> List[Dict[str, Any]]: | |
| if not HISTORY_FILE.exists(): | |
| return [] | |
| rows: List[Dict[str, Any]] = [] | |
| with HISTORY_FILE.open(encoding="utf-8") as handle: | |
| for line in handle: | |
| try: | |
| record = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| if record.get("session_id") == session_id: | |
| rows.append(record) | |
| rows.sort(key=lambda row: row.get("timestamp", 0)) | |
| return rows | |
| def build_planner_context_from_history(history: List[Dict[str, Any]]) -> PlannerContext: | |
| """Build session context: full history, short window, and topic-aware follow-up state.""" | |
| if not history: | |
| return PlannerContext() | |
| cap = 20 | |
| full_history: List[Dict[str, Any]] = [] | |
| for rec in history[-cap:]: | |
| full_history.append({ | |
| "question": rec.get("question", ""), | |
| "answer": rec.get("answer", ""), | |
| }) | |
| short_history: List[Dict[str, Any]] = [] | |
| for rec in history[-4:]: | |
| short_history.append({ | |
| "question": rec.get("question", ""), | |
| "answer": rec.get("answer", ""), | |
| "action": rec.get("action"), | |
| }) | |
| short_history = short_history[-3:] | |
| last = history[-1] | |
| action = last.get("action") or {} | |
| args = action.get("arguments") or {} | |
| facts = last.get("facts") or [] | |
| action_type = (action.get("type") or "").lower() | |
| topic: Optional[str] = None | |
| subject: Optional[str] = None | |
| last_class: Optional[str] = None | |
| if facts and isinstance(facts[0], dict) and facts[0].get("subject"): | |
| subject = facts[0]["subject"] | |
| if not subject: | |
| for key in ("name", "person", "student", "faculty", "class_name", "course"): | |
| if args.get(key): | |
| subject = args[key] | |
| break | |
| if action_type == "lookup_office_hours": | |
| topic = "office_hours" | |
| cls_val = args.get("class_name") or args.get("course") or "" | |
| if cls_val: | |
| last_class = cls_val | |
| elif subject and (any(c.isdigit() for c in subject) or subject.upper().startswith("CS")): | |
| last_class = subject | |
| elif action_type in ( | |
| "lookup_person", | |
| "lookup_location", | |
| "lookup_center", | |
| "lookup_advisorship", | |
| "lookup_faculty_topic", | |
| ): | |
| if action_type == "lookup_advisorship" and args.get("student"): | |
| topic = "student" | |
| elif action_type == "lookup_faculty_topic" or action_type == "lookup_center": | |
| topic = "professor" | |
| else: | |
| topic = "professor" | |
| last_subject = subject | |
| return PlannerContext( | |
| full_history=full_history, | |
| short_history=short_history, | |
| topic=topic, | |
| subject=subject, | |
| last_class=last_class, | |
| last_subject=last_subject, | |
| ) | |
| def _display_name_from_timestamp(ts: float) -> str: | |
| from datetime import datetime | |
| dt = datetime.fromtimestamp(ts) | |
| return dt.strftime("Chat – %b %d, %I:%M %p") | |
| def summarize_sessions() -> List[Dict[str, Any]]: | |
| if not HISTORY_FILE.exists(): | |
| return [] | |
| sessions: Dict[str, Dict[str, Any]] = {} | |
| with HISTORY_FILE.open(encoding="utf-8") as handle: | |
| for line in handle: | |
| try: | |
| record = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| session_id = record.get("session_id") | |
| ts = record.get("timestamp") | |
| if not session_id or not ts: | |
| continue | |
| session = sessions.setdefault( | |
| session_id, | |
| {"session_id": session_id, "created_at": ts, "updated_at": ts}, | |
| ) | |
| session["created_at"] = min(session["created_at"], ts) | |
| session["updated_at"] = max(session["updated_at"], ts) | |
| for session in sessions.values(): | |
| session["title"] = _display_name_from_timestamp(session["created_at"]) | |
| ordered = sorted(sessions.values(), key=lambda item: item["updated_at"], reverse=True) | |
| return ordered | |
| def get_session_summary(session_id: str) -> Optional[Dict[str, Any]]: | |
| for session in summarize_sessions(): | |
| if session["session_id"] == session_id: | |
| return session | |
| return None | |
| def describe_providers() -> Dict[str, Dict[str, Any]]: | |
| """Expose provider metadata and configuration status.""" | |
| _load_env_once() | |
| inventory: Dict[str, Dict[str, Any]] = {} | |
| for name, meta in available_providers().items(): | |
| entry = dict(meta) | |
| settings = PROVIDER_ENV_SETTINGS.get(name, {}) | |
| api_env = settings.get("api_key") | |
| configured = True | |
| note = "" | |
| if api_env: | |
| value = os.getenv(api_env, "").strip() | |
| configured = bool(value) and not _is_placeholder(value) | |
| if not configured: | |
| note = f"Set {api_env} before using this provider." | |
| entry["configured"] = configured | |
| if note: | |
| entry["note"] = note | |
| entry.setdefault("default_model", settings.get("default_model")) | |
| inventory[name] = entry | |
| return inventory | |
| def get_orchestrator() -> ConversationOrchestrator: | |
| catalog = load_default_catalog(ARCHIVE_DIR) | |
| engine = AnalysisEngine( | |
| catalog, | |
| [ | |
| FacultyByTopicBlueprint(), | |
| LocationBlueprint(), | |
| CenterBlueprint(), | |
| AdvisorshipBlueprint(), | |
| StaffSupportBlueprint(), | |
| UpcomingEventsBlueprint(), | |
| OfficeHoursBlueprint(), | |
| PersonLookupBlueprint(), | |
| ], | |
| ) | |
| try: | |
| engine.refresh_events() | |
| except Exception: | |
| pass | |
| return ConversationOrchestrator(engine) | |
| # ============================================================================= | |
| # API ENDPOINTS | |
| # ============================================================================= | |
| def root() -> Dict[str, str]: | |
| """Health check endpoint.""" | |
| return {"status": "ok", "service": "Northwestern CS Kiosk API"} | |
| def providers_endpoint() -> Dict[str, Any]: | |
| """List available LLM providers and their configuration status.""" | |
| inventory = describe_providers() | |
| default_provider = normalize_provider_name(os.getenv("KIOSK_LLM_PROVIDER", "anthropic")) | |
| return {"providers": inventory, "default_provider": default_provider} | |
| def query(payload: QueryPayload) -> Dict[str, Any]: | |
| """ | |
| Main query endpoint - send a question and get an answer. | |
| This is the primary endpoint for speech-to-text integration: | |
| - Input: question (string from speech-to-text) | |
| - Output: answer (string for text-to-speech) | |
| """ | |
| question = (payload.question or "").strip() | |
| if not question: | |
| raise HTTPException(status_code=400, detail="Question is required.") | |
| session_id = (payload.session_id or DEFAULT_SESSION).strip() or DEFAULT_SESSION | |
| requested_provider = (payload.provider or "").strip().lower() or None | |
| canonical_provider = normalize_provider_name(requested_provider) if requested_provider else None | |
| if canonical_provider: | |
| inventory = describe_providers() | |
| provider_meta = inventory.get(canonical_provider) | |
| if not provider_meta: | |
| raise HTTPException(status_code=400, detail=f"Unknown provider '{requested_provider}'.") | |
| if not provider_meta.get("configured", True): | |
| note = provider_meta.get("note") or f"The provider '{provider_meta.get('name', canonical_provider)}' is not configured." | |
| raise HTTPException(status_code=400, detail=note) | |
| history = load_history(session_id) | |
| planner_context = build_planner_context_from_history(history) | |
| planner_context = strip_context_on_topic_switch(question, planner_context) | |
| resolved = resolve_context(question, planner_context) | |
| orchestrator = get_orchestrator() | |
| with _orchestrator_lock: | |
| orchestrator.ensure_responder(canonical_provider) | |
| answer, result, action = orchestrator.answer( | |
| resolved.question, | |
| context=planner_context, | |
| resolved_input=resolved, | |
| ) | |
| metadata = ( | |
| orchestrator.responder.get_metadata() | |
| if hasattr(orchestrator, "responder") and hasattr(orchestrator.responder, "get_metadata") | |
| else {} | |
| ) | |
| metadata.setdefault("planner_action", action.to_dict()) | |
| facts_payload = [fact.__dict__ for fact in result.facts] | |
| record_history( | |
| session_id=session_id, | |
| question=question, | |
| answer=answer, | |
| blueprint=result.name, | |
| metadata=metadata, | |
| facts=facts_payload, | |
| notes=result.notes, | |
| action=action.to_dict(), | |
| ) | |
| summary = get_session_summary(session_id) or { | |
| "session_id": session_id, | |
| "title": _display_name_from_timestamp(time.time()), | |
| } | |
| return { | |
| "session_id": session_id, | |
| "session_title": summary.get("title"), | |
| "question": question, | |
| "answer": answer, | |
| "blueprint": result.name, | |
| "facts": facts_payload, | |
| "notes": result.notes, | |
| "usage": metadata, | |
| "action": action.to_dict(), | |
| } | |
| def history(session_id: str = Query(DEFAULT_SESSION)) -> Dict[str, Any]: | |
| """Get conversation history for a session.""" | |
| entries = load_history(session_id) | |
| summary = get_session_summary(session_id) | |
| title = summary.get("title") if summary else _display_name_from_timestamp(time.time()) | |
| return {"session_id": session_id, "title": title, "history": entries} | |
| def sessions() -> Dict[str, Any]: | |
| """List all conversation sessions.""" | |
| return {"sessions": summarize_sessions()} | |
| def main() -> None: | |
| """Run the API server.""" | |
| import uvicorn | |
| host = os.getenv("KIOSK_HOST", "0.0.0.0") | |
| port = int(os.getenv("KIOSK_PORT", "8000")) | |
| uvicorn.run( | |
| "backend.main:app", | |
| host=host, | |
| port=port, | |
| reload=False, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |