File size: 13,668 Bytes
a7cebac
 
b70b0c6
 
5949a90
 
 
 
 
 
 
 
 
70e41ad
5949a90
d0cb2c0
 
64b244e
5949a90
 
 
 
a7cebac
5237303
 
 
aa36a1a
 
 
 
 
 
 
 
5237303
a7cebac
02e886f
 
 
 
 
 
5949a90
5237303
5949a90
 
 
 
 
 
 
 
 
 
820c26f
5949a90
 
 
3e6838c
 
 
 
 
 
5949a90
 
3e6838c
cb19a24
5949a90
 
 
06c3703
 
 
 
4d25967
b1a489c
 
 
 
4d25967
b1a489c
 
 
 
4d25967
3e6838c
 
 
 
 
 
971c184
 
eca3358
da10aa3
3e6838c
 
eece913
3e6838c
 
 
 
 
64b244e
02e886f
5457f68
70e41ad
 
 
5949a90
e021102
 
 
 
3e6838c
fc19538
3e6838c
 
 
a7cebac
64b244e
e021102
 
a7cebac
5237303
5949a90
cb19a24
5949a90
 
 
 
 
 
 
 
 
820c26f
5949a90
 
 
 
b20b3a7
 
 
 
05ed738
 
5ead1fe
d0cb2c0
 
 
 
5ead1fe
5949a90
 
903be0d
 
5ead1fe
 
 
903be0d
112882a
 
 
 
 
 
cb19a24
5949a90
 
3e6838c
e95e7d1
 
7b4461e
e95e7d1
 
 
 
 
 
 
 
 
7b4461e
 
 
e95e7d1
 
 
903be0d
 
5949a90
 
7b4461e
5949a90
 
903be0d
 
 
5949a90
 
1a8094a
 
3e6838c
 
 
1a8094a
 
3e6838c
 
 
5949a90
 
e95e7d1
5949a90
 
5a3112a
5949a90
 
e95e7d1
a7cebac
e95e7d1
 
c157cb6
a73c56c
 
 
 
 
 
 
3e6838c
a73c56c
 
 
 
 
 
 
3e6838c
a73c56c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e6838c
 
a7cebac
3e6838c
a7cebac
3e6838c
e95e7d1
5949a90
 
e95e7d1
 
3e6838c
 
 
 
 
 
6fd8298
88ebdc6
 
 
 
 
 
a7cebac
88ebdc6
 
3e6838c
88ebdc6
 
 
 
3e6838c
88ebdc6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import os
import sys
from dotenv import load_dotenv
load_dotenv()
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger

from api.dependencies import get_driver, close_driver
from api.routes import search, profile, graph, risk, multilingual, export, admin, investigation, affidavit, biography, benami, sources, procurement, conflict, linguistic, policy, adversarial, debate, runtime, resolve, timeline, forensics, self_learning, case_memory
from api.models import HealthResponse, StatsResponse
from processing.alias_graph import AliasGraph          # Phase 32
from processing.entity_resolver_v2 import EntityResolverV2  # Phase 32
from config.runtime_profile import PROFILE as _RUNTIME_PROFILE


@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("[API] Starting up -- attempting Neo4j connection")
    driver = get_driver()
    if driver:
        logger.success("[API] Neo4j ready")
        # SEARCH-1 FIX: ensure fulltext index exists on every startup.
        # setup_schema() uses IF NOT EXISTS -- safe to call repeatedly.
        try:
            from graph.loader import GraphLoader
            GraphLoader(driver=driver).setup_schema()
            logger.info("[API] Schema/index verified")
        except Exception as _se:
            logger.warning(f"[API] Schema setup skipped: {type(_se).__name__}")
    else:
        logger.warning("[API] Starting without Neo4j -- set secrets to enable")
    # Phase 32: load alias graph so /resolve/alias works immediately
    try:
        _alias_graph.load()
        logger.info(f"[Startup] AliasGraph: {len(_alias_graph)} aliases loaded")
    except Exception as _ag_err:
        logger.warning(f"[Startup] AliasGraph load skipped: {type(_ag_err).__name__}")
    yield
    logger.info("[API] Shutting down")
    close_driver()


app = FastAPI(
    title="BharatGraph API",
    description=(
        "Public transparency and institutional intelligence platform for India. "
        "All data sourced from official government records. "
        "Outputs are structural indicators, not legal findings."
    ),
    version="0.35.0",
    lifespan=lifespan,
)

_cors_origins_raw = os.getenv(
    "CORS_ORIGINS",
    "http://localhost:8000,http://127.0.0.1:8000,https://abinaze.github.io"
)
_cors_origins = [o.strip() for o in _cors_origins_raw.split(",") if o.strip()]

