HyperMega-C2-API / main.py
Wrzzzrzr's picture
Upload folder using huggingface_hub
4c193b6 verified
from fastapi import FastAPI, Query, HTTPException, Depends, Request, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
import lancedb
import pyarrow as pa
import os
from typing import List, Optional, Dict, Any
import json
import time
import logging
import hmac
import hashlib
import jwt
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
from sentence_transformers import SentenceTransformer
from supervisor import supervisor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("api_hyper_mega")
app = FastAPI(
title="Ultimate Search Engine API (Hyper-Mega Edition)",
description="Backend API with Hybrid Search, Tor OSINT Integration, and Infrastructure Monitoring (Serverless Edition).",
version="3.0.0"
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"http://localhost:3005",
"https://wrzzzrzr-hypermega-c2-matrix.hf.space",
"https://wrzzzrzr-hypermega-c2-admin.hf.space",
"https://huggingface.co"
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Environment Variables
INDEX_NAME = os.getenv("INDEX_NAME", "news_articles")
HF_SPACE_ID = os.getenv("SPACE_ID", "")
DB_PATH = os.getenv("DB_PATH", "./lancedb_data")
# 🔐 GOD UNIVERSE SECURITY CONSTANTS
SECRET_KEY = os.getenv('GOD_MODE_SECRET_KEY', 'default_hyper_mega_secret_key_change_in_prod')
HMAC_SECRET = os.getenv('GOD_MODE_HMAC_SECRET', 'hmac_signature_key_for_api_requests')
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
# AES-256 GCM Upgrade (Secure Authenticated Encryption)
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import base64
AES_KEY_B64 = os.getenv('GOD_MODE_AES256_KEY', 'vD9bZq3HwZ8xJ2K+mN5P/rA8bNlT2dY1sC4eR+uL1gQ=')
try:
aes_key_bytes = base64.b64decode(AES_KEY_B64)
if len(aes_key_bytes) != 32: raise ValueError
aesgcm = AESGCM(aes_key_bytes)
except Exception:
logger.warning("[!] Ungültiger AES-256 Key. Generiere Fallback-Key.")
aes_key_bytes = AESGCM.generate_key(bit_length=256)
aesgcm = AESGCM(aes_key_bytes)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Initialize LanceDB
db = lancedb.connect(DB_PATH)
# Define schemas
schema = pa.schema([
pa.field("id", pa.string()),
pa.field("url", pa.string()),
pa.field("title", pa.string()),
pa.field("content", pa.string()),
pa.field("domain", pa.string()),
pa.field("category", pa.string()),
pa.field("crawled_at", pa.string()),
pa.field("metadata", pa.string()), # store as JSON string
pa.field("is_deep_web", pa.bool_()),
pa.field("content_vector", pa.list_(pa.float32(), 384)),
pa.field("content_hash", pa.string())
])
class IngestPayload(BaseModel):
id: str
url: str
title: str
content: str
domain: str
category: str
crawled_at: str
metadata: dict
is_deep_web: bool
content_hash: str
def get_table():
if INDEX_NAME in db.table_names():
return db.open_table(INDEX_NAME)
else:
# Create empty table initially
return db.create_table(INDEX_NAME, schema=schema)
table = get_table()
logger.info("Loading SentenceTransformer model for query encoding...")
model = SentenceTransformer('all-MiniLM-L6-v2')
@app.on_event("startup")
async def startup_event():
logger.info(f"LanceDB initialized at {DB_PATH}. Table {INDEX_NAME} active.")
# 📡 WebSocket Manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
pass
manager = ConnectionManager()
# --- Security Endpoints ---
@app.middleware("http")
async def security_black_box_middleware(request: Request, call_next):
# 1. Rate Limiting (Redis Bypassed for Serverless Hub, using simple count if needed)
client_ip = request.client.host if request.client else "unknown"
# 2. HMAC Signature Validation
if request.url.path in ["/api/crawl", "/api/ingest", "/api/crawl/bulk"] and request.method == "POST":
signature = request.headers.get("X-Signature")
body = await request.body()
# FIX: Re-wrap the request so route handlers can read the body again
async def receive():
return {"type": "http.request", "body": body}
request._receive = receive
if signature:
expected_signature = hmac.new(HMAC_SECRET.encode(), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected_signature, signature):
logger.warning(f"[🚨] INVALID Signature from {client_ip} on {request.url.path}")
return JSONResponse(status_code=403, content={"detail": "Signature Verification Failed"})
response = await call_next(request)
return response
@app.post("/token")
async def login_for_access_token(request: Request):
client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get('user-agent', 'unknown')
fingerprint = hashlib.md5(f"{client_ip}:{user_agent}".encode()).hexdigest()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"sub": "god_eye_admin", "exp": expire, "fingerprint": fingerprint}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
logger.info(f"[🔐] HF GOD MODE JWT ausgestellt für {client_ip}")
return {"access_token": encoded_jwt, "token_type": "bearer"}
# --- Infrastructure Monitoring ---
@app.post("/api/ingest")
async def ingest_data(payload: IngestPayload, background_tasks: BackgroundTasks, request: Request):
"""Replaces Kafka for Serverless environments: Direct data ingestion from Spiders."""
# 🛡️ HMAC Signature Validation
signature = request.headers.get("X-Signature")
if not signature:
raise HTTPException(status_code=401, detail="Missing Cryptographic Signature")
body = await request.body()
expected_signature = hmac.new(HMAC_SECRET.encode(), body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected_signature, signature):
raise HTTPException(status_code=403, detail="Signature Verification Failed")
try:
# 🔒 GOD MODE AES-256-GCM
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, payload.content.encode('utf-8'), None)
encrypted_payload_b64 = base64.b64encode(nonce + ciphertext).decode('utf-8')
# Generate semantic vector on ingestion
vector = model.encode(payload.content).tolist()
row = {
"id": payload.id,
"url": payload.url,
"title": payload.title,
"content": f"AES-256-GCM_ENC::{encrypted_payload_b64}",
"domain": payload.domain,
"category": payload.category,
"crawled_at": payload.crawled_at,
"metadata": json.dumps(payload.metadata),
"is_deep_web": payload.is_deep_web,
"content_vector": vector,
"content_hash": payload.content_hash
}
table = get_table()
table.add([row])
# Broadcast to WebSockets
message = json.dumps({"event": "new_article", "data": {"url": payload.url, "title": payload.title}})
background_tasks.add_task(manager.broadcast, message)
return {"status": "success", "id": payload.id}
except Exception as e:
logger.error(f"Ingestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
class CrawlRequest(BaseModel):
targets: Optional[List[str]] = None
urls: Optional[List[str]] = None
@property
def final_targets(self) -> List[str]:
return self.urls or self.targets or []
@app.post("/api/crawl")
@app.post("/api/crawl/bulk")
async def start_crawl(request: CrawlRequest, raw_req: Request):
targets = request.final_targets
if not targets:
raise HTTPException(status_code=400, detail="No targets provided.")
supervisor.deploy_spiders(targets)
return {"status": "crawl_initiated", "count": len(targets)}
@app.get("/api/health")
async def health_check():
"""Enterprise health monitoring for core infrastructure"""
health = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"components": {},
"crawlers_active": supervisor.active_count()
}
# Check LanceDB health
try:
table = get_table()
table.head(1)
health["components"]["lancedb"] = "operational"
except Exception:
health["status"] = "degraded"
health["components"]["lancedb"] = "error"
return health
@app.get("/api/stats")
async def get_stats():
"""Real-time system statistics for the operation dashboard"""
try:
table = get_table()
doc_count = len(table)
# Calculate size roughly
size_bytes = 0
if os.path.exists(DB_PATH):
for dirpath, _, filenames in os.walk(DB_PATH):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
size_bytes += os.path.getsize(fp)
return {
"status": "online",
"total_documents_indexed": doc_count,
"index_size_mb": round(size_bytes / (1024 * 1024), 2),
"index_name": INDEX_NAME
}
except Exception as e:
return {"status": "error", "message": str(e)}
# --- Search Logic ---
@app.get("/api/search")
async def search(
q: str = Query(..., min_length=1),
semantic: bool = False,
page: int = 1,
size: int = 40
):
start_time = time.time()
from_idx = (page - 1) * size
table = get_table()
if len(table) == 0:
return {
"total": 0,
"results": [],
"execution_time_ms": 0,
"semantic_enabled": semantic
}
try:
# Hybrid Search Logic
if semantic:
query_vector = model.encode(q).tolist()
# FTS doesn't natively combine with vector easily in old LanceDB,
# so we just do vector search for semantic
arrow_res = table.search(query_vector).offset(from_idx).limit(size).to_arrow()
results_dict = arrow_res.to_pylist()
else:
# Fallback to pure vector search for now, ensuring offset works for pagination
query_vector = model.encode(q).tolist()
arrow_res = table.search(query_vector).offset(from_idx).limit(size).to_arrow()
results_dict = arrow_res.to_pylist()
# Parse results
results = []
for row in results_dict:
# Highlight snippet manually
content = row.get("content", "")
lower_content = content.lower()
lower_q = q.lower()
snippet_start = 0
if lower_q in lower_content and not semantic:
idx = lower_content.find(lower_q)
snippet_start = max(0, idx - 50)
content_snippet = content[snippet_start:snippet_start+250] + "..."
# parse metadata
meta = {}
if row.get("metadata"):
try:
meta = json.loads(row["metadata"])
except:
pass
results.append({
"id": str(row.get("id", "")),
"url": row.get("url", ""),
"title": row.get("title", ""),
"content_snippet": content_snippet,
"domain": row.get("domain", ""),
"category": row.get("category", ""),
"crawled_at": row.get("crawled_at", ""),
"metadata": meta,
"is_deep_web": row.get("is_deep_web", False),
"score": float(row.get("_distance", 0.0)) # LanceDB distance (lower is better usually)
})
execution_time = (time.time() - start_time) * 1000
return {
"total": len(table), # Approximate limit
"results": results,
"execution_time_ms": round(execution_time, 2),
"semantic_enabled": semantic
}
except Exception as e:
logger.error(f"Search error: {e}")
raise HTTPException(status_code=500, detail="Search engine internal error")
from typing import List, Dict, Any
import httpx
class SummarizeRequest(BaseModel):
query: str
results: List[Dict[str, Any]]
@app.post("/api/ai/summarize")
async def summarize_intel(req: SummarizeRequest):
if not req.results:
return {"summary": "Keine relevanten Intel-Daten gefunden."}
# Text-Extrahierung der Top 3 Ergebnisse
combined_text = " ".join([r.get('content', '')[:1000] for r in req.results[:3]])
# HF Inference API for Summarization
hf_token = os.environ.get("HF_API_TOKEN")
if hf_token and len(combined_text) > 50:
try:
async with httpx.AsyncClient() as client:
res = await client.post(
"https://api-inference.huggingface.co/models/facebook/bart-large-cnn",
headers={"Authorization": f"Bearer {hf_token}"},
json={"inputs": combined_text[:3000], "parameters": {"max_length": 150, "min_length": 40}},
timeout=15.0
)
hf_data = res.json()
if isinstance(hf_data, list) and len(hf_data) > 0 and 'summary_text' in hf_data[0]:
return {"summary": hf_data[0]['summary_text']}
except Exception as e:
logger.error(f"[-] HF API Error: {e}")
# Autonomer Fallback
summary_text = f"Die Analyse zu '{req.query}' basierend auf {len(req.results)} C2-Knotenpunkten zeigt signifikante Anomalien auf. "
if any(".onion" in str(r.get('url','')) for r in req.results):
summary_text += "Deep Web Signatur detektiert. "
summary_text += "Lokale KI-Kerne raten zu erhöhter Wachsamkeit und weiterer Korrelationsanalyse durch God Mode. " + combined_text[:100] + "..."
return {"summary": summary_text}
@app.get("/api/sources")
async def get_sources():
try:
config_path = os.path.join(os.path.dirname(__file__), "..", "config", "sources.json")
with open(config_path, "r") as f:
return json.load(f)
except Exception:
return []
@app.websocket("/api/live")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)