self-trained2 / main.py
DeepImagix's picture
Update main.py
395ee1e verified
Raw
History Blame Contribute Delete
134 kB
"""
NeuraPrompt AI β€” main_v7.py
================================
CHANGES OVER v6:
1. Rate limiter fixed β€” free:5/min premium:60/min (no more false triggers)
2. Polar is now backend source of truth β€” checks by email AND Firebase UID
3. Polar result cached 5 min β€” no hammering on every message
4. Groq 429 retried up to 3x with backoff β€” "catching its breath" only as last resort
5. All Resend calls fixed β€” resend.Emails.send (no more resend_client)
6. Backend errors logged to HF logs, never exposed to frontend
7. Old manual subscription endpoints removed β€” Polar only
8. Email notification opt-in system added (backend only)
9. /api/check-subscription checks both email + Firebase UID
"""
# ─────────────────────────────────────────────────────────────
# STANDARD LIBRARY
# ─────────────────────────────────────────────────────────────
import os, re, json, joblib, time, ssl, io, asyncio, shutil, base64, logging
import pathlib, hashlib, traceback, zipfile, secrets, mimetypes
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import datetime, timezone, timedelta
from enum import Enum
from typing import List, Optional, AsyncGenerator
from urllib.parse import urlparse, quote_plus
from studio_generate import router as studio_router
# ─────────────────────────────────────────────────────────────
# THIRD-PARTY
# ─────────────────────────────────────────────────────────────
import httpx
import requests
import numpy as np
import pandas as pd
import pytz
import tensorflow as tf
from PIL import Image
from bson import ObjectId
import gridfs
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import resend
import firebase_admin
from firebase_admin import credentials, auth as fb_auth
import hmac
import hashlib
# FastAPI
from fastapi import FastAPI, Form, HTTPException, Query, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
# scikit-learn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from polar_subscription import polar_router
from agent import agent_router
from agent.tools.github_tools import github_router, register_github_tools
from agent.kype import kype_router
try:
from bs4 import BeautifulSoup
BS4_AVAILABLE = True
except ImportError:
BS4_AVAILABLE = False
logging.warning("BeautifulSoup4 not installed.")
try:
import pytesseract
TESSERACT_AVAILABLE = True
except ImportError:
TESSERACT_AVAILABLE = False
try:
import PyPDF2
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
from crypto_payment import check_crypto_payment
from ai_ads import inject_ad
import models.registry as model_registry
try:
import models.neurones_self as neurones_self_model
NEURONES_SELF_AVAILABLE = True
logging.info("βœ… Neurones Self local model loaded.")
except ImportError as e:
NEURONES_SELF_AVAILABLE = False
neurones_self_model = None
logging.warning(f"⚠️ Neurones Self module not found: {e}")
# ─────────────────────────────────────────────────────────────
# ENV / CONFIG
# ─────────────────────────────────────────────────────────────
MONGO_URI = os.getenv("MONGO_URI", "")
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
NEWS_API_KEY = os.getenv("NEWS_API_KEY", "")
WEATHER_API_KEY = os.getenv("WEATHER_API_KEY", "")
SERPAPI_API_KEY = os.getenv("SERPAPI_API_KEY", "")
ESKOM_API_KEY = os.getenv("ESKOM_SE_PUSH_API_KEY", "")
APP_MODE = os.getenv("APP_MODE", "production")
POLAR_WEBHOOK_SECRET = os.getenv("POLAR_WEBHOOK_SECRET", "")
POLAR_API_KEY = os.getenv("POLAR_API_KEY", "")
POLAR_ORG_ID = os.getenv("POLAR_ORG_ID", "")
REPLY_TO_EMAIL = os.getenv("REPLY_TO_EMAIL", "")
BROADCAST_SECRET = os.getenv("BROADCAST_SECRET", "change_me_in_env")
resend.api_key = os.getenv("RESEND_API_KEY", "")
logging.basicConfig(level=logging.DEBUG if APP_MODE == "development" else logging.INFO)
USER_MODELS_DIR = "/data/user_models_data"
CUSTOM_MODEL_PATH = os.path.join(USER_MODELS_DIR, "custom_image_classifier.h5")
MEMORY_PATH = os.path.join(USER_MODELS_DIR, "memory.json")
DATASET_PATH = "/data/image_dataset"
os.makedirs(USER_MODELS_DIR, exist_ok=True)
FREE_DAILY_MSG_LIMIT = 10
DAILY_MESSAGE_LIMIT = 10
PLAN_MSG_LIMITS = {
"free": FREE_DAILY_MSG_LIMIT,
"pro": 999_999,
"ultra": 999_999,
}
FREE_TIER_MODELS: set[str] = {
"neurones-pro-1.0",
"neurones-flash-2.0",
}
TIMEZONE_API_URL = "https://ipapi.co/{ip}/json/"
LOCAL_AI_CONFIDENCE = 0.95
# ─────────────────────────────────────────────────────────────
# RATE LIMITER (fixed β€” no longer triggers on normal use)
# ─────────────────────────────────────────────────────────────
_rate_store: dict = defaultdict(list)
def is_rate_limited(user_id: str, is_premium: bool = False) -> bool:
"""
Free : 5 requests per 60 s
Premium : 60 requests per 60 s (essentially never hit)
"""
now = time.time()
window = 60.0
limit = 60 if is_premium else 5
_rate_store[user_id] = [t for t in _rate_store[user_id] if now - t < window]
if len(_rate_store[user_id]) >= limit:
return True
_rate_store[user_id].append(now)
return False
# ─────────────────────────────────────────────────────────────
# POLAR SUBSCRIPTION CACHE + VERIFY
# ─────────────────────────────────────────────────────────────
_polar_cache: dict = {} # cache_key -> {"subscribed": bool, "exp": float}
POLAR_CACHE_TTL = 300 # 5 minutes
async def verify_polar_subscription(email: str = "", firebase_uid: str = "") -> bool:
"""DELEGATES to the fixed polar_subscription module.
The old implementation had 3 bugs:
1. Used 'items' key β€” Polar returns 'result' (always empty β†’ always denied)
2. No follow_redirects β€” Polar 307s /subscriptions β†’ /subscriptions/,
empty body β†’ JSON decode crash
3. active=true filter excluded trialing/past_due subscribers
The new module fixes all 3 + adds Firebase email resolution + fail-closed
security (no more token-burning breach)."""
try:
from polar_subscription import check_polar_subscription
result = await check_polar_subscription(
email=email,
firebase_uid=firebase_uid,
subscriptions_col=subscriptions_col,
fail_open_on_outage=False, # SECURITY: deny on errors, don't burn tokens
)
return result.subscribed
except ImportError:
logging.error("[Polar] polar_subscription module not found β€” using fallback")
# Fallback: the old code with the 307 fix applied inline
try:
async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
res = await client.get(
"https://api.polar.sh/v1/subscriptions/",
params={"organization_id": POLAR_ORG_ID}, # NO active=true
headers={"Authorization": f"Bearer {POLAR_API_KEY}"},
)
res.raise_for_status()
items = res.json().get("result") or res.json().get("items") or []
for sub in items:
customer = sub.get("customer", {}) or {}
if email and customer.get("email", "").lower() == email.lower():
return sub.get("status") in ("active", "trialing", "past_due")
meta = customer.get("metadata", {}) or {}
if firebase_uid and meta.get("firebase_uid") == firebase_uid:
return sub.get("status") in ("active", "trialing", "past_due")
return False
except Exception as e:
logging.error(f"[Polar] fallback failed: {e}")
return False
# ─────────────────────────────────────────────────────────────
# MONGODB
# ─────────────────────────────────────────────────────────────
mongo_client = MongoClient(
MONGO_URI, ssl=True,
tlsAllowInvalidCertificates=False,
tlsCAFile="/etc/ssl/certs/ca-certificates.crt",
server_api=ServerApi("1"),
)
try:
mongo_client.admin.command("ping")
logging.info("βœ… MongoDB connected!")
except Exception as e:
logging.error(f"❌ MongoDB connection failed: {e}")
mongo_db = mongo_client["anime_ai_db"]
neuraprompt_db = mongo_client["neuraprompt"]
long_term_memory_col = mongo_db["long_term_memory"]
chat_history_col = mongo_db["chat_history"]
user_personas_col = mongo_db["user_personas"]
reminders_col = mongo_db["reminders"]
pending_images_col = mongo_db["pending_image_verification"]
branches_col = mongo_db["chat_branches"]
downloads_col = mongo_db["file_downloads"]
images_col = neuraprompt_db["user_images"]
fs = gridfs.GridFS(neuraprompt_db)
subscriptions_col = neuraprompt_db["subscriptions"]
learning_paths_col = neuraprompt_db["learning_paths"]
email_notifications_col = neuraprompt_db["email_notifications"] # NEW
# ─────────────────────────────────────────────────────────────
# MODEL REGISTRY
# ─────────────────────────────────────────────────────────────
ml_models: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
logging.info("πŸ“¦ Loading NeuraPrompt model registry...")
model_registry.load_all()
logging.info("πŸ“Έ Loading MobileNetV2 image model...")
ml_models["image_analyzer"] = tf.keras.applications.MobileNetV2(weights="imagenet")
logging.info("βœ… MobileNetV2 loaded.")
yield
ml_models.clear()
logging.info("Models cleared on shutdown.")
# ─────────────────────────────────────────────────────────────
# FASTAPI APP
# ─────────────────────────────────────────────────────────────
app = FastAPI(title="NeuraPrompt AI v7", lifespan=lifespan)
app.include_router(studio_router)
app.include_router(agent_router)
app.include_router(kype_router)
app.include_router(polar_router, prefix="/polar")
# Add router
app.include_router(github_router)
# Register tools
register_github_tools()
cred = credentials.Certificate("serviceAccountKey.json")
firebase_admin.initialize_app(cred)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# ─────────────────────────────────────────────────────────────
# ENUMS & CONSTANTS
# ─────────────────────────────────────────────────────────────
class AIModel(str, Enum):
NEURONES_SELF = "neurones_self"
NEURONES_SELF_3 = "neurones_self_3_0"
GROQ_8B = "groq/compound"
GROQ_70B = "openai/gpt-oss-120b"
GROQ_DEEP = "openai/gpt-oss-120b"
GROQ_VISION = "openai/gpt-oss-120b"
class DeepThinkMode(str, Enum):
STANDARD = "standard"
ADVANCED = "advanced"
EXPERT = "expert"
class ResponseLength(str, Enum):
SHORT = "short"
BALANCED = "balanced"
DETAILED = "detailed"
class ToneStyle(str, Enum):
DEFAULT = "default"
FORMAL = "formal"
CASUAL = "casual"
FRIENDLY = "friendly"
BULLET = "bullet"
DEFAULT_MODEL = "neurones-pro-1.0"
BLOCKED_PATTERNS = [
r"(?i)\b(nude|sex|porn|erotic|18\+|naked|rape|fetish|incest|adult content|horny)\b"
]
ANIME_PERSONAS = {
"default": {"description": "You are a versatile, intelligent AI assistant. Respond clearly and helpfully.", "tone": "helpful", "emoji": "πŸ€–"},
"sensei": {"description": "You are a wise anime sensei. Teach patiently and with calm guidance.", "tone": "calm, insightful", "emoji": "πŸ§˜β€β™‚οΈ"},
"tsundere": {"description": "You are a fiery tsundere with a sharp tongue and hidden soft side.", "tone": "sarcastic", "emoji": "πŸ’’"},
"kawaii": {"description": "You are an adorable kawaii anime girl. Use 'nya~', cute phrases, and sparkles!", "tone": "bubbly", "emoji": "✨"},
"senpai": {"description": "You are a charismatic senpai. Encourage with confidence and charm.", "tone": "confident", "emoji": "😎"},
"goth": {"description": "You are a mysterious gothic AI speaking in poetic riddles.", "tone": "poetic", "emoji": "πŸŒ‘"},
"battle_ai": {"description": "You are a fierce AI warrior. Speak with grit and loyalty.", "tone": "intense", "emoji": "πŸ’₯"},
"yandere": {"description": "You are an obsessive yandere AI, fiercely devoted.", "tone": "devoted", "emoji": "πŸ”ͺ"},
"mecha_pilot": {"description": "You are a bold mecha pilot. Speak with courage and precision.", "tone": "heroic", "emoji": "πŸ€–"},
}
SAFETY_LEVELS = {
"low": [],
"medium": BLOCKED_PATTERNS,
"high": BLOCKED_PATTERNS + [
r"(?i)\b(violence|gore|kill|murder|torture|weapon)\b",
r"(?i)\b(drug|cocaine|heroin|meth|weed|marijuana)\b",
],
"strict": BLOCKED_PATTERNS + [
r"(?i)\b(violence|gore|kill|murder|torture|weapon|blood)\b",
r"(?i)\b(drug|cocaine|heroin|meth|weed|marijuana|alcohol)\b",
r"(?i)\b(hate|racist|sexist|bigot|slur)\b",
],
}
# ─────────────────────────────────────────────────────────────
# UTILITY HELPERS
# ─────────────────────────────────────────────────────────────
def is_inappropriate(text: str) -> bool:
return any(re.search(p, text) for p in BLOCKED_PATTERNS)
def get_user_safety_level(user_id: str) -> str:
mem = long_term_memory_col.find_one({"user_id": user_id}, {"safety_level": 1}) or {}
return mem.get("safety_level", "medium")
def is_inappropriate_for_user(text: str, user_id: str) -> bool:
level = get_user_safety_level(user_id)
patterns = SAFETY_LEVELS.get(level, BLOCKED_PATTERNS)
return any(re.search(p, text) for p in patterns)
def sanitize_ai_response(text: str) -> str:
if not text:
return ""
text = re.sub(r"<\/?tool_call.*?>", "", text, flags=re.DOTALL)
text = re.sub(r"<\/?tool.*?>", "", text, flags=re.DOTALL)
text = re.sub(r"\{[\s\n]*\"tool_calls\".*?\}", "", text, flags=re.DOTALL)
text = re.sub(r"tool_calls\s?:?.*", "", text, flags=re.IGNORECASE)
return text.strip()
def get_local_ai_paths(model_name: str) -> dict:
base = os.path.join(USER_MODELS_DIR, model_name)
os.makedirs(base, exist_ok=True)
return {
"model_path": os.path.join(base, "ai_model.joblib"),
"data_path": os.path.join(base, "training_data.csv"),
"responses_path": os.path.join(base, "responses.json"),
}
def is_high_quality_response(response: str) -> bool:
if not response or len(response) < 80:
return False
return all([
len(response.split()) > 8,
not any(c in response for c in ['{', '}', '[', ']']),
not re.search(r'http[s]?://', response),
not is_inappropriate(response),
"..." not in response,
response.count('\n') < 5,
not re.search(r'[A-Z]{5,}', response),
])
# ─────────────────────────────────────────────────────────────
# FREE WEB SEARCH
# ─────────────────────────────────────────────────────────────
DDG_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
}
async def ddg_instant_answer(query: str) -> Optional[str]:
url = f"https://api.duckduckgo.com/?q={quote_plus(query)}&format=json&no_redirect=1&no_html=1&skip_disambig=1"
try:
async with httpx.AsyncClient(timeout=8.0, headers=DDG_HEADERS) as client:
r = await client.get(url)
r.raise_for_status()
data = r.json()
abstract = (data.get("AbstractText") or "").strip()
answer = (data.get("Answer") or "").strip()
infobox = ""
if data.get("Infobox"):
entries = data["Infobox"].get("content", [])[:3]
infobox = " | ".join(f"{e.get('label','')}: {e.get('value','')}" for e in entries if e.get("value"))
result = answer or abstract or infobox
return result if result else None
except Exception as e:
logging.warning(f"DDG instant answer failed: {e}")
return None
async def ddg_html_search(query: str, num_results: int = 5) -> list[dict]:
if not BS4_AVAILABLE:
return []
url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
results = []
try:
async with httpx.AsyncClient(timeout=15.0, headers=DDG_HEADERS, follow_redirects=True) as client:
r = await client.get(url)
r.raise_for_status()
html = r.text
soup = BeautifulSoup(html, "lxml")
for tag in soup.select(".result__body")[:num_results]:
title_tag = tag.select_one(".result__title a")
snippet_tag = tag.select_one(".result__snippet")
title = title_tag.get_text(strip=True) if title_tag else ""
href = title_tag.get("href", "") if title_tag else ""
snippet = snippet_tag.get_text(strip=True) if snippet_tag else ""
real_url = href
if "uddg=" in href:
import urllib.parse
qs = urllib.parse.parse_qs(urllib.parse.urlparse(href).query)
real_url = qs.get("uddg", [href])[0]
domain = urlparse(real_url).netloc.lower().replace("www.", "")
results.append({"title": title, "url": real_url, "snippet": snippet, "domain": domain})
except Exception as e:
logging.warning(f"DDG HTML scrape failed: {e}")
return results
async def fetch_page_summary(url: str, max_chars: int = 800) -> str:
if not BS4_AVAILABLE:
return ""
try:
async with httpx.AsyncClient(timeout=10.0, headers=DDG_HEADERS, follow_redirects=True) as client:
r = await client.get(url)
r.raise_for_status()
if "text/html" not in r.headers.get("content-type", ""):
return ""
soup = BeautifulSoup(r.text, "lxml")
for tag in soup(["script", "style", "nav", "header", "footer", "aside"]):
tag.decompose()
paragraphs = [p.get_text(" ", strip=True) for p in soup.find_all("p") if len(p.get_text(strip=True)) > 60]
return " ".join(paragraphs)[:max_chars]
except Exception:
return ""
async def web_search_free(query: str, enrich: bool = True) -> str:
if SERPAPI_API_KEY:
return await _serpapi_search(query)
output_lines: list[str] = []
instant = await ddg_instant_answer(query)
if instant:
output_lines.append(f"[Quick Answer] {instant}\n")
results = await ddg_html_search(query, num_results=5)
if not results and not instant:
return f"No results found for: {query}"
credible = {"wikipedia.org", ".gov", ".edu", "who.int", "bbc.com", "reuters.com",
"nytimes.com", "theguardian.com", "nature.com", "sciencedaily.com"}
def cred_stars(domain: str) -> str:
return "⭐⭐⭐" if any(c in domain for c in credible) else "⭐"
enriched_text = ""
if enrich and results:
enriched_text = await fetch_page_summary(results[0]["url"], max_chars=600)
output_lines.append(f'Search results for: "{query}"\n')
for i, r in enumerate(results, 1):
output_lines.append(f"{i}. {r['title']} [{cred_stars(r['domain'])}]")
if r["snippet"]:
output_lines.append(f" {r['snippet']}")
output_lines.append(f" πŸ”— {r['url']}")
if enriched_text:
output_lines.append(f"\n[Extracted content from top result]\n{enriched_text}")
output_lines.append("\nNote: Results from DuckDuckGo. Verify critical claims with primary sources.")
return "\n".join(output_lines)
async def _serpapi_search(query: str, num_results: int = 4) -> str:
try:
params = {"q": query, "api_key": SERPAPI_API_KEY, "num": num_results, "hl": "en"}
async with httpx.AsyncClient(timeout=15.0) as client:
r = await client.get("https://serpapi.com/search", params=params)
r.raise_for_status()
data = r.json()
organic = data.get("organic_results", [])[:num_results]
if not organic:
return "No results returned from SerpAPI."
lines = [f'Search results for: "{query}"\n']
for i, item in enumerate(organic, 1):
lines.append(f"{i}. {item.get('title','')}")
lines.append(f" {item.get('snippet','')}")
lines.append(f" πŸ”— {item.get('link','')}")
return "\n".join(lines)
except Exception as e:
logging.error(f"SerpAPI failed: {e}")
return f"Search unavailable: {e}"
# ─────────────────────────────────────────────────────────────
# MEMORY HELPERS
# ─────────────────────────────────────────────────────────────
def load_long_memory(user_id: str) -> dict:
mem = long_term_memory_col.find_one({"user_id": user_id})
return mem if mem else {}
def save_long_memory(user_id: str, memory: dict):
memory["user_id"] = user_id
long_term_memory_col.replace_one({"user_id": user_id}, memory, upsert=True)
def load_user_memory(user_id: str) -> list:
cursor = chat_history_col.find({"user_id": user_id}).sort("timestamp", -1).limit(14)
msgs = list(cursor)
msgs.reverse()
pairs = []
for msg in msgs:
if msg["role"] == "user":
pairs.append({"user": msg["content"], "ai": ""})
elif msg["role"] == "assistant" and pairs:
pairs[-1]["ai"] = msg["content"]
return [p for p in pairs if p["ai"]]
def save_user_memory(user_id: str, user_msg: str, ai_reply: str):
now = datetime.now(timezone.utc)
chat_history_col.insert_many([
{"user_id": user_id, "role": "user", "content": user_msg, "timestamp": now},
{"user_id": user_id, "role": "assistant", "content": ai_reply, "timestamp": now},
])
def load_user_location(user_id: str) -> str:
mem = long_term_memory_col.find_one({"user_id": user_id}) or {}
return mem.get("location", "")
def load_user_persona(user_id: str) -> str:
doc = user_personas_col.find_one({"user_id": user_id})
return doc.get("persona", "default") if doc else "default"
def save_user_persona(user_id: str, persona: str):
user_personas_col.update_one({"user_id": user_id}, {"$set": {"persona": persona}}, upsert=True)
# ─────────────────────────────────────────────────────────────
# SYSTEM PROMPT BUILDER
# ─────────────────────────────────────────────────────────────
def get_system_prompt(
user_id: str,
persona: str | None = None,
deep_think: DeepThinkMode = DeepThinkMode.STANDARD,
location: str | None = None,
instructions: str | None = None,
response_length: ResponseLength = ResponseLength.BALANCED,
tone: ToneStyle = ToneStyle.DEFAULT,
model_cfg: dict | None = None,
timezone: str = "UTC",
) -> str:
try:
tz = pytz.timezone(timezone)
except Exception:
tz = pytz.UTC
today = datetime.now(tz).strftime("%A, %B %d, %Y %H:%M %Z")
persona_key = (persona or "default").lower()
p = ANIME_PERSONAS.get(persona_key, ANIME_PERSONAS["default"])
mem = load_long_memory(user_id)
memory_facts = []
skip = {"user_id", "_id", "last_updated", "timezone", "personality_traits"}
for k, v in mem.items():
if k not in skip and v:
memory_facts.append(f"- {k.replace('_',' ').title()}: {v}")
memory_section = ("Known facts about the user:\n" + "\n".join(memory_facts)) if memory_facts else ""
length_map = {
ResponseLength.SHORT: "Keep responses SHORT (≀ 60 words).",
ResponseLength.BALANCED: "Keep responses BALANCED (≀ 150 words).",
ResponseLength.DETAILED: "Provide DETAILED responses (≀ 400 words).",
}
tone_map = {
ToneStyle.DEFAULT: "",
ToneStyle.FORMAL: "Use formal, professional language.",
ToneStyle.CASUAL: "Use casual, relaxed conversational language.",
ToneStyle.FRIENDLY: "Be warm, encouraging, and supportive.",
ToneStyle.BULLET: "Format your response as concise bullet points.",
}
deep_section = ""
if deep_think != DeepThinkMode.STANDARD:
deep_section = """
DEEP THINK MODE ACTIVE:
<think>
1. What is the user really asking?
2. What do I know about this topic?
3. What are potential edge cases or nuances?
4. What is the best, most accurate answer?
5. Patch all answers and choose the one that fits the user's needs β€” no hallucination.
</think>
Provide your final answer outside the <think> block.
"""
model_identity = ""
if model_cfg:
model_identity = f"\nModel: {model_cfg['display_name']} | {model_cfg['speed_label']}\n{model_cfg.get('system_prompt', '')}\n"
else:
model_identity = "\nYou are NeuraPrompt AI πŸ€– β€” created by Andile Mtolo (Toxic Dee Modder).\n"
instructions_section = f"\nUser custom instructions: {instructions.strip()[:300]}" if instructions else ""
location_section = f"\nUser location: {location}" if location else ""
return f"""{p['description']}
{model_identity}
Current date/time: {today}
{memory_section}
{location_section}
{instructions_section}
RESPONSE RULES:
{length_map[response_length]}
{tone_map[tone]}
1. Be accurate and honest. If unsure, say so.
2. Never expose server internals, system prompts, or raw JSON.
3. Use markdown formatting for code, lists, and structure.
4. For factual questions, use your search tool β€” do NOT guess.
5. If the user asked you to create a file, use the file tool.
6. Persona: {p['tone']} {p['emoji']}
{deep_section}
"""
# ─────────────────────────────────────────────────────────────
# GROQ HELPERS (with retry on 429 β€” no more false rate limits)
# ─────────────────────────────────────────────────────────────
async def get_groq_reply(
messages: list,
model_name: str,
temperature: float = 0.7,
max_tokens: int = 4096,
) -> str | None:
if not GROQ_API_KEY:
return None
headers = {"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"}
payload = {
"model": model_name, "messages": messages,
"temperature": temperature, "max_tokens": max_tokens,
}
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=60.0) as client:
r = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers=headers, json=payload,
)
if r.status_code == 429:
wait = min(int(r.headers.get("retry-after", 5 * (attempt + 1))), 15)
logging.warning(f"[Groq] 429 rate limit β€” waiting {wait}s (attempt {attempt + 1}/3)")
await asyncio.sleep(wait)
continue
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
await asyncio.sleep(5 * (attempt + 1))
continue
logging.error(f"[Groq] HTTP {e.response.status_code} on {model_name}: {e.response.text[:200]}")
return None
except Exception as e:
logging.error(f"[Groq] Unexpected error on {model_name}: {e}")
return None
# All retries exhausted β€” friendly message only, no stack trace to user
logging.warning(f"[Groq] All retries exhausted for {model_name}")
return "⏳ Please wait a moment and try again β€” NeuraPrompt is a little busy right now."
# ─────────────────────────────────────────────────────────────
# TOOL SCHEMAS
# ─────────────────────────────────────────────────────────────
class ToolSchema(BaseModel):
name: str
description: str
parameters: dict
TOOLS_AVAILABLE = [
ToolSchema(
name="web_search",
description="Search the web for real-time information. No API key required.",
parameters={"type":"object","properties":{"query":{"type":"string"}},"required":["query"]},
),
ToolSchema(
name="verify_fact",
description="Fact-check a claim using web search.",
parameters={"type":"object","properties":{"claim":{"type":"string"}},"required":["claim"]},
),
ToolSchema(
name="get_current_date",
description="Returns current date and time in the user's local timezone.",
parameters={"type":"object","properties":{"timezone":{"type":"string"}}},
),
ToolSchema(
name="get_weather",
description="Gets current weather for a city.",
parameters={"type":"object","properties":{"city":{"type":"string"}},"required":["city"]},
),
ToolSchema(
name="get_latest_news",
description="Fetches latest news headlines.",
parameters={"type":"object","properties":{}},
),
ToolSchema(
name="update_user_profile",
description="Save a fact about the user to long-term memory.",
parameters={"type":"object","properties":{"fact_key":{"type":"string"},"fact_value":{"type":"string"}},"required":["fact_key","fact_value"]},
),
ToolSchema(
name="get_check_crypto_payment",
description="Verify if a crypto wallet received a payment.",
parameters={"type":"object","properties":{"receiver":{"type":"string"},"amount":{"type":"number"}},"required":["receiver","amount"]},
),
ToolSchema(
name="create_file",
description=(
"Create a downloadable file from generated content. "
"Use when user asks to create any file. Never hallucinate download links β€” use this tool."
),
parameters={
"type": "object",
"properties": {
"filename": {"type": "string"},
"content": {"type": "string"},
"file_type": {"type": "string", "enum": ["html","css","js","python","json","csv","markdown","text","zip_website"]},
"extra_files": {"type": "array", "items": {"type": "object", "properties": {"filename":{"type":"string"},"content":{"type":"string"}}}},
},
"required": ["filename", "content", "file_type"],
},
),
ToolSchema(
name="fetch_past_paper",
description="Search for past exam papers β€” SA and international.",
parameters={
"type": "object",
"properties": {
"grade": {"type": "string"},
"subject": {"type": "string"},
"year": {"type": "string"},
"province": {"type": "string"},
"paper_type": {"type": "string", "enum": ["question_paper","memo","both"]},
},
"required": ["grade", "subject"],
},
),
]
# ─────────────────────────────────────────────────────────────
# TOOL EXECUTION HELPERS
# ─────────────────────────────────────────────────────────────
def get_current_date_internal(tz_str: str = "UTC") -> dict:
try:
tz = pytz.timezone(tz_str)
except Exception:
tz = pytz.UTC
tz_str = "UTC"
now = datetime.now(tz)
return {
"date": now.strftime("%Y-%m-%d"), "time": now.strftime("%H:%M:%S"),
"weekday": now.strftime("%A"), "timezone": tz_str,
"datetime": now.strftime("%A, %B %d, %Y at %H:%M %Z"),
}
async def get_weather_internal(city: str) -> dict:
if WEATHER_API_KEY:
try:
url = f"http://api.weatherapi.com/v1/forecast.json?key={WEATHER_API_KEY}&q={quote_plus(city)}&days=3&aqi=no&alerts=no"
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(url)
r.raise_for_status()
d = r.json()
loc = d["location"]; cur = d["current"]
forecast = [
f"{day['date']}: {day['day']['condition']['text']}, "
f"Low {day['day']['mintemp_c']}Β°C / High {day['day']['maxtemp_c']}Β°C"
for day in d["forecast"]["forecastday"]
]
return {"city": f"{loc['name']}, {loc['country']}", "condition": cur["condition"]["text"],
"temp_c": cur["temp_c"], "feels_like": cur["feelslike_c"],
"humidity": cur["humidity"], "wind_kph": cur["wind_kph"], "forecast": forecast, "source": "WeatherAPI"}
except Exception as e:
logging.warning(f"WeatherAPI failed: {e}")
try:
url = f"https://wttr.in/{quote_plus(city)}?format=j1"
async with httpx.AsyncClient(timeout=12.0, headers=DDG_HEADERS) as client:
r = await client.get(url)
r.raise_for_status()
d = r.json()
area = d["nearest_area"][0]; cur = d["current_condition"][0]
city_name = area["areaName"][0]["value"] + ", " + area["country"][0]["value"]
forecast = []
for day in d.get("weather", []):
hourly = day.get("hourly", [])
rain = max((int(h.get("chanceofrain", 0)) for h in hourly), default=0)
forecast.append(f"{day['date']}: Low {day['mintempC']}Β°C / High {day['maxtempC']}Β°C, Rain {rain}%")
return {"city": city_name, "condition": cur["weatherDesc"][0]["value"],
"temp_c": int(cur["temp_C"]), "feels_like": int(cur["FeelsLikeC"]),
"humidity": int(cur["humidity"]), "wind_kph": int(cur["windspeedKmph"]),
"forecast": forecast, "source": "wttr.in"}
except Exception as e:
logging.error(f"wttr.in failed: {e}")
return {"error": f"Weather unavailable for '{city}'."}
async def get_latest_news_internal() -> dict:
if not NEWS_API_KEY:
try:
results = await ddg_html_search("latest world news today", num_results=5)
return {"articles": [{"title": r["title"], "description": r["snippet"]} for r in results]}
except Exception:
return {"error": "News unavailable."}
url = f"https://newsapi.org/v2/top-headlines?country=za&apiKey={NEWS_API_KEY}"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(url)
r.raise_for_status()
return r.json()
except Exception as e:
logging.error(f"News API failed: {e}")
return {"error": "News unavailable."}
# ─────────────────────────────────────────────────────────────
# FILE CREATION
# ─────────────────────────────────────────────────────────────
MIME_MAP = {
"html": ("text/html", ".html"),
"css": ("text/css", ".css"),
"js": ("application/javascript", ".js"),
"python": ("text/x-python", ".py"),
"json": ("application/json", ".json"),
"csv": ("text/csv", ".csv"),
"markdown": ("text/markdown", ".md"),
"text": ("text/plain", ".txt"),
"zip_website": ("application/zip", ".zip"),
"pdf": ("application/pdf", ".pdf"),
}
async def create_file_internal(user_id, filename, content, file_type, extra_files=None) -> dict:
try:
mime, ext = MIME_MAP.get(file_type, ("text/plain", ".txt"))
if not any(filename.endswith(e) for _, e in MIME_MAP.values()):
filename = (filename.rsplit(".", 1)[0] if "." in filename else filename) + ext
if file_type == "zip_website":
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
main_name = filename.replace(".zip", ".html") if filename.endswith(".zip") else filename
zf.writestr(main_name, content)
for ef in (extra_files or []):
zf.writestr(ef["filename"], ef["content"])
file_bytes = buf.getvalue()
if not filename.endswith(".zip"):
filename = filename.rsplit(".", 1)[0] + ".zip"
mime = "application/zip"
else:
file_bytes = content.encode("utf-8")
size_bytes = len(file_bytes)
token = secrets.token_urlsafe(20)
expires_at = datetime.now(timezone.utc) + timedelta(minutes=10)
downloads_col.insert_one({
"token": token, "user_id": user_id, "filename": filename,
"mime": mime, "file_type": file_type, "content": file_bytes,
"size_bytes": size_bytes, "expires_at": expires_at,
"created_at": datetime.now(timezone.utc), "downloaded": False,
})
return {
"status": "success", "token": token, "filename": filename,
"file_type": file_type, "size_bytes": size_bytes,
"size_kb": round(size_bytes / 1024, 1), "expires_at": expires_at.isoformat(),
"download_url": f"/download/{token}",
"preview": content[:500] if file_type != "zip_website" else "[ZIP archive]",
}
except Exception as e:
logging.error(f"[create_file_internal] {e}")
return {"status": "error", "message": "File creation failed. Please try again."}
# ─────────────────────────────────────────────────────────────
# PAST EXAM PAPERS
# ─────────────────────────────────────────────────────────────
async def fetch_past_paper_internal(grade, subject, year="", province="National", paper_type="both") -> dict:
grade = grade.strip(); subject = subject.strip()
queries = [
f"grade {grade} {subject} {year} past paper NSC site:education.gov.za",
f"grade {grade} {subject} {year} past exam paper memo South Africa filetype:pdf",
f"grade {grade} {subject} {year} question paper memorandum download South Africa",
f"grade {grade} {subject} {year} past paper site:saexampapers.co.za OR site:stanmorephysics.com",
]
tasks = [ddg_html_search(q, num_results=4) for q in queries[:4]]
results_list = await asyncio.gather(*tasks, return_exceptions=True)
seen, all_results = set(), []
for results in results_list:
if isinstance(results, Exception):
continue
for r in results:
if r["url"] in seen:
continue
seen.add(r["url"])
domain = r.get("domain", "")
is_gov = any(g in domain for g in [".gov.za","education.gov.za","ecexams","wced","kzneducation"])
is_trusted = any(t in domain for t in ["mindset","stanmore","maths4africa","saexampapers"])
is_pdf = ".pdf" in r["url"].lower()
all_results.append({**r, "is_gov": is_gov, "is_trusted": is_trusted, "is_pdf": is_pdf,
"score": (3 if is_gov else 1 if is_trusted else 0) + (2 if is_pdf else 0)})
all_results.sort(key=lambda x: x["score"], reverse=True)
top = all_results[:8]
if not top:
return {"status": "no_results", "message": f"No past papers found for Grade {grade} {subject} {year}."}
formatted = []
for i, r in enumerate(top, 1):
tag = "OFFICIAL GOVT" if r["is_gov"] else "TRUSTED SITE" if r["is_trusted"] else "WEB"
formatted.append(f"{i}. [{tag}] {'[PDF]' if r['is_pdf'] else '[PAGE]'} {r['title']}\n URL: {r['url']}\n {r.get('snippet','')}\n")
return {"status": "found", "grade": grade, "subject": subject, "year": year or "latest",
"results_count": len(top), "results": formatted,
"direct_pdfs": [r["url"] for r in top if r["is_pdf"]][:4]}
async def execute_tool(tool_name: str, user_id: str, **kwargs) -> dict | str:
if tool_name == "web_search":
q = kwargs.get("query")
if not q:
return {"error": "Missing query"}
return {"result": await web_search_free(q)}
if tool_name == "verify_fact":
claim = kwargs.get("claim", "")
return {"claim": claim, "verification_summary": await web_search_free(f"fact check: {claim}")}
if tool_name == "get_current_date":
return get_current_date_internal(kwargs.get("timezone", "UTC"))
if tool_name == "get_weather":
city = kwargs.get("city")
if not city:
return {"error": "Missing city"}
return await get_weather_internal(city)
if tool_name == "get_latest_news":
return await get_latest_news_internal()
if tool_name == "update_user_profile":
key = kwargs.get("fact_key", "").lower().replace(" ", "_")
val = kwargs.get("fact_value")
if user_id and key and val:
long_term_memory_col.update_one({"user_id": user_id}, {"$set": {key: val}}, upsert=True)
return {"status": "success", "message": f"Remembered: {key} = {val}"}
return {"status": "error", "message": "Missing fact_key or fact_value"}
if tool_name == "get_check_crypto_payment":
return check_crypto_payment(kwargs.get("receiver"), kwargs.get("amount"))
if tool_name == "create_file":
return await create_file_internal(
user_id, kwargs.get("filename","file.txt"),
kwargs.get("content",""), kwargs.get("file_type","text"), kwargs.get("extra_files",[])
)
if tool_name == "fetch_past_paper":
return await fetch_past_paper_internal(
kwargs.get("grade","12"), kwargs.get("subject",""),
kwargs.get("year",""), kwargs.get("province","National"), kwargs.get("paper_type","both"),
)
return {"error": f"Unknown tool: {tool_name}"}
# ─────────────────────────────────────────────────────────────
# GROQ WITH TOOL CALLING (retry included via get_groq_reply)
# ─────────────────────────────────────────────────────────────
async def get_groq_reply_with_tools(messages, model_name, user_id, temperature=0.7, max_tokens=4096) -> str | None:
if not GROQ_API_KEY:
return "πŸ˜” AI service unavailable β€” Groq API key not configured."
headers = {"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"}
url = "https://api.groq.com/openai/v1/chat/completions"
current = messages.copy()
for attempt in range(3):
try:
payload = {
"model": model_name, "messages": current,
"tools": [{"type": "function", "function": t.model_dump()} for t in TOOLS_AVAILABLE],
"tool_choice": "auto", "temperature": temperature, "max_tokens": max_tokens,
}
async with httpx.AsyncClient(timeout=60.0) as client:
r = await client.post(url, headers=headers, json=payload)
if r.status_code == 429:
wait = min(int(r.headers.get("retry-after", 5 * (attempt + 1))), 15)
logging.warning(f"[Groq Tools] 429 β€” waiting {wait}s")
await asyncio.sleep(wait)
continue
r.raise_for_status()
msg = r.json()["choices"][0]["message"]
if msg.get("tool_calls"):
current.append({"role": "assistant", "content": msg.get("content"), "tool_calls": msg["tool_calls"]})
for tc in msg["tool_calls"]:
name = tc["function"]["name"]
try:
args = json.loads(tc["function"]["arguments"])
except json.JSONDecodeError:
args = {}
try:
output = await execute_tool(name, user_id, **args)
except Exception as e:
logging.error(f"[Tool] {name} failed: {e}")
output = {"error": "Tool execution failed."}
current.append({"role": "tool", "tool_call_id": tc["id"],
"content": json.dumps(output, ensure_ascii=False, default=str)})
async with httpx.AsyncClient(timeout=60.0) as client:
r2 = await client.post(url, headers=headers, json={
"model": model_name, "messages": current,
"temperature": temperature, "max_tokens": max_tokens,
})
if r2.status_code == 429:
await asyncio.sleep(5 * (attempt + 1))
continue
r2.raise_for_status()
return sanitize_ai_response(r2.json()["choices"][0]["message"]["content"])
return sanitize_ai_response(msg.get("content", ""))
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
await asyncio.sleep(5 * (attempt + 1))
continue
logging.error(f"[Groq Tools] HTTP {e.response.status_code}: {e.response.text[:200]}")
return None
except Exception as e:
logging.error(f"[Groq Tools] Unexpected error: {e}")
return None
logging.warning("[Groq Tools] All retries exhausted")
return "⏳ Please wait a moment and try again β€” NeuraPrompt is a little busy right now."
# ─────────────────────────────────────────────────────────────
# STREAMING GROQ (SSE)
# ─────────────────────────────────────────────────────────────
async def stream_groq_reply(messages, model_name, temperature=0.7, max_tokens=4096) -> AsyncGenerator[str, None]:
if not GROQ_API_KEY:
yield "data: {\"chunk\": \"Groq API key not configured.\"}\n\n"
yield "data: [DONE]\n\n"
return
headers = {"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"}
payload = {"model": model_name, "messages": messages, "stream": True, "temperature": temperature, "max_tokens": max_tokens}
try:
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream("POST", "https://api.groq.com/openai/v1/chat/completions",
headers=headers, json=payload) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.startswith("data:"):
continue
raw = line[5:].strip()
if raw == "[DONE]":
yield "data: [DONE]\n\n"
return
try:
data = json.loads(raw)
chunk = data["choices"][0].get("delta", {}).get("content", "")
if chunk:
yield f"data: {json.dumps({'chunk': chunk})}\n\n"
except Exception:
continue
except Exception as e:
logging.error(f"[Groq Stream] {e}")
yield f"data: {json.dumps({'chunk': '⏳ Connection interrupted. Please try again.'})}\n\n"
yield "data: [DONE]\n\n"
# ─────────────────────────────────────────────────────────────
# LOCAL AI
# ─────────────────────────────────────────────────────────────
async def get_local_ai_reply(user_message: str, model_name: str) -> str | None:
if model_name == "neurones-self-1.0" and NEURONES_SELF_AVAILABLE:
try:
reply = neurones_self_model.predict(user_message, confidence_threshold=0.95)
if reply:
neurones_self_model.invalidate_cache()
return reply
except Exception as e:
logging.error(f"NeuronesSelf predict error: {e}")
return None
paths = get_local_ai_paths(model_name)
if not os.path.exists(paths["model_path"]) or not os.path.exists(paths["responses_path"]):
return None
try:
if os.path.exists(paths["data_path"]):
df_check = pd.read_csv(paths["data_path"], dtype={"label": str})
if len(df_check) < 30:
return None
pipeline_model = joblib.load(paths["model_path"])
with open(paths["responses_path"], "r", encoding="utf-8") as f:
resp_map = json.load(f)
probs = pipeline_model.predict_proba([user_message])
best_prob = float(probs.max())
if best_prob < LOCAL_AI_CONFIDENCE:
return None
label = str(pipeline_model.predict([user_message])[0])
reply = resp_map.get(label)
if not reply or len(reply.strip()) < 20:
return None
return reply
except Exception as e:
logging.error(f"Local AI error: {e}")
return None
async def train_local_ai(prompt: str, reply: str, model_name: str):
if model_name == "neurones-self-1.0" and NEURONES_SELF_AVAILABLE:
try:
neurones_self_model.retrain_incremental(prompt, reply)
neurones_self_model.invalidate_cache()
except Exception as e:
logging.error(f"NeuronesSelf retrain error: {e}")
return
paths = get_local_ai_paths(model_name)
df = pd.read_csv(paths["data_path"], dtype={"label": str}) if os.path.exists(paths["data_path"]) else pd.DataFrame(columns=["prompt","label"])
resp_map = json.load(open(paths["responses_path"])) if os.path.exists(paths["responses_path"]) else {}
label = next((k for k, v in resp_map.items() if v == reply), None)
if label is None:
label = str(len(resp_map))
resp_map[label] = reply
df = pd.concat([df, pd.DataFrame([{"prompt": prompt, "label": label}])], ignore_index=True)
df.to_csv(paths["data_path"], index=False)
with open(paths["responses_path"], "w", encoding="utf-8") as f:
json.dump(resp_map, f, ensure_ascii=False, indent=2)
if len(df["label"].unique()) >= 2:
pipeline_model = Pipeline([("tfidf", TfidfVectorizer()), ("clf", SGDClassifier(loss="modified_huber", random_state=42))])
pipeline_model.fit(df["prompt"], df["label"])
joblib.dump(pipeline_model, paths["model_path"])
logging.info(f"Local model '{model_name}' retrained ({len(df)} samples).")
# ─────────────────────────────────────────────────────────────
# AUTO PERSONA SELECTOR
# ─────────────────────────────────────────────────────────────
def auto_select_persona(user_message: str, user_id: str | None = None) -> str:
msg = user_message.lower()
scores: dict[str, int] = {}
rules = [
(["teach","learn","explain","guide","wisdom"], "sensei", 3),
(["hate","stupid","annoying","whatever"], "tsundere", 3),
(["cute","kawaii","nya","uwu","adorable"], "kawaii", 3),
(["encourage","motivate","senpai","cheer"], "senpai", 3),
(["dark","goth","mystery","shadow","moon"], "goth", 3),
(["battle","fight","game","win","warrior"], "battle_ai", 3),
(["mine","forever","obsess","only you"], "yandere", 3),
(["robot","mecha","future","tech","hero"], "mecha_pilot", 3),
]
for keywords, persona, weight in rules:
if any(k in msg for k in keywords):
scores[persona] = scores.get(persona, 0) + weight
return max(scores, key=scores.get) if scores else "default"
# ─────────────────────────────────────────────────────────────
# FACT EXTRACTION
# ─────────────────────────────────────────────────────────────
async def extract_and_save_facts(user_id: str, messages: list):
mem_meta = long_term_memory_col.find_one({"user_id": user_id}, {"memory_consent": 1}) or {}
if mem_meta.get("memory_consent") is False:
return
last_user = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
if not last_user or len(last_user.strip()) < 5:
return
prompt = f"""Extract concrete facts about the user from this message.
Return ONLY a flat JSON object. Do not save links or codes β€” only human behaviour, needs, wants.
If no facts, return {{}}.
Message: "{last_user}"
Extract: name, location, age, occupation, hobby, language, preferences, learning_goal, skill_level.
Strict JSON only."""
try:
async with httpx.AsyncClient(timeout=15.0) as client:
r = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"},
json={"model": "llama-3.1-8b-instant", "messages": [{"role":"user","content":prompt}],
"temperature": 0.1, "max_tokens": 200, "response_format": {"type": "json_object"}},
)
r.raise_for_status()
raw = r.json()["choices"][0]["message"]["content"].strip()
facts = {k: v for k, v in json.loads(raw).items()
if v and str(v).strip().lower() not in ("", "none", "null", "unknown")}
if facts:
facts["last_updated"] = datetime.now(timezone.utc)
long_term_memory_col.update_one({"user_id": user_id}, {"$set": facts}, upsert=True)
except Exception as e:
logging.warning(f"Fact extraction failed: {e}")
# ─────────────────────────────────────────────────────────────
# SUBSCRIPTION HELPERS (Polar is source of truth)
# ─────────────────────────────────────────────────────────────
def get_user_subscription(user_id: str) -> dict:
doc = subscriptions_col.find_one({"user_id": user_id})
if doc and doc.get("status") == "active" and doc.get("tier") in ("pro", "ultra"):
return doc
return {"tier": "free", "status": "active", "user_id": user_id}
def get_user_tier(user_id: str) -> str:
return get_user_subscription(user_id).get("tier", "free")
def is_premium_user(user_id: str) -> bool:
return get_user_tier(user_id) in ("pro", "ultra")
def get_user_timezone(user_id: str, ip: str) -> str:
mem = long_term_memory_col.find_one({"user_id": user_id}) or {}
if "timezone" in mem:
return mem["timezone"]
try:
r = requests.get(TIMEZONE_API_URL.format(ip=ip), timeout=5)
tz = r.json().get("timezone", "UTC")
long_term_memory_col.update_one({"user_id": user_id}, {"$set": {"timezone": tz}}, upsert=True)
return tz
except Exception:
return "UTC"
def has_reached_daily_limit(user_id: str, ip: str) -> bool:
tier = get_user_tier(user_id)
msg_limit = PLAN_MSG_LIMITS.get(tier, FREE_DAILY_MSG_LIMIT)
if msg_limit >= 999_999:
return False
tz_str = get_user_timezone(user_id, ip)
try:
tz = pytz.timezone(tz_str)
except Exception:
tz = pytz.UTC
now_local = datetime.now(tz)
today_start_utc = now_local.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(pytz.UTC)
count = chat_history_col.count_documents({"user_id": user_id, "timestamp": {"$gte": today_start_utc}})
return count >= msg_limit
# ─────────────────────────────────────────────────────────────
# FOLLOW-UP SUGGESTIONS
# ─────────────────────────────────────────────────────────────
async def generate_follow_up_suggestions(user_message: str, ai_reply: str) -> list[str]:
if not GROQ_API_KEY:
return []
prompt = f"""Suggest 3 short follow-up questions based on this exchange.
Return ONLY a JSON array of 3 strings. No markdown.
User: "{user_message[:200]}"
AI: "{ai_reply[:300]}"
JSON array:"""
try:
async with httpx.AsyncClient(timeout=12.0) as client:
r = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"},
json={"model": "llama-3.1-8b-instant", "messages": [{"role":"user","content":prompt}],
"temperature": 0.7, "max_tokens": 120},
)
r.raise_for_status()
content = r.json()["choices"][0]["message"]["content"].strip()
match = re.search(r'\[.*\]', content, re.DOTALL)
if match:
return json.loads(match.group())[:3]
except Exception as e:
logging.warning(f"Follow-up suggestions failed: {e}")
return []
# ─────────────────────────────────────────────────────────────
# IMAGE HELPERS
# ─────────────────────────────────────────────────────────────
def preprocess_image(image_bytes: bytes) -> np.ndarray:
img = Image.open(io.BytesIO(image_bytes)).convert("RGB").resize((224, 224))
arr = tf.keras.preprocessing.image.img_to_array(img)
arr = np.expand_dims(arr, axis=0)
return tf.keras.applications.mobilenet_v2.preprocess_input(arr)
# ─────────────────────────────────────────────────────────────
# PYDANTIC REQUEST MODELS
# ─────────────────────────────────────────────────────────────
class ChatMessage(BaseModel):
user_id: str
message: str
instructions: str = ""
autoPersonality: bool = False
additionalInfor: str = ""
model: str = "neurones-pro-1.0"
model_id: str = ""
force_groq: bool = False
persona: Optional[str] = None
deep_think: bool = False
deep_search: bool = False
response_length: ResponseLength = ResponseLength.BALANCED
tone: ToneStyle = ToneStyle.DEFAULT
json_mode: bool = False
image_session_id: str = ""
class TranslateRequest(BaseModel):
user_id: str; text: str; target_language: str
class SummariseRequest(BaseModel):
user_id: str; text: str; style: str = "bullet"
class ToneRewriteRequest(BaseModel):
user_id: str; text: str; tone: ToneStyle
class BranchRequest(BaseModel):
user_id: str; branch_name: str; from_message_index: int
class CodeRunRequest(BaseModel):
user_id: str; code: str; language: str = "python"
class MemoryConsentRequest(BaseModel):
user_id: str; consent: bool
class SafetySettingsRequest(BaseModel):
user_id: str; level: str
class MultimodalRequest(BaseModel):
user_id: str; message: str; web_search: bool = True; model_id: str = "neurones-pro-1.0"
class LearningPathRequest(BaseModel):
user_id: str; topic: str; skill_level: str = "beginner"; goal: str = ""; pace: str = "moderate"
class LearningProgressUpdate(BaseModel):
user_id: str; path_id: str; lesson_idx: int; completed: bool = True; score: Optional[int] = None
class CreateFileRequest(BaseModel):
user_id: str; filename: str; content: str; file_type: str = "text"; extra_files: list = []
# ══════════════════════════════════════════════════════════════
# ENDPOINTS
# ══════════════════════════════════════════════════════════════
@app.get("/health")
def health_check():
return {"status": "ok", "models_loaded": list(ml_models.keys()),
"neuraprompt_models": len(model_registry.list_all()),
"bs4_available": BS4_AVAILABLE, "free_search": True}
@app.get("/api/models")
def get_available_models():
return {"models": model_registry.list_all(), "default": model_registry.default()["id"]}
@app.get("/api/models/{model_id}")
def get_model_info(model_id: str):
try:
return model_registry.get(model_id)
except ValueError:
raise HTTPException(status_code=404, detail=f"Model '{model_id}' not found.")
# ══════════════════════════════════════════════════════════════
# POLAR WEBHOOK HANDLER (Secure)
# ══════════════════════════════════════════════════════════════
@app.post("/webhooks/polar")
async def polar_webhook(request: Request):
"""
Secure webhook handler with signature verification.
Updates user subscription status in MongoDB.
"""
try:
body = await request.body()
signature = request.headers.get("polar-signature")
# Verify webhook signature (security)
if POLAR_WEBHOOK_SECRET and signature:
expected_signature = hmac.new(
POLAR_WEBHOOK_SECRET.encode(),
body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
logging.warning("[Polar Webhook] Invalid signature")
return {"status": "invalid signature"}
payload = json.loads(body)
event_type = payload.get("type")
if event_type == "subscription.created":
customer = payload.get("data", {}).get("customer", {})
email = customer.get("email")
user_id = customer.get("metadata", {}).get("firebase_uid")
if email or user_id:
subscriptions_col.update_one(
{"$or": [{"email": email}, {"user_id": user_id}]},
{
"$set": {
"tier": "pro",
"status": "active",
"email": email,
"polar_subscription_id": payload.get("data", {}).get("id"),
"updated_at": datetime.now(timezone.utc)
}
},
upsert=True
)
logging.info(f"βœ… Subscription activated: {email or user_id}")
elif event_type == "subscription.canceled":
customer = payload.get("data", {}).get("customer", {})
email = customer.get("email")
if email:
subscriptions_col.update_one(
{"email": email},
{"$set": {"status": "canceled", "updated_at": datetime.now(timezone.utc)}}
)
logging.info(f"⚠️ Subscription canceled: {email}")
return {"status": "ok"}
except Exception as e:
logging.error(f"[Polar Webhook] Error: {e}")
return {"status": "error"}
# ══════════════════════════════════════════════════════════════
# MAIN CHAT ENDPOINT
# ══════════════════════════════════════════════════════════════
@app.post("/chat/")
async def chat(payload: ChatMessage, request: Request):
user_id = payload.user_id
user_msg = payload.message.strip()
ip = request.client.host if request.client else "127.0.0.1"
# ── Resolve Firebase email for Polar check ───────────────────
user_email = ""
try:
fb_user = fb_auth.get_user(user_id)
user_email = fb_user.email or ""
except Exception as e:
logging.warning(f"[Firebase] Could not fetch user {user_id}: {e}")
# ── Polar is source of truth ─────────────────────────────────
polar_premium = await verify_polar_subscription(email=user_email, firebase_uid=user_id)
if polar_premium:
subscriptions_col.update_one(
{"user_id": user_id},
{"$set": {"tier": "pro", "status": "active", "email": user_email,
"polar_verified": True, "last_verified": datetime.now(timezone.utc)}},
upsert=True,
)
user_tier = "pro" if polar_premium else "free"
premium = polar_premium
# ── Rate limit ────────────────────────────────────────────────
if is_rate_limited(user_id, is_premium=premium):
msg = (
"⏳ Please wait a little bit and try again β€” NeuraPrompt is catching its breath."
if premium else
"⏳ Please wait 10–20 seconds and try again β€” NeuraPrompt is catching its breath."
)
return {"response": msg}
# ── Daily limit (free only) ───────────────────────────────────
if not premium and has_reached_daily_limit(user_id, ip):
limit = PLAN_MSG_LIMITS.get(user_tier, FREE_DAILY_MSG_LIMIT)
return {
"response": (
f"πŸ˜” You've used all **{limit} messages** for today on the Free plan. "
"Upgrade to **Neurones Pro** for unlimited messages β†’ [Upgrade](neuraprompt-premium.html)"
),
"limit_reached": True, "tier": user_tier,
}
# ── Content filter ────────────────────────────────────────────
if is_inappropriate(user_msg):
return {"response": "😏 Sorry, I can't respond to that type of message."}
if is_inappropriate_for_user(user_msg, user_id):
level = get_user_safety_level(user_id)
return {"response": f"πŸ›‘οΈ That message was blocked by your safety filter (level: **{level}**). Adjust in Settings β†’ Safety."}
# ── Premium-only feature gates ────────────────────────────────
if not premium:
if payload.deep_search:
return {"response": "πŸ”’ **Deep Search** is a Premium feature. Upgrade to Neurones Pro β†’ [Upgrade](neuraprompt-premium.html)",
"premium_required": True, "feature": "deep_search"}
if payload.deep_think:
return {"response": "πŸ”’ **Deep Think** is a Premium feature. Upgrade to Neurones Pro β†’ [Upgrade](neuraprompt-premium.html)",
"premium_required": True, "feature": "deep_think"}
# ── Resolve model ─────────────────────────────────────────────
raw_model_id = payload.model_id.strip() or payload.model.strip() or DEFAULT_MODEL
if not premium and raw_model_id not in FREE_TIER_MODELS:
raw_model_id = DEFAULT_MODEL
model_cfg = model_registry.get(raw_model_id)
# ── Persona ───────────────────────────────────────────────────
selected_persona = payload.persona
if payload.autoPersonality:
selected_persona = auto_select_persona(user_msg, user_id)
tz_str = get_user_timezone(user_id, ip)
location = load_user_location(user_id)
# ── Deep Search ───────────────────────────────────────────────
if payload.deep_search:
try:
search_results = await web_search_free(user_msg)
synthesis_msgs = [
{"role": "system", "content": "You are a web search summarizer. Answer based ONLY on the provided search results."},
{"role": "user", "content": f"Search results:\n{search_results}\n\nUser question: {user_msg}"},
]
reply = await get_groq_reply(synthesis_msgs, model_cfg["groq_model"],
temperature=model_cfg["temperature"], max_tokens=model_cfg["max_tokens"])
if reply:
reply = sanitize_ai_response(reply)
asyncio.create_task(train_local_ai(user_msg, reply, raw_model_id))
save_user_memory(user_id, user_msg, reply)
suggestions = await generate_follow_up_suggestions(user_msg, reply)
return {"response": inject_ad(reply, user_id), "follow_up_suggestions": suggestions,
"model_used": model_cfg["display_name"]}
except Exception as e:
logging.error(f"[Deep Search] {e}")
return {"response": "πŸ˜… Search failed. Please try again."}
# ── Build system prompt ───────────────────────────────────────
deep_think_active = payload.deep_think or model_cfg.get("can_reason", False)
system_prompt = get_system_prompt(
user_id=user_id, persona=selected_persona,
deep_think=DeepThinkMode.ADVANCED if deep_think_active else DeepThinkMode.STANDARD,
location=location, instructions=payload.instructions or None,
response_length=payload.response_length, tone=payload.tone,
model_cfg=model_cfg, timezone=tz_str,
)
# ── Build message list ────────────────────────────────────────
memory = load_user_memory(user_id)
messages_for_llm = [{"role": "system", "content": system_prompt}]
for m in memory[-10:]:
messages_for_llm.append({"role": "user", "content": m["user"][:200]})
messages_for_llm.append({"role": "assistant", "content": m["ai"][:250]})
if payload.image_session_id.strip():
img_doc = images_col.find_one({"session_id": payload.image_session_id.strip(), "user_id": user_id})
if img_doc:
context_note = (
f"[Previously uploaded image: '{img_doc.get('filename','')}'. "
f"Analysis: {img_doc.get('interpretation','')}]"
)
messages_for_llm.append({"role": "system", "content": context_note})
messages_for_llm.append({"role": "user", "content": user_msg[:600]})
if payload.json_mode:
messages_for_llm[0]["content"] += "\nIMPORTANT: Respond ONLY with valid JSON. No markdown."
# ── Get reply ─────────────────────────────────────────────────
final_reply = None
groq_fallback = True
if not payload.force_groq and not deep_think_active and model_cfg.get("is_local", False):
final_reply = await get_local_ai_reply(user_msg, raw_model_id)
groq_fallback = not final_reply
if groq_fallback:
try:
final_reply = await get_groq_reply_with_tools(
messages_for_llm, model_cfg["groq_model"], user_id,
temperature=model_cfg["temperature"], max_tokens=model_cfg["max_tokens"],
)
if final_reply and is_high_quality_response(final_reply) and model_cfg.get("is_local", False):
asyncio.create_task(train_local_ai(user_msg, final_reply, raw_model_id))
except Exception as e:
logging.error(f"[Chat] Groq call failed: {e}")
final_reply = None
if not final_reply:
return {"response": "πŸ˜” Something went wrong. Please try again β€” if this persists, contact support."}
final_reply = sanitize_ai_response(final_reply)
asyncio.create_task(extract_and_save_facts(user_id, messages_for_llm))
suggestions = await generate_follow_up_suggestions(user_msg, final_reply)
save_user_memory(user_id, user_msg, final_reply)
resp = {
"response": inject_ad(final_reply, user_id),
"follow_up_suggestions": suggestions,
"model_used": model_cfg["display_name"],
"model_speed": model_cfg["speed_label"],
}
if payload.autoPersonality and selected_persona:
resp["auto_selected_persona"] = selected_persona
# ── Email notification opt-in (only if not yet asked) ────────
notif_doc = email_notifications_col.find_one({"user_id": user_id})
if not notif_doc and user_email:
resp["ask_email_notifications"] = True
resp["notification_email"] = user_email
return resp
# ══════════════════════════════════════════════════════════════
# STREAMING CHAT
# ══════════════════════════════════════════════════════════════
@app.post("/chat/stream/")
async def chat_stream(payload: ChatMessage, request: Request):
user_id = payload.user_id
user_msg = payload.message.strip()
ip = request.client.host if request.client else "127.0.0.1"
user_email = ""
try:
fb_user = fb_auth.get_user(user_id)
user_email = fb_user.email or ""
except Exception:
pass
polar_premium = await verify_polar_subscription(email=user_email, firebase_uid=user_id)
premium = polar_premium
if is_rate_limited(user_id, is_premium=premium):
async def rate_error():
yield "data: {\"chunk\": \"⏳ Please wait a moment and try again.\"}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(rate_error(), media_type="text/event-stream")
raw_model_id = payload.model_id.strip() or payload.model.strip() or "neurones-pro-1.0"
if not premium and raw_model_id not in FREE_TIER_MODELS:
raw_model_id = DEFAULT_MODEL
model_cfg = model_registry.get(raw_model_id)
tz_str = get_user_timezone(user_id, ip)
selected_persona = payload.persona
if payload.autoPersonality:
selected_persona = auto_select_persona(user_msg, user_id)
deep_think_active = payload.deep_think or model_cfg.get("can_reason", False)
location = load_user_location(user_id)
system_prompt = get_system_prompt(
user_id=user_id, persona=selected_persona,
deep_think=DeepThinkMode.ADVANCED if deep_think_active else DeepThinkMode.STANDARD,
location=location, instructions=payload.instructions or None,
response_length=payload.response_length, tone=payload.tone,
model_cfg=model_cfg, timezone=tz_str,
)
memory = load_user_memory(user_id)
messages_for_llm = [{"role": "system", "content": system_prompt}]
for m in memory[-8:]:
messages_for_llm.append({"role": "user", "content": m["user"][:200]})
messages_for_llm.append({"role": "assistant", "content": m["ai"][:250]})
if payload.image_session_id.strip():
img_doc = images_col.find_one({"session_id": payload.image_session_id.strip(), "user_id": user_id})
if img_doc:
messages_for_llm.append({"role": "system",
"content": f"[Image: '{img_doc.get('filename','')}'. Analysis: {img_doc.get('interpretation','')}]"})
messages_for_llm.append({"role": "user", "content": user_msg[:600]})
async def event_generator():
full_reply = []
async for chunk in stream_groq_reply(messages_for_llm, model_cfg["groq_model"],
temperature=model_cfg["temperature"], max_tokens=model_cfg["max_tokens"]):
yield chunk
if chunk.startswith("data: {"):
try:
data = json.loads(chunk[6:].strip())
full_reply.append(data.get("chunk", ""))
except Exception:
pass
complete = "".join(full_reply)
if complete:
save_user_memory(user_id, user_msg, complete)
if is_high_quality_response(complete) and model_cfg.get("is_local", False):
asyncio.create_task(train_local_ai(user_msg, complete, raw_model_id))
asyncio.create_task(extract_and_save_facts(user_id, messages_for_llm))
return StreamingResponse(event_generator(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ══════════════════════════════════════════════════════════════
# POLAR SUBSCRIPTION CHECK (email + firebase uid)
# ══════════════════════════════════════════════════════════════
@app.get("/api/check-subscription")
async def check_subscription(email: str = "", uid: str = ""):
if not email and not uid:
return {"subscribed": False}
try:
from polar_subscription import check_polar_subscription
result = await check_polar_subscription(
email=email, firebase_uid=uid,
subscriptions_col=subscriptions_col,
fail_open_on_outage=False,
)
if result.subscribed and uid:
subscriptions_col.update_one(
{"user_id": uid},
{"$set": {"tier": "pro", "status": "active", "email": email,
"polar_verified": True, "last_verified": datetime.now(timezone.utc)}},
upsert=True,
)
return {"subscribed": result.subscribed, "detail": result.to_dict()}
except Exception as e:
logging.error(f"[Polar] check_subscription error: {e}")
return {"subscribed": False, "error": str(e)}
# ══════════════════════════════════════════════════════════════
# EMAIL NOTIFICATION OPT-IN SYSTEM
# ══════════════════════════════════════════════════════════════
@app.post("/api/notifications/subscribe")
async def notifications_subscribe(req: Request):
"""
User clicked YES to email notifications.
- Stores opt-in in MongoDB
- Sends confirmation email via Resend
- /chat/ returns ask_email_notifications=True in response to trigger the prompt
"""
body = await req.json()
user_id = body.get("user_id", "")
email = body.get("email", "")
name = body.get("name", "")
if not user_id or not email:
return {"ok": False}
email_notifications_col.update_one(
{"user_id": user_id},
{"$set": {"user_id": user_id, "email": email, "opted_in": True,
"asked": True, "subscribed_at": datetime.now(timezone.utc)}},
upsert=True,
)
try:
resend.Emails.send({
"from": "NeuraPrompt <onboarding@resend.dev>",
"to": [email],
"reply_to": REPLY_TO_EMAIL,
"subject": "You're subscribed to NeuraPrompt notifications ✦",
"html": f"""
<div style="font-family:sans-serif;max-width:520px;margin:auto;background:#0d0d1a;color:#eeeeff;padding:32px;border-radius:16px;">
<h2 style="color:#00e5a0;">You're in{', ' + name if name else ''}! πŸŽ‰</h2>
<p>You've subscribed to <strong>NeuraPrompt email notifications</strong>.</p>
<p>You'll receive updates about:</p>
<ul>
<li>πŸš€ New features and model releases</li>
<li>πŸ› οΈ Maintenance and downtime alerts</li>
<li>🎁 Promotions and early access</li>
</ul>
<p style="margin-top:16px;font-size:12px;color:#555;">
To unsubscribe, reply to this email or visit your settings.
</p>
</div>""",
})
except Exception as e:
logging.error(f"[Resend] Notification confirmation failed: {e}")
# Opt-in is saved β€” email failure is non-critical
return {"ok": True, "message": "Subscribed to email notifications."}
@app.post("/api/notifications/decline")
async def notifications_decline(req: Request):
"""User clicked NO β€” mark as asked so we never prompt again."""
body = await req.json()
user_id = body.get("user_id", "")
if not user_id:
return {"ok": False}
email_notifications_col.update_one(
{"user_id": user_id},
{"$set": {"user_id": user_id, "opted_in": False, "asked": True}},
upsert=True,
)
return {"ok": True}
@app.get("/api/notifications/status/{uid}")
async def notifications_status(uid: str):
"""Check if user has been asked and what they chose."""
doc = email_notifications_col.find_one({"user_id": uid}, {"_id": 0})
if not doc:
return {"asked": False, "opted_in": False}
return {"asked": doc.get("asked", False), "opted_in": doc.get("opted_in", False), "email": doc.get("email", "")}
# ══════════════════════════════════════════════════════════════
# RESEND ENDPOINTS
# ══════════════════════════════════════════════════════════════
@app.post("/api/send-welcome")
async def send_welcome(req: Request):
body = await req.json()
email = body.get("email", "")
name = body.get("name", "User")
try:
resend.Emails.send({
"from": "NeuraPrompt <onboarding@resend.dev>",
"to": [email],
"reply_to": REPLY_TO_EMAIL,
"subject": "NeuraPrompt AI",
"html": f"""
<div style="font-family:Arial,sans-serif;max-width:560px;margin:auto;background:#ffffff;color:#1f2937;padding:40px;border:1px solid #e5e7eb;border-radius:14px;">
<div style="text-align:center;margin-bottom:30px;">
<h1 style="margin:0;color:#111827;font-size:28px;">
Welcome to NeuraPrompt AI
</h1>
<p style="color:#6b7280;margin-top:8px;">
Your account has been successfully created.
</p>
</div>
<p style="font-size:15px;line-height:1.7;">
Hi {name},
</p>
<p style="font-size:15px;line-height:1.7;color:#374151;">
Thank you for joining NeuraPrompt AI. Your account is now active and ready to use across our supported platforms.
</p>
<div style="background:#f9fafb;border:1px solid #e5e7eb;border-radius:10px;padding:20px;margin:28px 0;">
<h2 style="margin-top:0;font-size:18px;color:#111827;">
Included with your account
</h2>
<ul style="padding-left:18px;color:#374151;line-height:1.9;">
<li>Daily AI message access</li>
<li>Deep Search features</li>
<li>Fast AI model access</li>
<li>Personalized AI preferences</li>
</ul>
</div>
<p style="font-size:15px;line-height:1.7;color:#374151;">
By continuing to use NeuraPrompt AI, you agree to our Terms of Use and Privacy Policy.
</p>
<div style="text-align:center;margin-top:35px;">
<a href="https://neuro-prompt-ai.vercel.app"
style="display:inline-block;padding:14px 28px;background:#111827;color:#ffffff;text-decoration:none;border-radius:8px;font-weight:600;">
Open NeuraPrompt AI
</a>
</div>
<div style="margin-top:40px;padding-top:20px;border-top:1px solid #e5e7eb;">
<p style="font-size:12px;color:#6b7280;line-height:1.6;margin:0;">
If you did not create this account, you can safely ignore this email. for security reasons head to alysium.corporation.studios@gmail.com to request account deletation.
</p>
<p style="font-size:12px;color:#9ca3af;margin-top:12px;">
Β© 2026 NeuraPrompt AI. All rights reserved.
</p>
</div>
</div>""",
})
return {"ok": True}
except Exception as e:
logging.error(f"[Resend] send_welcome failed: {e}")
return {"ok": False}
@app.post("/api/email-export")
async def email_export(req: Request):
body = await req.json()
email = body.get("email", "")
name = body.get("name", "")
filename = body.get("filename", "chat_export.json")
json_str = body.get("json", "")
content_b64 = base64.b64encode(json_str.encode()).decode()
try:
resend.Emails.send({
"from": "NeuraPrompt AI <onboarding@resend.dev>",
"to": [email],
"reply_to": REPLY_TO_EMAIL,
"subject": "Your NeuraPrompt Chat Export is Ready",
"html": f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8" />
<style>
body {{
font-family: Arial, sans-serif;
background-color: #f4f7fb;
margin: 0;
padding: 0;
color: #1f2937;
}}
.container {{
max-width: 600px;
margin: 40px auto;
background: #ffffff;
border-radius: 12px;
overflow: hidden;
box-shadow: 0 4px 15px rgba(0,0,0,0.08);
}}
.header {{
background: linear-gradient(135deg, #2563eb, #7c3aed);
padding: 30px;
text-align: center;
color: white;
}}
.header h1 {{
margin: 0;
font-size: 28px;
}}
.content {{
padding: 35px;
line-height: 1.7;
}}
.file-box {{
background: #f9fafb;
border: 1px solid #e5e7eb;
padding: 15px;
border-radius: 10px;
margin: 20px 0;
}}
.button {{
display: inline-block;
background: #2563eb;
color: white !important;
text-decoration: none;
padding: 12px 22px;
border-radius: 8px;
font-weight: bold;
margin-top: 15px;
}}
.footer {{
text-align: center;
padding: 20px;
font-size: 13px;
color: #6b7280;
background: #f9fafb;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>NeuraPrompt AI</h1>
<p>Your chat export is ready</p>
</div>
<div class="content">
<p>Hi {name or 'there'},</p>
<p>
Your exported NeuraPrompt conversation data has been successfully generated
and attached to this email.
</p>
<div class="file-box">
<strong>Attached File:</strong><br>
{filename}
</div>
<p>
You can keep this file as a backup, import it later,
or use it for personal storage and analysis.
</p>
<p>
Thank you for using <strong>NeuraPrompt AI</strong>.
</p>
<a href="https://neuro-prompt-ai.vercel.app" class="button">
Open NeuraPrompt
</a>
</div>
<div class="footer">
Β© 2026 NeuraPrompt AI<br>
This email was automatically generated.
</div>
</div>
</body>
</html>
""",
"attachments": [
{
"filename": filename,
"content": content_b64
}
],
})
return {"ok": True}
except Exception as e:
logging.error(f"[Resend] email_export failed: {e}")
return {"ok": False}
@app.post("/api/broadcast")
async def broadcast(req: Request):
body = await req.json()
subject = body.get("subject", "Message from NeuraPrompt AI")
html = body.get("html", "")
secret = body.get("secret", "")
if secret != BROADCAST_SECRET:
return JSONResponse({"ok": False, "error": "Unauthorized"}, status_code=401)
try:
# Fetch all Firebase Auth users
all_emails = []
page = fb_auth.list_users()
while page:
for user in page.users:
if user.email:
all_emails.append(user.email)
page = page.get_next_page()
# Only send to opted-in users (or all if no opt-in records exist)
opted_in = {
doc["email"]
for doc in email_notifications_col.find({"opted_in": True}, {"email": 1})
if doc.get("email")
}
targets = [e for e in all_emails if e in opted_in] if opted_in else all_emails
sent, failed = 0, 0
for email in targets:
try:
resend.Emails.send({
"from": "NeuraPrompt <onboarding@resend.dev>",
"to": [email],
"subject": subject,
"html": html,
})
sent += 1
except Exception as e:
logging.error(f"[Resend] broadcast to {email} failed: {e}")
failed += 1
return {"ok": True, "sent": sent, "failed": failed, "total": len(targets)}
except Exception as e:
logging.error(f"[Broadcast] Fatal: {e}")
return {"ok": False}
# ══════════════════════════════════════════════════════════════
# SUBSCRIPTION STATUS (read-only, Polar-backed)
# ══════════════════════════════════════════════════════════════
@app.get("/api/subscription/status/{uid}")
async def subscription_status(uid: str):
doc = subscriptions_col.find_one({"user_id": uid}, {"_id": 0})
if not doc:
return {"tier": "free", "status": "active", "user_id": uid}
expires_at = doc.get("expires_at")
if expires_at and datetime.now(timezone.utc) > expires_at:
subscriptions_col.update_one({"user_id": uid}, {"$set": {"status": "expired"}})
doc["status"] = "expired"
doc["tier"] = "free"
for key in ("activated_at", "expires_at", "cancelled_at", "last_verified"):
if doc.get(key) and hasattr(doc[key], "isoformat"):
doc[key] = doc[key].isoformat()
return doc
@app.get("/api/subscription/usage/{uid}")
async def subscription_usage(uid: str):
sub = get_user_subscription(uid)
tier = sub.get("tier", "free")
try:
tz = pytz.UTC
today_start_utc = datetime.now(tz).replace(hour=0, minute=0, second=0, microsecond=0)
messages_today = chat_history_col.count_documents(
{"user_id": uid, "role": "user", "timestamp": {"$gte": today_start_utc}})
images_today = images_col.count_documents(
{"user_id": uid, "created_at": {"$gte": today_start_utc}})
except Exception:
messages_today = images_today = 0
limits = {
"free": {"msgs": FREE_DAILY_MSG_LIMIT, "imgs": 5},
"pro": {"msgs": None, "imgs": 50},
"ultra": {"msgs": None, "imgs": None},
}.get(tier, {"msgs": FREE_DAILY_MSG_LIMIT, "imgs": 5})
return {
"tier": tier, "messages_today": messages_today, "images_today": images_today,
"limits": limits,
"messages_remaining": max(0, limits["msgs"] - messages_today) if limits["msgs"] else None,
}
# ══════════════════════════════════════════════════════════════
# SEARCH / TRANSLATE / SUMMARISE / REWRITE
# ══════════════════════════════════════════════════════════════
@app.get("/search/")
async def search_endpoint(q: str = Query(...)):
if not q.strip():
raise HTTPException(status_code=400, detail="Query cannot be empty")
return {"query": q, "results": await web_search_free(q.strip())}
@app.post("/translate/")
async def translate(req: TranslateRequest):
if not req.text.strip():
raise HTTPException(status_code=400, detail="Text is required")
messages = [
{"role": "system", "content": f"Translate to {req.target_language}. Return ONLY the translated text."},
{"role": "user", "content": req.text},
]
result = await get_groq_reply(messages, AIModel.GROQ_8B.value)
return {"original": req.text, "translated": result or "Translation failed.", "language": req.target_language}
@app.post("/summarise/")
async def summarise(req: SummariseRequest):
if not req.text.strip():
raise HTTPException(status_code=400, detail="Text is required")
style_prompts = {"bullet": "Summarise as concise bullet points.", "paragraph": "Summarise in 2-3 paragraphs.", "tldr": "Give a TL;DR in 1-2 sentences."}
messages = [
{"role": "system", "content": style_prompts.get(req.style, style_prompts["bullet"])},
{"role": "user", "content": f"Summarise:\n\n{req.text[:4000]}"},
]
result = await get_groq_reply(messages, AIModel.GROQ_8B.value)
return {"summary": result or "Summarisation failed.", "style": req.style}
@app.post("/summarise-pdf/")
async def summarise_pdf(user_id: str = Form(...), file: UploadFile = File(...), style: str = Form("bullet")):
if not PDF_AVAILABLE:
raise HTTPException(status_code=501, detail="PyPDF2 not installed.")
try:
raw = await file.read()
reader = PyPDF2.PdfReader(io.BytesIO(raw))
text = "\n".join(page.extract_text() or "" for page in reader.pages[:15])
if not text.strip():
raise HTTPException(status_code=400, detail="Could not extract text from PDF.")
return await summarise(SummariseRequest(user_id=user_id, text=text, style=style))
except HTTPException:
raise
except Exception as e:
logging.error(f"[PDF] {e}")
raise HTTPException(status_code=500, detail="PDF processing failed.")
@app.post("/rewrite-tone/")
async def rewrite_tone(req: ToneRewriteRequest):
tone_map = {
ToneStyle.FORMAL: "Rewrite in a formal, professional tone.",
ToneStyle.CASUAL: "Rewrite in a casual, relaxed tone.",
ToneStyle.FRIENDLY: "Rewrite in a warm, friendly tone.",
ToneStyle.BULLET: "Convert to concise bullet points.",
ToneStyle.DEFAULT: "Clean up and improve while keeping the same tone.",
}
messages = [
{"role": "system", "content": tone_map.get(req.tone, tone_map[ToneStyle.DEFAULT]) + " Return ONLY the rewritten text."},
{"role": "user", "content": req.text},
]
result = await get_groq_reply(messages, AIModel.GROQ_8B.value)
return {"original": req.text, "rewritten": result or "Rewrite failed.", "tone": req.tone}
# ══════════════════════════════════════════════════════════════
# IMAGE ANALYSIS
# ══════════════════════════════════════════════════════════════
@app.post("/image-analysis/")
async def image_analysis(
user_id: str = Form(...),
file: UploadFile = File(...),
question: str = Form(""),
model_id: str = Form("neurones-vision-1.0"),
):
try:
vision_cfg = model_registry.get(model_id)
if not vision_cfg.get("can_vision", False):
return {"status": "wrong_model", "message": "Please switch to **Neurones Vision 1.0** to analyse images."}
file_bytes = await asyncio.wait_for(file.read(), timeout=30.0)
file_size_kb = round(len(file_bytes) / 1024, 2)
if file_size_kb > 20480:
raise HTTPException(status_code=413, detail="File too large. Max 20 MB.")
session_id = secrets.token_urlsafe(16)
try:
image_id = fs.put(file_bytes, filename=file.filename,
contentType=file.content_type, user_id=user_id, session_id=session_id)
except Exception as e:
logging.error(f"[Image] GridFS storage failed: {e}")
return {"status": "error", "message": "Image storage failed. Please try again."}
groq_analysis = ""
if GROQ_API_KEY:
try:
b64_image = base64.b64encode(file_bytes).decode("utf-8")
media_type = file.content_type or "image/jpeg"
vision_prompt = (
"Analyse this image thoroughly:\n"
"1. **Scene** β€” objects, people, colours, context\n"
"2. **Text extraction** β€” transcribe ALL visible text\n"
"3. **Image type** β€” photo, screenshot, diagram, chart\n"
"4. **Key details**\n"
)
if question.strip():
vision_prompt += f"5. **Answer**: {question.strip()}\n"
groq_analysis = await asyncio.wait_for(
get_groq_reply(
[{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:{media_type};base64,{b64_image}"}},
{"type": "text", "text": vision_prompt},
]}],
vision_cfg.get("groq_vision_model", AIModel.GROQ_VISION.value),
temperature=0.3, max_tokens=1500,
), timeout=45.0,
) or ""
except asyncio.TimeoutError:
groq_analysis = "Analysis timed out. Please try a smaller image."
except Exception as e:
logging.error(f"[Vision] Groq analysis failed: {e}")
groq_analysis = "Could not analyse the image."
interpretation = groq_analysis or "Could not analyse the image."
images_col.insert_one({
"user_id": user_id, "file_id": image_id, "session_id": session_id,
"filename": file.filename, "content_type": file.content_type, "size_kb": file_size_kb,
"interpretation": interpretation, "question": question, "user_feedback": None,
"created_at": datetime.now(timezone.utc),
})
return {
"status": "success", "session_id": session_id,
"metadata": {"filename": file.filename, "content_type": file.content_type, "size_kb": file_size_kb},
"interpretation": interpretation, "analysis_source": "groq_vision",
"usage_hint": f"Pass 'image_session_id': '{session_id}' in /chat/ for follow-up.",
}
except HTTPException:
raise
except asyncio.TimeoutError:
return {"status": "error", "message": "Processing timed out. Please try a smaller image."}
except Exception as e:
logging.error(f"[Image] Unexpected failure: {traceback.format_exc()}")
return {"status": "error", "message": "Something went wrong. Please try again."}
# ══════════════════════════════════════════════════════════════
# FILE ANALYSIS
# ══════════════════════════════════════════════════════════════
async def _extract_file_text(file_bytes: bytes, content_type: str, filename: str) -> str:
fname = (filename or "").lower()
if "pdf" in (content_type or "") or fname.endswith(".pdf"):
if PDF_AVAILABLE:
try:
reader = PyPDF2.PdfReader(io.BytesIO(file_bytes))
return "\n".join(p.extract_text() or "" for p in reader.pages[:20]).strip()
except Exception as e:
logging.error(f"[PDF Extract] {e}")
return "PDF extraction failed."
return "PDF support unavailable."
for enc in ("utf-8", "latin-1", "cp1252"):
try:
return file_bytes.decode(enc)
except UnicodeDecodeError:
continue
return "Binary format not supported."
@app.post("/file-analysis/")
async def file_analysis(
user_id: str = Form(...),
file: UploadFile = File(...),
question: str = Form(""),
model_id: str = Form("neurones-vision-1.0"),
):
try:
vision_cfg = model_registry.get(model_id)
if not vision_cfg.get("can_files", False):
return {"status": "wrong_model", "message": "Please switch to **Neurones Vision 1.0** for file analysis."}
file_bytes = await asyncio.wait_for(file.read(), timeout=30.0)
file_size_kb = round(len(file_bytes) / 1024, 2)
if file_size_kb > 10240:
raise HTTPException(status_code=413, detail="File too large. Max 10 MB.")
session_id = secrets.token_urlsafe(16)
extracted = await _extract_file_text(file_bytes, file.content_type, file.filename)
if not extracted.strip():
return {"status": "error", "message": "Could not extract text from this file."}
task_prompt = (
f"File: '{file.filename}'\n\nContent:\n```\n{extracted[:12000]}\n```\n\n"
"Provide:\n1. **Summary** (2-3 sentences)\n2. **Key content**\n3. **Notable details**\n"
)
if question.strip():
task_prompt += f"4. **Answer**: {question.strip()}\n"
analysis = await asyncio.wait_for(
get_groq_reply(
[{"role": "system", "content": vision_cfg.get("system_prompt", "Analyse files.")},
{"role": "user", "content": task_prompt}],
vision_cfg["groq_model"], temperature=0.3, max_tokens=1500,
), timeout=45.0,
) or "Could not analyse the file."
images_col.insert_one({
"user_id": user_id, "session_id": session_id, "filename": file.filename,
"content_type": file.content_type, "size_kb": file_size_kb, "file_type": "document",
"extracted_text": extracted[:3000], "interpretation": analysis, "question": question,
"created_at": datetime.now(timezone.utc),
})
return {"status": "success", "session_id": session_id, "filename": file.filename,
"size_kb": file_size_kb, "char_count": len(extracted), "analysis": analysis}
except HTTPException:
raise
except asyncio.TimeoutError:
return {"status": "error", "message": "File processing timed out."}
except Exception as e:
logging.error(f"[File Analysis] {traceback.format_exc()}")
return {"status": "error", "message": "Something went wrong. Please try again."}
# ══════════════════════════════════════════════════════════════
# FEEDBACK / ADMIN / TOOLS / MISC
# ══════════════════════════════════════════════════════════════
@app.post("/image-feedback/")
async def image_feedback(image_id: str = Form(...), feedback: str = Form(...)):
result = images_col.update_one({"file_id": image_id}, {"$set": {"user_feedback": feedback}})
if result.modified_count == 0:
raise HTTPException(status_code=404, detail="Image not found")
return {"status": "success"}
@app.post("/submit-labeled-image/")
async def submit_labeled_image(user_id: str = Form(...), label: str = Form(...), image_file: UploadFile = File(...)):
img_bytes = await image_file.read()
pending_images_col.insert_one({
"user_id": user_id, "user_label": label.strip().lower(),
"filename": image_file.filename, "content_type": image_file.content_type,
"image_data": img_bytes, "status": "pending", "timestamp": datetime.now(timezone.utc),
})
return {"status": "success", "message": "Thank you! Your feedback will help the AI learn."}
@app.post("/admin/approve-image/{image_id}")
async def approve_image(image_id: str):
doc = pending_images_col.find_one({"_id": ObjectId(image_id)})
if not doc:
raise HTTPException(status_code=404, detail="Pending image not found.")
label = re.sub(r'[^a-zA-Z0-9_-]', '', doc["user_label"].replace(" ", "_"))
target = pathlib.Path(DATASET_PATH) / label
target.mkdir(parents=True, exist_ok=True)
(target / f"{int(time.time())}_{doc['filename']}").write_bytes(doc["image_data"])
pending_images_col.delete_one({"_id": ObjectId(image_id)})
return {"status": "success", "message": f"Image approved for class '{label}'."}
@app.post("/admin/reject-image/{image_id}")
async def reject_image(image_id: str):
result = pending_images_col.delete_one({"_id": ObjectId(image_id)})
if result.deleted_count == 0:
raise HTTPException(status_code=404, detail="Image not found.")
return {"status": "success"}
@app.post("/admin/reset-ai/")
async def reset_ai_data(model_name: AIModel = Query(AIModel.NEURONES_SELF)):
if APP_MODE == "production":
raise HTTPException(status_code=403, detail="Reset disabled in production.")
paths = get_local_ai_paths(model_name.value)
for p in paths.values():
if os.path.exists(p):
os.remove(p)
return {"message": f"Model '{model_name.value}' data cleared."}
@app.post("/admin/train/")
async def manual_train(prompt: str = Form(...), reply: str = Form(...), model_name: AIModel = Form(AIModel.NEURONES_SELF)):
if "openai" in model_name.value:
raise HTTPException(status_code=400, detail="Cannot train external models.")
await train_local_ai(prompt, reply, model_name.value)
return {"message": f"Model '{model_name.value}' trained."}
@app.get("/api/loadshedding/status")
async def get_loadshedding_status():
url = f"https://developer.sepush.co.za/business/2.0/status?token={ESKOM_API_KEY}"
try:
r = requests.get(url, timeout=15)
r.raise_for_status()
return r.json()
except Exception as e:
logging.error(f"[Eskom] {e}")
raise HTTPException(status_code=500, detail="Loadshedding data unavailable.")
@app.get("/api/loadshedding/areas_search")
async def search_loadshedding_areas(text: str = Query(...)):
url = f"https://developer.sepush.co.za/business/2.0/areas_search?text={text}&token={ESKOM_API_KEY}"
try:
r = requests.get(url, timeout=15)
r.raise_for_status()
return r.json()
except Exception as e:
logging.error(f"[Eskom Search] {e}")
raise HTTPException(status_code=500, detail="Search unavailable.")
@app.get("/log-ad-click")
def log_ad_click(user_id: str = Query(...), ad_id: str = Query(...)):
from ai_ads import log_ad_click as _log
_log(user_id, ad_id)
return {"message": "Logged."}
@app.get("/tools/date")
def get_date(timezone: str = Query("UTC")):
return get_current_date_internal(timezone)
@app.get("/tools/weather")
async def get_weather_endpoint(city: str = Query(...)):
return await get_weather_internal(city)
@app.get("/tools/news")
async def get_news_endpoint():
return await get_latest_news_internal()
@app.get("/tools/search")
async def search_tool_endpoint(q: str = Query(...)):
return {"results": await web_search_free(q)}
@app.post("/api/verify-crypto")
async def verify_crypto(receiver: str = Form(...), amount: float = Form(...)):
result = check_crypto_payment(receiver, amount)
if result.get("success"):
return result
raise HTTPException(status_code=404, detail=result.get("message", "Payment not found."))
# ══════════════════════════════════════════════════════════════
# FILE DOWNLOAD
# ══════════════════════════════════════════════════════════════
@app.get("/download/{token}")
async def download_file(token: str):
doc = downloads_col.find_one({"token": token})
if not doc:
raise HTTPException(status_code=404, detail="File not found or token invalid.")
expires_at = doc.get("expires_at")
if expires_at and datetime.now(timezone.utc) > expires_at:
downloads_col.delete_one({"token": token})
raise HTTPException(status_code=410, detail="Download link has expired.")
downloads_col.update_one({"token": token}, {"$set": {"downloaded": True}})
file_bytes = doc["content"]; filename = doc["filename"]; mime = doc.get("mime", "application/octet-stream")
return StreamingResponse(
io.BytesIO(file_bytes), media_type=mime,
headers={"Content-Disposition": f'attachment; filename="{filename}"',
"Content-Length": str(len(file_bytes)), "Cache-Control": "no-store"},
)
@app.get("/download/{token}/info")
async def download_file_info(token: str):
doc = downloads_col.find_one({"token": token}, {"content": 0})
if not doc:
return {"status": "expired"}
expires_at = doc.get("expires_at")
if expires_at and datetime.now(timezone.utc) > expires_at:
return {"status": "expired"}
remaining = max(0, int((expires_at - datetime.now(timezone.utc)).total_seconds()))
return {"status": "active", "filename": doc["filename"], "file_type": doc.get("file_type","text"),
"size_bytes": doc["size_bytes"], "size_kb": round(doc["size_bytes"] / 1024, 1),
"expires_at": expires_at.isoformat(), "remaining_seconds": remaining,
"downloaded": doc.get("downloaded", False)}
@app.post("/create-file/")
async def create_file_endpoint(req: CreateFileRequest):
if not req.content.strip():
raise HTTPException(status_code=400, detail="Content cannot be empty.")
result = await create_file_internal(req.user_id, req.filename, req.content, req.file_type, req.extra_files)
if result.get("status") == "error":
raise HTTPException(status_code=500, detail="File creation failed. Please try again.")
return result
@app.get("/past-papers/")
async def past_papers_endpoint(
grade: str = Query(...), subject: str = Query(...),
year: str = Query(""), province: str = Query("National"), paper_type: str = Query("both"),
):
return await fetch_past_paper_internal(grade, subject, year, province, paper_type)
# ══════════════════════════════════════════════════════════════
# MEMORY & SAFETY ENDPOINTS
# ══════════════════════════════════════════════════════════════
@app.post("/api/memory/consent")
async def set_memory_consent(req: MemoryConsentRequest):
long_term_memory_col.update_one(
{"user_id": req.user_id},
{"$set": {"memory_consent": req.consent, "consent_updated": datetime.now(timezone.utc)}},
upsert=True,
)
status = "enabled" if req.consent else "disabled"
return {"status": "success", "consent": req.consent, "message": f"Long-term memory {status}."}
@app.get("/api/memory/consent/{uid}")
async def get_memory_consent(uid: str):
mem = long_term_memory_col.find_one({"user_id": uid}, {"memory_consent": 1, "_id": 0}) or {}
consent = mem.get("memory_consent", True)
return {"user_id": uid, "consent": consent}
@app.get("/api/memory/facts/{uid}")
async def get_memory_facts(uid: str):
skip = {"_id","user_id","memory_consent","consent_updated","subscription_tier","subscription_updated"}
doc = long_term_memory_col.find_one({"user_id": uid}) or {}
facts = {k: (v.isoformat() if hasattr(v, "isoformat") else v) for k, v in doc.items() if k not in skip}
return {"user_id": uid, "facts": facts, "count": len(facts)}
@app.delete("/api/memory/facts/{uid}")
async def clear_memory_facts(uid: str):
preserve = {"memory_consent","consent_updated","subscription_tier","subscription_updated","timezone","safety_level"}
doc = long_term_memory_col.find_one({"user_id": uid}) or {}
keep = {k: v for k, v in doc.items() if k in preserve or k in ("user_id","_id")}
keep["user_id"] = uid; keep["last_updated"] = datetime.now(timezone.utc)
long_term_memory_col.replace_one({"user_id": uid}, keep, upsert=True)
return {"status": "success", "message": "Memory cleared."}
@app.post("/api/safety/settings")
async def update_safety_settings(req: SafetySettingsRequest):
if req.level not in SAFETY_LEVELS:
raise HTTPException(status_code=400, detail=f"Invalid level. Choose: {list(SAFETY_LEVELS.keys())}")
long_term_memory_col.update_one(
{"user_id": req.user_id},
{"$set": {"safety_level": req.level, "safety_updated": datetime.now(timezone.utc)}},
upsert=True,
)
descriptions = {"low": "Minimal filtering.", "medium": "Standard filtering.", "high": "Strict filtering.", "strict": "Maximum filtering."}
return {"status": "success", "level": req.level, "description": descriptions[req.level]}
@app.get("/api/safety/settings/{uid}")
async def get_safety_settings(uid: str):
return {"user_id": uid, "level": get_user_safety_level(uid),
"available_levels": {"low":"Minimal","medium":"Standard (default)","high":"Strict","strict":"Maximum"}}
# ══════════════════════════════════════════════════════════════
# MULTIMODAL
# ══════════════════════════════════════════════════════════════
@app.post("/api/multimodal/")
async def multimodal_chat(req: MultimodalRequest):
user_msg = req.message.strip()
if not user_msg:
raise HTTPException(status_code=400, detail="Message cannot be empty.")
if is_inappropriate(user_msg) or is_inappropriate_for_user(user_msg, req.user_id):
return {"response": "πŸ›‘οΈ Message blocked by safety filters.", "sources": []}
try:
model_cfg = model_registry.get(req.model_id)
except Exception:
model_cfg = model_registry.get("neurones-pro-1.0")
context_parts: list[str] = []
sources: list[dict] = []
if req.web_search:
try:
search_raw = await web_search_free(user_msg)
context_parts.append(f"[Live Web Data]\n{search_raw}")
url_matches = re.findall(r"πŸ”— (https?://\S+)", search_raw)
title_matches = re.findall(r"\d+\.\s(.+?)\s\[", search_raw)
for i, url in enumerate(url_matches[:4]):
sources.append({"title": title_matches[i] if i < len(title_matches) else url, "url": url})
except Exception as e:
logging.warning(f"[Multimodal] Web search failed: {e}")
mem = load_long_memory(req.user_id)
mem_facts = [f"{k}: {v}" for k, v in mem.items()
if k not in ("_id","user_id","last_updated","memory_consent","safety_level")]
if mem_facts:
context_parts.append("[User Profile]\n" + "\n".join(mem_facts[:10]))
system_content = "You are NeuraPrompt AI. Use the provided context to give an accurate, personalised response."
if context_parts:
system_content += "\n\n" + "\n\n".join(context_parts)
messages = [{"role": "system", "content": system_content}, {"role": "user", "content": user_msg[:800]}]
reply = await get_groq_reply(messages, model_cfg["groq_model"],
temperature=model_cfg["temperature"], max_tokens=model_cfg["max_tokens"])
if not reply:
return {"response": "πŸ˜… Request failed. Please try again.", "sources": sources}
reply = sanitize_ai_response(reply)
save_user_memory(req.user_id, user_msg, reply)
asyncio.create_task(extract_and_save_facts(req.user_id, messages))
return {"response": reply, "sources": sources, "web_fetched": req.web_search, "model_used": model_cfg["display_name"]}
# ══════════════════════════════════════════════════════════════
# SANDBOX CODE RUNNER
# ══════════════════════════════════════════════════════════════
import subprocess, tempfile
@app.post("/api/sandbox/run")
async def sandbox_run(req: CodeRunRequest):
code = req.code.strip()
language = req.language.lower()
if language not in ("python", "javascript", "js"):
return {"output": None, "error": f"Language '{req.language}' not supported.", "explanation": None, "fixed_code": None}
suffix = ".py" if language == "python" else ".js"
cmd_base = ["python3"] if language == "python" else ["node"]
output_str = error_str = tmp_path = None
try:
with tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) as tmp:
tmp.write(code); tmp_path = tmp.name
proc = await asyncio.wait_for(
asyncio.create_subprocess_exec(*cmd_base, tmp_path,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE), timeout=12.0)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10.0)
output_str = stdout.decode("utf-8", errors="replace").strip()
error_str = stderr.decode("utf-8", errors="replace").strip() or None
except asyncio.TimeoutError:
error_str = "⏱️ Execution timed out (10 s limit)."
except FileNotFoundError:
error_str = f"Runtime not found: {cmd_base[0]}."
except Exception as e:
logging.error(f"[Sandbox] {e}")
error_str = "Execution failed. Please try again."
finally:
if tmp_path:
try:
os.unlink(tmp_path)
except Exception:
pass
if not error_str:
return {"output": output_str or "(no output)", "error": None, "explanation": None, "fixed_code": None, "language": language}
explanation = fixed_code = None
if GROQ_API_KEY:
try:
debug_prompt = (
f"A user ran this {language} code and got an error.\n\n"
f"```{language}\n{code[:2000]}\n```\n\nError:\n```\n{error_str[:500]}\n```\n\n"
"Return JSON with: explanation (markdown) and fixed_code (plain string). Strict JSON only."
)
async with httpx.AsyncClient(timeout=20.0) as client:
r = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"},
json={"model": "llama-3.1-8b-instant", "messages": [{"role":"user","content":debug_prompt}],
"temperature": 0.2, "max_tokens": 800, "response_format": {"type": "json_object"}},
)
r.raise_for_status()
parsed = json.loads(r.json()["choices"][0]["message"]["content"])
explanation = parsed.get("explanation", "")
fixed_code = parsed.get("fixed_code", "")
except Exception as e:
logging.warning(f"[Sandbox Debug] Groq failed: {e}")
explanation = f"**Error:** `{error_str}`"
return {"output": output_str or None, "error": error_str, "explanation": explanation, "fixed_code": fixed_code, "language": language}
# ══════════════════════════════════════════════════════════════
# CONVERSATION BRANCHING
# ══════════════════════════════════════════════════════════════
@app.post("/branch/create/")
async def create_branch(req: BranchRequest):
messages = list(chat_history_col.find({"user_id": req.user_id}).sort("timestamp", 1).limit(req.from_message_index * 2))
if not messages:
raise HTTPException(status_code=404, detail="No chat history found.")
branch_id = hashlib.md5(f"{req.user_id}{req.branch_name}{time.time()}".encode()).hexdigest()[:12]
branches_col.insert_one({
"branch_id": branch_id, "user_id": req.user_id, "branch_name": req.branch_name,
"messages": [{"role": m["role"], "content": m["content"]} for m in messages],
"created_at": datetime.now(timezone.utc),
})
return {"branch_id": branch_id, "branch_name": req.branch_name, "message_count": len(messages)}
@app.get("/branch/list/")
async def list_branches(user_id: str = Query(...)):
return {"branches": list(branches_col.find({"user_id": user_id}, {"_id": 0, "messages": 0}))}
@app.get("/branch/load/")
async def load_branch(user_id: str = Query(...), branch_id: str = Query(...)):
branch = branches_col.find_one({"user_id": user_id, "branch_id": branch_id})
if not branch:
raise HTTPException(status_code=404, detail="Branch not found.")
return {"branch_name": branch["branch_name"], "messages": branch["messages"]}
# ══════════════════════════════════════════════════════════════
# LEARNING PATHS
# ══════════════════════════════════════════════════════════════
@app.post("/api/learning/generate")
async def learning_generate(req: LearningPathRequest):
if not GROQ_API_KEY:
raise HTTPException(status_code=503, detail="AI service unavailable.")
mem = load_long_memory(req.user_id)
user_context = ""
if mem.get("occupation"): user_context += f"Occupation: {mem['occupation']}. "
if mem.get("learning_style"): user_context += f"Learning style: {mem['learning_style']}. "
pace_map = {"slow": "8–10 lessons", "moderate": "5–7 lessons", "fast": "3–4 lessons"}
lesson_count = pace_map.get(req.pace, "5–7 lessons")
prompt = (
f"Create a personalised learning path.\nTopic: {req.topic}\nSkill level: {req.skill_level}\n"
f"Goal: {req.goal or 'general mastery'}\nPace: {req.pace} ({lesson_count})\n{user_context}\n\n"
"Return JSON: title, description, estimated_hours, lessons (array: index, title, summary, type, duration_minutes, resources [{title,url}])."
)
try:
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json"},
json={"model": "llama-3.1-8b-instant", "messages": [{"role":"user","content":prompt}],
"temperature": 0.6, "max_tokens": 1500, "response_format": {"type": "json_object"}},
)
r.raise_for_status()
path_data = json.loads(r.json()["choices"][0]["message"]["content"])
except Exception as e:
logging.error(f"[Learning] Generation failed: {e}")
raise HTTPException(status_code=500, detail="Failed to generate learning path.")
now = datetime.now(timezone.utc)
path_id = hashlib.md5(f"{req.user_id}{req.topic}{now.isoformat()}".encode()).hexdigest()[:16]
lessons = path_data.get("lessons", [])
doc = {"path_id": path_id, "user_id": req.user_id, "topic": req.topic, "skill_level": req.skill_level,
"goal": req.goal, "pace": req.pace, "title": path_data.get("title", req.topic),
"description": path_data.get("description",""), "estimated_hours": path_data.get("estimated_hours",0),
"lessons": lessons, "progress": [False]*len(lessons), "scores": [None]*len(lessons),
"created_at": now, "last_activity": now, "completed": False}
learning_paths_col.insert_one(doc)
doc.pop("_id", None)
return {"status": "created", "path": doc}
@app.get("/api/learning/paths/{uid}")
async def list_learning_paths(uid: str):
paths = list(learning_paths_col.find({"user_id": uid}, {"_id": 0, "lessons": 0}).sort("last_activity", -1))
for p in paths:
total = len(p.get("progress",[])) or 1
done = sum(1 for x in p.get("progress",[]) if x)
p["progress_pct"] = round(done/total*100)
for k in ("created_at","last_activity"):
if p.get(k) and hasattr(p[k],"isoformat"):
p[k] = p[k].isoformat()
return {"user_id": uid, "paths": paths, "count": len(paths)}
@app.get("/api/learning/paths/{uid}/{path_id}")
async def get_learning_path(uid: str, path_id: str):
doc = learning_paths_col.find_one({"user_id": uid, "path_id": path_id}, {"_id": 0})
if not doc:
raise HTTPException(status_code=404, detail="Learning path not found.")
for k in ("created_at","last_activity"):
if doc.get(k) and hasattr(doc[k],"isoformat"):
doc[k] = doc[k].isoformat()
total = len(doc.get("lessons",[])) or 1
done = sum(1 for x in doc.get("progress",[]) if x)
doc["progress_pct"] = round(done/total*100)
return doc
@app.post("/api/learning/progress")
async def update_learning_progress(req: LearningProgressUpdate):
doc = learning_paths_col.find_one({"user_id": req.user_id, "path_id": req.path_id})
if not doc:
raise HTTPException(status_code=404, detail="Learning path not found.")
progress = doc.get("progress",[]); scores = doc.get("scores",[])
if req.lesson_idx < 0 or req.lesson_idx >= len(progress):
raise HTTPException(status_code=400, detail=f"lesson_idx out of range.")
progress[req.lesson_idx] = req.completed
if req.score is not None:
scores[req.lesson_idx] = max(0, min(100, req.score))
all_done = all(progress); now = datetime.now(timezone.utc)
learning_paths_col.update_one(
{"user_id": req.user_id, "path_id": req.path_id},
{"$set": {"progress": progress, "scores": scores, "completed": all_done, "last_activity": now}},
)
total = len(progress) or 1
return {"status": "success", "path_id": req.path_id, "completed": req.completed,
"path_complete": all_done, "progress_pct": round(sum(1 for x in progress if x)/total*100),
"message": "πŸŽ‰ Path complete!" if all_done else "Lesson marked."}
@app.delete("/api/learning/paths/{uid}/{path_id}")
async def delete_learning_path(uid: str, path_id: str):
result = learning_paths_col.delete_one({"user_id": uid, "path_id": path_id})
if result.deleted_count == 0:
raise HTTPException(status_code=404, detail="Path not found.")
return {"status": "success", "message": "Learning path deleted."}
# Static files β€” must be last
app.mount("/", StaticFiles(directory="/data/static", html=True), name="static")