glof / chatbot /chat_server.py
Freate16's picture
Fix hf_hub_download to handle nested directory structure
20b22b9
Raw
History Blame Contribute Delete
8.08 kB
from __future__ import annotations
import os
import argparse
import contextlib
import io
import json
import sys
import traceback
from functools import partial
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import urlparse
PROJECT_ROOT = Path(__file__).resolve().parent.parent
WEBSITE_DIR = PROJECT_ROOT / "website"
RAG_DIR = PROJECT_ROOT / "chatbot" / "rag"
def download_heavy_files():
"""Downloads the massive 4GB files from the Hugging Face Dataset at startup."""
try:
from huggingface_hub import hf_hub_download
print("Checking for heavy model weights and GIS databases...")
except ImportError:
print("Warning: huggingface_hub not installed. Cannot download heavy files.")
return
# Dictionary of destination paths to HF Dataset filenames
files_to_download = {
PROJECT_ROOT / "gis_data" / "final_merged_clipped_buildings.gpkg": "final_merged_clipped_buildings.gpkg",
PROJECT_ROOT / "gis_data" / "river_network.gpkg": "river_network.gpkg",
PROJECT_ROOT / "gis_data" / "stage2_dataset_15.gpkg": "stage2_dataset_15.gpkg",
PROJECT_ROOT / "chatbot" / "graphrag" / "glof_kg_populated_codex.ttl": "glof_kg_populated_codex.ttl",
PROJECT_ROOT / "models" / "lake_segmentation" / "best_model.pth": "lake_segmentation/best_model.pth",
PROJECT_ROOT / "models" / "gnn" / "gnn" / "glof_gnn_weights_final.pth": "glof_gnn_weights_final.pth"
}
for local_path, hf_filename in files_to_download.items():
if not local_path.exists():
try:
print(f"Downloading {hf_filename} from Hugging Face Dataset 'freate16/glof-database'...")
local_path.parent.mkdir(parents=True, exist_ok=True)
downloaded_path = hf_hub_download(
repo_id="freate16/glof-database",
repo_type="dataset",
filename=hf_filename
)
import shutil
shutil.copy2(downloaded_path, local_path)
print(f"Successfully downloaded to {local_path}")
except Exception as e:
print(f"CRITICAL ERROR: Failed to download '{hf_filename}' from your Dataset! Did you upload it? Error: {e}")
else:
print(f"File already exists locally: {hf_filename}")
def build_kg_store_if_needed():
"""Builds the Pyoxigraph database natively from the TTL file if it is missing."""
kg_store_dir = PROJECT_ROOT / "chatbot" / "rag" / "kg_store"
ttl_path = PROJECT_ROOT / "chatbot" / "graphrag" / "glof_kg_populated_codex.ttl"
if not kg_store_dir.exists() or not any(kg_store_dir.iterdir()):
print(f"Building Pyoxigraph DB into {kg_store_dir} from {ttl_path}... This may take 2-3 minutes...")
try:
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from chatbot.rag.kg_store import KgStore
kg_store_dir.mkdir(parents=True, exist_ok=True)
KgStore.build(ttl_path, kg_store_dir)
print("Pyoxigraph DB built successfully!")
except Exception as e:
print(f"CRITICAL ERROR: Failed to build Pyoxigraph DB: {e}")
else:
print("Pyoxigraph DB already exists locally.")
# Run the download check and DB build before starting the server
download_heavy_files()
build_kg_store_if_needed()
if str(RAG_DIR) not in sys.path:
sys.path.insert(0, str(RAG_DIR))
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
class ChatHandler(SimpleHTTPRequestHandler):
server_version = "GLOFChatServer/1.0"
def end_headers(self) -> None:
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
super().end_headers()
def do_OPTIONS(self) -> None:
self.send_response(204)
self.end_headers()
def do_GET(self) -> None:
path = urlparse(self.path).path
if path == "/api/health":
self._send_json({"ok": True, "service": "glof-chatbot"})
return
super().do_GET()
def do_POST(self) -> None:
path = urlparse(self.path).path
if path != "/api/chat":
self.send_error(404, "Unknown endpoint")
return
try:
payload = self._read_json()
question = str(payload.get("question", "")).strip()
route = str(payload.get("route", "auto")).strip() or "auto"
if not question:
self._send_json({"error": "Question is required."}, status=400)
return
response = self._run_chatbot(question, route)
self._send_json(response)
except SystemExit as exc:
self._send_json({"error": str(exc)}, status=500)
except Exception as exc:
self._send_json(
{
"error": str(exc),
"traceback": traceback.format_exc(),
},
status=500,
)
def _read_json(self) -> dict:
length = int(self.headers.get("Content-Length", "0"))
if length > 20000:
raise ValueError("Request body is too large.")
raw = self.rfile.read(length).decode("utf-8")
return json.loads(raw or "{}")
def _run_chatbot(self, question: str, route: str) -> dict:
from chatbot.agentic_rag.agent import run_reasoning_agent
log_buffer = io.StringIO()
with contextlib.redirect_stdout(log_buffer):
try:
run_reasoning_agent(question, max_steps=8)
final_answer = "No final answer reached by the agent."
except Exception as e:
print(f"Agent crashed: {e}")
final_answer = f"**Agent Crashed!**\nError: {e}\n\n**Internal Logs:**\n```text\n{log_buffer.getvalue()[-1000:]}\n```"
if "Final Answer:" in log_buffer.getvalue():
final_answer = log_buffer.getvalue().split("Final Answer:")[-1].strip()
final_answer = final_answer.split("=========")[0].strip()
# final_answer += f"\n\n**Internal Logs (Debugging):**\n```text\n{log_buffer.getvalue()[-1500:]}\n```"
elif "Agent Crashed!" not in final_answer:
final_answer = f"No final answer reached by the agent.\n\n**Internal Logs:**\n```text\n{log_buffer.getvalue()[-1500:]}\n```"
return {
"answer": final_answer,
"route": "agentic",
"has_kg_context": True,
"logs": log_buffer.getvalue().splitlines()[-40:]
}
def _send_json(self, payload: dict, status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Serve the GLOF website and chatbot API locally.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
return parser.parse_args()
def main() -> None:
args = parse_args()
if not WEBSITE_DIR.exists():
raise SystemExit(f"Website directory not found: {WEBSITE_DIR}")
handler = partial(ChatHandler, directory=str(WEBSITE_DIR))
server = ThreadingHTTPServer((args.host, args.port), handler)
url = f"http://{args.host}:{args.port}"
print(f"Serving website: {WEBSITE_DIR}")
print(f"Chat API: {url}/api/chat")
print(f"Open: {url}/")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nServer stopped.")
finally:
server.server_close()
if __name__ == "__main__":
main()