Spaces:
Running
Running
fix(SEC-6): reject empty-string ADMIN_SECRET -- empty string was falsy and bypassed authentication check
4040e0f | import os, sys | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) | |
| import threading | |
| from datetime import datetime | |
| from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException, Header | |
| from loguru import logger | |
| from api.dependencies import get_db | |
| router = APIRouter() | |
| _pipeline_status = { | |
| "running": False, "last_run": None, | |
| "last_summary": None, "last_error": None, | |
| } | |
| # NEW-A6 FIX: lock prevents two concurrent POST /admin/pipeline requests from | |
| # both passing the running=False check before either sets it to True | |
| _pipeline_lock = threading.Lock() | |
| def _require_admin(x_admin_secret: str = Header(default="")): | |
| """ | |
| H-04 FIX: even in DEBUG_MODE, require a non-empty X-Admin-Secret header | |
| if ADMIN_SECRET is set. Previously DEBUG_MODE=true bypassed ALL auth, | |
| allowing anyone to POST /admin/pipeline (database wipe/reseed) on the | |
| production HuggingFace server if DEBUG_MODE was accidentally left on. | |
| """ | |
| secret = os.getenv("ADMIN_SECRET", "") | |
| is_debug = os.getenv("DEBUG_MODE", "").lower() in ("1", "true", "yes") | |
| if not secret or not secret.strip(): # SEC-6 FIX: empty string is not valid | |
| if not is_debug: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Admin endpoints disabled: set ADMIN_SECRET env var", | |
| ) | |
| # DEBUG_MODE without ADMIN_SECRET: warn but allow (local dev only) | |
| logger.warning("[Admin] No ADMIN_SECRET set -- auth bypassed (DEBUG_MODE)") | |
| return | |
| # ADMIN_SECRET is set: always enforce it regardless of DEBUG_MODE | |
| if x_admin_secret != secret: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Forbidden: invalid or missing X-Admin-Secret header", | |
| ) | |
| def seed_database( | |
| driver=Depends(get_db), | |
| _auth=Depends(_require_admin), | |
| ): | |
| """Load sample nodes AND relationships into Neo4j for demonstration.""" | |
| from graph.seed import ( | |
| SAMPLE_POLITICIANS, SAMPLE_COMPANIES, | |
| SAMPLE_CONTRACTS, SAMPLE_AUDIT_REPORTS, | |
| SAMPLE_DIRECTOR_LINKS, | |
| ) | |
| from graph.loader import GraphLoader | |
| loader = GraphLoader(driver=driver) | |
| p = loader.load_politicians(SAMPLE_POLITICIANS) | |
| c = loader.load_companies(SAMPLE_COMPANIES) | |
| k = loader.load_contracts(SAMPLE_CONTRACTS) | |
| a = loader.load_audit_reports(SAMPLE_AUDIT_REPORTS) | |
| d = loader.load_politician_company_links(SAMPLE_DIRECTOR_LINKS) | |
| return { | |
| "status": "seeded", | |
| "politicians": p, | |
| "companies": c, | |
| "contracts": k, | |
| "audit_reports": a, | |
| "director_links":d, | |
| "try_searching": [ | |
| "Modi", "Gandhi", "Adani", "Tata", "Infosys", | |
| "Amit Shah", "Anurag Thakur", "road construction", "audit Maharashtra" | |
| ], | |
| "note": ( | |
| "Sample data loaded with DIRECTOR_OF and WON_CONTRACT relationships. " | |
| "Search any name above to see the investigation graph." | |
| ), | |
| "next": "POST /admin/pipeline to ingest all 21 live government sources", | |
| } | |
| def trigger_pipeline( | |
| background_tasks: BackgroundTasks, | |
| scrapers: str = "all", | |
| driver=Depends(get_db), | |
| _auth=Depends(_require_admin), | |
| ): | |
| """Trigger full pipeline in background. scrapers: comma-separated or 'all'.""" | |
| # NEW-A6 FIX: lock prevents two simultaneous requests both passing the | |
| # running=False check before either sets running=True | |
| with _pipeline_lock: | |
| if _pipeline_status["running"]: | |
| return { | |
| "status": "already_running", | |
| "started": _pipeline_status["last_run"], | |
| "message": "Pipeline already running. Check /admin/pipeline/status", | |
| } | |
| _pipeline_status["running"] = True | |
| _pipeline_status["last_run"] = datetime.now().isoformat() | |
| scraper_list = None if scrapers == "all" else scrapers.split(",") | |
| background_tasks.add_task(_run_pipeline_background, scraper_list, driver) | |
| return { | |
| "status": "started", | |
| "scrapers": scrapers, | |
| "message": "Pipeline running in background. Check /admin/pipeline/status", | |
| "note": "Full run takes 3-8 minutes depending on network.", | |
| } | |
| def pipeline_status(): | |
| return { | |
| "running": _pipeline_status["running"], | |
| "last_run": _pipeline_status["last_run"], | |
| "last_summary": _pipeline_status["last_summary"], | |
| "last_error": _pipeline_status["last_error"], | |
| } | |
| def _run_pipeline_background(scraper_list, driver): | |
| """Background task -- runs pipeline then loads ALL scraped data into Neo4j.""" | |
| try: | |
| logger.info("[Admin] Background pipeline started") | |
| from processing.pipeline import BharatGraphPipeline | |
| from graph.loader import GraphLoader | |
| results = BharatGraphPipeline().run(scrapers=scraper_list, parallel=True) | |
| summary = results["summary"] | |
| raw = results.get("raw", {}) | |
| logger.info("[Admin] Pipeline done -- loading into Neo4j...") | |
| loader = GraphLoader(driver=driver) | |
| SCRAPER_TO_LOADER = { | |
| "myneta": "load_politicians", | |
| "mca": "load_companies", | |
| "gem": "load_contracts", | |
| "cag": "load_audit_reports", | |
| "pib": "load_press_releases", | |
| "sebi": "load_regulatory_orders", | |
| "ed": "load_enforcement_actions", | |
| "electoral_bond": "load_electoral_bonds", | |
| "ibbi": "load_insolvency_orders", | |
| "ngo_darpan": "load_ngos", | |
| "cppp": "load_tenders", | |
| "loksabha": "load_parliament_questions", | |
| "cvc": "load_vigilance_circulars", | |
| "icij": "load_icij_entities", | |
| "opensanctions": "load_sanctioned_entities", | |
| "njdg": "load_court_cases", | |
| "lgd": "load_local_bodies", | |
| "ncrb": "load_crime_reports", | |
| "wikidata": "load_wikidata_enrichments", | |
| "datagov": "load_datagov_documents", | |
| } | |
| loaded = {} | |
| for scraper_key, method_name in SCRAPER_TO_LOADER.items(): | |
| records = raw.get(scraper_key, []) | |
| if not records: | |
| logger.debug(f"[Admin] No records for {scraper_key}, skipping") | |
| continue | |
| method = getattr(loader, method_name, None) | |
| if method is None: | |
| logger.warning(f"[Admin] loader.{method_name}() not found -- skipping {scraper_key}") | |
| continue | |
| try: | |
| n = method(records) | |
| loaded[scraper_key] = n | |
| logger.info(f"[Admin] Loaded {n} {scraper_key} records via {method_name}()") | |
| except Exception as load_err: | |
| logger.error(f"[Admin] Failed loading {scraper_key}: {load_err}") | |
| loaded[scraper_key] = 0 | |
| total_loaded = sum(loaded.values()) | |
| _pipeline_status["last_summary"] = { | |
| **summary, | |
| "loaded": loaded, | |
| "total_loaded": total_loaded, | |
| "completed_at": datetime.now().isoformat(), | |
| } | |
| logger.success( | |
| f"[Admin] Pipeline complete -- " | |
| f"{total_loaded} total records loaded across {len(loaded)} datasets" | |
| ) | |
| except Exception as e: | |
| logger.error(f"[Admin] Pipeline failed: {e}") | |
| _pipeline_status["last_error"] = str(e) | |
| finally: | |
| _pipeline_status["running"] = False | |