Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Query, HTTPException, Depends, Request, WebSocket, WebSocketDisconnect, BackgroundTasks | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import OAuth2PasswordBearer | |
| from pydantic import BaseModel | |
| import lancedb | |
| import pyarrow as pa | |
| import os | |
| from typing import List, Optional, Dict, Any | |
| import json | |
| import time | |
| import logging | |
| import hmac | |
| import hashlib | |
| import jwt | |
| from datetime import datetime, timedelta | |
| from cryptography.fernet import Fernet | |
| from sentence_transformers import SentenceTransformer | |
| from supervisor import supervisor | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("api_hyper_mega") | |
| app = FastAPI( | |
| title="Ultimate Search Engine API (Hyper-Mega Edition)", | |
| description="Backend API with Hybrid Search, Tor OSINT Integration, and Infrastructure Monitoring (Serverless Edition).", | |
| version="3.0.0" | |
| ) | |
| # CORS configuration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "http://localhost:3000", | |
| "http://localhost:3005", | |
| "https://wrzzzrzr-hypermega-c2-matrix.hf.space", | |
| "https://wrzzzrzr-hypermega-c2-admin.hf.space", | |
| "https://huggingface.co" | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Environment Variables | |
| INDEX_NAME = os.getenv("INDEX_NAME", "news_articles") | |
| HF_SPACE_ID = os.getenv("SPACE_ID", "") | |
| DB_PATH = os.getenv("DB_PATH", "./lancedb_data") | |
| # 🔐 GOD UNIVERSE SECURITY CONSTANTS | |
| SECRET_KEY = os.getenv('GOD_MODE_SECRET_KEY', 'default_hyper_mega_secret_key_change_in_prod') | |
| HMAC_SECRET = os.getenv('GOD_MODE_HMAC_SECRET', 'hmac_signature_key_for_api_requests') | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 | |
| # AES-256 GCM Upgrade (Secure Authenticated Encryption) | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| import base64 | |
| AES_KEY_B64 = os.getenv('GOD_MODE_AES256_KEY', 'vD9bZq3HwZ8xJ2K+mN5P/rA8bNlT2dY1sC4eR+uL1gQ=') | |
| try: | |
| aes_key_bytes = base64.b64decode(AES_KEY_B64) | |
| if len(aes_key_bytes) != 32: raise ValueError | |
| aesgcm = AESGCM(aes_key_bytes) | |
| except Exception: | |
| logger.warning("[!] Ungültiger AES-256 Key. Generiere Fallback-Key.") | |
| aes_key_bytes = AESGCM.generate_key(bit_length=256) | |
| aesgcm = AESGCM(aes_key_bytes) | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
| # Initialize LanceDB | |
| db = lancedb.connect(DB_PATH) | |
| # Define schemas | |
| schema = pa.schema([ | |
| pa.field("id", pa.string()), | |
| pa.field("url", pa.string()), | |
| pa.field("title", pa.string()), | |
| pa.field("content", pa.string()), | |
| pa.field("domain", pa.string()), | |
| pa.field("category", pa.string()), | |
| pa.field("crawled_at", pa.string()), | |
| pa.field("metadata", pa.string()), # store as JSON string | |
| pa.field("is_deep_web", pa.bool_()), | |
| pa.field("content_vector", pa.list_(pa.float32(), 384)), | |
| pa.field("content_hash", pa.string()) | |
| ]) | |
| class IngestPayload(BaseModel): | |
| id: str | |
| url: str | |
| title: str | |
| content: str | |
| domain: str | |
| category: str | |
| crawled_at: str | |
| metadata: dict | |
| is_deep_web: bool | |
| content_hash: str | |
| def get_table(): | |
| if INDEX_NAME in db.table_names(): | |
| return db.open_table(INDEX_NAME) | |
| else: | |
| # Create empty table initially | |
| return db.create_table(INDEX_NAME, schema=schema) | |
| table = get_table() | |
| logger.info("Loading SentenceTransformer model for query encoding...") | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| async def startup_event(): | |
| logger.info(f"LanceDB initialized at {DB_PATH}. Table {INDEX_NAME} active.") | |
| # 📡 WebSocket Manager | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: List[WebSocket] = [] | |
| async def connect(self, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_connections.append(websocket) | |
| def disconnect(self, websocket: WebSocket): | |
| self.active_connections.remove(websocket) | |
| async def broadcast(self, message: str): | |
| for connection in self.active_connections: | |
| try: | |
| await connection.send_text(message) | |
| except: | |
| pass | |
| manager = ConnectionManager() | |
| # --- Security Endpoints --- | |
| async def security_black_box_middleware(request: Request, call_next): | |
| # 1. Rate Limiting (Redis Bypassed for Serverless Hub, using simple count if needed) | |
| client_ip = request.client.host if request.client else "unknown" | |
| # 2. HMAC Signature Validation | |
| if request.url.path in ["/api/crawl", "/api/ingest", "/api/crawl/bulk"] and request.method == "POST": | |
| signature = request.headers.get("X-Signature") | |
| body = await request.body() | |
| # FIX: Re-wrap the request so route handlers can read the body again | |
| async def receive(): | |
| return {"type": "http.request", "body": body} | |
| request._receive = receive | |
| if signature: | |
| expected_signature = hmac.new(HMAC_SECRET.encode(), body, hashlib.sha256).hexdigest() | |
| if not hmac.compare_digest(expected_signature, signature): | |
| logger.warning(f"[🚨] INVALID Signature from {client_ip} on {request.url.path}") | |
| return JSONResponse(status_code=403, content={"detail": "Signature Verification Failed"}) | |
| response = await call_next(request) | |
| return response | |
| async def login_for_access_token(request: Request): | |
| client_ip = request.client.host if request.client else "unknown" | |
| user_agent = request.headers.get('user-agent', 'unknown') | |
| fingerprint = hashlib.md5(f"{client_ip}:{user_agent}".encode()).hexdigest() | |
| expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| to_encode = {"sub": "god_eye_admin", "exp": expire, "fingerprint": fingerprint} | |
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| logger.info(f"[🔐] HF GOD MODE JWT ausgestellt für {client_ip}") | |
| return {"access_token": encoded_jwt, "token_type": "bearer"} | |
| # --- Infrastructure Monitoring --- | |
| async def ingest_data(payload: IngestPayload, background_tasks: BackgroundTasks, request: Request): | |
| """Replaces Kafka for Serverless environments: Direct data ingestion from Spiders.""" | |
| # 🛡️ HMAC Signature Validation | |
| signature = request.headers.get("X-Signature") | |
| if not signature: | |
| raise HTTPException(status_code=401, detail="Missing Cryptographic Signature") | |
| body = await request.body() | |
| expected_signature = hmac.new(HMAC_SECRET.encode(), body, hashlib.sha256).hexdigest() | |
| if not hmac.compare_digest(expected_signature, signature): | |
| raise HTTPException(status_code=403, detail="Signature Verification Failed") | |
| try: | |
| # 🔒 GOD MODE AES-256-GCM | |
| nonce = os.urandom(12) | |
| ciphertext = aesgcm.encrypt(nonce, payload.content.encode('utf-8'), None) | |
| encrypted_payload_b64 = base64.b64encode(nonce + ciphertext).decode('utf-8') | |
| # Generate semantic vector on ingestion | |
| vector = model.encode(payload.content).tolist() | |
| row = { | |
| "id": payload.id, | |
| "url": payload.url, | |
| "title": payload.title, | |
| "content": f"AES-256-GCM_ENC::{encrypted_payload_b64}", | |
| "domain": payload.domain, | |
| "category": payload.category, | |
| "crawled_at": payload.crawled_at, | |
| "metadata": json.dumps(payload.metadata), | |
| "is_deep_web": payload.is_deep_web, | |
| "content_vector": vector, | |
| "content_hash": payload.content_hash | |
| } | |
| table = get_table() | |
| table.add([row]) | |
| # Broadcast to WebSockets | |
| message = json.dumps({"event": "new_article", "data": {"url": payload.url, "title": payload.title}}) | |
| background_tasks.add_task(manager.broadcast, message) | |
| return {"status": "success", "id": payload.id} | |
| except Exception as e: | |
| logger.error(f"Ingestion failed: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| class CrawlRequest(BaseModel): | |
| targets: Optional[List[str]] = None | |
| urls: Optional[List[str]] = None | |
| def final_targets(self) -> List[str]: | |
| return self.urls or self.targets or [] | |
| async def start_crawl(request: CrawlRequest, raw_req: Request): | |
| targets = request.final_targets | |
| if not targets: | |
| raise HTTPException(status_code=400, detail="No targets provided.") | |
| supervisor.deploy_spiders(targets) | |
| return {"status": "crawl_initiated", "count": len(targets)} | |
| async def health_check(): | |
| """Enterprise health monitoring for core infrastructure""" | |
| health = { | |
| "status": "healthy", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "components": {}, | |
| "crawlers_active": supervisor.active_count() | |
| } | |
| # Check LanceDB health | |
| try: | |
| table = get_table() | |
| table.head(1) | |
| health["components"]["lancedb"] = "operational" | |
| except Exception: | |
| health["status"] = "degraded" | |
| health["components"]["lancedb"] = "error" | |
| return health | |
| async def get_stats(): | |
| """Real-time system statistics for the operation dashboard""" | |
| try: | |
| table = get_table() | |
| doc_count = len(table) | |
| # Calculate size roughly | |
| size_bytes = 0 | |
| if os.path.exists(DB_PATH): | |
| for dirpath, _, filenames in os.walk(DB_PATH): | |
| for f in filenames: | |
| fp = os.path.join(dirpath, f) | |
| if not os.path.islink(fp): | |
| size_bytes += os.path.getsize(fp) | |
| return { | |
| "status": "online", | |
| "total_documents_indexed": doc_count, | |
| "index_size_mb": round(size_bytes / (1024 * 1024), 2), | |
| "index_name": INDEX_NAME | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # --- Search Logic --- | |
| async def search( | |
| q: str = Query(..., min_length=1), | |
| semantic: bool = False, | |
| page: int = 1, | |
| size: int = 40 | |
| ): | |
| start_time = time.time() | |
| from_idx = (page - 1) * size | |
| table = get_table() | |
| if len(table) == 0: | |
| return { | |
| "total": 0, | |
| "results": [], | |
| "execution_time_ms": 0, | |
| "semantic_enabled": semantic | |
| } | |
| try: | |
| # Hybrid Search Logic | |
| if semantic: | |
| query_vector = model.encode(q).tolist() | |
| # FTS doesn't natively combine with vector easily in old LanceDB, | |
| # so we just do vector search for semantic | |
| arrow_res = table.search(query_vector).offset(from_idx).limit(size).to_arrow() | |
| results_dict = arrow_res.to_pylist() | |
| else: | |
| # Fallback to pure vector search for now, ensuring offset works for pagination | |
| query_vector = model.encode(q).tolist() | |
| arrow_res = table.search(query_vector).offset(from_idx).limit(size).to_arrow() | |
| results_dict = arrow_res.to_pylist() | |
| # Parse results | |
| results = [] | |
| for row in results_dict: | |
| # Highlight snippet manually | |
| content = row.get("content", "") | |
| lower_content = content.lower() | |
| lower_q = q.lower() | |
| snippet_start = 0 | |
| if lower_q in lower_content and not semantic: | |
| idx = lower_content.find(lower_q) | |
| snippet_start = max(0, idx - 50) | |
| content_snippet = content[snippet_start:snippet_start+250] + "..." | |
| # parse metadata | |
| meta = {} | |
| if row.get("metadata"): | |
| try: | |
| meta = json.loads(row["metadata"]) | |
| except: | |
| pass | |
| results.append({ | |
| "id": str(row.get("id", "")), | |
| "url": row.get("url", ""), | |
| "title": row.get("title", ""), | |
| "content_snippet": content_snippet, | |
| "domain": row.get("domain", ""), | |
| "category": row.get("category", ""), | |
| "crawled_at": row.get("crawled_at", ""), | |
| "metadata": meta, | |
| "is_deep_web": row.get("is_deep_web", False), | |
| "score": float(row.get("_distance", 0.0)) # LanceDB distance (lower is better usually) | |
| }) | |
| execution_time = (time.time() - start_time) * 1000 | |
| return { | |
| "total": len(table), # Approximate limit | |
| "results": results, | |
| "execution_time_ms": round(execution_time, 2), | |
| "semantic_enabled": semantic | |
| } | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| raise HTTPException(status_code=500, detail="Search engine internal error") | |
| from typing import List, Dict, Any | |
| import httpx | |
| class SummarizeRequest(BaseModel): | |
| query: str | |
| results: List[Dict[str, Any]] | |
| async def summarize_intel(req: SummarizeRequest): | |
| if not req.results: | |
| return {"summary": "Keine relevanten Intel-Daten gefunden."} | |
| # Text-Extrahierung der Top 3 Ergebnisse | |
| combined_text = " ".join([r.get('content', '')[:1000] for r in req.results[:3]]) | |
| # HF Inference API for Summarization | |
| hf_token = os.environ.get("HF_API_TOKEN") | |
| if hf_token and len(combined_text) > 50: | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| res = await client.post( | |
| "https://api-inference.huggingface.co/models/facebook/bart-large-cnn", | |
| headers={"Authorization": f"Bearer {hf_token}"}, | |
| json={"inputs": combined_text[:3000], "parameters": {"max_length": 150, "min_length": 40}}, | |
| timeout=15.0 | |
| ) | |
| hf_data = res.json() | |
| if isinstance(hf_data, list) and len(hf_data) > 0 and 'summary_text' in hf_data[0]: | |
| return {"summary": hf_data[0]['summary_text']} | |
| except Exception as e: | |
| logger.error(f"[-] HF API Error: {e}") | |
| # Autonomer Fallback | |
| summary_text = f"Die Analyse zu '{req.query}' basierend auf {len(req.results)} C2-Knotenpunkten zeigt signifikante Anomalien auf. " | |
| if any(".onion" in str(r.get('url','')) for r in req.results): | |
| summary_text += "Deep Web Signatur detektiert. " | |
| summary_text += "Lokale KI-Kerne raten zu erhöhter Wachsamkeit und weiterer Korrelationsanalyse durch God Mode. " + combined_text[:100] + "..." | |
| return {"summary": summary_text} | |
| async def get_sources(): | |
| try: | |
| config_path = os.path.join(os.path.dirname(__file__), "..", "config", "sources.json") | |
| with open(config_path, "r") as f: | |
| return json.load(f) | |
| except Exception: | |
| return [] | |
| async def websocket_endpoint(websocket: WebSocket): | |
| await manager.connect(websocket) | |
| try: | |
| while True: | |
| await websocket.receive_text() | |
| except WebSocketDisconnect: | |
| manager.disconnect(websocket) | |