Nexa.ai / app.py
umar8902's picture
Update app.py
888bc06 verified
Raw
History Blame Contribute Delete
71.5 kB
import os
import re
import time
import random
import logging
import io
import json
import requests
import urllib.parse
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from datetime import datetime
from dotenv import load_dotenv
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import pdfplumber
import docx
import pytesseract
from moviepy import VideoFileClip, TextClip, CompositeVideoClip
import tempfile
import uuid
from gradio_client import Client
import shutil
import sqlite3
import jwt
import groq
from functools import wraps
from bs4 import BeautifulSoup
from passlib.context import CryptContext
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
load_dotenv()
# Initialize password context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Initialize Multi-Agent System
try:
from agents.multi_agent_system import MultiAgentSystem
multi_agent_system = MultiAgentSystem()
logger.info("Multi-Agent System initialized")
except Exception as e:
logger.error(f"Failed to initialize Multi-Agent System: {e}")
multi_agent_system = None
app = Flask(__name__)
CORS(app, supports_credentials=True)
app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", "nexa-ai-secret-key-2026")
# --- Database Setup ---
DB_PATH = "nexa_ai.db"
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_db_connection() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
avatar TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
model TEXT,
pinned INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
feedback INTEGER, -- 1 for up, -1 for down
feedback_text TEXT,
FOREIGN KEY (conversation_id) REFERENCES conversations (id)
)
''')
# FIXED: Ensure a default admin account exists and has the correct password
try:
admin_user = conn.execute('SELECT * FROM users WHERE username = ?', ('admin',)).fetchone()
admin_pass_hash = pwd_context.hash("admin123")
if not admin_user:
conn.execute(
'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)',
('admin', 'admin@nexa.ai', admin_pass_hash)
)
logger.info(f"Default admin account created: admin / admin123 (Hash: {admin_pass_hash[:10]}...)")
else:
# Force update password hash to ensure it matches admin123
conn.execute(
'UPDATE users SET password_hash = ?, email = ? WHERE username = ?',
(admin_pass_hash, 'admin@nexa.ai', 'admin')
)
logger.info(f"Default admin account password reset to: admin123 (Hash: {admin_pass_hash[:10]}...)")
# Self-verification check
test_verify = pwd_context.verify("admin123", admin_pass_hash)
logger.info(f"Admin self-verification test: {'PASSED' if test_verify else 'FAILED'}")
except Exception as e:
logger.error(f"Error creating/resetting default admin: {e}")
conn.commit()
init_db()
# --- Auth Helpers ---
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.cookies.get('nexa_token')
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
current_user_id = data['user_id']
except:
return jsonify({'message': 'Token is invalid!'}), 401
return f(current_user_id, *args, **kwargs)
return decorated
# Output directory for generated media
OUTPUT_DIR = os.path.join(os.getcwd(), "static", "outputs")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# --- Rate Limiting (Disabled for Free Forever) ---
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["10000 per day", "1000 per hour"], # Effectively unlimited
storage_uri="memory://",
)
# --- Configuration & Model Selection ---
OLLAMA_URL = "http://localhost:11434"
# NOTE: Swapped from TinyLlama-1.1B to Phi-3-mini (3.8B). TinyLlama is a very
# weak model and was the main reason replies looked low quality whenever Groq
# was unavailable. Phi-3-mini is a much stronger instruction-following model
# while still being small enough to run via llama_cpp on CPU.
HF_FALLBACK_REPO = "microsoft/Phi-3-mini-4k-instruct-gguf"
HF_FALLBACK_FILE = "Phi-3-mini-4k-instruct-q4.gguf"
IS_HF_SPACE = os.environ.get("SPACE_ID") is not None
# Load Primary Local Model (Phi-3-mini fallback engine)
llm_primary = None
try:
logger.info("Initializing Phi-3-mini Primary Engine...")
# Check for local file first (as seen in the screenshot)
local_model_path = os.path.join(os.getcwd(), HF_FALLBACK_FILE)
if os.path.exists(local_model_path):
logger.info(f"Using local model file: {local_model_path}")
model_path = local_model_path
else:
logger.info("Local model not found, downloading from Hugging Face...")
model_path = hf_hub_download(repo_id=HF_FALLBACK_REPO, filename=HF_FALLBACK_FILE)
llm_primary = Llama(
model_path=model_path,
n_ctx=4096,
n_threads=4,
verbose=False
)
logger.info("Phi-3-mini engine loaded successfully.")
except Exception as e:
logger.error(f"Failed to load Phi-3-mini model: {e}")
SYSTEM_PROMPT = """You are Nexa AI, a professional technical execution engine.
### **WRITING PRINCIPLES**:
1. Match response depth and length to the question. A simple question gets a short, direct answer in plain prose — no headers, no bullet template. A complex technical request can use structure (headers, numbered steps, tables, code blocks) where it genuinely improves clarity.
2. Write in clear, natural language. Avoid filler words ("just", "really", "very", "basically") but do not strip the response down to unnatural telegraphic phrasing either.
3. Never simulate dialogue, never include "AI:"/"User:" labels, never narrate what you're about to do — just answer.
4. Don't force emoji, bold labels, or section headers onto every message. Use them only when they add real value (e.g., a multi-step technical walkthrough, a comparison table).
5. If the user asks for code, give clean, correct, runnable code with only as much explanation as is useful — don't pad it with restating what the code obviously does.
6. Be honest about uncertainty instead of inventing confident-sounding details.
### **WHEN WEB RESULTS ARE PROVIDED**:
- Synthesize the sources into your own words; cite specific claims with [N] tied to the source list.
- End with a short **Reference** section listing each [N] as a markdown link.
- Never cite Wikipedia.
### **WHEN A TOOL FAILS**:
Briefly state what failed and what you'll try next (or what the user can do), in one or two sentences — no need for a formatted alert block.
Current Date: {date}
"""
IDE_PLAN_PROMPT = """You are the Nexa AI IDE Engine.
Your task is to convert a high-level user request into a concrete, multi-step technical execution plan.
### **OUTPUT FORMAT**:
You MUST return a JSON object with the following structure:
{
"plan": [
{
"id": 1,
"action": "create_file" | "edit_file" | "run_command",
"path": "relative/path/to/file",
"description": "Short explanation of what this step does",
"content": "The code content (only for create_file/edit_file)",
"command": "The shell command (only for run_command)"
}
],
"stats": {
"costSavings": "e.g., $15"
}
}
### **GUIDELINES**:
1. **Be Precise**: Use exact file paths.
2. **Be Modular**: Each step should do exactly one thing.
3. **Be Safe**: Do not suggest commands that delete system files.
4. **Project Context**: The user is working on a Flask/Python/React project.
Current Files: {files}
User Command: {command}
"""
# ============ BULLETPROOF SEARCH SYSTEM ============
class SearchManager:
"""Multi-engine search with automatic fallback. Never fails completely."""
def __init__(self):
self.last_search_time = 0
self.min_delay = 2
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0"
]
def _get_headers(self):
return {"User-Agent": random.choice(self.user_agents)}
def _rate_limit(self):
elapsed = time.time() - self.last_search_time
if elapsed < self.min_delay:
time.sleep(self.min_delay - elapsed)
self.last_search_time = time.time()
def search_duckduckgo(self, query, max_results=5):
try:
self._rate_limit()
from duckduckgo_search import DDGS
with DDGS(headers=self._get_headers(), timeout=15) as ddgs:
results = list(ddgs.text(query, max_results=max_results))
if results:
logger.info(f"DuckDuckGo: {len(results)} results")
return [{"title": r["title"], "href": r["href"], "body": r["body"]} for r in results]
except Exception as e:
logger.warning(f"DuckDuckGo failed: {e}")
return []
def search_bing(self, query, max_results=5):
try:
self._rate_limit()
encoded_query = urllib.parse.quote(query)
url = f"https://www.bing.com/search?q={encoded_query}"
response = requests.get(url, headers=self._get_headers(), timeout=10)
response.raise_for_status()
html = response.text
results = []
result_blocks = re.findall(r'<li class="b_algo"[^>]*>(.*?)</li>', html, re.DOTALL)
for block in result_blocks[:max_results]:
title_match = re.search(r'<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', block, re.DOTALL)
if title_match:
href = title_match.group(1)
title = re.sub(r'<[^>]+>', '', title_match.group(2)).strip()
snippet_match = re.search(r'<p[^>]*>(.*?)</p>', block, re.DOTALL)
body = re.sub(r'<[^>]+>', '', snippet_match.group(1)).strip() if snippet_match else ""
if href.startswith('/'):
href = 'https://www.bing.com' + href
results.append({"title": title, "href": href, "body": body})
if results:
logger.info(f"Bing: {len(results)} results")
return results
except Exception as e:
logger.warning(f"Bing failed: {e}")
return []
def search_wikipedia(self, query, max_results=3):
try:
self._rate_limit()
url = "https://en.wikipedia.org/w/api.php"
params = {
"action": "query",
"list": "search",
"srsearch": query,
"format": "json",
"srlimit": max_results
}
response = requests.get(url, params=params, headers=self._get_headers(), timeout=10)
data = response.json()
results = []
for item in data.get("query", {}).get("search", []):
title = item["title"]
page_url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}"
snippet = re.sub(r'<span class="searchmatch">|</span>', '', item["snippet"])
results.append({"title": title, "href": page_url, "body": snippet})
if results:
logger.info(f"Wikipedia: {len(results)} results")
return results
except Exception as e:
logger.warning(f"Wikipedia failed: {e}")
return []
def visit_website(self, url, timeout=15):
"""Fetches and extracts text from a URL for deeper research."""
try:
self._rate_limit()
response = requests.get(url, headers=self._get_headers(), timeout=timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove noise
for script in soup(["script", "style", "nav", "footer", "header", "aside"]):
script.decompose()
text = soup.get_text(separator=' ', strip=True)
# Basic cleaning
text = re.sub(r'\s+', ' ', text)
return text[:10000] # Return first 10k chars
except Exception as e:
logger.error(f"Failed to visit {url}: {e}")
return None
def search(self, query, max_results=5):
if not query or not query.strip():
return {"engine": "None", "results": [], "success": False, "error": "Empty query"}
# Strip common prefixes for better engine results
clean_query = re.sub(r'^(search for|find|look up|what is|who is|where is)\s+', '', query, flags=re.IGNORECASE).strip()
logger.info(f"Searching for: {clean_query}")
# REMOVED Wikipedia as per Autonomous Research Agent protocol
engines = [self.search_duckduckgo, self.search_bing]
for engine_func in engines:
engine_name = engine_func.__name__.replace('search_', '').capitalize()
try:
results = engine_func(clean_query, max_results)
if results:
# Filter out any accidentally returned Wikipedia results
filtered_results = [r for r in results if 'wikipedia.org' not in r.get('href', '').lower()]
if filtered_results:
return {"engine": engine_name, "results": filtered_results, "success": True, "error": None}
except Exception as e:
logger.warning(f"Engine {engine_name} crashed: {e}")
logger.error("All search engines failed or were blocked")
return {"engine": "None", "results": [], "success": False, "error": "Search unavailable or results blocked. Please try again later."}
search_manager = SearchManager()
# ============ TOOL DETECTION ============
def _kw_in(text, keyword):
"""Match a keyword as a whole word/phrase, not a raw substring.
Plain `in` matching caused false positives like 'photo' firing
inside 'photosynthesis', or 'now' firing inside 'know' — both of
which made the bot misfire image-gen/search on ordinary questions."""
if " " in keyword:
return keyword in text
return re.search(rf"\b{re.escape(keyword)}\b", text) is not None
def detect_tools(message):
message = message.lower().strip()
tools = []
# 1. Explicit Search Commands (User clearly wants a search)
explicit_search_commands = ["/search", "/news", "/youtube", "search for", "look up", "browse the web for"]
if any(_kw_in(message, kw) for kw in explicit_search_commands):
tools.append("search")
return tools # Return immediately if explicit
# 2. Real-time / News Indicators (Likely needs up-to-date info).
# These matter regardless of whether the message is phrased as a
# question — "latest iPhone price" should search just as much as
# "what's the latest iPhone price?" did (the old code required an
# exact "what " prefix, so "what's..." silently never matched).
real_time_keywords = [
"latest", "news", "today", "current", "weather", "stock", "price",
"now", "recent", "yesterday", "tonight", "2024", "2025", "2026",
"score", "game", "match", "live"
]
if any(_kw_in(message, kw) for kw in real_time_keywords):
tools.append("search")
# 4. Other tools (Image/File/Video/Diagram)
diagram_keywords = [
"diagram", "mind map", "mindmap", "flowchart", "flow chart",
"architecture diagram", "sequence diagram", "er diagram",
"entity relationship", "gantt chart", "org chart", "tree diagram",
"visualize the structure", "show me the steps visually"
]
chart_keywords = ["bar chart", "pie chart", "line chart", "plot the data", "graph the numbers", "graph of"]
is_diagram_request = any(_kw_in(message, kw) for kw in diagram_keywords) or any(_kw_in(message, kw) for kw in chart_keywords)
if is_diagram_request:
tools.append("diagram")
image_keywords = ["image", "photo", "paint", "picture", "generate image", "thumbnail"]
if not is_diagram_request:
image_keywords += ["draw", "visualize"]
if any(_kw_in(message, kw) for kw in image_keywords) or ("create" in message and "image" in message) or ("make" in message and "image" in message):
tools.append("image")
video_keywords = ["video", "movie", "animation", "generate video", "create video", "make a video"]
if any(_kw_in(message, kw) for kw in video_keywords) or ("create" in message and "video" in message) or ("make" in message and "video" in message):
tools.append("video")
file_keywords = ["pdf", "document", "file", "analyze file", "read file", "upload"]
if any(_kw_in(message, kw) for kw in file_keywords):
tools.append("file")
return tools
# ============ FORMATTING ============
def format_search_results(data):
if not data.get("success"):
error = data.get("error", "Search failed")
return f"Warning: {error}"
results = data.get("results", [])
if not results:
return "No results found for this query."
formatted = []
formatted.append(f"Web Search Results from {data['engine']}:")
formatted.append("-" * 30)
for i, r in enumerate(results, 1):
title = r.get("title", "No title")
url = r.get("href", "#")
body = r.get("body", "No description")
body = re.sub(r'[\s]+', ' ', body).strip()[:600] # Increased context
formatted.append(f"Source [{i}]: {title}")
formatted.append(f"URL: {url}")
formatted.append(f"Content: {body}")
formatted.append("-" * 15)
formatted.append("\n### MANDATORY RESEARCH PROTOCOL ###")
formatted.append("1. Compose a well-structured answer based on the provided sources.")
formatted.append("2. Insert numbered citations [N] immediately after every fact, statistic, or quote.")
formatted.append("3. After the response, output a dedicated Reference section formatted exactly like this:")
formatted.append("\n---\n**Reference**\n\n[1] [Source Title](URL)\n[2] [Source Title](URL)\n---")
formatted.append("\nStrictly adhere to this format. Do not use Wikipedia.")
return "\n".join(formatted)
def clean_search_query(query):
prefixes = [
"search for", "find out about", "look up", "check",
"tell me about", "what is", "who is", "where is",
"/search", "search:", "find:"
]
query = query.lower().strip()
for prefix in prefixes:
if query.startswith(prefix):
query = query[len(prefix):].strip()
return query
# ============ TIME BOT ============
def get_time_response(message):
message = message.lower()
# ONLY exact phrases or specific combinations
time_phrases = ["what time is it", "current time", "what's the date", "whats the date"]
# Require at least 2 time-related words using regex boundaries
time_words = ["time", "date", "clock", "hour", "minute"]
word_matches = [word for word in time_words if re.search(rf"\b{word}\b", message)]
if any(phrase in message for phrase in time_phrases) or len(word_matches) >= 2:
now = datetime.now()
return f"The current time is **{now.strftime('%H:%M:%S')}**. Today is **{now.strftime('%A, %Y-%m-%d')}**."
return None
# ============ AUTH ROUTES ============
@app.route('/api/auth/signup', methods=['POST'])
def signup():
data = request.json
username = data.get('username', '').strip()
email = data.get('email', '').strip()
password = data.get('password', '').strip()
if not email or not password:
return jsonify({"error": "Missing required fields"}), 400
if not username:
username = email.split('@')[0]
password_hash = pwd_context.hash(password)
try:
with get_db_connection() as conn:
conn.execute(
'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)',
(username, email, password_hash)
)
conn.commit()
return jsonify({"message": "User created successfully"}), 201
except sqlite3.IntegrityError:
return jsonify({"error": "Username or email already exists"}), 409
except Exception as e:
logger.error(f"Signup error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route('/api/auth/login', methods=['POST'])
def login():
data = request.json
# FIXED: Allow login with either email or username
login_id = data.get('email', '').strip()
password = data.get('password', '').strip()
if not login_id or not password:
return jsonify({"error": "Missing credentials"}), 400
with get_db_connection() as conn:
# Check both email and username columns
user = conn.execute(
'SELECT * FROM users WHERE email = ? OR username = ?',
(login_id, login_id)
).fetchone()
# FIXED: Emergency Fallback to guarantee "admin / admin123" access
if not user and login_id == "admin" and password == "admin123":
try:
with get_db_connection() as conn:
admin_pass_hash = pwd_context.hash("admin123")
conn.execute(
'INSERT OR IGNORE INTO users (username, email, password_hash) VALUES (?, ?, ?)',
('admin', 'admin@nexa.ai', admin_pass_hash)
)
conn.commit()
user = conn.execute('SELECT * FROM users WHERE username = ?', ('admin',)).fetchone()
logger.info("Emergency fallback: Admin user recreated on login.")
except Exception as e:
logger.error(f"Emergency fallback failed: {e}")
if user:
logger.info(f"Login attempt for user: {user['username']} (ID: {user['id']})")
# Log the result of verification (be careful not to log the actual password)
is_valid = pwd_context.verify(password, user['password_hash'])
logger.info(f"Password verification result for {user['username']}: {is_valid}")
if is_valid:
token = jwt.encode({
'user_id': user['id'],
'username': user['username'],
'exp': time.time() + (24 * 3600) # 24 hours
}, app.config['SECRET_KEY'], algorithm="HS256")
resp = jsonify({
"message": "Login successful",
"user": {
"id": user['id'],
"username": user['username'],
"email": user['email'],
"avatar": user['avatar']
}
})
resp.set_cookie('nexa_token', token, httponly=True, samesite='Lax', max_age=24*3600)
return resp
else:
logger.warning(f"Invalid password for user: {user['username']}")
else:
logger.warning(f"User not found for login ID: {login_id}")
return jsonify({"error": "Invalid credentials"}), 401
@app.route('/api/auth/logout', methods=['POST'])
def logout():
resp = jsonify({"message": "Logged out successfully"})
resp.set_cookie('nexa_token', '', expires=0)
return resp
@app.route('/api/auth/me', methods=['GET'])
@token_required
def get_me(current_user_id):
with get_db_connection() as conn:
user = conn.execute('SELECT id, username, email, avatar FROM users WHERE id = ?', (current_user_id,)).fetchone()
if user:
return jsonify(dict(user))
return jsonify({"error": "User not found"}), 404
# ============ CONVERSATION ROUTES ============
@app.route('/api/video/generate', methods=['POST'])
@limiter.limit("100 per minute")
def generate_video():
"""Unified video generation with fallback (Upsampler -> HF Spaces -> Pollinations)."""
try:
data = request.json or {}
prompt = data.get("prompt", "").strip()
if not prompt:
return jsonify({"status": "error", "error": "Prompt is required"}), 400
# Clean prompt
clean_prompt = re.sub(r'^(generate|create|make)\s+(a\s+)?video\s+(about|of|for)?\s*', '', prompt, flags=re.IGNORECASE).strip()
logger.info(f"Generating video for: {clean_prompt}")
video_dir = os.path.join(os.getcwd(), "static", "outputs")
os.makedirs(video_dir, exist_ok=True)
# Step 1: Try Upsampler API if key exists
api_key = os.environ.get("UPSAMPLER_API_KEY")
if api_key:
try:
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
body = {"prompt": clean_prompt, "model": "wan-2.2-5b-fast", "width": 1024, "height": 576}
response = requests.post("https://upsampler.com/api/v1/video/generate", headers=headers, json=body, timeout=60)
if response.status_code == 200:
res_data = response.json()
video_url = res_data.get("video_url")
if video_url:
video_filename = f"vid_{uuid.uuid4().hex}.mp4"
local_path = os.path.join(video_dir, video_filename)
v_res = requests.get(video_url, stream=True)
if v_res.status_code == 200:
with open(local_path, 'wb') as f:
shutil.copyfileobj(v_res.raw, f)
return jsonify({"status": "success", "video_url": f"/static/outputs/{video_filename}", "is_motion_image": False, "provider": "Upsampler", "prompt": clean_prompt})
except Exception as e:
logger.error(f"Upsampler failed: {e}")
# Step 2: Try Hugging Face Spaces (Gradio)
video_spaces = ["THUDM/CogVideoX-5b", "ali-vilas/text-to-video-ms-1.7b", "fffiloni/zeroscope-v2-xl"]
for space in video_spaces:
try:
logger.info(f"🎬 Attempting Video Gen via {space}")
client = Client(space)
result = client.predict(clean_prompt, api_name="/predict")
if result:
if isinstance(result, list): result = result[0]
output_filename = f"ai_video_{uuid.uuid4().hex}.mp4"
output_path = os.path.join(video_dir, output_filename)
if result.startswith("http"):
resp = requests.get(result, stream=True, timeout=30)
with open(output_path, 'wb') as f:
shutil.copyfileobj(resp.raw, f)
elif os.path.exists(result):
shutil.copy(result, output_path)
return jsonify({"status": "success", "video_url": f"/static/outputs/{output_filename}", "is_motion_image": False, "provider": space, "prompt": clean_prompt})
except Exception as e:
logger.warning(f"Space {space} failed: {e}")
# Step 3: Final Fallback: Pollinations "Motion Image"
try:
logger.info(f"🚀 Falling back to Pollinations Motion")
seed = int(time.time())
motion_url = f"https://image.pollinations.ai/prompt/{requests.utils.quote(clean_prompt)}?seed={seed}&nologo=true&width=1024&height=576"
return jsonify({"status": "success", "video_url": motion_url, "is_motion_image": True, "provider": "Pollinations AI (Motion)", "prompt": clean_prompt})
except Exception as e:
logger.error(f"❌ All video methods failed: {e}")
return jsonify({"status": "error", "error": "Our video generation servers are temporarily unavailable."}), 503
except Exception as e:
logger.error(f"Video generation crash: {e}")
return jsonify({"status": "error", "error": str(e)}), 500
@app.route('/api/video/overlay-url', methods=['POST'])
@limiter.limit("100 per minute")
def overlay_video_url():
try:
data = request.json or {}
video_url = data.get("video_url", "").strip()
text = data.get("text", "").strip()
pos = data.get("pos", "Bottom Center")
font_size = int(data.get("font_size", 60))
color = data.get("color", "#FFFFFF")
bg_opacity = float(data.get("bg_opacity", 0.5))
if not video_url or not text:
return jsonify({"status": "error", "error": "Video URL and text are required"}), 400
video_dir = os.environ.get("VIDEO_TEMP_DIR", "./temp/videos")
os.makedirs(video_dir, exist_ok=True)
# 1. Download original video
input_filename = f"input_{uuid.uuid4()}.mp4"
input_path = os.path.join(video_dir, input_filename)
# Handle relative internal URLs
if video_url.startswith('/'):
# Convert to local path if it's our own temp video
video_url = video_url.lstrip('/')
local_input = os.path.join(os.getcwd(), video_url)
if os.path.exists(local_input):
shutil.copy(local_input, input_path)
else:
return jsonify({"status": "error", "error": "Internal video source not found"}), 404
else:
v_res = requests.get(video_url, stream=True)
if v_res.status_code == 200:
with open(input_path, 'wb') as f:
for chunk in v_res.iter_content(8192): f.write(chunk)
else:
return jsonify({"status": "error", "error": "Failed to fetch source video"}), 400
# 2. Process with MoviePy
output_filename = process_video_overlay(
input_path=input_path,
overlay_text=text,
position=pos,
font_size=font_size,
text_color=color,
bg_opacity=bg_opacity
)
if output_filename:
os.remove(input_path)
return jsonify({
"status": "success",
"video_url": f"/static/outputs/{output_filename}"
})
else:
return jsonify({"status": "error", "error": "MoviePy processing failed"}), 500
except Exception as e:
logger.error(f"Overlay error: {e}")
return jsonify({"status": "error", "error": str(e)}), 500
@app.route('/api/conversations', methods=['GET'])
@token_required
def get_conversations(current_user_id):
with get_db_connection() as conn:
convs = conn.execute(
'SELECT * FROM conversations WHERE user_id = ? ORDER BY pinned DESC, updated_at DESC',
(current_user_id,)
).fetchall()
return jsonify([dict(c) for c in convs])
@app.route('/api/conversations', methods=['POST'])
@token_required
def create_conversation(current_user_id):
data = request.json
title = data.get('title', 'New Chat')
model = data.get('model', 'phi3:mini')
with get_db_connection() as conn:
cursor = conn.execute(
'INSERT INTO conversations (user_id, title, model) VALUES (?, ?, ?)',
(current_user_id, title, model)
)
conn.commit()
conv_id = cursor.lastrowid
conv = conn.execute('SELECT * FROM conversations WHERE id = ?', (conv_id,)).fetchone()
return jsonify(dict(conv)), 201
@app.route('/api/messages', methods=['POST'])
@token_required
def save_message(current_user_id):
data = request.json
conv_id = data.get('conversation_id')
role = data.get('role', 'user')
content = data.get('content')
if not conv_id or not content:
return jsonify({"error": "Missing fields"}), 400
with get_db_connection() as conn:
# Verify ownership
conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone()
if not conv:
return jsonify({"error": "Conversation not found"}), 404
conn.execute(
'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)',
(conv_id, role, content)
)
conn.commit()
return jsonify({"status": "success"}), 201
@app.route('/api/conversations/<int:conv_id>', methods=['GET'])
@token_required
def get_conversation_messages(current_user_id, conv_id):
with get_db_connection() as conn:
# Verify ownership
conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone()
if not conv:
return jsonify({"error": "Conversation not found"}), 404
messages = conn.execute(
'SELECT * FROM messages WHERE conversation_id = ? ORDER BY timestamp ASC',
(conv_id,)
).fetchall()
return jsonify({
"conversation": dict(conv),
"messages": [dict(m) for m in messages]
})
@app.route('/api/conversations/<int:conv_id>', methods=['PUT'])
@token_required
def update_conversation(current_user_id, conv_id):
data = request.json
title = data.get('title')
pinned = data.get('pinned')
with get_db_connection() as conn:
# Verify ownership
conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone()
if not conv:
return jsonify({"error": "Conversation not found"}), 404
if title is not None:
conn.execute('UPDATE conversations SET title = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?', (title, conv_id))
if pinned is not None:
conn.execute('UPDATE conversations SET pinned = ? WHERE id = ?', (pinned, conv_id))
conn.commit()
updated = conn.execute('SELECT * FROM conversations WHERE id = ?', (conv_id,)).fetchone()
return jsonify(dict(updated))
@app.route('/api/conversations/<int:conv_id>', methods=['DELETE'])
@token_required
def delete_conversation(current_user_id, conv_id):
with get_db_connection() as conn:
# Verify ownership
conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone()
if not conv:
return jsonify({"error": "Conversation not found"}), 404
conn.execute('DELETE FROM messages WHERE conversation_id = ?', (conv_id,))
conn.execute('DELETE FROM conversations WHERE id = ?', (conv_id,))
conn.commit()
return jsonify({"message": "Conversation deleted"})
# ============ API ROUTES ============
@app.route('/')
@app.route('/chat')
@app.route('/login')
@app.route('/signup')
@app.route('/pricing')
def home(path=None):
try:
with open('index.html', 'r', encoding='utf-8') as f:
return Response(f.read(), mimetype='text/html')
except Exception as e:
logger.error(f"Home route failed: {e}")
return "Nexa AI Backend is running."
@app.route('/api/search', methods=['POST'])
@limiter.limit("200 per minute")
def search_api():
try:
data = request.json or {}
query = data.get("query", "").strip()
max_results = data.get("max_results", 5)
if not query:
return jsonify({"error": "No query provided", "success": False}), 400
search_data = search_manager.search(query, max_results)
return jsonify({
"success": search_data["success"],
"query": query,
"engine": search_data["engine"],
"results": search_data["results"],
"formatted": format_search_results(search_data),
"result_count": len(search_data["results"])
})
except Exception as e:
logger.error(f"Search API error: {e}")
return jsonify({"error": "Search service temporarily unavailable", "success": False}), 500
@app.route('/api/chat', methods=['POST'])
@limiter.limit("500 per minute")
def chat():
try:
# Optional token check to allow guest chat but require it for history
token = request.cookies.get('nexa_token')
current_user_id = None
if token:
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
current_user_id = data['user_id']
except:
pass
data = request.json or {}
message = data.get("message", "").strip()
history = data.get("history", [])
memory = data.get("memory", {})
attachments = data.get("attachments", "").strip()
mode = data.get("mode", "instant")
model_choice = data.get("model", "llama-3")
is_deep_research = data.get("deep_research", False)
conv_id = data.get("conversation_id")
if not message:
return jsonify({"error": "No message"}), 400
# Build the base system prompt up front so every branch below (including
# planner mode) can safely reference it. Previously this was built much
# later in the function, which made the planner-mode branch crash with
# an UnboundLocalError every single time it ran.
current_date = datetime.now().strftime("%Y-%m-%d")
dynamic_system_prompt = SYSTEM_PROMPT.format(date=current_date)
# Task: Handle Planner Mode (Non-streaming JSON)
if 'Format the response as a valid JSON array' in message:
logger.info("Planner Mode detected: Switching to non-streaming response")
full_prompt = f"{dynamic_system_prompt}\n\nUSER QUESTION: {message}"
planner_response = ""
groq_key = os.environ.get("GROQ_API_KEY")
if groq_key:
try:
client = groq.Groq(api_key=groq_key)
resp = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": full_prompt}],
stream=False
)
planner_response = resp.choices[0].message.content
except Exception as e:
logger.warning(f"Groq planner mode failed: {e}")
if not planner_response and llm_primary:
output = llm_primary(
f"<|system|>\n{dynamic_system_prompt}<|end|>\n<|user|>\n{message}<|end|>\n<|assistant|>\n",
max_tokens=1024,
stop=["<|end|>", "<|user|>"]
)
planner_response = output["choices"][0]["text"]
return jsonify({"response": planner_response, "status": "success"})
if message == "status_check":
return jsonify({"status": "Online", "version": "1.0.0"}), 200
# Save user message to DB if conversation exists
if current_user_id and conv_id:
try:
with get_db_connection() as conn:
conn.execute(
'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)',
(conv_id, 'user', message)
)
conn.commit()
except Exception as e:
logger.error(f"Failed to save user message: {e}")
# Model configuration
openai_key = os.environ.get("OPENAI_API_KEY")
anthropic_key = os.environ.get("ANTHROPIC_API_KEY")
google_key = os.environ.get("GOOGLE_API_KEY")
def get_model_response(system_prompt, user_prompt, history, model):
# Fallback to local/ollama if keys missing
if model == "gpt-4o" and openai_key:
from openai import OpenAI
client = OpenAI(api_key=openai_key)
messages = [{"role": "system", "content": system_prompt}]
for h in history:
messages.append({"role": h['role'], "content": h['content']})
messages.append({"role": "user", "content": user_prompt})
return client.chat.completions.create(model="gpt-4o", messages=messages, stream=True)
elif model == "claude-3-5" and anthropic_key:
import anthropic
client = anthropic.Anthropic(api_key=anthropic_key)
messages = []
for h in history:
messages.append({"role": h['role'], "content": h['content']})
messages.append({"role": "user", "content": user_prompt})
return client.messages.create(model="claude-3-5-sonnet-20240620", system=system_prompt, messages=messages, stream=True)
elif model == "gemini-1-5" and google_key:
import google.generativeai as genai
genai.configure(api_key=google_key)
model_gen = genai.GenerativeModel('gemini-1.5-pro')
chat_gen = model_gen.start_chat(history=[]) # Simplified
return chat_gen.send_message(f"{system_prompt}\n\n{user_prompt}", stream=True)
return None # Fallback to existing Ollama/HF logic
time_reply = get_time_response(message)
if time_reply:
if current_user_id and conv_id:
with get_db_connection() as conn:
conn.execute(
'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)',
(conv_id, 'assistant', time_reply)
)
conn.commit()
return jsonify({"response": time_reply, "provider": "TimeBot", "status": "Online"})
tools_needed = detect_tools(message)
tool_context = ""
search_data_for_frontend = None
if "search" in tools_needed:
search_query = clean_search_query(message)
search_data = search_manager.search(search_query, max_results=5)
if search_data["success"]:
# --- DEEP RESEARCH: Visit Sources ---
deep_context = ""
if is_deep_research or mode == "research":
logger.info(f"Deep Research: Visiting top 3 sources for {search_query}")
top_sources = search_data["results"][:3]
for i, source in enumerate(top_sources, 1):
url = source.get("href")
if url:
content = search_manager.visit_website(url)
if content:
deep_context += f"\n\n[EXTRACTED FROM SOURCE {i} ({url})]:\n{content[:3000]}\n"
tool_context += f"\n\n[Web Search Results from {search_data['engine']}]:\n{format_search_results(search_data)}"
if deep_context:
tool_context += f"\n\n### DEEP RESEARCH DATA ###{deep_context}"
search_data_for_frontend = {
"engine": search_data["engine"],
"results": search_data["results"]
}
else:
tool_context += f"\n\n[Search Status]: {search_data.get('error', 'Search temporarily unavailable')}"
if is_deep_research:
dynamic_system_prompt += "\n[DEEP RESEARCH ACTIVE]: Use a multi-step reasoning approach. Analyze thoroughly."
if "diagram" in tools_needed:
dynamic_system_prompt += (
"\n[DIAGRAM MODE]: The user wants a visual diagram, mind map, or chart. "
"Respond with a single valid Mermaid.js diagram inside a fenced code block "
"labeled mermaid, e.g. ```mermaid ... ```. Choose whichever Mermaid diagram "
"type best fits the content (mindmap, flowchart TD, graph TD, sequenceDiagram, "
"classDiagram, gantt, pie, etc.). Keep node labels short and the syntax strictly "
"valid so it renders without errors. Follow the diagram with 2-4 sentences of "
"plain-language explanation, not a restatement of every node."
)
# Mode-specific direct instructions
mode_instr = {
"instant": "Provide a direct, zero-fluff answer.",
"study": "Explain like a tutor, use simple analogies.",
"dev": "Provide clean, production-ready code with minimal explanation.",
"research": "Provide a deep, structured technical report.",
"creative": "Write with evocative, artistic language."
}
dynamic_system_prompt += f"\n[Task Instruction]: {mode_instr.get(mode, 'Answer directly.')}"
# Task 8: Memory injection security
if memory:
sanitized_memory = {}
for k, v in memory.items():
# Disallow sensitive keys
if k.lower() in ["role", "system", "instruction"]:
continue
# Sanitize values: strip [, ], <, > and limit length
clean_val = str(v).replace("[", "").replace("]", "").replace("<", "").replace(">", "")[:100]
sanitized_memory[k] = clean_val
if sanitized_memory:
dynamic_system_prompt += f"\n[User Memory]: {json.dumps(sanitized_memory)}"
full_prompt = ""
if attachments:
full_prompt += f"{attachments}\n\n"
if tool_context:
full_prompt += f"{tool_context}\n\n"
full_prompt += f"USER QUESTION: {message}"
def generate():
try:
# First chunk to provide search data and tools used
initial_payload = {"tools_used": tools_needed}
if search_data_for_frontend:
initial_payload["search_results"] = search_data_for_frontend
yield json.dumps(initial_payload) + "\n"
full_ai_response = ""
groq_key = os.environ.get("GROQ_API_KEY")
# --- STEP 1: TRY GROQ API ---
if groq_key:
try:
logger.info("Attempting response via Groq API...")
client = groq.Groq(api_key=groq_key)
messages = [{"role": "system", "content": dynamic_system_prompt}]
for h in history[-10:]: # Include more history for Groq
role = "assistant" if h.get('role') == 'ai' else "user"
messages.append({"role": role, "content": h.get('content', '')})
messages.append({"role": "user", "content": full_prompt})
completion = client.chat.completions.create(
model="llama-3.3-70b-versatile", # High-performance model
messages=messages,
stream=True,
max_tokens=2048,
temperature=0.7
)
for chunk in completion:
token = chunk.choices[0].delta.content or ""
if token:
full_ai_response += token
yield json.dumps({"token": token, "source": "groq", "provider": "Groq (Llama 3.3)", "tools_used": tools_needed}) + "\n"
# Save to DB
if current_user_id and conv_id and full_ai_response:
with get_db_connection() as conn:
conn.execute('INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)', (conv_id, 'assistant', full_ai_response))
conn.commit()
return # Success with Groq
except Exception as e:
logger.warning(f"Groq API failed: {e}. Falling back to local Phi-3-mini engine...")
# --- STEP 2: FALLBACK TO LOCAL PHI-3-MINI ENGINE ---
if llm_primary:
try:
# Clean and trim history to fit context window
trimmed_history = history[-8:] # Phi-3's 4k context can hold more turns
llama_prompt = f"<|system|>\n{dynamic_system_prompt}<|end|>\n"
for h in trimmed_history:
role = "assistant" if h.get('role') == 'ai' else "user"
llama_prompt += f"<|{role}|>\n{h.get('content', '')[:800]}<|end|>\n"
llama_prompt += f"<|user|>\n{full_prompt[:2000]}<|end|>\n<|assistant|>\n"
logger.info("Generating response using local Phi-3-mini engine...")
output = llm_primary(
llama_prompt,
max_tokens=1024,
stop=["<|end|>", "<|user|>", "<|system|>", "User:", "Assistant:"],
stream=True,
repeat_penalty=1.1,
temperature=0.7
)
for chunk in output:
if "choices" in chunk and len(chunk["choices"]) > 0:
token = chunk["choices"][0].get("text", "")
if token:
full_ai_response += token
yield json.dumps({"token": token, "source": "phi3-mini", "provider": "Phi-3-mini (Local Fallback)", "tools_used": tools_needed}) + "\n"
# Save to DB after stream finishes
if current_user_id and conv_id and full_ai_response:
with get_db_connection() as conn:
conn.execute(
'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)',
(conv_id, 'assistant', full_ai_response)
)
conn.commit()
return
except Exception as hf_err:
logger.error(f"Phi-3-mini Primary crashed: {hf_err}")
yield json.dumps({"error": "AI service encountered an error."}) + "\n"
else:
yield json.dumps({"error": "No AI engine available."}) + "\n"
except Exception as gen_err:
logger.error(f"Generator error: {gen_err}")
yield json.dumps({"error": f"Internal system error: {str(gen_err)}"}) + "\n"
return Response(generate(), mimetype='application/x-ndjson')
except Exception as e:
logger.error(f"Chat route crashed: {e}")
return jsonify({"error": "Fatal system error. Please refresh and try again."}), 500
@app.route('/api/image', methods=['POST'])
@limiter.limit("100 per minute")
def generate_image_api():
data = request.json or {}
prompt = data.get("prompt", "").strip()
if not prompt:
return jsonify({"error": "No prompt provided"}), 400
try:
# Step 1: Attempt Pollinations with better error handling and cache-busting
seed = random.randint(0, 2147483647)
width = data.get("width", 1024)
height = data.get("height", 1024)
# Use a more reliable endpoint format for Pollinations
image_url = f"https://image.pollinations.ai/prompt/{requests.utils.quote(prompt)}?width={width}&height={height}&seed={seed}&nologo=true"
# Verify URL is alive
try:
check = requests.get(image_url, timeout=10)
if check.status_code == 200:
return jsonify({"status": "success", "image_url": image_url, "prompt": prompt})
elif check.status_code == 402 or "Queue full" in check.text:
logger.warning("Pollinations rate limited. Falling back to static image generation...")
else:
logger.error(f"Pollinations error {check.status_code}: {check.text}")
except Exception as e:
logger.error(f"Pollinations check failed: {e}")
# Step 2: Fallback to a placeholder or simplified generation if possible
# For now, return a more user-friendly error instead of raw JSON
return jsonify({
"error": "Image generation service is temporarily busy. Please wait 10 seconds and try again.",
"status": "error"
}), 503
except Exception as e:
logger.error(f"Image generation failed: {e}")
return jsonify({"error": "Failed to generate image. Please try again later."}), 500
# ============ VIDEO OVERLAY SYSTEM ============
def process_video_overlay(
input_path,
overlay_text,
position="Bottom Center",
font_size=60,
text_color="#FFFFFF",
stroke_width=2,
stroke_color="#000000",
bg_color="#000000",
bg_opacity=0.5,
start_time=0,
end_time=None
):
"""
Optimized video overlay using MoviePy's native compositing.
Significantly faster and more robust than manual OpenCV loops.
"""
try:
from moviepy import VideoFileClip, TextClip, CompositeVideoClip, ColorClip
# Load clip
clip = VideoFileClip(input_path)
duration = clip.duration
if end_time is None or end_time > duration:
end_time = duration
# Position mapping
pos_map = {
"Top Left": ("left", "top"), "Top Center": ("center", "top"), "Top Right": ("right", "top"),
"Center Left": ("left", "center"), "Center": ("center", "center"), "Center Right": ("right", "center"),
"Bottom Left": ("left", "bottom"), "Bottom Center": ("center", "bottom"), "Bottom Right": ("right", "bottom")
}
actual_pos = pos_map.get(position, ("center", "bottom"))
# Create Text Clip
txt_clip = TextClip(
text=overlay_text,
font_size=font_size,
color=text_color,
font="Arial", # Standard fallback
method="caption",
size=(clip.w * 0.8, None),
stroke_color=stroke_color,
stroke_width=stroke_width
).with_start(start_time).with_end(end_time).with_position(actual_pos)
# Background for readability
clips_to_composite = [clip]
if bg_opacity > 0:
bg_clip = ColorClip(
size=(txt_clip.w + 20, txt_clip.h + 20),
color=tuple(int(bg_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
).with_opacity(bg_opacity).with_start(start_time).with_end(end_time).with_position(actual_pos)
clips_to_composite.append(bg_clip)
clips_to_composite.append(txt_clip)
final_clip = CompositeVideoClip(clips_to_composite)
output_filename = f"overlay_{uuid.uuid4().hex}.mp4"
output_path = os.path.join(OUTPUT_DIR, output_filename)
# Write file with optimized parameters
final_clip.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
temp_audiofile="temp-audio.m4a",
remove_temp=True,
threads=4
)
# Cleanup
clip.close()
final_clip.close()
return output_filename
except Exception as e:
logger.error(f"MoviePy overlay failed: {e}")
return None
# ============ NEXUS EXPERT ENDPOINTS ============
@app.route('/api/nexus/mermaid', methods=['POST'])
def nexus_mermaid():
"""Generates Mermaid syntax for diagrams."""
try:
data = request.json or {}
diagram_type = data.get("type", "mindmap")
structure = data.get("structure", {})
# Logic to convert JSON structure to Mermaid syntax
mermaid_code = f"{diagram_type}\n"
def build_mermaid(obj, indent=" "):
code = ""
if isinstance(obj, dict):
for key, val in obj.items():
code += f"{indent}{key}\n"
code += build_mermaid(val, indent + " ")
elif isinstance(obj, list):
for item in obj:
code += f"{indent}{item}\n"
return code
mermaid_code += build_mermaid(structure)
return jsonify({
"status": "success",
"syntax": mermaid_code,
"type": diagram_type
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/nexus/chart', methods=['POST'])
def nexus_chart():
"""Returns structured data for Chart.js rendering."""
try:
data = request.json or {}
chart_type = data.get("type", "bar")
raw_data = data.get("data", [])
labels = data.get("labels", [])
title = data.get("title", "Nexa Analytics")
return jsonify({
"status": "success",
"chart_config": {
"type": chart_type,
"data": {
"labels": labels,
"datasets": [{
"label": title,
"data": raw_data,
"backgroundColor": "rgba(16, 163, 127, 0.2)",
"borderColor": "rgba(16, 163, 127, 1)",
"borderWidth": 1
}]
},
"options": {
"responsive": True,
"plugins": {
"legend": {"position": "top"},
"title": {"display": True, "text": title}
}
}
}
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/video/overlay', methods=['POST'])
def video_overlay_api():
try:
data = request.json or {}
video_url = data.get("video_url")
text = data.get("text", "").strip()
if not video_url or not text:
return jsonify({"error": "Missing video_url or text"}), 400
# Resolve path
if video_url.startswith("/static/outputs/"):
local_path = os.path.join(OUTPUT_DIR, video_url.replace("/static/outputs/", ""))
else:
# Handle full URL or other paths
return jsonify({"error": "Invalid video source"}), 400
if not os.path.exists(local_path):
return jsonify({"error": "Video file not found"}), 404
params = {
"overlay_text": text,
"position": data.get("position", "Bottom Center"),
"font_size": int(data.get("font_size", 60)),
"text_color": data.get("text_color", "#FFFFFF"),
"stroke_width": int(data.get("stroke_width", 2)),
"stroke_color": data.get("stroke_color", "#000000"),
"bg_color": data.get("bg_color", "#000000"),
"bg_opacity": float(data.get("bg_opacity", 0.5)),
"start_time": float(data.get("start_time", 0)),
"end_time": float(data.get("end_time", 9999))
}
output_file = process_video_overlay(local_path, **params)
if output_file:
return jsonify({
"status": "success",
"video_url": f"/static/outputs/{output_file}",
"message": "Text overlay added successfully"
})
else:
return jsonify({"error": "Processing failed"}), 500
except Exception as e:
logger.error(f"Overlay API error: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/analyze-file', methods=['POST'])
def analyze_file():
if 'file' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['file']
filename = file.filename
ext = filename.split('.')[-1].lower()
# Task 10: Use independent copy of file stream
file_bytes = io.BytesIO(file.read())
try:
text_content = ""
if ext == 'pdf':
with pdfplumber.open(file_bytes) as pdf:
text_content = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()])
elif ext == 'docx':
doc = docx.Document(file_bytes)
text_content = "\n".join([para.text for para in doc.paragraphs])
elif ext == 'txt':
file_bytes.seek(0)
text_content = file_bytes.read().decode('utf-8', errors='ignore')
elif ext in ['png', 'jpg', 'jpeg', 'webp']:
img = Image.open(file_bytes)
text_content = pytesseract.image_to_string(img)
else:
return jsonify({"error": f"Unsupported file type: {ext}"}), 400
if not text_content.strip():
return jsonify({"error": "Could not extract text from the file."}), 400
truncated_text = text_content[:4000]
return jsonify({"filename": filename, "content": truncated_text, "full_length": len(text_content), "status": "success"})
except Exception as e:
logger.error(f"File analysis failed: {e}")
return jsonify({"error": f"Failed to process file: {str(e)}"}), 500
@app.route('/static/outputs/<path:filename>')
def serve_output(filename):
from flask import send_from_directory
return send_from_directory(OUTPUT_DIR, filename)
@app.route('/api/ide/execute', methods=['POST'])
def ide_execute():
try:
data = request.json or {}
action = data.get("action")
payload = data.get("payload", {})
if action == "list_files":
files = []
for root, dirs, filenames in os.walk(os.getcwd()):
if "venv" in root or ".git" in root or "__pycache__" in root:
continue
for f in filenames:
rel_path = os.path.relpath(os.path.join(root, f), os.getcwd())
files.append(rel_path)
return jsonify({"status": "success", "files": files})
elif action == "create_file":
path = payload.get("path")
content = payload.get("content", "")
if not path:
return jsonify({"status": "error", "error": "Path required"}), 400
# Security: Ensure path is within project
abs_path = os.path.abspath(os.path.join(os.getcwd(), path))
if not abs_path.startswith(os.getcwd()):
return jsonify({"status": "error", "error": "Invalid path"}), 403
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, 'w', encoding='utf-8') as f:
f.write(content)
return jsonify({"status": "success", "message": f"File {path} created"})
elif action == "run_command":
command = payload.get("command")
if not command:
return jsonify({"status": "error", "error": "Command required"}), 400
# Security: Basic command filtering
forbidden = ["rm -rf", "format", "mkfs", "> /dev/"]
if any(f in command for f in forbidden):
return jsonify({"status": "error", "error": "Forbidden command"}), 403
import subprocess
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30)
return jsonify({
"status": "success",
"stdout": result.stdout,
"stderr": result.stderr,
"code": result.returncode
})
except Exception as e:
return jsonify({"status": "error", "error": str(e)}), 500
return jsonify({"status": "error", "error": "Unknown action"}), 400
except Exception as e:
logger.error(f"IDE Execute error: {e}")
return jsonify({"status": "error", "error": str(e)}), 500
@app.route('/api/ide/plan', methods=['POST'])
def ide_plan():
try:
data = request.json or {}
command = data.get("command", "").strip()
if not command:
return jsonify({"error": "No command provided"}), 400
# Get file tree for context
files = []
for root, dirs, filenames in os.walk(os.getcwd()):
if "venv" in root or ".git" in root or "__pycache__" in root:
continue
for f in filenames:
files.append(os.path.relpath(os.path.join(root, f), os.getcwd()))
# Use LLM to generate plan
prompt = IDE_PLAN_PROMPT.format(files=", ".join(files[:50]), command=command)
# We'll use the fallback model for planning if no API keys,
# but ideally we want a strong model.
# For this implementation, we'll try to use the chat-like logic.
plan_response = ""
groq_key = os.environ.get("GROQ_API_KEY")
# Try Groq first for high-quality planning
if groq_key:
try:
client = groq.Groq(api_key=groq_key)
resp = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
stream=False,
response_format={"type": "json_object"}
)
plan_response = resp.choices[0].message.content
except Exception as e:
logger.warning(f"Groq planning failed: {e}")
# Fallback to local if Groq fails or no key
if not plan_response and llm_primary:
output = llm_primary(
f"<|system|>\n{prompt}<|end|>\n<|user|>\nGenerate the plan in JSON format.<|end|>\n<|assistant|>\n",
max_tokens=1024,
stop=["<|end|>", "<|user|>"]
)
plan_response = output["choices"][0]["text"]
try:
# Clean response to find JSON
json_match = re.search(r'\{.*\}', plan_response, re.DOTALL)
if json_match:
plan_data = json.loads(json_match.group(0))
return jsonify(plan_data)
else:
raise ValueError("No JSON found in response")
except Exception as e:
# Fallback hardcoded plan if LLM fails for common tasks
if "overlay" in command.lower():
return jsonify({
"plan": [
{"id": 1, "action": "run_command", "description": "Check MoviePy version", "command": "pip show moviepy"},
{"id": 2, "action": "edit_file", "path": "app.py", "description": "Ensure overlay endpoint is optimized"}
],
"stats": {"costSavings": "$10"}
})
return jsonify({"error": "Failed to generate structured plan. Please try a simpler command."}), 500
except Exception as e:
logger.error(f"IDE Plan error: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/ide/debug', methods=['POST'])
def ide_debug():
return jsonify({"error": "Local debugging is coming soon.", "suggestions": [], "root_cause": "N/A"})
@app.route('/api/status', methods=['GET'])
def status_check():
return jsonify({
"status": "online",
"backend": "Nexa AI Unified",
"timestamp": time.ctime(),
"multi_agent_system": "available" if multi_agent_system else "unavailable",
"search_engines": ["DuckDuckGo", "Bing"]
})
# Multi-Agent System Endpoints
@app.route('/api/multi-agent/process', methods=['POST'])
def multi_agent_process():
"""Process a request through the full multi-agent system"""
try:
data = request.json or {}
user_request = data.get("request", "")
if not user_request:
return jsonify({"error": "Missing 'request' parameter"}), 400
if not multi_agent_system:
return jsonify({"error": "Multi-Agent System not initialized"}), 503
logger.info(f"Received multi-agent request: {user_request[:100]}")
result = multi_agent_system.process_request(user_request)
return jsonify(result)
except Exception as e:
logger.error(f"Multi-agent processing failed: {e}")
return jsonify({"error": "Internal server error during multi-agent processing"}), 500
@app.route('/api/multi-agent/agents', methods=['GET'])
def list_agents():
"""List all available agents in the system"""
agents = [
"IntakeAgent",
"PlanningAgent",
"OrchestratorAgent",
"ResearchAgent",
"AnalysisAgent",
"CodingAgent",
"WritingAgent",
"ValidationAgent",
"FeedbackAgent",
"OptimizationAgent"
]
return jsonify({
"agents": agents,
"total": len(agents)
})
@app.errorhandler(500)
def internal_error(error):
return jsonify({"error": "Internal Server Error", "message": "An unexpected error occurred on the server."}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)