Spaces:
Sleeping
Sleeping
| import os | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from fastapi import FastAPI, HTTPException, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import folium | |
| import json | |
| from collections import OrderedDict | |
| try: | |
| # When Backend is treated as a package (e.g., uvicorn Backend.api:app from repo root) | |
| from .chatbot_backend import GroqRAGChatbot | |
| except Exception: | |
| # When running inside Backend directory (e.g., uvicorn api:app) | |
| from chatbot_backend import GroqRAGChatbot | |
| # Initialize services | |
| chatbot = GroqRAGChatbot() | |
| app = FastAPI(title="SIH Groundwater API", version="1.0.0") | |
| # CORS for Next.js app | |
| frontend_origin = os.getenv("FRONTEND_ORIGIN", "http://localhost:3000") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[frontend_origin, "http://localhost:3000", "http://127.0.0.1:3000"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"] | |
| ) | |
| class ChatRequest(BaseModel): | |
| query: str | |
| def _normalize_query(q: Optional[str]) -> str: | |
| return (q or "").strip().lower() | |
| # Simple LRU cache for last chat results by exact query | |
| _CHAT_CACHE_MAX = 50 | |
| _chat_cache: "OrderedDict[str, List[Dict[str, Any]]]" = OrderedDict() | |
| def _cache_put(query: str, rows: List[Dict[str, Any]]) -> None: | |
| key = _normalize_query(query) | |
| if not key: | |
| return | |
| if key in _chat_cache: | |
| del _chat_cache[key] | |
| _chat_cache[key] = rows or [] | |
| while len(_chat_cache) > _CHAT_CACHE_MAX: | |
| _chat_cache.popitem(last=False) | |
| def _cache_get(query: Optional[str]) -> List[Dict[str, Any]]: | |
| key = _normalize_query(query) | |
| if not key: | |
| return [] | |
| rows = _chat_cache.get(key) | |
| if rows is None: | |
| return [] | |
| # move to end (recently used) | |
| del _chat_cache[key] | |
| _chat_cache[key] = rows | |
| return rows | |
| def health() -> Dict[str, Any]: | |
| ok = chatbot.get_db_connection() | |
| return {"ok": ok} | |
| def chat(req: ChatRequest) -> Dict[str, Any]: | |
| if not req.query or not req.query.strip(): | |
| raise HTTPException(status_code=400, detail="Query is required") | |
| result = chatbot.chat(req.query.strip()) | |
| if not result.get("success"): | |
| raise HTTPException(status_code=502, detail=result.get("response") or "Failed to process query") | |
| try: | |
| _cache_put(req.query, result.get("results") or []) | |
| except Exception: | |
| pass | |
| return result | |
| def stats() -> Dict[str, Any]: | |
| return chatbot.get_quick_stats() | |
| class MapQuery(BaseModel): | |
| query: Optional[str] = None | |
| limit: Optional[int] = 100 | |
| def map_data(req: MapQuery) -> Dict[str, Any]: | |
| """ | |
| Returns lightweight map-ready data from Supabase rows. | |
| - id: synthetic id | |
| - name: district (title-cased) | |
| - state: state (title-cased) | |
| - area: st_area_shape (float or None) | |
| - perimeter: st_length_shape (float or None) | |
| - geometry: WKT/GeoJSON string stored in DB (passed through) | |
| """ | |
| user_query = (req.query or "").strip() or "top districts by area" | |
| intent = chatbot.analyze_user_intent(user_query) | |
| # Ensure geography focus for better map ranking | |
| intent["intent_type"] = "geographic" | |
| query = chatbot.build_supabase_query(user_query, intent) | |
| # Override limit if provided | |
| if req.limit and isinstance(req.limit, int): | |
| query = query.limit(max(1, min(500, req.limit))) | |
| rows = chatbot.execute_supabase_query(query) or [] | |
| features: List[Dict[str, Any]] = [] | |
| for idx, r in enumerate(rows): | |
| name = (r.get("district") or "").title() if r.get("district") else None | |
| state = (r.get("state") or "").title() if r.get("state") else None | |
| # Best-effort numeric parsing | |
| def to_float(x: Any) -> Optional[float]: | |
| try: | |
| if x in (None, ""): | |
| return None | |
| return float(x) | |
| except Exception: | |
| return None | |
| features.append({ | |
| "id": idx + 1, | |
| "name": name, | |
| "state": state, | |
| "area": to_float(r.get("st_area_shape")), | |
| "perimeter": to_float(r.get("st_length_shape")), | |
| "geometry": r.get("geometry") | |
| }) | |
| return { | |
| "count": len(features), | |
| "features": features | |
| } | |
| # Uvicorn entrypoint: `python -m uvicorn Backend.api:app --reload --host 0.0.0.0 --port 8000` | |
| def map_html(query: Optional[str] = None, limit: int = 100) -> Response: | |
| """ | |
| Builds an HTML map using folium. | |
| Uses the SAME rows as chat (chatbot.chat(query)['results']) to ensure identical filtering/ordering. | |
| Frontend component `MapPlaceholder` expects this endpoint to return HTML. | |
| """ | |
| try: | |
| user_query = (query or "").strip() | |
| features_data: List[Dict[str, Any]] = [] | |
| # 1) Primary path: use EXACT results previously produced by /chat for the same query | |
| rows: List[Dict[str, Any]] = _cache_get(user_query) | |
| # 2) Fallback: if no rows from chat, reuse map-data builder | |
| if not rows: | |
| payload = MapQuery(query=user_query, limit=limit) | |
| data = map_data(payload) | |
| features_data = data.get("features") or [] | |
| # 3) If still nothing, as a last attempt, run chat now and cache it | |
| if not rows and not features_data and user_query: | |
| chat_out = chatbot.chat(user_query) | |
| rows = chat_out.get("results") or [] | |
| _cache_put(user_query, rows) | |
| # Convert rows -> features if we have rows | |
| if rows and not features_data: | |
| for idx, r in enumerate(rows[: max(1, min(500, limit))]): | |
| def to_float(x: Any) -> Optional[float]: | |
| try: | |
| if x in (None, ""): | |
| return None | |
| return float(x) | |
| except Exception: | |
| return None | |
| features_data.append({ | |
| "id": idx + 1, | |
| "name": ((r.get("district") or "").title() if r.get("district") else None), | |
| "state": ((r.get("state") or "").title() if r.get("state") else None), | |
| "area": to_float(r.get("st_area_shape")), | |
| "perimeter": to_float(r.get("st_length_shape")), | |
| "geometry": r.get("geometry"), | |
| }) | |
| # Initialize map centered on India | |
| fmap = folium.Map(location=[22.9734, 78.6569], zoom_start=5, tiles="OpenStreetMap") | |
| fg = folium.FeatureGroup(name="Underground Coverage") | |
| # Add features; draw GeoJSON when available, otherwise add a label-only marker | |
| # Geometry parsing helpers | |
| def parse_geometry(geom: Any) -> Optional[Any]: | |
| if geom is None: | |
| return None | |
| # Already a mapping (GeoJSON-like) | |
| if isinstance(geom, (dict, list)): | |
| return geom | |
| if isinstance(geom, str): | |
| s = geom.strip() | |
| # JSON string | |
| if s.startswith('{') or s.startswith('['): | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| pass | |
| # WKT detection | |
| wkt_prefixes = ("POLYGON", "MULTIPOLYGON", "LINESTRING", "MULTILINESTRING", "POINT", "MULTIPOINT") | |
| if any(s.upper().startswith(p) for p in wkt_prefixes): | |
| try: | |
| # Try shapely if available | |
| from shapely import wkt as _wkt | |
| from shapely.geometry import mapping as _mapping | |
| shape_obj = _wkt.loads(s) | |
| return _mapping(shape_obj) | |
| except Exception: | |
| return None | |
| return None | |
| for f in features_data: | |
| name = f.get("name") or "Unknown" | |
| state = f.get("state") or "" | |
| area = f.get("area") | |
| perimeter = f.get("perimeter") | |
| geometry = f.get("geometry") | |
| popup = folium.Popup( | |
| f"<b>{name}</b>, {state}<br/>Area: {area or 'N/A'}<br/>Perimeter: {perimeter or 'N/A'}", | |
| max_width=300 | |
| ) | |
| # Try to parse geometry (JSON or WKT -> GeoJSON-like) and render | |
| parsed = parse_geometry(geometry) | |
| if parsed is not None: | |
| try: | |
| folium.GeoJson( | |
| parsed, | |
| name=name, | |
| tooltip=name, | |
| popup=popup, | |
| style_function=lambda _: | |
| {"fillColor": "#3186cc", "color": "#3186cc", "weight": 1, "fillOpacity": 0.4} | |
| ).add_to(fg) | |
| continue | |
| except Exception: | |
| pass | |
| # Fallback: no geometry or not JSON — add a generic marker at India center (avoids failure) | |
| folium.Marker( | |
| location=[22.9734, 78.6569], | |
| tooltip=name, | |
| popup=popup, | |
| icon=folium.Icon(color="blue", icon="info-sign") | |
| ).add_to(fg) | |
| fg.add_to(fmap) | |
| folium.LayerControl().add_to(fmap) | |
| html = fmap.get_root().render() | |
| return Response(content=html, media_type="text/html") | |
| except Exception as e: | |
| return Response(content=f"<html><body><pre>Failed to render map: {str(e)}</pre></body></html>", media_type="text/html", status_code=500) | |
| def results(query: Optional[str] = None, limit: int = 100) -> Dict[str, Any]: | |
| """ | |
| Returns the exact rows used by chat for a given query. If absent, runs chat once. | |
| Also returns a 'table' projection suited for the Explore page. | |
| """ | |
| user_query = (query or "").strip() | |
| rows: List[Dict[str, Any]] = _cache_get(user_query) | |
| if not rows and user_query: | |
| chat_out = chatbot.chat(user_query) | |
| rows = chat_out.get("results") or [] | |
| _cache_put(user_query, rows) | |
| # Clamp and normalize | |
| rows = (rows or [])[: max(1, min(500, limit))] | |
| def to_float(x: Any) -> Optional[float]: | |
| try: | |
| if x in (None, ""): | |
| return None | |
| return float(x) | |
| except Exception: | |
| return None | |
| def derive_status(stage: Optional[float]) -> str: | |
| if stage is None: | |
| return "safe" | |
| if stage > 100: | |
| return "over-exploited" | |
| if 80 <= stage <= 100: | |
| return "critical" | |
| if 60 <= stage < 80: | |
| return "semi-critical" | |
| return "safe" | |
| table = [] | |
| for r in rows: | |
| stage = to_float(r.get("stage_of_development")) | |
| draft_total = to_float(r.get("annual_gw_draft_total")) | |
| underground_area = to_float(r.get("st_area_shape")) | |
| table.append({ | |
| "district": (r.get("district") or "").title() if r.get("district") else "", | |
| "state": (r.get("state") or "").title() if r.get("state") else "", | |
| "development_stage": round(stage, 1) if isinstance(stage, (int, float)) else None, | |
| "draft_total": draft_total, | |
| "availability": to_float(r.get("net_gw_availability")), | |
| "underground_area": underground_area, | |
| "status": derive_status(stage), | |
| }) | |
| return {"count": len(rows), "rows": rows, "table": table} | |