bharatgraph / api /main.py
abinazebinoy's picture
chore: bump version to 0.35.0 -- Phase 34 complete
820c26f
Raw
History Blame Contribute Delete
13.7 kB
import os
import sys
from dotenv import load_dotenv
load_dotenv()
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
from api.dependencies import get_driver, close_driver
from api.routes import search, profile, graph, risk, multilingual, export, admin, investigation, affidavit, biography, benami, sources, procurement, conflict, linguistic, policy, adversarial, debate, runtime, resolve, timeline, forensics, self_learning, case_memory
from api.models import HealthResponse, StatsResponse
from processing.alias_graph import AliasGraph # Phase 32
from processing.entity_resolver_v2 import EntityResolverV2 # Phase 32
from config.runtime_profile import PROFILE as _RUNTIME_PROFILE
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("[API] Starting up -- attempting Neo4j connection")
driver = get_driver()
if driver:
logger.success("[API] Neo4j ready")
# SEARCH-1 FIX: ensure fulltext index exists on every startup.
# setup_schema() uses IF NOT EXISTS -- safe to call repeatedly.
try:
from graph.loader import GraphLoader
GraphLoader(driver=driver).setup_schema()
logger.info("[API] Schema/index verified")
except Exception as _se:
logger.warning(f"[API] Schema setup skipped: {type(_se).__name__}")
else:
logger.warning("[API] Starting without Neo4j -- set secrets to enable")
# Phase 32: load alias graph so /resolve/alias works immediately
try:
_alias_graph.load()
logger.info(f"[Startup] AliasGraph: {len(_alias_graph)} aliases loaded")
except Exception as _ag_err:
logger.warning(f"[Startup] AliasGraph load skipped: {type(_ag_err).__name__}")
yield
logger.info("[API] Shutting down")
close_driver()
app = FastAPI(
title="BharatGraph API",
description=(
"Public transparency and institutional intelligence platform for India. "
"All data sourced from official government records. "
"Outputs are structural indicators, not legal findings."
),
version="0.35.0",
lifespan=lifespan,
)
_cors_origins_raw = os.getenv(
"CORS_ORIGINS",
"http://localhost:8000,http://127.0.0.1:8000,https://abinaze.github.io"
)
_cors_origins = [o.strip() for o in _cors_origins_raw.split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# BUG-26 FIX: middleware registered before routes (conventional ordering).
# In FastAPI/Starlette, middleware wraps the entire ASGI app so the
# functional behavior is identical regardless of order, but conventional
# placement before routers is clearer for debugging.
from fastapi.middleware.gzip import GZipMiddleware
from api.middleware.rate_limiter import SlidingWindowRateLimiter
from api.middleware.security_headers import SecurityHeadersMiddleware
from api.middleware.input_validator import InputValidatorMiddleware
from api.middleware.audit_logger import AuditLoggerMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(SlidingWindowRateLimiter)
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(InputValidatorMiddleware)
app.add_middleware(AuditLoggerMiddleware)
app.include_router(search.router, tags=["Search"])
app.include_router(profile.router, tags=["Profile"])
app.include_router(graph.router, tags=["Graph"])
app.include_router(risk.router, tags=["Risk"])
app.include_router(multilingual.router, tags=["Multilingual"])
app.include_router(export.router, tags=["Export"])
app.include_router(admin.router, tags=["Admin"])
app.include_router(investigation.router, tags=["Investigation"])
app.include_router(affidavit.router, tags=["Affidavit"])
app.include_router(biography.router, tags=["Biography"])
app.include_router(benami.router, tags=["Benami"])
app.include_router(sources.router, tags=["Sources"])
app.include_router(procurement.router, tags=["Procurement"])
app.include_router(conflict.router, tags=["Conflict"])
app.include_router(linguistic.router, tags=["Linguistic"])
app.include_router(policy.router, tags=["Policy"])
app.include_router(adversarial.router, tags=["Adversarial"])
app.include_router(debate.router, tags=["Debate"])
app.include_router(runtime.router, tags=["Runtime"])
app.include_router(resolve.router, tags=["Resolve"]) # Phase 32
app.include_router(timeline.router, tags=["Timeline"]) # Phase 33
app.include_router(forensics.router, tags=["Forensics"]) # Phase 34
app.include_router(self_learning.router, tags=["SelfLearning"]) # Phase 34
app.include_router(case_memory.router, tags=["CaseMemory"]) # Phase 34
@app.get("/")
def root():
return {
"name": "BharatGraph API",
"version": "0.31.5",
"status": "running",
"docs": "/docs",
"health": "/health",
"description": "Public transparency intelligence platform for India.",
"profile": _RUNTIME_PROFILE.name,
}
@app.api_route("/health", methods=["GET", "HEAD"], response_model=HealthResponse)
def health_check():
driver = get_driver()
connected = driver is not None
try:
if driver:
driver.verify_connectivity()
except Exception:
connected = False
return HealthResponse(
status="ok" if connected else "degraded",
neo4j_connected=connected,
version="0.35.0",
generated_at=datetime.now().isoformat(),
)
# H-07 FIX: module-level cache vars (must be at module scope, not inside function)
_stats_cache = None
_stats_cached_at = 0.0
_STATS_TTL = 60.0
import threading as _threading # B-01 FIX: was missing entirely
_stats_lock = _threading.Lock() # B-01 FIX: NameError on every GET /stats
# Phase 32: shared AliasGraph instance (loaded once at startup)
_alias_graph = AliasGraph()
_resolver_v2 = EntityResolverV2(threshold=0.82)
@app.get("/stats", response_model=StatsResponse)
def get_stats():
global _stats_cache, _stats_cached_at
import time as _time
_now = _time.monotonic()
if _stats_cache is not None and (_now - _stats_cached_at) < _STATS_TTL:
return _stats_cache
# BACKEND-2 FIX: lock prevents two concurrent requests both
# seeing _stats_cache is None and both running the full graph scan
with _stats_lock:
import time as _tc
if _stats_cache is not None and (_tc.monotonic() - _stats_cached_at) < _STATS_TTL:
return _stats_cache
driver = get_driver()
node_counts = {}
rel_counts = {}
last_run = None
if driver:
try:
with driver.session() as session:
n_rows = session.run(
"MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c"
).data()
node_counts = {r["t"]: r["c"] for r in n_rows if r["t"]}
r_rows = session.run(
"MATCH ()-[r]->() RETURN type(r) AS t, count(r) AS c"
).data()
rel_counts = {r["t"]: r["c"] for r in r_rows if r["t"]}
meta = session.run(
"MATCH (m:PipelineMeta) "
"RETURN m.last_run AS ts ORDER BY m.last_run DESC LIMIT 1"
).single()
last_run = meta["ts"] if meta else None
except Exception as e:
logger.debug(f"[Stats] Query error: {e}")
import time as _time
result = StatsResponse(
nodes=node_counts,
relationships=rel_counts,
last_pipeline_run=last_run,
generated_at=datetime.now().isoformat(),
)
_stats_cache = result
_stats_cached_at = _time.monotonic()
return result
# NEW-3 FIX: expanded from 6 to 14 types -- Politician, Company, CourtCase etc
# were never appearing in the live feed even when the graph was fully populated
_FEED_LABELS = [
"AuditReport", "EnforcementAction", "RegulatoryOrder",
"ElectoralBond", "Contract", "VigilanceCircular",
"Politician", "Company", "CourtCase", "Tender",
"NGO", "PressRelease", "InsolvencyOrder", "SanctionedEntity",
]
@app.websocket("/ws/feed")
async def websocket_feed(websocket: WebSocket):
import asyncio
await websocket.accept()
logger.info("[WS] Feed client connected")
last_scraped_at = None # WS cursor: only push when data changes
try:
while True:
driver = get_driver()
payload = {"type": "feed", "at": datetime.now().isoformat()}
if driver:
try:
loop = asyncio.get_running_loop() # FEED-3 FIX: get_event_loop deprecated in 3.10+
def _query_feed():
# C-03 / BUG-C3 FIX: synchronous Neo4j driver MUST run
# in a thread pool -- calling driver.session() directly
# inside async def blocks the entire uvicorn event loop
# while waiting for network I/O to Neo4j AuraDB.
with driver.session() as s:
rows = s.run(
"MATCH (n) WHERE labels(n)[0] IN $labels "
"AND n.scraped_at IS NOT NULL "
"RETURN labels(n)[0] AS label, "
"coalesce(n.name, n.title, n.company_name, n.id) AS name, "
"n.scraped_at AS scraped_at, n.id AS id, n.source AS source "
"ORDER BY n.scraped_at DESC LIMIT 8",
labels=_FEED_LABELS
).data()
if not rows:
stats = s.run(
"MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c"
).data()
return {"rows": [], "stats": {r["t"]: r["c"] for r in stats if r["t"]}}
return {"rows": rows, "stats": {}}
feed_result = await loop.run_in_executor(None, _query_feed)
feed_rows = feed_result["rows"]
# M-13 FIX: compare full set of IDs, not just first scraped_at
current_ids = frozenset(r.get("id","") for r in feed_rows)
if current_ids and current_ids == last_scraped_at:
await asyncio.sleep(15)
continue
last_scraped_at = current_ids
if feed_rows:
payload["items"] = feed_rows
payload["message"] = (
feed_rows[0].get("label", "Entity") + ": " +
feed_rows[0].get("name", "-")
)
else:
payload["stats"] = feed_result["stats"]
payload["message"] = "Feed active -- run /admin/pipeline to ingest data"
except Exception as db_err:
logger.debug(f"[WS] DB query error: {db_err}")
payload["message"] = "Feed active -- database query pending"
else:
payload["message"] = "Feed active -- no database connection"
await websocket.send_json(payload)
await asyncio.sleep(15)
except WebSocketDisconnect:
logger.info("[WS] Feed client disconnected")
except Exception as e:
logger.warning(f"[WS] Feed error: {e}")
finally:
try:
await websocket.close()
except Exception:
pass
# BUG-7 FIX: only register debug routes when DEBUG_MODE is explicitly true.
# Routes registered unconditionally (even returning 404) reveal their existence
# to attackers via path enumeration. Move into a conditional block entirely.
if os.getenv("DEBUG_MODE", "").lower() in ("1", "true", "yes"):
from fastapi import APIRouter as _DebugRouter
_debug_router = _DebugRouter(prefix="/debug", tags=["Debug"])
@_debug_router.get("/env")
def debug_env():
return {
"neo4j_uri_set": bool(os.getenv("NEO4J_URI")),
"neo4j_user_set": bool(os.getenv("NEO4J_USER")),
"neo4j_password_set": bool(os.getenv("NEO4J_PASSWORD")),
"neo4j_uri_prefix": (os.getenv("NEO4J_URI", "")[:15] + "...") if os.getenv("NEO4J_URI") else "NOT SET",
}
@_debug_router.get("/neo4j")
def debug_neo4j():
from neo4j import GraphDatabase
uri = os.getenv("NEO4J_URI", "")
user = os.getenv("NEO4J_USER", "neo4j")
password = os.getenv("NEO4J_PASSWORD", "")
try:
driver = GraphDatabase.driver(uri, auth=(user, password))
driver.verify_connectivity()
driver.close()
return {"status": "connected", "uri_prefix": uri[:20]}
except Exception as e:
logger.warning(f"[Debug] Neo4j test failed: {e.__class__.__name__}")
return {
"status": "failed",
"error": "Connection failed -- check environment secrets",
"uri_prefix": uri[:20] if uri else "NOT SET",
}
app.include_router(_debug_router)