app.add_middleware(
    CORSMiddleware,
    allow_origins=_cors_origins,
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

# BUG-26 FIX: middleware registered before routes (conventional ordering).
# In FastAPI/Starlette, middleware wraps the entire ASGI app so the
# functional behavior is identical regardless of order, but conventional
# placement before routers is clearer for debugging.
from fastapi.middleware.gzip import GZipMiddleware
from api.middleware.rate_limiter import SlidingWindowRateLimiter
from api.middleware.security_headers import SecurityHeadersMiddleware
from api.middleware.input_validator import InputValidatorMiddleware
from api.middleware.audit_logger import AuditLoggerMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(SlidingWindowRateLimiter)
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(InputValidatorMiddleware)
app.add_middleware(AuditLoggerMiddleware)

app.include_router(search.router,        tags=["Search"])
app.include_router(profile.router,       tags=["Profile"])
app.include_router(graph.router,         tags=["Graph"])
app.include_router(risk.router,          tags=["Risk"])
app.include_router(multilingual.router,  tags=["Multilingual"])
app.include_router(export.router,        tags=["Export"])
app.include_router(admin.router,         tags=["Admin"])
app.include_router(investigation.router, tags=["Investigation"])
app.include_router(affidavit.router,     tags=["Affidavit"])
app.include_router(biography.router,     tags=["Biography"])
app.include_router(benami.router,        tags=["Benami"])
app.include_router(sources.router,       tags=["Sources"])
app.include_router(procurement.router,   tags=["Procurement"])
app.include_router(conflict.router,      tags=["Conflict"])
app.include_router(linguistic.router,    tags=["Linguistic"])
app.include_router(policy.router,        tags=["Policy"])
app.include_router(adversarial.router,   tags=["Adversarial"])
app.include_router(debate.router,        tags=["Debate"])
app.include_router(runtime.router,       tags=["Runtime"])
app.include_router(resolve.router,       tags=["Resolve"])   # Phase 32
app.include_router(timeline.router,      tags=["Timeline"])  # Phase 33
app.include_router(forensics.router,     tags=["Forensics"])  # Phase 34
app.include_router(self_learning.router, tags=["SelfLearning"])  # Phase 34
app.include_router(case_memory.router,   tags=["CaseMemory"])  # Phase 34


@app.get("/")
def root():
    return {
        "name":        "BharatGraph API",
        "version":     "0.31.5",
        "status":      "running",
        "docs":        "/docs",
        "health":      "/health",
        "description": "Public transparency intelligence platform for India.",
        "profile":     _RUNTIME_PROFILE.name,
    }


@app.api_route("/health", methods=["GET", "HEAD"], response_model=HealthResponse)
def health_check():
    driver    = get_driver()
    connected = driver is not None
    try:
        if driver:
            driver.verify_connectivity()
    except Exception:
        connected = False
    return HealthResponse(
        status="ok" if connected else "degraded",
        neo4j_connected=connected,
        version="0.35.0",
        generated_at=datetime.now().isoformat(),
    )


# H-07 FIX: module-level cache vars (must be at module scope, not inside function)
_stats_cache      = None
_stats_cached_at  = 0.0
_STATS_TTL        = 60.0
import threading as _threading          # B-01 FIX: was missing entirely
_stats_lock       = _threading.Lock()  # B-01 FIX: NameError on every GET /stats

# Phase 32: shared AliasGraph instance (loaded once at startup)
_alias_graph   = AliasGraph()
_resolver_v2   = EntityResolverV2(threshold=0.82)


@app.get("/stats", response_model=StatsResponse)
def get_stats():
    global _stats_cache, _stats_cached_at
    import time as _time
    _now = _time.monotonic()
    if _stats_cache is not None and (_now - _stats_cached_at) < _STATS_TTL:
        return _stats_cache

    # BACKEND-2 FIX: lock prevents two concurrent requests both
    # seeing _stats_cache is None and both running the full graph scan
    with _stats_lock:
        import time as _tc
        if _stats_cache is not None and (_tc.monotonic() - _stats_cached_at) < _STATS_TTL:
            return _stats_cache
    driver      = get_driver()
    node_counts = {}
    rel_counts  = {}
    last_run    = None
    if driver:
        try:
            with driver.session() as session:
                n_rows = session.run(
                    "MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c"
                ).data()
                node_counts = {r["t"]: r["c"] for r in n_rows if r["t"]}
                r_rows = session.run(
                    "MATCH ()-[r]->() RETURN type(r) AS t, count(r) AS c"
                ).data()
                rel_counts = {r["t"]: r["c"] for r in r_rows if r["t"]}
                meta = session.run(
                    "MATCH (m:PipelineMeta) "
                    "RETURN m.last_run AS ts ORDER BY m.last_run DESC LIMIT 1"
                ).single()
                last_run = meta["ts"] if meta else None
        except Exception as e:
            logger.debug(f"[Stats] Query error: {e}")
    import time as _time
    result = StatsResponse(
        nodes=node_counts,
        relationships=rel_counts,
        last_pipeline_run=last_run,
        generated_at=datetime.now().isoformat(),
    )
    _stats_cache = result
    _stats_cached_at = _time.monotonic()
    return result


# NEW-3 FIX: expanded from 6 to 14 types -- Politician, Company, CourtCase etc
# were never appearing in the live feed even when the graph was fully populated
_FEED_LABELS = [
    "AuditReport", "EnforcementAction", "RegulatoryOrder",
    "ElectoralBond", "Contract", "VigilanceCircular",
    "Politician", "Company", "CourtCase", "Tender",
    "NGO", "PressRelease", "InsolvencyOrder", "SanctionedEntity",
]


@app.websocket("/ws/feed")
async def websocket_feed(websocket: WebSocket):
    import asyncio
    await websocket.accept()
    logger.info("[WS] Feed client connected")
    last_scraped_at = None   # WS cursor: only push when data changes
    try:
        while True:
            driver = get_driver()
            payload = {"type": "feed", "at": datetime.now().isoformat()}
            if driver:
                try:
                    loop = asyncio.get_running_loop()  # FEED-3 FIX: get_event_loop deprecated in 3.10+

                    def _query_feed():
                        # C-03 / BUG-C3 FIX: synchronous Neo4j driver MUST run
                        # in a thread pool -- calling driver.session() directly
                        # inside async def blocks the entire uvicorn event loop
                        # while waiting for network I/O to Neo4j AuraDB.
                        with driver.session() as s:
                            rows = s.run(
                                "MATCH (n) WHERE labels(n)[0] IN $labels "
                                "AND n.scraped_at IS NOT NULL "
                                "RETURN labels(n)[0] AS label, "
                                "coalesce(n.name, n.title, n.company_name, n.id) AS name, "
                                "n.scraped_at AS scraped_at, n.id AS id, n.source AS source "
                                "ORDER BY n.scraped_at DESC LIMIT 8",
                                labels=_FEED_LABELS
                            ).data()
                            if not rows:
                                stats = s.run(
                                    "MATCH (n) RETURN labels(n)[0] AS t, count(n) AS c"
                                ).data()
                                return {"rows": [], "stats": {r["t"]: r["c"] for r in stats if r["t"]}}
                            return {"rows": rows, "stats": {}}

                    feed_result = await loop.run_in_executor(None, _query_feed)
                    feed_rows   = feed_result["rows"]

                    # M-13 FIX: compare full set of IDs, not just first scraped_at
                    current_ids = frozenset(r.get("id","") for r in feed_rows)
                    if current_ids and current_ids == last_scraped_at:
                        await asyncio.sleep(15)
                        continue
                    last_scraped_at = current_ids

                    if feed_rows:
                        payload["items"]   = feed_rows
                        payload["message"] = (
                            feed_rows[0].get("label", "Entity") + ": " +
                            feed_rows[0].get("name", "-")
                        )
                    else:
                        payload["stats"]   = feed_result["stats"]
                        payload["message"] = "Feed active -- run /admin/pipeline to ingest data"
                except Exception as db_err:
                    logger.debug(f"[WS] DB query error: {db_err}")
                    payload["message"] = "Feed active -- database query pending"
            else:
                payload["message"] = "Feed active -- no database connection"
            await websocket.send_json(payload)
            await asyncio.sleep(15)
    except WebSocketDisconnect:
        logger.info("[WS] Feed client disconnected")
    except Exception as e:
        logger.warning(f"[WS] Feed error: {e}")
    finally:
        try:
            await websocket.close()
        except Exception:
            pass


# BUG-7 FIX: only register debug routes when DEBUG_MODE is explicitly true.
# Routes registered unconditionally (even returning 404) reveal their existence
# to attackers via path enumeration. Move into a conditional block entirely.
if os.getenv("DEBUG_MODE", "").lower() in ("1", "true", "yes"):
    from fastapi import APIRouter as _DebugRouter
    _debug_router = _DebugRouter(prefix="/debug", tags=["Debug"])

    @_debug_router.get("/env")
    def debug_env():
        return {
            "neo4j_uri_set":      bool(os.getenv("NEO4J_URI")),
            "neo4j_user_set":     bool(os.getenv("NEO4J_USER")),
            "neo4j_password_set": bool(os.getenv("NEO4J_PASSWORD")),
            "neo4j_uri_prefix":   (os.getenv("NEO4J_URI", "")[:15] + "...") if os.getenv("NEO4J_URI") else "NOT SET",
        }

    @_debug_router.get("/neo4j")
    def debug_neo4j():
        from neo4j import GraphDatabase
        uri      = os.getenv("NEO4J_URI", "")
        user     = os.getenv("NEO4J_USER", "neo4j")
        password = os.getenv("NEO4J_PASSWORD", "")
        try:
            driver = GraphDatabase.driver(uri, auth=(user, password))
            driver.verify_connectivity()
            driver.close()
            return {"status": "connected", "uri_prefix": uri[:20]}
        except Exception as e:
            logger.warning(f"[Debug] Neo4j test failed: {e.__class__.__name__}")
            return {
                "status":     "failed",
                "error":      "Connection failed -- check environment secrets",
                "uri_prefix": uri[:20] if uri else "NOT SET",
            }

    app.include_router(_debug_router)