Spaces:
Running
Running
| import os | |
| import sys | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from loguru import logger | |
| from api.dependencies import get_driver, close_driver | |
| from api.routes import search, profile, graph, risk, multilingual, export, admin, investigation, affidavit, biography, benami, sources, procurement, conflict, linguistic, policy, adversarial, debate, runtime, resolve, timeline, forensics, self_learning, case_memory | |
| from api.models import HealthResponse, StatsResponse | |
| from processing.alias_graph import AliasGraph # Phase 32 | |
| from processing.entity_resolver_v2 import EntityResolverV2 # Phase 32 | |
| from config.runtime_profile import PROFILE as _RUNTIME_PROFILE | |
| async def lifespan(app: FastAPI): | |
| logger.info("[API] Starting up -- attempting Neo4j connection") | |
| driver = get_driver() | |
| if driver: | |
| logger.success("[API] Neo4j ready") | |
| # SEARCH-1 FIX: ensure fulltext index exists on every startup. | |
| # setup_schema() uses IF NOT EXISTS -- safe to call repeatedly. | |
| try: | |
| from graph.loader import GraphLoader | |
| GraphLoader(driver=driver).setup_schema() | |
| logger.info("[API] Schema/index verified") | |
| except Exception as _se: | |
| logger.warning(f"[API] Schema setup skipped: {type(_se).__name__}") | |
| else: | |
| logger.warning("[API] Starting without Neo4j -- set secrets to enable") | |
| # Phase 32: load alias graph so /resolve/alias works immediately | |
| try: | |
| _alias_graph.load() | |
| logger.info(f"[Startup] AliasGraph: {len(_alias_graph)} aliases loaded") | |
| except Exception as _ag_err: | |
| logger.warning(f"[Startup] AliasGraph load skipped: {type(_ag_err).__name__}") | |
| yield | |
| logger.info("[API] Shutting down") | |
| close_driver() | |
| app = FastAPI( | |
| title="BharatGraph API", | |
| description=( | |
| "Public transparency and institutional intelligence platform for India. " | |
| "All data sourced from official government records. " | |
| "Outputs are structural indicators, not legal findings." | |
| ), | |
| version="0.35.0", | |
| lifespan=lifespan, | |
| ) | |
| _cors_origins_raw = os.getenv( | |
| "CORS_ORIGINS", | |
| "http://localhost:8000,http://127.0.0.1:8000,https://abinaze.github.io" | |
| ) | |
| _cors_origins = [o.strip() for o in _cors_origins_raw.split(",") if o.strip()] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=_cors_origins, | |
| allow_methods=["GET", "POST"], | |
| allow_headers=["*"], | |
| ) | |
| # BUG-26 FIX: middleware registered before routes (conventional ordering). | |
| # In FastAPI/Starlette, middleware wraps the entire ASGI app so the | |
| # functional behavior is identical regardless of order, but conventional | |
| # placement before routers is clearer for debugging. | |
| from fastapi.middleware.gzip import GZipMiddleware | |
| from api.middleware.rate_limiter import SlidingWindowRateLimiter | |
| from api.middleware.security_headers import SecurityHeadersMiddleware | |
| from api.middleware.input_validator import InputValidatorMiddleware | |
| from api.middleware.audit_logger import AuditLoggerMiddleware | |
| app.add_middleware(GZipMiddleware, minimum_size=1000) | |
| app.add_middleware(SlidingWindowRateLimiter) | |
| app.add_middleware(SecurityHeadersMiddleware) | |
| app.add_middleware(InputValidatorMiddleware) | |
| app.add_middleware(AuditLoggerMiddleware) | |
| app.include_router(search.router, tags=["Search"]) | |
| app.include_router(profile.router, tags=["Profile"]) | |
| app.include_router(graph.router, tags=["Graph"]) | |
| app.include_router(risk.router, tags=["Risk"]) | |
| app.include_router(multilingual.router, tags=["Multilingual"]) | |
| app.include_router(export.router, tags=["Export"]) | |
| app.include_router(admin.router, tags=["Admin"]) | |
| app.include_router(investigation.router, tags=["Investigation"]) | |
| app.include_router(affidavit.router, tags=["Affidavit"]) | |
| app.include_router(biography.router, tags=["Biography"]) | |
| app.include_router(benami.router, tags=["Benami"]) | |
| app.include_router(sources.router, tags=["Sources"]) | |
| app.include_router(procurement.router, tags=["Procurement"]) | |
| app.include_router(conflict.router, tags=["Conflict"]) | |
| app.include_router(linguistic.router, tags=["Linguistic"]) | |
| app.include_router(policy.router, tags=["Policy"]) | |
| app.include_router(adversarial.router, tags=["Adversarial"]) | |
| app.include_router(debate.router, tags=["Debate"]) | |
| app.include_router(runtime.router, tags=["Runtime"]) | |
| app.include_router(resolve.router, tags=["Resolve"]) # Phase 32 | |
| app.include_router(timeline.router, tags=["Timeline"]) # Phase 33 | |
| app.include_router(forensics.router, tags=["Forensics"]) # Phase 34 | |
| app.include_router(self_learning.router, tags=["SelfLearning"]) # Phase 34 | |
| app.include_router(case_memory.router, tags=["CaseMemory"]) # Phase 34 | |
| def root(): | |
| return { | |
| "name": "BharatGraph API", | |
| "version": "0.31.5", | |
| "status": "running", | |
| "docs": "/docs", | |
| "health": "/health", | |
| "description": "Public transparency intelligence platform for India.", | |
| "profile": _RUNTIME_PROFILE.name, | |
| } | |
| def health_check(): | |
| driver = get_driver() | |
| connected = driver is not None | |
| try: | |
| if driver: | |
| driver.verify_connectivity() | |
| except Exception: | |
| connected = False | |
| return HealthResponse( | |
| status="ok" if connected else "degraded", | |
| neo4j_connected=connected, | |
| version="0.35.0", | |
| generated_at=datetime.now().isoformat(), | |
| ) | |
| # H-07 FIX: module-level cache vars (must be at module scope, not inside function) | |
| _stats_cache = None | |
| _stats_cached_at = 0.0 | |
| _STATS_TTL = 60.0 | |
| import threading as _threading # B-01 FIX: was missing entirely | |
| _stats_lock = _threading.Lock() # B-01 FIX: NameError on every GET /stats | |
| # Phase 32: shared AliasGraph instance (loaded once at startup) | |
| _alias_graph = AliasGraph() | |
| _resolver_v2 = EntityResolverV2(threshold=0.82) | |
| def get_stats(): | |
| global _stats_cache, _stats_cached_at | |
| import time as _time | |
| _now = _time.monotonic() | |
| if _stats_cache is not None and (_now - _stats_cached_at) < _STATS_TTL: | |
| return _stats_cache | |
| # BACKEND-2 FIX: lock prevents two concurrent requests both | |
| # seeing _stats_cache is None and both running the full graph scan | |
| with _stats_lock: | |
| import time as _tc | |
| if _stats_cache is not None and (_tc.monotonic() - _stats_cached_at) < _STATS_TTL: | |
| return _stats_cache | |
| driver = get_driver() | |
| node_counts = {} | |
| rel_counts = {} | |
| last_run = None | |
| if driver: | |
| try: | |
| with driver.session() as session: | |
| n_rows = session.run( | |
| "MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c" | |
| ).data() | |
| node_counts = {r["t"]: r["c"] for r in n_rows if r["t"]} | |
| r_rows = session.run( | |
| "MATCH ()-[r]->() RETURN type(r) AS t, count(r) AS c" | |
| ).data() | |
| rel_counts = {r["t"]: r["c"] for r in r_rows if r["t"]} | |
| meta = session.run( | |
| "MATCH (m:PipelineMeta) " | |
| "RETURN m.last_run AS ts ORDER BY m.last_run DESC LIMIT 1" | |
| ).single() | |
| last_run = meta["ts"] if meta else None | |
| except Exception as e: | |
| logger.debug(f"[Stats] Query error: {e}") | |
| import time as _time | |
| result = StatsResponse( | |
| nodes=node_counts, | |
| relationships=rel_counts, | |
| last_pipeline_run=last_run, | |
| generated_at=datetime.now().isoformat(), | |
| ) | |
| _stats_cache = result | |
| _stats_cached_at = _time.monotonic() | |
| return result | |
| # NEW-3 FIX: expanded from 6 to 14 types -- Politician, Company, CourtCase etc | |
| # were never appearing in the live feed even when the graph was fully populated | |
| _FEED_LABELS = [ | |
| "AuditReport", "EnforcementAction", "RegulatoryOrder", | |
| "ElectoralBond", "Contract", "VigilanceCircular", | |
| "Politician", "Company", "CourtCase", "Tender", | |
| "NGO", "PressRelease", "InsolvencyOrder", "SanctionedEntity", | |
| ] | |
| async def websocket_feed(websocket: WebSocket): | |
| import asyncio | |
| await websocket.accept() | |
| logger.info("[WS] Feed client connected") | |
| last_scraped_at = None # WS cursor: only push when data changes | |
| try: | |
| while True: | |
| driver = get_driver() | |
| payload = {"type": "feed", "at": datetime.now().isoformat()} | |
| if driver: | |
| try: | |
| loop = asyncio.get_running_loop() # FEED-3 FIX: get_event_loop deprecated in 3.10+ | |
| def _query_feed(): | |
| # C-03 / BUG-C3 FIX: synchronous Neo4j driver MUST run | |
| # in a thread pool -- calling driver.session() directly | |
| # inside async def blocks the entire uvicorn event loop | |
| # while waiting for network I/O to Neo4j AuraDB. | |
| with driver.session() as s: | |
| rows = s.run( | |
| "MATCH (n) WHERE labels(n)[0] IN $labels " | |
| "AND n.scraped_at IS NOT NULL " | |
| "RETURN labels(n)[0] AS label, " | |
| "coalesce(n.name, n.title, n.company_name, n.id) AS name, " | |
| "n.scraped_at AS scraped_at, n.id AS id, n.source AS source " | |
| "ORDER BY n.scraped_at DESC LIMIT 8", | |
| labels=_FEED_LABELS | |
| ).data() | |
| if not rows: | |
| stats = s.run( | |
| "MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c" | |
| ).data() | |
| return {"rows": [], "stats": {r["t"]: r["c"] for r in stats if r["t"]}} | |
| return {"rows": rows, "stats": {}} | |
| feed_result = await loop.run_in_executor(None, _query_feed) | |
| feed_rows = feed_result["rows"] | |
| # M-13 FIX: compare full set of IDs, not just first scraped_at | |
| current_ids = frozenset(r.get("id","") for r in feed_rows) | |
| if current_ids and current_ids == last_scraped_at: | |
| await asyncio.sleep(15) | |
| continue | |
| last_scraped_at = current_ids | |
| if feed_rows: | |
| payload["items"] = feed_rows | |
| payload["message"] = ( | |
| feed_rows[0].get("label", "Entity") + ": " + | |
| feed_rows[0].get("name", "-") | |
| ) | |
| else: | |
| payload["stats"] = feed_result["stats"] | |
| payload["message"] = "Feed active -- run /admin/pipeline to ingest data" | |
| except Exception as db_err: | |
| logger.debug(f"[WS] DB query error: {db_err}") | |
| payload["message"] = "Feed active -- database query pending" | |
| else: | |
| payload["message"] = "Feed active -- no database connection" | |
| await websocket.send_json(payload) | |
| await asyncio.sleep(15) | |
| except WebSocketDisconnect: | |
| logger.info("[WS] Feed client disconnected") | |
| except Exception as e: | |
| logger.warning(f"[WS] Feed error: {e}") | |
| finally: | |
| try: | |
| await websocket.close() | |
| except Exception: | |
| pass | |
| # BUG-7 FIX: only register debug routes when DEBUG_MODE is explicitly true. | |
| # Routes registered unconditionally (even returning 404) reveal their existence | |
| # to attackers via path enumeration. Move into a conditional block entirely. | |
| if os.getenv("DEBUG_MODE", "").lower() in ("1", "true", "yes"): | |
| from fastapi import APIRouter as _DebugRouter | |
| _debug_router = _DebugRouter(prefix="/debug", tags=["Debug"]) | |
| def debug_env(): | |
| return { | |
| "neo4j_uri_set": bool(os.getenv("NEO4J_URI")), | |
| "neo4j_user_set": bool(os.getenv("NEO4J_USER")), | |
| "neo4j_password_set": bool(os.getenv("NEO4J_PASSWORD")), | |
| "neo4j_uri_prefix": (os.getenv("NEO4J_URI", "")[:15] + "...") if os.getenv("NEO4J_URI") else "NOT SET", | |
| } | |
| def debug_neo4j(): | |
| from neo4j import GraphDatabase | |
| uri = os.getenv("NEO4J_URI", "") | |
| user = os.getenv("NEO4J_USER", "neo4j") | |
| password = os.getenv("NEO4J_PASSWORD", "") | |
| try: | |
| driver = GraphDatabase.driver(uri, auth=(user, password)) | |
| driver.verify_connectivity() | |
| driver.close() | |
| return {"status": "connected", "uri_prefix": uri[:20]} | |
| except Exception as e: | |
| logger.warning(f"[Debug] Neo4j test failed: {e.__class__.__name__}") | |
| return { | |
| "status": "failed", | |
| "error": "Connection failed -- check environment secrets", | |
| "uri_prefix": uri[:20] if uri else "NOT SET", | |
| } | |
| app.include_router(_debug_router) | |