import os import sys from dotenv import load_dotenv load_dotenv() sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from contextlib import asynccontextmanager from datetime import datetime from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from loguru import logger from api.dependencies import get_driver, close_driver from api.routes import search, profile, graph, risk, multilingual, export, admin, investigation, affidavit, biography, benami, sources, procurement, conflict, linguistic, policy, adversarial, debate, runtime, resolve, timeline, forensics, self_learning, case_memory from api.models import HealthResponse, StatsResponse from processing.alias_graph import AliasGraph # Phase 32 from processing.entity_resolver_v2 import EntityResolverV2 # Phase 32 from config.runtime_profile import PROFILE as _RUNTIME_PROFILE @asynccontextmanager async def lifespan(app: FastAPI): logger.info("[API] Starting up -- attempting Neo4j connection") driver = get_driver() if driver: logger.success("[API] Neo4j ready") # SEARCH-1 FIX: ensure fulltext index exists on every startup. # setup_schema() uses IF NOT EXISTS -- safe to call repeatedly. try: from graph.loader import GraphLoader GraphLoader(driver=driver).setup_schema() logger.info("[API] Schema/index verified") except Exception as _se: logger.warning(f"[API] Schema setup skipped: {type(_se).__name__}") else: logger.warning("[API] Starting without Neo4j -- set secrets to enable") # Phase 32: load alias graph so /resolve/alias works immediately try: _alias_graph.load() logger.info(f"[Startup] AliasGraph: {len(_alias_graph)} aliases loaded") except Exception as _ag_err: logger.warning(f"[Startup] AliasGraph load skipped: {type(_ag_err).__name__}") yield logger.info("[API] Shutting down") close_driver() app = FastAPI( title="BharatGraph API", description=( "Public transparency and institutional intelligence platform for India. " "All data sourced from official government records. " "Outputs are structural indicators, not legal findings." ), version="0.35.0", lifespan=lifespan, ) _cors_origins_raw = os.getenv( "CORS_ORIGINS", "http://localhost:8000,http://127.0.0.1:8000,https://abinaze.github.io" ) _cors_origins = [o.strip() for o in _cors_origins_raw.split(",") if o.strip()] app.add_middleware( CORSMiddleware, allow_origins=_cors_origins, allow_methods=["GET", "POST"], allow_headers=["*"], ) # BUG-26 FIX: middleware registered before routes (conventional ordering). # In FastAPI/Starlette, middleware wraps the entire ASGI app so the # functional behavior is identical regardless of order, but conventional # placement before routers is clearer for debugging. from fastapi.middleware.gzip import GZipMiddleware from api.middleware.rate_limiter import SlidingWindowRateLimiter from api.middleware.security_headers import SecurityHeadersMiddleware from api.middleware.input_validator import InputValidatorMiddleware from api.middleware.audit_logger import AuditLoggerMiddleware app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware(SlidingWindowRateLimiter) app.add_middleware(SecurityHeadersMiddleware) app.add_middleware(InputValidatorMiddleware) app.add_middleware(AuditLoggerMiddleware) app.include_router(search.router, tags=["Search"]) app.include_router(profile.router, tags=["Profile"]) app.include_router(graph.router, tags=["Graph"]) app.include_router(risk.router, tags=["Risk"]) app.include_router(multilingual.router, tags=["Multilingual"]) app.include_router(export.router, tags=["Export"]) app.include_router(admin.router, tags=["Admin"]) app.include_router(investigation.router, tags=["Investigation"]) app.include_router(affidavit.router, tags=["Affidavit"]) app.include_router(biography.router, tags=["Biography"]) app.include_router(benami.router, tags=["Benami"]) app.include_router(sources.router, tags=["Sources"]) app.include_router(procurement.router, tags=["Procurement"]) app.include_router(conflict.router, tags=["Conflict"]) app.include_router(linguistic.router, tags=["Linguistic"]) app.include_router(policy.router, tags=["Policy"]) app.include_router(adversarial.router, tags=["Adversarial"]) app.include_router(debate.router, tags=["Debate"]) app.include_router(runtime.router, tags=["Runtime"]) app.include_router(resolve.router, tags=["Resolve"]) # Phase 32 app.include_router(timeline.router, tags=["Timeline"]) # Phase 33 app.include_router(forensics.router, tags=["Forensics"]) # Phase 34 app.include_router(self_learning.router, tags=["SelfLearning"]) # Phase 34 app.include_router(case_memory.router, tags=["CaseMemory"]) # Phase 34 @app.get("/") def root(): return { "name": "BharatGraph API", "version": "0.31.5", "status": "running", "docs": "/docs", "health": "/health", "description": "Public transparency intelligence platform for India.", "profile": _RUNTIME_PROFILE.name, } @app.api_route("/health", methods=["GET", "HEAD"], response_model=HealthResponse) def health_check(): driver = get_driver() connected = driver is not None try: if driver: driver.verify_connectivity() except Exception: connected = False return HealthResponse( status="ok" if connected else "degraded", neo4j_connected=connected, version="0.35.0", generated_at=datetime.now().isoformat(), ) # H-07 FIX: module-level cache vars (must be at module scope, not inside function) _stats_cache = None _stats_cached_at = 0.0 _STATS_TTL = 60.0 import threading as _threading # B-01 FIX: was missing entirely _stats_lock = _threading.Lock() # B-01 FIX: NameError on every GET /stats # Phase 32: shared AliasGraph instance (loaded once at startup) _alias_graph = AliasGraph() _resolver_v2 = EntityResolverV2(threshold=0.82) @app.get("/stats", response_model=StatsResponse) def get_stats(): global _stats_cache, _stats_cached_at import time as _time _now = _time.monotonic() if _stats_cache is not None and (_now - _stats_cached_at) < _STATS_TTL: return _stats_cache # BACKEND-2 FIX: lock prevents two concurrent requests both # seeing _stats_cache is None and both running the full graph scan with _stats_lock: import time as _tc if _stats_cache is not None and (_tc.monotonic() - _stats_cached_at) < _STATS_TTL: return _stats_cache driver = get_driver() node_counts = {} rel_counts = {} last_run = None if driver: try: with driver.session() as session: n_rows = session.run( "MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c" ).data() node_counts = {r["t"]: r["c"] for r in n_rows if r["t"]} r_rows = session.run( "MATCH ()-[r]->() RETURN type(r) AS t, count(r) AS c" ).data() rel_counts = {r["t"]: r["c"] for r in r_rows if r["t"]} meta = session.run( "MATCH (m:PipelineMeta) " "RETURN m.last_run AS ts ORDER BY m.last_run DESC LIMIT 1" ).single() last_run = meta["ts"] if meta else None except Exception as e: logger.debug(f"[Stats] Query error: {e}") import time as _time result = StatsResponse( nodes=node_counts, relationships=rel_counts, last_pipeline_run=last_run, generated_at=datetime.now().isoformat(), ) _stats_cache = result _stats_cached_at = _time.monotonic() return result # NEW-3 FIX: expanded from 6 to 14 types -- Politician, Company, CourtCase etc # were never appearing in the live feed even when the graph was fully populated _FEED_LABELS = [ "AuditReport", "EnforcementAction", "RegulatoryOrder", "ElectoralBond", "Contract", "VigilanceCircular", "Politician", "Company", "CourtCase", "Tender", "NGO", "PressRelease", "InsolvencyOrder", "SanctionedEntity", ] @app.websocket("/ws/feed") async def websocket_feed(websocket: WebSocket): import asyncio await websocket.accept() logger.info("[WS] Feed client connected") last_scraped_at = None # WS cursor: only push when data changes try: while True: driver = get_driver() payload = {"type": "feed", "at": datetime.now().isoformat()} if driver: try: loop = asyncio.get_running_loop() # FEED-3 FIX: get_event_loop deprecated in 3.10+ def _query_feed(): # C-03 / BUG-C3 FIX: synchronous Neo4j driver MUST run # in a thread pool -- calling driver.session() directly # inside async def blocks the entire uvicorn event loop # while waiting for network I/O to Neo4j AuraDB. with driver.session() as s: rows = s.run( "MATCH (n) WHERE labels(n)[0] IN $labels " "AND n.scraped_at IS NOT NULL " "RETURN labels(n)[0] AS label, " "coalesce(n.name, n.title, n.company_name, n.id) AS name, " "n.scraped_at AS scraped_at, n.id AS id, n.source AS source " "ORDER BY n.scraped_at DESC LIMIT 8", labels=_FEED_LABELS ).data() if not rows: stats = s.run( "MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c" ).data() return {"rows": [], "stats": {r["t"]: r["c"] for r in stats if r["t"]}} return {"rows": rows, "stats": {}} feed_result = await loop.run_in_executor(None, _query_feed) feed_rows = feed_result["rows"] # M-13 FIX: compare full set of IDs, not just first scraped_at current_ids = frozenset(r.get("id","") for r in feed_rows) if current_ids and current_ids == last_scraped_at: await asyncio.sleep(15) continue last_scraped_at = current_ids if feed_rows: payload["items"] = feed_rows payload["message"] = ( feed_rows[0].get("label", "Entity") + ": " + feed_rows[0].get("name", "-") ) else: payload["stats"] = feed_result["stats"] payload["message"] = "Feed active -- run /admin/pipeline to ingest data" except Exception as db_err: logger.debug(f"[WS] DB query error: {db_err}") payload["message"] = "Feed active -- database query pending" else: payload["message"] = "Feed active -- no database connection" await websocket.send_json(payload) await asyncio.sleep(15) except WebSocketDisconnect: logger.info("[WS] Feed client disconnected") except Exception as e: logger.warning(f"[WS] Feed error: {e}") finally: try: await websocket.close() except Exception: pass # BUG-7 FIX: only register debug routes when DEBUG_MODE is explicitly true. # Routes registered unconditionally (even returning 404) reveal their existence # to attackers via path enumeration. Move into a conditional block entirely. if os.getenv("DEBUG_MODE", "").lower() in ("1", "true", "yes"): from fastapi import APIRouter as _DebugRouter _debug_router = _DebugRouter(prefix="/debug", tags=["Debug"]) @_debug_router.get("/env") def debug_env(): return { "neo4j_uri_set": bool(os.getenv("NEO4J_URI")), "neo4j_user_set": bool(os.getenv("NEO4J_USER")), "neo4j_password_set": bool(os.getenv("NEO4J_PASSWORD")), "neo4j_uri_prefix": (os.getenv("NEO4J_URI", "")[:15] + "...") if os.getenv("NEO4J_URI") else "NOT SET", } @_debug_router.get("/neo4j") def debug_neo4j(): from neo4j import GraphDatabase uri = os.getenv("NEO4J_URI", "") user = os.getenv("NEO4J_USER", "neo4j") password = os.getenv("NEO4J_PASSWORD", "") try: driver = GraphDatabase.driver(uri, auth=(user, password)) driver.verify_connectivity() driver.close() return {"status": "connected", "uri_prefix": uri[:20]} except Exception as e: logger.warning(f"[Debug] Neo4j test failed: {e.__class__.__name__}") return { "status": "failed", "error": "Connection failed -- check environment secrets", "uri_prefix": uri[:20] if uri else "NOT SET", } app.include_router(_debug_router)