aegis-swarm-api / src /main.py
Your Name
Fix event loop blocking in background tasks
bf35b03
"""
AEGIS-SWARM FastAPI Backend (CORRECTED)
"""
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import joblib
import cv2
import numpy as np
import re
from scipy.sparse import hstack, csr_matrix
import pandas as pd
import uvicorn
import os
import asyncio
import imaplib
import smtplib
import email
from email.message import EmailMessage
from sentence_transformers import SentenceTransformer
from supabase import create_client
# ==========================================
# APP INITIALIZATION
# ==========================================
app = FastAPI(title="AEGIS-SWARM API", version="2.5.0")
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"]
)
# Email settings
IMAP_SERVER = "imap.gmail.com"
SMTP_SERVER = "smtp.gmail.com"
EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT", "aegisswarm@gmail.com")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "your-app-password")
# ==========================================
# SUPABASE CONNECTION
# ==========================================
SUPABASE_URL = os.getenv("SUPABASE_URL", "https://fpvmqjsnqakhiqbscjle.supabase.co")
SUPABASE_KEY = os.getenv("SUPABASE_KEY", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImZwdm1xanNucWFraGlxYnNjamxlIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NzgwOTgzNTAsImV4cCI6MjA5MzY3NDM1MH0.q11ue7nFAraaRtVcABYKKXemUIraEMG8Ets2q-89yA0")
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
print("[SUCCESS] Supabase connected")
# ==========================================
# LOAD ALL MODELS
# ==========================================
MODELS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "models")
print("Loading ML Classifiers...")
nlp_model = joblib.load(f"{MODELS_DIR}/nlp_agent.pkl")
nlp_tfidf = joblib.load(f"{MODELS_DIR}/tfidf_vectorizer.pkl")
url_model = joblib.load(f"{MODELS_DIR}/url_classifier.pkl")
url_tfidf = joblib.load(f"{MODELS_DIR}/url_tfidf.pkl")
print("[SUCCESS] Classifiers loaded")
print("Loading Memory Core Embedder...")
embedder = SentenceTransformer('all-MiniLM-L6-v2')
print("[SUCCESS] Embedder loaded")
# ==========================================
# FEATURE EXTRACTORS
# ==========================================
def extract_text_features(texts):
features = []
for text in texts:
text = str(text).lower()
feat = {
'length': len(text), 'num_urls': len(re.findall(r'http[s]?://\S+', text)),
'num_digits': sum(c.isdigit() for c in text),
'has_urgent': int(any(w in text for w in ['urgent', 'immediate', 'alert', 'warning', 'suspended', 'blocked', 'verify now'])),
'has_money': int(any(w in text for w in ['reward', 'won', 'prize', 'cash', 'payment', 'refund', '$', 'usd', 'free', 'win'])),
'has_action': int(any(w in text for w in ['click', 'verify', 'confirm', 'update', 'login', 'password', 'authenticate', 'sign in'])),
'exclamation_count': text.count('!'), 'question_count': text.count('?'),
'uppercase_ratio': sum(1 for c in text if c.isupper()) / max(len(text), 1),
'num_words': len(text.split()),
'has_phone': int(bool(re.search(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text))),
'suspicious_chars': len(re.findall(r'[@#$%^&*]', text)),
'has_suspicious_url': int(bool(re.search(r'bit\.ly|tinyurl|t\.co|goo\.gl|ow\.ly', text)))
}
features.append(feat)
return pd.DataFrame(features)
def extract_url_features(urls):
features = []
for url in urls:
url = str(url).lower()
parsed = re.sub(r'^https?://', '', url).split('/')[0]
feat = {
'length': len(url), 'num_dots': url.count('.'),
'num_slashes': url.count('/'), 'num_digits': sum(c.isdigit() for c in url),
'has_https': int(url.startswith('https')),
'has_ip': int(bool(re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', parsed))),
'has_shortener': int(any(s in parsed for s in ['bit.ly','tinyurl','t.co','goo.gl'])),
'has_suspicious_kw': int(any(kw in url for kw in ['login','verify','account','update','secure','password','confirm'])),
'num_subdomains': len(parsed.split('.')) - 2, 'has_port': int(':' in parsed),
'has_at': int('@' in url), 'has_query': int('?' in url),
'has_encoded': int('%' in url),
'tld_length': len(parsed.split('.')[-1]) if '.' in parsed else 0,
'path_length': len(url.split('/', 3)[-1]) if '/' in url else 0
}
features.append(feat)
return pd.DataFrame(features)
# ==========================================
# MEMORY CORE FUNCTIONS
# ==========================================
def store_threat(content, embedding, threat_type, confidence, agent_source):
"""Store threat in Supabase pgvector"""
try:
supabase.table("threats").insert({
"content": content,
"embedding": embedding,
"threat_type": threat_type,
"confidence": confidence,
"agent_source": agent_source
}).execute()
except Exception as e:
print(f"Memory store error: {e}")
def search_similar_threats(content, threshold=0.60, k=3):
"""Search for similar threats in memory"""
try:
embedding = embedder.encode(content, normalize_embeddings=True).tolist()
result = supabase.rpc("match_threats", {
"query_embedding": embedding,
"match_threshold": threshold,
"match_count": k
}).execute()
return result.data or []
except Exception as e:
print(f"Memory search error: {e}")
return []
# ==========================================
# API ENDPOINTS
# ==========================================
class TextRequest(BaseModel):
text: str
@app.get("/")
async def root():
return {"name": "AEGIS-SWARM", "status": "running", "version": "2.0.0"}
@app.get("/health")
async def health():
return {"status": "healthy", "memory_core": "connected"}
@app.post("/analyze/text")
async def analyze_text(request: TextRequest):
try:
text = request.text
# 1. NLP Classification
tfidf_vec = nlp_tfidf.transform([text])
handcrafted = extract_text_features([text])
combined = hstack([tfidf_vec, csr_matrix(handcrafted.values)])
proba = nlp_model.predict_proba(combined)[0]
pred = nlp_model.predict(combined)[0]
phishing_prob = proba[1] * 100
verdict = "HIGH" if pred == 1 else "LOW"
confidence = phishing_prob if pred == 1 else (100 - phishing_prob)
# 2. Memory Search
memory_matches = search_similar_threats(text, threshold=0.55, k=3)
# 3. Store in Memory
embedding = embedder.encode(text, normalize_embeddings=True).tolist()
store_threat(text, embedding, verdict, confidence / 100, "ShieldAI_NLP")
return {
"verdict": verdict,
"confidence": round(confidence, 2),
"model_used": "ShieldAI NLP",
"memory_matches": len(memory_matches),
"similar_threats": memory_matches,
"details": {"phishing_probability": round(phishing_prob, 2)}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/qr")
async def analyze_qr(file: UploadFile = File(...)):
try:
image_bytes = await file.read()
nparr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=400, detail="Invalid image")
# Use OpenCV QR detector (no pyzbar needed)
detector = cv2.QRCodeDetector()
data, bbox, _ = detector.detectAndDecode(img)
if not data or not data.startswith('http'):
return {"verdict": "UNKNOWN", "confidence": 0.0, "model_used": "QR Decoder", "details": {"message": "No URL found in QR"}}
url = data
# URL Classification
tfidf_vec = url_tfidf.transform([url])
handcrafted = extract_url_features([url])
combined = hstack([tfidf_vec, csr_matrix(handcrafted.values)])
proba = url_model.predict_proba(combined)[0]
pred = url_model.predict(combined)[0]
malicious_prob = proba[1] * 100
verdict = "HIGH" if pred == 1 else "LOW"
confidence = malicious_prob if pred == 1 else (100 - malicious_prob)
# Memory
memory_matches = search_similar_threats(url, threshold=0.55, k=3)
embedding = embedder.encode(url, normalize_embeddings=True).tolist()
store_threat(url, embedding, verdict, confidence / 100, "QR_URL_Classifier")
return {
"verdict": verdict,
"confidence": round(confidence, 2),
"model_used": "QR URL Classifier",
"memory_matches": len(memory_matches),
"similar_threats": memory_matches,
"details": {"decoded_url": url[:100], "malicious_probability": round(malicious_prob, 2)}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/file")
async def analyze_file(file: UploadFile = File(...)):
try:
content_bytes = await file.read()
text_content = ""
if file.filename.endswith('.eml'):
msg = email.message_from_bytes(content_bytes)
for part in msg.walk():
if part.get_content_type() == "text/plain":
text_content += part.get_payload(decode=True).decode(errors='ignore')
else:
text_content = content_bytes.decode(errors='ignore')
if not text_content.strip():
raise HTTPException(status_code=400, detail="No readable text found in file")
# Re-use the NLP classification pipeline
tfidf_vec = nlp_tfidf.transform([text_content])
handcrafted = extract_text_features([text_content])
combined = hstack([tfidf_vec, csr_matrix(handcrafted.values)])
proba = nlp_model.predict_proba(combined)[0]
pred = nlp_model.predict(combined)[0]
phishing_prob = proba[1] * 100
verdict = "HIGH" if pred == 1 else "LOW"
confidence = phishing_prob if pred == 1 else (100 - phishing_prob)
memory_matches = search_similar_threats(text_content, threshold=0.55, k=3)
embedding = embedder.encode(text_content, normalize_embeddings=True).tolist()
store_threat(text_content, embedding, verdict, confidence / 100, "File_Parser")
return {
"verdict": verdict,
"confidence": round(confidence, 2),
"model_used": "ShieldAI NLP (File)",
"memory_matches": len(memory_matches),
"similar_threats": memory_matches,
"details": {"phishing_probability": round(phishing_prob, 2)}
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ==========================================
# BACKGROUND EMAIL WORKER
# ==========================================
def send_email_reply(to_addr, verdict, confidence, details):
try:
msg = EmailMessage()
msg.set_content(f"AEGIS-SWARM Threat Analysis Report\n\nVerdict: {verdict}\nConfidence: {confidence}%\nDetails: {details}\n\nStay secure,\nAEGIS-SWARM")
msg['Subject'] = f"[AEGIS-SWARM] Analysis Result: {verdict}"
msg['From'] = EMAIL_ACCOUNT
msg['To'] = to_addr
server = smtplib.SMTP_SSL(SMTP_SERVER, 465)
server.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email reply: {e}")
async def poll_emails_blocking():
try:
if EMAIL_PASSWORD == "your-app-password":
return
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
mail.select('inbox')
_, data = mail.search(None, 'UNSEEN')
for num in data[0].split():
_, msg_data = mail.fetch(num, '(RFC822)')
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
sender = msg['From']
text_content = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
text_content += part.get_payload(decode=True).decode(errors='ignore')
else:
text_content = msg.get_payload(decode=True).decode(errors='ignore')
if text_content.strip():
# Analyze
tfidf_vec = nlp_tfidf.transform([text_content])
handcrafted = extract_text_features([text_content])
combined = hstack([tfidf_vec, csr_matrix(handcrafted.values)])
pred = nlp_model.predict(combined)[0]
proba = nlp_model.predict_proba(combined)[0]
verdict = "HIGH RISK" if pred == 1 else "SAFE"
conf = proba[1]*100 if pred == 1 else (100 - proba[1]*100)
send_email_reply(sender, verdict, round(conf, 2), "Analyzed via IMAP Poller")
mail.close()
mail.logout()
except Exception as e:
print(f"IMAP Error: {e}")
async def poll_emails():
while True:
if EMAIL_PASSWORD == "your-app-password":
await asyncio.sleep(60)
continue
# Run the blocking IMAP operations in a separate thread so we don't freeze Uvicorn
await asyncio.to_thread(poll_emails_blocking)
await asyncio.sleep(30)
def keep_alive_ping_blocking():
import urllib.request
try:
space_url = os.getenv("SPACE_URL", "http://localhost:8000/health")
req = urllib.request.Request(space_url, headers={'User-Agent': 'KeepAlivePing/1.0'})
urllib.request.urlopen(req)
print(f"Keep-alive ping sent to {space_url}")
except Exception as e:
print(f"Keep-alive ping failed: {e}")
async def keep_alive_ping():
"""Internal ping to prevent Hugging Face Space from sleeping."""
while True:
await asyncio.sleep(5 * 60) # Ping every 5 minutes
await asyncio.to_thread(keep_alive_ping_blocking)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(poll_emails())
asyncio.create_task(keep_alive_ping())
if __name__ == "__main__":
print("\n" + "="*50)
print(" AEGIS-SWARM API v2.0")
print(" Local: http://localhost:8000")
print(" Docs: http://localhost:8000/docs")
print("="*50 + "\n")
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)