from fastapi import FastAPI, Query, HTTPException, Depends, Request, WebSocket, WebSocketDisconnect, BackgroundTasks from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel import lancedb import pyarrow as pa import os from typing import List, Optional, Dict, Any import json import time import logging import hmac import hashlib import jwt from datetime import datetime, timedelta from cryptography.fernet import Fernet from sentence_transformers import SentenceTransformer from supervisor import supervisor # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("api_hyper_mega") app = FastAPI( title="Ultimate Search Engine API (Hyper-Mega Edition)", description="Backend API with Hybrid Search, Tor OSINT Integration, and Infrastructure Monitoring (Serverless Edition).", version="3.0.0" ) # CORS configuration app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:3000", "http://localhost:3005", "https://wrzzzrzr-hypermega-c2-matrix.hf.space", "https://wrzzzrzr-hypermega-c2-admin.hf.space", "https://huggingface.co" ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Environment Variables INDEX_NAME = os.getenv("INDEX_NAME", "news_articles") HF_SPACE_ID = os.getenv("SPACE_ID", "") DB_PATH = os.getenv("DB_PATH", "./lancedb_data") # 🔐 GOD UNIVERSE SECURITY CONSTANTS SECRET_KEY = os.getenv('GOD_MODE_SECRET_KEY', 'default_hyper_mega_secret_key_change_in_prod') HMAC_SECRET = os.getenv('GOD_MODE_HMAC_SECRET', 'hmac_signature_key_for_api_requests') ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 # AES-256 GCM Upgrade (Secure Authenticated Encryption) from cryptography.hazmat.primitives.ciphers.aead import AESGCM import base64 AES_KEY_B64 = os.getenv('GOD_MODE_AES256_KEY', 'vD9bZq3HwZ8xJ2K+mN5P/rA8bNlT2dY1sC4eR+uL1gQ=') try: aes_key_bytes = base64.b64decode(AES_KEY_B64) if len(aes_key_bytes) != 32: raise ValueError aesgcm = AESGCM(aes_key_bytes) except Exception: logger.warning("[!] Ungültiger AES-256 Key. Generiere Fallback-Key.") aes_key_bytes = AESGCM.generate_key(bit_length=256) aesgcm = AESGCM(aes_key_bytes) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Initialize LanceDB db = lancedb.connect(DB_PATH) # Define schemas schema = pa.schema([ pa.field("id", pa.string()), pa.field("url", pa.string()), pa.field("title", pa.string()), pa.field("content", pa.string()), pa.field("domain", pa.string()), pa.field("category", pa.string()), pa.field("crawled_at", pa.string()), pa.field("metadata", pa.string()), # store as JSON string pa.field("is_deep_web", pa.bool_()), pa.field("content_vector", pa.list_(pa.float32(), 384)), pa.field("content_hash", pa.string()) ]) class IngestPayload(BaseModel): id: str url: str title: str content: str domain: str category: str crawled_at: str metadata: dict is_deep_web: bool content_hash: str def get_table(): if INDEX_NAME in db.table_names(): return db.open_table(INDEX_NAME) else: # Create empty table initially return db.create_table(INDEX_NAME, schema=schema) table = get_table() logger.info("Loading SentenceTransformer model for query encoding...") model = SentenceTransformer('all-MiniLM-L6-v2') @app.on_event("startup") async def startup_event(): logger.info(f"LanceDB initialized at {DB_PATH}. Table {INDEX_NAME} active.") # 📡 WebSocket Manager class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: try: await connection.send_text(message) except: pass manager = ConnectionManager() # --- Security Endpoints --- @app.middleware("http") async def security_black_box_middleware(request: Request, call_next): # 1. Rate Limiting (Redis Bypassed for Serverless Hub, using simple count if needed) client_ip = request.client.host if request.client else "unknown" # 2. HMAC Signature Validation if request.url.path in ["/api/crawl", "/api/ingest", "/api/crawl/bulk"] and request.method == "POST": signature = request.headers.get("X-Signature") body = await request.body() # FIX: Re-wrap the request so route handlers can read the body again async def receive(): return {"type": "http.request", "body": body} request._receive = receive if signature: expected_signature = hmac.new(HMAC_SECRET.encode(), body, hashlib.sha256).hexdigest() if not hmac.compare_digest(expected_signature, signature): logger.warning(f"[🚨] INVALID Signature from {client_ip} on {request.url.path}") return JSONResponse(status_code=403, content={"detail": "Signature Verification Failed"}) response = await call_next(request) return response @app.post("/token") async def login_for_access_token(request: Request): client_ip = request.client.host if request.client else "unknown" user_agent = request.headers.get('user-agent', 'unknown') fingerprint = hashlib.md5(f"{client_ip}:{user_agent}".encode()).hexdigest() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode = {"sub": "god_eye_admin", "exp": expire, "fingerprint": fingerprint} encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) logger.info(f"[🔐] HF GOD MODE JWT ausgestellt für {client_ip}") return {"access_token": encoded_jwt, "token_type": "bearer"} # --- Infrastructure Monitoring --- @app.post("/api/ingest") async def ingest_data(payload: IngestPayload, background_tasks: BackgroundTasks, request: Request): """Replaces Kafka for Serverless environments: Direct data ingestion from Spiders.""" # 🛡️ HMAC Signature Validation signature = request.headers.get("X-Signature") if not signature: raise HTTPException(status_code=401, detail="Missing Cryptographic Signature") body = await request.body() expected_signature = hmac.new(HMAC_SECRET.encode(), body, hashlib.sha256).hexdigest() if not hmac.compare_digest(expected_signature, signature): raise HTTPException(status_code=403, detail="Signature Verification Failed") try: # 🔒 GOD MODE AES-256-GCM nonce = os.urandom(12) ciphertext = aesgcm.encrypt(nonce, payload.content.encode('utf-8'), None) encrypted_payload_b64 = base64.b64encode(nonce + ciphertext).decode('utf-8') # Generate semantic vector on ingestion vector = model.encode(payload.content).tolist() row = { "id": payload.id, "url": payload.url, "title": payload.title, "content": f"AES-256-GCM_ENC::{encrypted_payload_b64}", "domain": payload.domain, "category": payload.category, "crawled_at": payload.crawled_at, "metadata": json.dumps(payload.metadata), "is_deep_web": payload.is_deep_web, "content_vector": vector, "content_hash": payload.content_hash } table = get_table() table.add([row]) # Broadcast to WebSockets message = json.dumps({"event": "new_article", "data": {"url": payload.url, "title": payload.title}}) background_tasks.add_task(manager.broadcast, message) return {"status": "success", "id": payload.id} except Exception as e: logger.error(f"Ingestion failed: {e}") raise HTTPException(status_code=500, detail=str(e)) class CrawlRequest(BaseModel): targets: Optional[List[str]] = None urls: Optional[List[str]] = None @property def final_targets(self) -> List[str]: return self.urls or self.targets or [] @app.post("/api/crawl") @app.post("/api/crawl/bulk") async def start_crawl(request: CrawlRequest, raw_req: Request): targets = request.final_targets if not targets: raise HTTPException(status_code=400, detail="No targets provided.") supervisor.deploy_spiders(targets) return {"status": "crawl_initiated", "count": len(targets)} @app.get("/api/health") async def health_check(): """Enterprise health monitoring for core infrastructure""" health = { "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "components": {}, "crawlers_active": supervisor.active_count() } # Check LanceDB health try: table = get_table() table.head(1) health["components"]["lancedb"] = "operational" except Exception: health["status"] = "degraded" health["components"]["lancedb"] = "error" return health @app.get("/api/stats") async def get_stats(): """Real-time system statistics for the operation dashboard""" try: table = get_table() doc_count = len(table) # Calculate size roughly size_bytes = 0 if os.path.exists(DB_PATH): for dirpath, _, filenames in os.walk(DB_PATH): for f in filenames: fp = os.path.join(dirpath, f) if not os.path.islink(fp): size_bytes += os.path.getsize(fp) return { "status": "online", "total_documents_indexed": doc_count, "index_size_mb": round(size_bytes / (1024 * 1024), 2), "index_name": INDEX_NAME } except Exception as e: return {"status": "error", "message": str(e)} # --- Search Logic --- @app.get("/api/search") async def search( q: str = Query(..., min_length=1), semantic: bool = False, page: int = 1, size: int = 40 ): start_time = time.time() from_idx = (page - 1) * size table = get_table() if len(table) == 0: return { "total": 0, "results": [], "execution_time_ms": 0, "semantic_enabled": semantic } try: # Hybrid Search Logic if semantic: query_vector = model.encode(q).tolist() # FTS doesn't natively combine with vector easily in old LanceDB, # so we just do vector search for semantic arrow_res = table.search(query_vector).offset(from_idx).limit(size).to_arrow() results_dict = arrow_res.to_pylist() else: # Fallback to pure vector search for now, ensuring offset works for pagination query_vector = model.encode(q).tolist() arrow_res = table.search(query_vector).offset(from_idx).limit(size).to_arrow() results_dict = arrow_res.to_pylist() # Parse results results = [] for row in results_dict: # Highlight snippet manually content = row.get("content", "") lower_content = content.lower() lower_q = q.lower() snippet_start = 0 if lower_q in lower_content and not semantic: idx = lower_content.find(lower_q) snippet_start = max(0, idx - 50) content_snippet = content[snippet_start:snippet_start+250] + "..." # parse metadata meta = {} if row.get("metadata"): try: meta = json.loads(row["metadata"]) except: pass results.append({ "id": str(row.get("id", "")), "url": row.get("url", ""), "title": row.get("title", ""), "content_snippet": content_snippet, "domain": row.get("domain", ""), "category": row.get("category", ""), "crawled_at": row.get("crawled_at", ""), "metadata": meta, "is_deep_web": row.get("is_deep_web", False), "score": float(row.get("_distance", 0.0)) # LanceDB distance (lower is better usually) }) execution_time = (time.time() - start_time) * 1000 return { "total": len(table), # Approximate limit "results": results, "execution_time_ms": round(execution_time, 2), "semantic_enabled": semantic } except Exception as e: logger.error(f"Search error: {e}") raise HTTPException(status_code=500, detail="Search engine internal error") from typing import List, Dict, Any import httpx class SummarizeRequest(BaseModel): query: str results: List[Dict[str, Any]] @app.post("/api/ai/summarize") async def summarize_intel(req: SummarizeRequest): if not req.results: return {"summary": "Keine relevanten Intel-Daten gefunden."} # Text-Extrahierung der Top 3 Ergebnisse combined_text = " ".join([r.get('content', '')[:1000] for r in req.results[:3]]) # HF Inference API for Summarization hf_token = os.environ.get("HF_API_TOKEN") if hf_token and len(combined_text) > 50: try: async with httpx.AsyncClient() as client: res = await client.post( "https://api-inference.huggingface.co/models/facebook/bart-large-cnn", headers={"Authorization": f"Bearer {hf_token}"}, json={"inputs": combined_text[:3000], "parameters": {"max_length": 150, "min_length": 40}}, timeout=15.0 ) hf_data = res.json() if isinstance(hf_data, list) and len(hf_data) > 0 and 'summary_text' in hf_data[0]: return {"summary": hf_data[0]['summary_text']} except Exception as e: logger.error(f"[-] HF API Error: {e}") # Autonomer Fallback summary_text = f"Die Analyse zu '{req.query}' basierend auf {len(req.results)} C2-Knotenpunkten zeigt signifikante Anomalien auf. " if any(".onion" in str(r.get('url','')) for r in req.results): summary_text += "Deep Web Signatur detektiert. " summary_text += "Lokale KI-Kerne raten zu erhöhter Wachsamkeit und weiterer Korrelationsanalyse durch God Mode. " + combined_text[:100] + "..." return {"summary": summary_text} @app.get("/api/sources") async def get_sources(): try: config_path = os.path.join(os.path.dirname(__file__), "..", "config", "sources.json") with open(config_path, "r") as f: return json.load(f) except Exception: return [] @app.websocket("/api/live") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket)