| import os |
| import re |
| import time |
| import random |
| import logging |
| import io |
| import json |
| import requests |
| import urllib.parse |
| import cv2 |
| import numpy as np |
| from PIL import Image, ImageDraw, ImageFont |
| from flask import Flask, request, jsonify, Response |
| from flask_cors import CORS |
| from flask_limiter import Limiter |
| from flask_limiter.util import get_remote_address |
| from datetime import datetime |
| from dotenv import load_dotenv |
| from llama_cpp import Llama |
| from huggingface_hub import hf_hub_download |
| import pdfplumber |
| import docx |
| import pytesseract |
| from moviepy import VideoFileClip, TextClip, CompositeVideoClip |
| import tempfile |
| import uuid |
| from gradio_client import Client |
| import shutil |
| import sqlite3 |
| import jwt |
| import groq |
| from functools import wraps |
| from bs4 import BeautifulSoup |
|
|
| from passlib.context import CryptContext |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| load_dotenv() |
|
|
| |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
| |
| try: |
| from agents.multi_agent_system import MultiAgentSystem |
| multi_agent_system = MultiAgentSystem() |
| logger.info("Multi-Agent System initialized") |
| except Exception as e: |
| logger.error(f"Failed to initialize Multi-Agent System: {e}") |
| multi_agent_system = None |
|
|
| app = Flask(__name__) |
| CORS(app, supports_credentials=True) |
| app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", "nexa-ai-secret-key-2026") |
|
|
| |
| DB_PATH = "nexa_ai.db" |
|
|
| def get_db_connection(): |
| conn = sqlite3.connect(DB_PATH) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
| def init_db(): |
| with get_db_connection() as conn: |
| conn.execute(''' |
| CREATE TABLE IF NOT EXISTS users ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| username TEXT UNIQUE NOT NULL, |
| email TEXT UNIQUE NOT NULL, |
| password_hash TEXT NOT NULL, |
| avatar TEXT, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| ''') |
| conn.execute(''' |
| CREATE TABLE IF NOT EXISTS conversations ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER NOT NULL, |
| title TEXT NOT NULL, |
| model TEXT, |
| pinned INTEGER DEFAULT 0, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (user_id) REFERENCES users (id) |
| ) |
| ''') |
| conn.execute(''' |
| CREATE TABLE IF NOT EXISTS messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| conversation_id INTEGER NOT NULL, |
| role TEXT NOT NULL, |
| content TEXT NOT NULL, |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| feedback INTEGER, -- 1 for up, -1 for down |
| feedback_text TEXT, |
| FOREIGN KEY (conversation_id) REFERENCES conversations (id) |
| ) |
| ''') |
| |
| |
| try: |
| admin_user = conn.execute('SELECT * FROM users WHERE username = ?', ('admin',)).fetchone() |
| admin_pass_hash = pwd_context.hash("admin123") |
| if not admin_user: |
| conn.execute( |
| 'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)', |
| ('admin', 'admin@nexa.ai', admin_pass_hash) |
| ) |
| logger.info(f"Default admin account created: admin / admin123 (Hash: {admin_pass_hash[:10]}...)") |
| else: |
| |
| conn.execute( |
| 'UPDATE users SET password_hash = ?, email = ? WHERE username = ?', |
| (admin_pass_hash, 'admin@nexa.ai', 'admin') |
| ) |
| logger.info(f"Default admin account password reset to: admin123 (Hash: {admin_pass_hash[:10]}...)") |
| |
| |
| test_verify = pwd_context.verify("admin123", admin_pass_hash) |
| logger.info(f"Admin self-verification test: {'PASSED' if test_verify else 'FAILED'}") |
| except Exception as e: |
| logger.error(f"Error creating/resetting default admin: {e}") |
| |
| conn.commit() |
|
|
| init_db() |
|
|
| |
| def token_required(f): |
| @wraps(f) |
| def decorated(*args, **kwargs): |
| token = request.cookies.get('nexa_token') |
| if not token: |
| return jsonify({'message': 'Token is missing!'}), 401 |
| try: |
| data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) |
| current_user_id = data['user_id'] |
| except: |
| return jsonify({'message': 'Token is invalid!'}), 401 |
| return f(current_user_id, *args, **kwargs) |
| return decorated |
|
|
| |
| OUTPUT_DIR = os.path.join(os.getcwd(), "static", "outputs") |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| |
| limiter = Limiter( |
| get_remote_address, |
| app=app, |
| default_limits=["10000 per day", "1000 per hour"], |
| storage_uri="memory://", |
| ) |
|
|
| |
| OLLAMA_URL = "http://localhost:11434" |
| |
| |
| |
| |
| HF_FALLBACK_REPO = "microsoft/Phi-3-mini-4k-instruct-gguf" |
| HF_FALLBACK_FILE = "Phi-3-mini-4k-instruct-q4.gguf" |
| IS_HF_SPACE = os.environ.get("SPACE_ID") is not None |
|
|
| |
| llm_primary = None |
| try: |
| logger.info("Initializing Phi-3-mini Primary Engine...") |
| |
| |
| local_model_path = os.path.join(os.getcwd(), HF_FALLBACK_FILE) |
| if os.path.exists(local_model_path): |
| logger.info(f"Using local model file: {local_model_path}") |
| model_path = local_model_path |
| else: |
| logger.info("Local model not found, downloading from Hugging Face...") |
| model_path = hf_hub_download(repo_id=HF_FALLBACK_REPO, filename=HF_FALLBACK_FILE) |
| |
| llm_primary = Llama( |
| model_path=model_path, |
| n_ctx=4096, |
| n_threads=4, |
| verbose=False |
| ) |
| logger.info("Phi-3-mini engine loaded successfully.") |
| except Exception as e: |
| logger.error(f"Failed to load Phi-3-mini model: {e}") |
|
|
| SYSTEM_PROMPT = """You are Nexa AI, a professional technical execution engine. |
| ### **WRITING PRINCIPLES**: |
| 1. Match response depth and length to the question. A simple question gets a short, direct answer in plain prose — no headers, no bullet template. A complex technical request can use structure (headers, numbered steps, tables, code blocks) where it genuinely improves clarity. |
| 2. Write in clear, natural language. Avoid filler words ("just", "really", "very", "basically") but do not strip the response down to unnatural telegraphic phrasing either. |
| 3. Never simulate dialogue, never include "AI:"/"User:" labels, never narrate what you're about to do — just answer. |
| 4. Don't force emoji, bold labels, or section headers onto every message. Use them only when they add real value (e.g., a multi-step technical walkthrough, a comparison table). |
| 5. If the user asks for code, give clean, correct, runnable code with only as much explanation as is useful — don't pad it with restating what the code obviously does. |
| 6. Be honest about uncertainty instead of inventing confident-sounding details. |
| ### **WHEN WEB RESULTS ARE PROVIDED**: |
| - Synthesize the sources into your own words; cite specific claims with [N] tied to the source list. |
| - End with a short **Reference** section listing each [N] as a markdown link. |
| - Never cite Wikipedia. |
| ### **WHEN A TOOL FAILS**: |
| Briefly state what failed and what you'll try next (or what the user can do), in one or two sentences — no need for a formatted alert block. |
| Current Date: {date} |
| """ |
|
|
| IDE_PLAN_PROMPT = """You are the Nexa AI IDE Engine. |
| Your task is to convert a high-level user request into a concrete, multi-step technical execution plan. |
| ### **OUTPUT FORMAT**: |
| You MUST return a JSON object with the following structure: |
| { |
| "plan": [ |
| { |
| "id": 1, |
| "action": "create_file" | "edit_file" | "run_command", |
| "path": "relative/path/to/file", |
| "description": "Short explanation of what this step does", |
| "content": "The code content (only for create_file/edit_file)", |
| "command": "The shell command (only for run_command)" |
| } |
| ], |
| "stats": { |
| "costSavings": "e.g., $15" |
| } |
| } |
| ### **GUIDELINES**: |
| 1. **Be Precise**: Use exact file paths. |
| 2. **Be Modular**: Each step should do exactly one thing. |
| 3. **Be Safe**: Do not suggest commands that delete system files. |
| 4. **Project Context**: The user is working on a Flask/Python/React project. |
| Current Files: {files} |
| User Command: {command} |
| """ |
|
|
| |
|
|
| class SearchManager: |
| """Multi-engine search with automatic fallback. Never fails completely.""" |
|
|
| def __init__(self): |
| self.last_search_time = 0 |
| self.min_delay = 2 |
| self.user_agents = [ |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0", |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0", |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0" |
| ] |
|
|
| def _get_headers(self): |
| return {"User-Agent": random.choice(self.user_agents)} |
|
|
| def _rate_limit(self): |
| elapsed = time.time() - self.last_search_time |
| if elapsed < self.min_delay: |
| time.sleep(self.min_delay - elapsed) |
| self.last_search_time = time.time() |
|
|
| def search_duckduckgo(self, query, max_results=5): |
| try: |
| self._rate_limit() |
| from duckduckgo_search import DDGS |
| with DDGS(headers=self._get_headers(), timeout=15) as ddgs: |
| results = list(ddgs.text(query, max_results=max_results)) |
| if results: |
| logger.info(f"DuckDuckGo: {len(results)} results") |
| return [{"title": r["title"], "href": r["href"], "body": r["body"]} for r in results] |
| except Exception as e: |
| logger.warning(f"DuckDuckGo failed: {e}") |
| return [] |
|
|
| def search_bing(self, query, max_results=5): |
| try: |
| self._rate_limit() |
| encoded_query = urllib.parse.quote(query) |
| url = f"https://www.bing.com/search?q={encoded_query}" |
| response = requests.get(url, headers=self._get_headers(), timeout=10) |
| response.raise_for_status() |
| html = response.text |
| results = [] |
| result_blocks = re.findall(r'<li class="b_algo"[^>]*>(.*?)</li>', html, re.DOTALL) |
| for block in result_blocks[:max_results]: |
| title_match = re.search(r'<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', block, re.DOTALL) |
| if title_match: |
| href = title_match.group(1) |
| title = re.sub(r'<[^>]+>', '', title_match.group(2)).strip() |
| snippet_match = re.search(r'<p[^>]*>(.*?)</p>', block, re.DOTALL) |
| body = re.sub(r'<[^>]+>', '', snippet_match.group(1)).strip() if snippet_match else "" |
| if href.startswith('/'): |
| href = 'https://www.bing.com' + href |
| results.append({"title": title, "href": href, "body": body}) |
| if results: |
| logger.info(f"Bing: {len(results)} results") |
| return results |
| except Exception as e: |
| logger.warning(f"Bing failed: {e}") |
| return [] |
|
|
| def search_wikipedia(self, query, max_results=3): |
| try: |
| self._rate_limit() |
| url = "https://en.wikipedia.org/w/api.php" |
| params = { |
| "action": "query", |
| "list": "search", |
| "srsearch": query, |
| "format": "json", |
| "srlimit": max_results |
| } |
| response = requests.get(url, params=params, headers=self._get_headers(), timeout=10) |
| data = response.json() |
| results = [] |
| for item in data.get("query", {}).get("search", []): |
| title = item["title"] |
| page_url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}" |
| snippet = re.sub(r'<span class="searchmatch">|</span>', '', item["snippet"]) |
| results.append({"title": title, "href": page_url, "body": snippet}) |
| if results: |
| logger.info(f"Wikipedia: {len(results)} results") |
| return results |
| except Exception as e: |
| logger.warning(f"Wikipedia failed: {e}") |
| return [] |
|
|
| def visit_website(self, url, timeout=15): |
| """Fetches and extracts text from a URL for deeper research.""" |
| try: |
| self._rate_limit() |
| response = requests.get(url, headers=self._get_headers(), timeout=timeout) |
| response.raise_for_status() |
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
| |
| |
| for script in soup(["script", "style", "nav", "footer", "header", "aside"]): |
| script.decompose() |
| |
| text = soup.get_text(separator=' ', strip=True) |
| |
| text = re.sub(r'\s+', ' ', text) |
| return text[:10000] |
| except Exception as e: |
| logger.error(f"Failed to visit {url}: {e}") |
| return None |
|
|
| def search(self, query, max_results=5): |
| if not query or not query.strip(): |
| return {"engine": "None", "results": [], "success": False, "error": "Empty query"} |
| |
| |
| clean_query = re.sub(r'^(search for|find|look up|what is|who is|where is)\s+', '', query, flags=re.IGNORECASE).strip() |
| logger.info(f"Searching for: {clean_query}") |
| |
| |
| engines = [self.search_duckduckgo, self.search_bing] |
| |
| for engine_func in engines: |
| engine_name = engine_func.__name__.replace('search_', '').capitalize() |
| try: |
| results = engine_func(clean_query, max_results) |
| if results: |
| |
| filtered_results = [r for r in results if 'wikipedia.org' not in r.get('href', '').lower()] |
| if filtered_results: |
| return {"engine": engine_name, "results": filtered_results, "success": True, "error": None} |
| except Exception as e: |
| logger.warning(f"Engine {engine_name} crashed: {e}") |
| |
| logger.error("All search engines failed or were blocked") |
| return {"engine": "None", "results": [], "success": False, "error": "Search unavailable or results blocked. Please try again later."} |
|
|
| search_manager = SearchManager() |
|
|
| |
|
|
| def _kw_in(text, keyword): |
| """Match a keyword as a whole word/phrase, not a raw substring. |
| Plain `in` matching caused false positives like 'photo' firing |
| inside 'photosynthesis', or 'now' firing inside 'know' — both of |
| which made the bot misfire image-gen/search on ordinary questions.""" |
| if " " in keyword: |
| return keyword in text |
| return re.search(rf"\b{re.escape(keyword)}\b", text) is not None |
|
|
|
|
| def detect_tools(message): |
| message = message.lower().strip() |
| tools = [] |
| |
| |
| explicit_search_commands = ["/search", "/news", "/youtube", "search for", "look up", "browse the web for"] |
| if any(_kw_in(message, kw) for kw in explicit_search_commands): |
| tools.append("search") |
| return tools |
|
|
| |
| |
| |
| |
| |
| real_time_keywords = [ |
| "latest", "news", "today", "current", "weather", "stock", "price", |
| "now", "recent", "yesterday", "tonight", "2024", "2025", "2026", |
| "score", "game", "match", "live" |
| ] |
| if any(_kw_in(message, kw) for kw in real_time_keywords): |
| tools.append("search") |
|
|
| |
| diagram_keywords = [ |
| "diagram", "mind map", "mindmap", "flowchart", "flow chart", |
| "architecture diagram", "sequence diagram", "er diagram", |
| "entity relationship", "gantt chart", "org chart", "tree diagram", |
| "visualize the structure", "show me the steps visually" |
| ] |
| chart_keywords = ["bar chart", "pie chart", "line chart", "plot the data", "graph the numbers", "graph of"] |
| is_diagram_request = any(_kw_in(message, kw) for kw in diagram_keywords) or any(_kw_in(message, kw) for kw in chart_keywords) |
| if is_diagram_request: |
| tools.append("diagram") |
|
|
| image_keywords = ["image", "photo", "paint", "picture", "generate image", "thumbnail"] |
| if not is_diagram_request: |
| image_keywords += ["draw", "visualize"] |
| if any(_kw_in(message, kw) for kw in image_keywords) or ("create" in message and "image" in message) or ("make" in message and "image" in message): |
| tools.append("image") |
| |
| video_keywords = ["video", "movie", "animation", "generate video", "create video", "make a video"] |
| if any(_kw_in(message, kw) for kw in video_keywords) or ("create" in message and "video" in message) or ("make" in message and "video" in message): |
| tools.append("video") |
| |
| file_keywords = ["pdf", "document", "file", "analyze file", "read file", "upload"] |
| if any(_kw_in(message, kw) for kw in file_keywords): |
| tools.append("file") |
| |
| return tools |
|
|
| |
|
|
|
|
| def format_search_results(data): |
| if not data.get("success"): |
| error = data.get("error", "Search failed") |
| return f"Warning: {error}" |
| results = data.get("results", []) |
| if not results: |
| return "No results found for this query." |
| formatted = [] |
| formatted.append(f"Web Search Results from {data['engine']}:") |
| formatted.append("-" * 30) |
| for i, r in enumerate(results, 1): |
| title = r.get("title", "No title") |
| url = r.get("href", "#") |
| body = r.get("body", "No description") |
| body = re.sub(r'[\s]+', ' ', body).strip()[:600] |
| formatted.append(f"Source [{i}]: {title}") |
| formatted.append(f"URL: {url}") |
| formatted.append(f"Content: {body}") |
| formatted.append("-" * 15) |
| |
| formatted.append("\n### MANDATORY RESEARCH PROTOCOL ###") |
| formatted.append("1. Compose a well-structured answer based on the provided sources.") |
| formatted.append("2. Insert numbered citations [N] immediately after every fact, statistic, or quote.") |
| formatted.append("3. After the response, output a dedicated Reference section formatted exactly like this:") |
| formatted.append("\n---\n**Reference**\n\n[1] [Source Title](URL)\n[2] [Source Title](URL)\n---") |
| formatted.append("\nStrictly adhere to this format. Do not use Wikipedia.") |
| |
| return "\n".join(formatted) |
|
|
| def clean_search_query(query): |
| prefixes = [ |
| "search for", "find out about", "look up", "check", |
| "tell me about", "what is", "who is", "where is", |
| "/search", "search:", "find:" |
| ] |
| query = query.lower().strip() |
| for prefix in prefixes: |
| if query.startswith(prefix): |
| query = query[len(prefix):].strip() |
| return query |
|
|
| |
|
|
| def get_time_response(message): |
| message = message.lower() |
| |
| time_phrases = ["what time is it", "current time", "what's the date", "whats the date"] |
| |
| |
| time_words = ["time", "date", "clock", "hour", "minute"] |
| word_matches = [word for word in time_words if re.search(rf"\b{word}\b", message)] |
| |
| if any(phrase in message for phrase in time_phrases) or len(word_matches) >= 2: |
| now = datetime.now() |
| return f"The current time is **{now.strftime('%H:%M:%S')}**. Today is **{now.strftime('%A, %Y-%m-%d')}**." |
| return None |
|
|
| |
|
|
| @app.route('/api/auth/signup', methods=['POST']) |
| def signup(): |
| data = request.json |
| username = data.get('username', '').strip() |
| email = data.get('email', '').strip() |
| password = data.get('password', '').strip() |
|
|
| if not email or not password: |
| return jsonify({"error": "Missing required fields"}), 400 |
|
|
| if not username: |
| username = email.split('@')[0] |
|
|
| password_hash = pwd_context.hash(password) |
|
|
| try: |
| with get_db_connection() as conn: |
| conn.execute( |
| 'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)', |
| (username, email, password_hash) |
| ) |
| conn.commit() |
| return jsonify({"message": "User created successfully"}), 201 |
| except sqlite3.IntegrityError: |
| return jsonify({"error": "Username or email already exists"}), 409 |
| except Exception as e: |
| logger.error(f"Signup error: {e}") |
| return jsonify({"error": "Internal server error"}), 500 |
|
|
| @app.route('/api/auth/login', methods=['POST']) |
| def login(): |
| data = request.json |
| |
| login_id = data.get('email', '').strip() |
| password = data.get('password', '').strip() |
|
|
| if not login_id or not password: |
| return jsonify({"error": "Missing credentials"}), 400 |
|
|
| with get_db_connection() as conn: |
| |
| user = conn.execute( |
| 'SELECT * FROM users WHERE email = ? OR username = ?', |
| (login_id, login_id) |
| ).fetchone() |
|
|
| |
| if not user and login_id == "admin" and password == "admin123": |
| try: |
| with get_db_connection() as conn: |
| admin_pass_hash = pwd_context.hash("admin123") |
| conn.execute( |
| 'INSERT OR IGNORE INTO users (username, email, password_hash) VALUES (?, ?, ?)', |
| ('admin', 'admin@nexa.ai', admin_pass_hash) |
| ) |
| conn.commit() |
| user = conn.execute('SELECT * FROM users WHERE username = ?', ('admin',)).fetchone() |
| logger.info("Emergency fallback: Admin user recreated on login.") |
| except Exception as e: |
| logger.error(f"Emergency fallback failed: {e}") |
|
|
| if user: |
| logger.info(f"Login attempt for user: {user['username']} (ID: {user['id']})") |
| |
| is_valid = pwd_context.verify(password, user['password_hash']) |
| logger.info(f"Password verification result for {user['username']}: {is_valid}") |
| |
| if is_valid: |
| token = jwt.encode({ |
| 'user_id': user['id'], |
| 'username': user['username'], |
| 'exp': time.time() + (24 * 3600) |
| }, app.config['SECRET_KEY'], algorithm="HS256") |
|
|
| resp = jsonify({ |
| "message": "Login successful", |
| "user": { |
| "id": user['id'], |
| "username": user['username'], |
| "email": user['email'], |
| "avatar": user['avatar'] |
| } |
| }) |
| resp.set_cookie('nexa_token', token, httponly=True, samesite='Lax', max_age=24*3600) |
| return resp |
| else: |
| logger.warning(f"Invalid password for user: {user['username']}") |
| else: |
| logger.warning(f"User not found for login ID: {login_id}") |
|
|
| return jsonify({"error": "Invalid credentials"}), 401 |
|
|
| @app.route('/api/auth/logout', methods=['POST']) |
| def logout(): |
| resp = jsonify({"message": "Logged out successfully"}) |
| resp.set_cookie('nexa_token', '', expires=0) |
| return resp |
|
|
| @app.route('/api/auth/me', methods=['GET']) |
| @token_required |
| def get_me(current_user_id): |
| with get_db_connection() as conn: |
| user = conn.execute('SELECT id, username, email, avatar FROM users WHERE id = ?', (current_user_id,)).fetchone() |
| if user: |
| return jsonify(dict(user)) |
| return jsonify({"error": "User not found"}), 404 |
|
|
| |
|
|
| @app.route('/api/video/generate', methods=['POST']) |
| @limiter.limit("100 per minute") |
| def generate_video(): |
| """Unified video generation with fallback (Upsampler -> HF Spaces -> Pollinations).""" |
| try: |
| data = request.json or {} |
| prompt = data.get("prompt", "").strip() |
| if not prompt: |
| return jsonify({"status": "error", "error": "Prompt is required"}), 400 |
|
|
| |
| clean_prompt = re.sub(r'^(generate|create|make)\s+(a\s+)?video\s+(about|of|for)?\s*', '', prompt, flags=re.IGNORECASE).strip() |
| logger.info(f"Generating video for: {clean_prompt}") |
|
|
| video_dir = os.path.join(os.getcwd(), "static", "outputs") |
| os.makedirs(video_dir, exist_ok=True) |
|
|
| |
| api_key = os.environ.get("UPSAMPLER_API_KEY") |
| if api_key: |
| try: |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} |
| body = {"prompt": clean_prompt, "model": "wan-2.2-5b-fast", "width": 1024, "height": 576} |
| response = requests.post("https://upsampler.com/api/v1/video/generate", headers=headers, json=body, timeout=60) |
| if response.status_code == 200: |
| res_data = response.json() |
| video_url = res_data.get("video_url") |
| if video_url: |
| video_filename = f"vid_{uuid.uuid4().hex}.mp4" |
| local_path = os.path.join(video_dir, video_filename) |
| v_res = requests.get(video_url, stream=True) |
| if v_res.status_code == 200: |
| with open(local_path, 'wb') as f: |
| shutil.copyfileobj(v_res.raw, f) |
| return jsonify({"status": "success", "video_url": f"/static/outputs/{video_filename}", "is_motion_image": False, "provider": "Upsampler", "prompt": clean_prompt}) |
| except Exception as e: |
| logger.error(f"Upsampler failed: {e}") |
|
|
| |
| video_spaces = ["THUDM/CogVideoX-5b", "ali-vilas/text-to-video-ms-1.7b", "fffiloni/zeroscope-v2-xl"] |
| for space in video_spaces: |
| try: |
| logger.info(f"🎬 Attempting Video Gen via {space}") |
| client = Client(space) |
| result = client.predict(clean_prompt, api_name="/predict") |
| if result: |
| if isinstance(result, list): result = result[0] |
| output_filename = f"ai_video_{uuid.uuid4().hex}.mp4" |
| output_path = os.path.join(video_dir, output_filename) |
| if result.startswith("http"): |
| resp = requests.get(result, stream=True, timeout=30) |
| with open(output_path, 'wb') as f: |
| shutil.copyfileobj(resp.raw, f) |
| elif os.path.exists(result): |
| shutil.copy(result, output_path) |
| return jsonify({"status": "success", "video_url": f"/static/outputs/{output_filename}", "is_motion_image": False, "provider": space, "prompt": clean_prompt}) |
| except Exception as e: |
| logger.warning(f"Space {space} failed: {e}") |
|
|
| |
| try: |
| logger.info(f"🚀 Falling back to Pollinations Motion") |
| seed = int(time.time()) |
| motion_url = f"https://image.pollinations.ai/prompt/{requests.utils.quote(clean_prompt)}?seed={seed}&nologo=true&width=1024&height=576" |
| return jsonify({"status": "success", "video_url": motion_url, "is_motion_image": True, "provider": "Pollinations AI (Motion)", "prompt": clean_prompt}) |
| except Exception as e: |
| logger.error(f"❌ All video methods failed: {e}") |
| return jsonify({"status": "error", "error": "Our video generation servers are temporarily unavailable."}), 503 |
|
|
| except Exception as e: |
| logger.error(f"Video generation crash: {e}") |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
| @app.route('/api/video/overlay-url', methods=['POST']) |
| @limiter.limit("100 per minute") |
| def overlay_video_url(): |
| try: |
| data = request.json or {} |
| video_url = data.get("video_url", "").strip() |
| text = data.get("text", "").strip() |
| pos = data.get("pos", "Bottom Center") |
| font_size = int(data.get("font_size", 60)) |
| color = data.get("color", "#FFFFFF") |
| bg_opacity = float(data.get("bg_opacity", 0.5)) |
|
|
| if not video_url or not text: |
| return jsonify({"status": "error", "error": "Video URL and text are required"}), 400 |
|
|
| video_dir = os.environ.get("VIDEO_TEMP_DIR", "./temp/videos") |
| os.makedirs(video_dir, exist_ok=True) |
|
|
| |
| input_filename = f"input_{uuid.uuid4()}.mp4" |
| input_path = os.path.join(video_dir, input_filename) |
| |
| |
| if video_url.startswith('/'): |
| |
| video_url = video_url.lstrip('/') |
| local_input = os.path.join(os.getcwd(), video_url) |
| if os.path.exists(local_input): |
| shutil.copy(local_input, input_path) |
| else: |
| return jsonify({"status": "error", "error": "Internal video source not found"}), 404 |
| else: |
| v_res = requests.get(video_url, stream=True) |
| if v_res.status_code == 200: |
| with open(input_path, 'wb') as f: |
| for chunk in v_res.iter_content(8192): f.write(chunk) |
| else: |
| return jsonify({"status": "error", "error": "Failed to fetch source video"}), 400 |
|
|
| |
| output_filename = process_video_overlay( |
| input_path=input_path, |
| overlay_text=text, |
| position=pos, |
| font_size=font_size, |
| text_color=color, |
| bg_opacity=bg_opacity |
| ) |
| |
| if output_filename: |
| os.remove(input_path) |
| return jsonify({ |
| "status": "success", |
| "video_url": f"/static/outputs/{output_filename}" |
| }) |
| else: |
| return jsonify({"status": "error", "error": "MoviePy processing failed"}), 500 |
|
|
| except Exception as e: |
| logger.error(f"Overlay error: {e}") |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
| @app.route('/api/conversations', methods=['GET']) |
| @token_required |
| def get_conversations(current_user_id): |
| with get_db_connection() as conn: |
| convs = conn.execute( |
| 'SELECT * FROM conversations WHERE user_id = ? ORDER BY pinned DESC, updated_at DESC', |
| (current_user_id,) |
| ).fetchall() |
| return jsonify([dict(c) for c in convs]) |
|
|
| @app.route('/api/conversations', methods=['POST']) |
| @token_required |
| def create_conversation(current_user_id): |
| data = request.json |
| title = data.get('title', 'New Chat') |
| model = data.get('model', 'phi3:mini') |
| |
| with get_db_connection() as conn: |
| cursor = conn.execute( |
| 'INSERT INTO conversations (user_id, title, model) VALUES (?, ?, ?)', |
| (current_user_id, title, model) |
| ) |
| conn.commit() |
| conv_id = cursor.lastrowid |
| conv = conn.execute('SELECT * FROM conversations WHERE id = ?', (conv_id,)).fetchone() |
| return jsonify(dict(conv)), 201 |
|
|
| @app.route('/api/messages', methods=['POST']) |
| @token_required |
| def save_message(current_user_id): |
| data = request.json |
| conv_id = data.get('conversation_id') |
| role = data.get('role', 'user') |
| content = data.get('content') |
| |
| if not conv_id or not content: |
| return jsonify({"error": "Missing fields"}), 400 |
| |
| with get_db_connection() as conn: |
| |
| conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone() |
| if not conv: |
| return jsonify({"error": "Conversation not found"}), 404 |
| |
| conn.execute( |
| 'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)', |
| (conv_id, role, content) |
| ) |
| conn.commit() |
| return jsonify({"status": "success"}), 201 |
|
|
| @app.route('/api/conversations/<int:conv_id>', methods=['GET']) |
| @token_required |
| def get_conversation_messages(current_user_id, conv_id): |
| with get_db_connection() as conn: |
| |
| conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone() |
| if not conv: |
| return jsonify({"error": "Conversation not found"}), 404 |
| |
| messages = conn.execute( |
| 'SELECT * FROM messages WHERE conversation_id = ? ORDER BY timestamp ASC', |
| (conv_id,) |
| ).fetchall() |
| return jsonify({ |
| "conversation": dict(conv), |
| "messages": [dict(m) for m in messages] |
| }) |
|
|
| @app.route('/api/conversations/<int:conv_id>', methods=['PUT']) |
| @token_required |
| def update_conversation(current_user_id, conv_id): |
| data = request.json |
| title = data.get('title') |
| pinned = data.get('pinned') |
| |
| with get_db_connection() as conn: |
| |
| conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone() |
| if not conv: |
| return jsonify({"error": "Conversation not found"}), 404 |
| |
| if title is not None: |
| conn.execute('UPDATE conversations SET title = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?', (title, conv_id)) |
| if pinned is not None: |
| conn.execute('UPDATE conversations SET pinned = ? WHERE id = ?', (pinned, conv_id)) |
| conn.commit() |
| |
| updated = conn.execute('SELECT * FROM conversations WHERE id = ?', (conv_id,)).fetchone() |
| return jsonify(dict(updated)) |
|
|
| @app.route('/api/conversations/<int:conv_id>', methods=['DELETE']) |
| @token_required |
| def delete_conversation(current_user_id, conv_id): |
| with get_db_connection() as conn: |
| |
| conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone() |
| if not conv: |
| return jsonify({"error": "Conversation not found"}), 404 |
| |
| conn.execute('DELETE FROM messages WHERE conversation_id = ?', (conv_id,)) |
| conn.execute('DELETE FROM conversations WHERE id = ?', (conv_id,)) |
| conn.commit() |
| return jsonify({"message": "Conversation deleted"}) |
|
|
| |
|
|
| @app.route('/') |
| @app.route('/chat') |
| @app.route('/login') |
| @app.route('/signup') |
| @app.route('/pricing') |
| def home(path=None): |
| try: |
| with open('index.html', 'r', encoding='utf-8') as f: |
| return Response(f.read(), mimetype='text/html') |
| except Exception as e: |
| logger.error(f"Home route failed: {e}") |
| return "Nexa AI Backend is running." |
|
|
| @app.route('/api/search', methods=['POST']) |
| @limiter.limit("200 per minute") |
| def search_api(): |
| try: |
| data = request.json or {} |
| query = data.get("query", "").strip() |
| max_results = data.get("max_results", 5) |
| if not query: |
| return jsonify({"error": "No query provided", "success": False}), 400 |
| search_data = search_manager.search(query, max_results) |
| return jsonify({ |
| "success": search_data["success"], |
| "query": query, |
| "engine": search_data["engine"], |
| "results": search_data["results"], |
| "formatted": format_search_results(search_data), |
| "result_count": len(search_data["results"]) |
| }) |
| except Exception as e: |
| logger.error(f"Search API error: {e}") |
| return jsonify({"error": "Search service temporarily unavailable", "success": False}), 500 |
|
|
| @app.route('/api/chat', methods=['POST']) |
| @limiter.limit("500 per minute") |
| def chat(): |
| try: |
| |
| token = request.cookies.get('nexa_token') |
| current_user_id = None |
| if token: |
| try: |
| data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) |
| current_user_id = data['user_id'] |
| except: |
| pass |
|
|
| data = request.json or {} |
| message = data.get("message", "").strip() |
| history = data.get("history", []) |
| memory = data.get("memory", {}) |
| attachments = data.get("attachments", "").strip() |
| mode = data.get("mode", "instant") |
| model_choice = data.get("model", "llama-3") |
| is_deep_research = data.get("deep_research", False) |
| conv_id = data.get("conversation_id") |
| |
| if not message: |
| return jsonify({"error": "No message"}), 400 |
|
|
| |
| |
| |
| |
| current_date = datetime.now().strftime("%Y-%m-%d") |
| dynamic_system_prompt = SYSTEM_PROMPT.format(date=current_date) |
|
|
| |
| if 'Format the response as a valid JSON array' in message: |
| logger.info("Planner Mode detected: Switching to non-streaming response") |
| full_prompt = f"{dynamic_system_prompt}\n\nUSER QUESTION: {message}" |
| planner_response = "" |
| groq_key = os.environ.get("GROQ_API_KEY") |
|
|
| if groq_key: |
| try: |
| client = groq.Groq(api_key=groq_key) |
| resp = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=[{"role": "user", "content": full_prompt}], |
| stream=False |
| ) |
| planner_response = resp.choices[0].message.content |
| except Exception as e: |
| logger.warning(f"Groq planner mode failed: {e}") |
| |
| if not planner_response and llm_primary: |
| output = llm_primary( |
| f"<|system|>\n{dynamic_system_prompt}<|end|>\n<|user|>\n{message}<|end|>\n<|assistant|>\n", |
| max_tokens=1024, |
| stop=["<|end|>", "<|user|>"] |
| ) |
| planner_response = output["choices"][0]["text"] |
| |
| return jsonify({"response": planner_response, "status": "success"}) |
|
|
| if message == "status_check": |
| return jsonify({"status": "Online", "version": "1.0.0"}), 200 |
|
|
| |
| if current_user_id and conv_id: |
| try: |
| with get_db_connection() as conn: |
| conn.execute( |
| 'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)', |
| (conv_id, 'user', message) |
| ) |
| conn.commit() |
| except Exception as e: |
| logger.error(f"Failed to save user message: {e}") |
|
|
| |
| openai_key = os.environ.get("OPENAI_API_KEY") |
| anthropic_key = os.environ.get("ANTHROPIC_API_KEY") |
| google_key = os.environ.get("GOOGLE_API_KEY") |
|
|
| def get_model_response(system_prompt, user_prompt, history, model): |
| |
| if model == "gpt-4o" and openai_key: |
| from openai import OpenAI |
| client = OpenAI(api_key=openai_key) |
| messages = [{"role": "system", "content": system_prompt}] |
| for h in history: |
| messages.append({"role": h['role'], "content": h['content']}) |
| messages.append({"role": "user", "content": user_prompt}) |
| return client.chat.completions.create(model="gpt-4o", messages=messages, stream=True) |
| |
| elif model == "claude-3-5" and anthropic_key: |
| import anthropic |
| client = anthropic.Anthropic(api_key=anthropic_key) |
| messages = [] |
| for h in history: |
| messages.append({"role": h['role'], "content": h['content']}) |
| messages.append({"role": "user", "content": user_prompt}) |
| return client.messages.create(model="claude-3-5-sonnet-20240620", system=system_prompt, messages=messages, stream=True) |
| |
| elif model == "gemini-1-5" and google_key: |
| import google.generativeai as genai |
| genai.configure(api_key=google_key) |
| model_gen = genai.GenerativeModel('gemini-1.5-pro') |
| chat_gen = model_gen.start_chat(history=[]) |
| return chat_gen.send_message(f"{system_prompt}\n\n{user_prompt}", stream=True) |
|
|
| return None |
|
|
| time_reply = get_time_response(message) |
| if time_reply: |
| if current_user_id and conv_id: |
| with get_db_connection() as conn: |
| conn.execute( |
| 'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)', |
| (conv_id, 'assistant', time_reply) |
| ) |
| conn.commit() |
| return jsonify({"response": time_reply, "provider": "TimeBot", "status": "Online"}) |
|
|
| tools_needed = detect_tools(message) |
| tool_context = "" |
| search_data_for_frontend = None |
|
|
| if "search" in tools_needed: |
| search_query = clean_search_query(message) |
| search_data = search_manager.search(search_query, max_results=5) |
| if search_data["success"]: |
| |
| deep_context = "" |
| if is_deep_research or mode == "research": |
| logger.info(f"Deep Research: Visiting top 3 sources for {search_query}") |
| top_sources = search_data["results"][:3] |
| for i, source in enumerate(top_sources, 1): |
| url = source.get("href") |
| if url: |
| content = search_manager.visit_website(url) |
| if content: |
| deep_context += f"\n\n[EXTRACTED FROM SOURCE {i} ({url})]:\n{content[:3000]}\n" |
| |
| tool_context += f"\n\n[Web Search Results from {search_data['engine']}]:\n{format_search_results(search_data)}" |
| if deep_context: |
| tool_context += f"\n\n### DEEP RESEARCH DATA ###{deep_context}" |
| |
| search_data_for_frontend = { |
| "engine": search_data["engine"], |
| "results": search_data["results"] |
| } |
| else: |
| tool_context += f"\n\n[Search Status]: {search_data.get('error', 'Search temporarily unavailable')}" |
|
|
| if is_deep_research: |
| dynamic_system_prompt += "\n[DEEP RESEARCH ACTIVE]: Use a multi-step reasoning approach. Analyze thoroughly." |
|
|
| if "diagram" in tools_needed: |
| dynamic_system_prompt += ( |
| "\n[DIAGRAM MODE]: The user wants a visual diagram, mind map, or chart. " |
| "Respond with a single valid Mermaid.js diagram inside a fenced code block " |
| "labeled mermaid, e.g. ```mermaid ... ```. Choose whichever Mermaid diagram " |
| "type best fits the content (mindmap, flowchart TD, graph TD, sequenceDiagram, " |
| "classDiagram, gantt, pie, etc.). Keep node labels short and the syntax strictly " |
| "valid so it renders without errors. Follow the diagram with 2-4 sentences of " |
| "plain-language explanation, not a restatement of every node." |
| ) |
| |
| |
| mode_instr = { |
| "instant": "Provide a direct, zero-fluff answer.", |
| "study": "Explain like a tutor, use simple analogies.", |
| "dev": "Provide clean, production-ready code with minimal explanation.", |
| "research": "Provide a deep, structured technical report.", |
| "creative": "Write with evocative, artistic language." |
| } |
| dynamic_system_prompt += f"\n[Task Instruction]: {mode_instr.get(mode, 'Answer directly.')}" |
| |
| |
| if memory: |
| sanitized_memory = {} |
| for k, v in memory.items(): |
| |
| if k.lower() in ["role", "system", "instruction"]: |
| continue |
| |
| clean_val = str(v).replace("[", "").replace("]", "").replace("<", "").replace(">", "")[:100] |
| sanitized_memory[k] = clean_val |
| if sanitized_memory: |
| dynamic_system_prompt += f"\n[User Memory]: {json.dumps(sanitized_memory)}" |
|
|
| full_prompt = "" |
| if attachments: |
| full_prompt += f"{attachments}\n\n" |
| if tool_context: |
| full_prompt += f"{tool_context}\n\n" |
| |
| full_prompt += f"USER QUESTION: {message}" |
|
|
| def generate(): |
| try: |
| |
| initial_payload = {"tools_used": tools_needed} |
| if search_data_for_frontend: |
| initial_payload["search_results"] = search_data_for_frontend |
| yield json.dumps(initial_payload) + "\n" |
|
|
| full_ai_response = "" |
| groq_key = os.environ.get("GROQ_API_KEY") |
|
|
| |
| if groq_key: |
| try: |
| logger.info("Attempting response via Groq API...") |
| client = groq.Groq(api_key=groq_key) |
| |
| messages = [{"role": "system", "content": dynamic_system_prompt}] |
| for h in history[-10:]: |
| role = "assistant" if h.get('role') == 'ai' else "user" |
| messages.append({"role": role, "content": h.get('content', '')}) |
| messages.append({"role": "user", "content": full_prompt}) |
|
|
| completion = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=messages, |
| stream=True, |
| max_tokens=2048, |
| temperature=0.7 |
| ) |
|
|
| for chunk in completion: |
| token = chunk.choices[0].delta.content or "" |
| if token: |
| full_ai_response += token |
| yield json.dumps({"token": token, "source": "groq", "provider": "Groq (Llama 3.3)", "tools_used": tools_needed}) + "\n" |
|
|
| |
| if current_user_id and conv_id and full_ai_response: |
| with get_db_connection() as conn: |
| conn.execute('INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)', (conv_id, 'assistant', full_ai_response)) |
| conn.commit() |
| return |
| except Exception as e: |
| logger.warning(f"Groq API failed: {e}. Falling back to local Phi-3-mini engine...") |
|
|
| |
| if llm_primary: |
| try: |
| |
| trimmed_history = history[-8:] |
| |
| llama_prompt = f"<|system|>\n{dynamic_system_prompt}<|end|>\n" |
| for h in trimmed_history: |
| role = "assistant" if h.get('role') == 'ai' else "user" |
| llama_prompt += f"<|{role}|>\n{h.get('content', '')[:800]}<|end|>\n" |
| |
| llama_prompt += f"<|user|>\n{full_prompt[:2000]}<|end|>\n<|assistant|>\n" |
| |
| logger.info("Generating response using local Phi-3-mini engine...") |
| output = llm_primary( |
| llama_prompt, |
| max_tokens=1024, |
| stop=["<|end|>", "<|user|>", "<|system|>", "User:", "Assistant:"], |
| stream=True, |
| repeat_penalty=1.1, |
| temperature=0.7 |
| ) |
| |
| for chunk in output: |
| if "choices" in chunk and len(chunk["choices"]) > 0: |
| token = chunk["choices"][0].get("text", "") |
| if token: |
| full_ai_response += token |
| yield json.dumps({"token": token, "source": "phi3-mini", "provider": "Phi-3-mini (Local Fallback)", "tools_used": tools_needed}) + "\n" |
| |
| |
| if current_user_id and conv_id and full_ai_response: |
| with get_db_connection() as conn: |
| conn.execute( |
| 'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)', |
| (conv_id, 'assistant', full_ai_response) |
| ) |
| conn.commit() |
| return |
| except Exception as hf_err: |
| logger.error(f"Phi-3-mini Primary crashed: {hf_err}") |
| yield json.dumps({"error": "AI service encountered an error."}) + "\n" |
| else: |
| yield json.dumps({"error": "No AI engine available."}) + "\n" |
| except Exception as gen_err: |
| logger.error(f"Generator error: {gen_err}") |
| yield json.dumps({"error": f"Internal system error: {str(gen_err)}"}) + "\n" |
|
|
| return Response(generate(), mimetype='application/x-ndjson') |
| except Exception as e: |
| logger.error(f"Chat route crashed: {e}") |
| return jsonify({"error": "Fatal system error. Please refresh and try again."}), 500 |
|
|
|
|
| @app.route('/api/image', methods=['POST']) |
| @limiter.limit("100 per minute") |
| def generate_image_api(): |
| data = request.json or {} |
| prompt = data.get("prompt", "").strip() |
| if not prompt: |
| return jsonify({"error": "No prompt provided"}), 400 |
| try: |
| |
| seed = random.randint(0, 2147483647) |
| width = data.get("width", 1024) |
| height = data.get("height", 1024) |
| |
| |
| image_url = f"https://image.pollinations.ai/prompt/{requests.utils.quote(prompt)}?width={width}&height={height}&seed={seed}&nologo=true" |
| |
| |
| try: |
| check = requests.get(image_url, timeout=10) |
| if check.status_code == 200: |
| return jsonify({"status": "success", "image_url": image_url, "prompt": prompt}) |
| elif check.status_code == 402 or "Queue full" in check.text: |
| logger.warning("Pollinations rate limited. Falling back to static image generation...") |
| else: |
| logger.error(f"Pollinations error {check.status_code}: {check.text}") |
| except Exception as e: |
| logger.error(f"Pollinations check failed: {e}") |
|
|
| |
| |
| return jsonify({ |
| "error": "Image generation service is temporarily busy. Please wait 10 seconds and try again.", |
| "status": "error" |
| }), 503 |
|
|
| except Exception as e: |
| logger.error(f"Image generation failed: {e}") |
| return jsonify({"error": "Failed to generate image. Please try again later."}), 500 |
|
|
| |
|
|
| def process_video_overlay( |
| input_path, |
| overlay_text, |
| position="Bottom Center", |
| font_size=60, |
| text_color="#FFFFFF", |
| stroke_width=2, |
| stroke_color="#000000", |
| bg_color="#000000", |
| bg_opacity=0.5, |
| start_time=0, |
| end_time=None |
| ): |
| """ |
| Optimized video overlay using MoviePy's native compositing. |
| Significantly faster and more robust than manual OpenCV loops. |
| """ |
| try: |
| from moviepy import VideoFileClip, TextClip, CompositeVideoClip, ColorClip |
| |
| |
| clip = VideoFileClip(input_path) |
| duration = clip.duration |
| if end_time is None or end_time > duration: |
| end_time = duration |
|
|
| |
| pos_map = { |
| "Top Left": ("left", "top"), "Top Center": ("center", "top"), "Top Right": ("right", "top"), |
| "Center Left": ("left", "center"), "Center": ("center", "center"), "Center Right": ("right", "center"), |
| "Bottom Left": ("left", "bottom"), "Bottom Center": ("center", "bottom"), "Bottom Right": ("right", "bottom") |
| } |
| actual_pos = pos_map.get(position, ("center", "bottom")) |
|
|
| |
| txt_clip = TextClip( |
| text=overlay_text, |
| font_size=font_size, |
| color=text_color, |
| font="Arial", |
| method="caption", |
| size=(clip.w * 0.8, None), |
| stroke_color=stroke_color, |
| stroke_width=stroke_width |
| ).with_start(start_time).with_end(end_time).with_position(actual_pos) |
|
|
| |
| clips_to_composite = [clip] |
| if bg_opacity > 0: |
| bg_clip = ColorClip( |
| size=(txt_clip.w + 20, txt_clip.h + 20), |
| color=tuple(int(bg_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)) |
| ).with_opacity(bg_opacity).with_start(start_time).with_end(end_time).with_position(actual_pos) |
| clips_to_composite.append(bg_clip) |
| |
| clips_to_composite.append(txt_clip) |
| |
| final_clip = CompositeVideoClip(clips_to_composite) |
| |
| output_filename = f"overlay_{uuid.uuid4().hex}.mp4" |
| output_path = os.path.join(OUTPUT_DIR, output_filename) |
| |
| |
| final_clip.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| temp_audiofile="temp-audio.m4a", |
| remove_temp=True, |
| threads=4 |
| ) |
| |
| |
| clip.close() |
| final_clip.close() |
| |
| return output_filename |
| except Exception as e: |
| logger.error(f"MoviePy overlay failed: {e}") |
| return None |
|
|
| |
|
|
| @app.route('/api/nexus/mermaid', methods=['POST']) |
| def nexus_mermaid(): |
| """Generates Mermaid syntax for diagrams.""" |
| try: |
| data = request.json or {} |
| diagram_type = data.get("type", "mindmap") |
| structure = data.get("structure", {}) |
| |
| |
| mermaid_code = f"{diagram_type}\n" |
| |
| def build_mermaid(obj, indent=" "): |
| code = "" |
| if isinstance(obj, dict): |
| for key, val in obj.items(): |
| code += f"{indent}{key}\n" |
| code += build_mermaid(val, indent + " ") |
| elif isinstance(obj, list): |
| for item in obj: |
| code += f"{indent}{item}\n" |
| return code |
|
|
| mermaid_code += build_mermaid(structure) |
| |
| return jsonify({ |
| "status": "success", |
| "syntax": mermaid_code, |
| "type": diagram_type |
| }) |
| except Exception as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route('/api/nexus/chart', methods=['POST']) |
| def nexus_chart(): |
| """Returns structured data for Chart.js rendering.""" |
| try: |
| data = request.json or {} |
| chart_type = data.get("type", "bar") |
| raw_data = data.get("data", []) |
| labels = data.get("labels", []) |
| title = data.get("title", "Nexa Analytics") |
| |
| return jsonify({ |
| "status": "success", |
| "chart_config": { |
| "type": chart_type, |
| "data": { |
| "labels": labels, |
| "datasets": [{ |
| "label": title, |
| "data": raw_data, |
| "backgroundColor": "rgba(16, 163, 127, 0.2)", |
| "borderColor": "rgba(16, 163, 127, 1)", |
| "borderWidth": 1 |
| }] |
| }, |
| "options": { |
| "responsive": True, |
| "plugins": { |
| "legend": {"position": "top"}, |
| "title": {"display": True, "text": title} |
| } |
| } |
| } |
| }) |
| except Exception as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
|
|
| @app.route('/api/video/overlay', methods=['POST']) |
| def video_overlay_api(): |
| try: |
| data = request.json or {} |
| video_url = data.get("video_url") |
| text = data.get("text", "").strip() |
| |
| if not video_url or not text: |
| return jsonify({"error": "Missing video_url or text"}), 400 |
| |
| |
| if video_url.startswith("/static/outputs/"): |
| local_path = os.path.join(OUTPUT_DIR, video_url.replace("/static/outputs/", "")) |
| else: |
| |
| return jsonify({"error": "Invalid video source"}), 400 |
| |
| if not os.path.exists(local_path): |
| return jsonify({"error": "Video file not found"}), 404 |
|
|
| params = { |
| "overlay_text": text, |
| "position": data.get("position", "Bottom Center"), |
| "font_size": int(data.get("font_size", 60)), |
| "text_color": data.get("text_color", "#FFFFFF"), |
| "stroke_width": int(data.get("stroke_width", 2)), |
| "stroke_color": data.get("stroke_color", "#000000"), |
| "bg_color": data.get("bg_color", "#000000"), |
| "bg_opacity": float(data.get("bg_opacity", 0.5)), |
| "start_time": float(data.get("start_time", 0)), |
| "end_time": float(data.get("end_time", 9999)) |
| } |
|
|
| output_file = process_video_overlay(local_path, **params) |
| |
| if output_file: |
| return jsonify({ |
| "status": "success", |
| "video_url": f"/static/outputs/{output_file}", |
| "message": "Text overlay added successfully" |
| }) |
| else: |
| return jsonify({"error": "Processing failed"}), 500 |
| |
| except Exception as e: |
| logger.error(f"Overlay API error: {e}") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route('/api/analyze-file', methods=['POST']) |
| def analyze_file(): |
| if 'file' not in request.files: |
| return jsonify({"error": "No file uploaded"}), 400 |
| file = request.files['file'] |
| filename = file.filename |
| ext = filename.split('.')[-1].lower() |
| |
| |
| file_bytes = io.BytesIO(file.read()) |
| |
| try: |
| text_content = "" |
| if ext == 'pdf': |
| with pdfplumber.open(file_bytes) as pdf: |
| text_content = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()]) |
| elif ext == 'docx': |
| doc = docx.Document(file_bytes) |
| text_content = "\n".join([para.text for para in doc.paragraphs]) |
| elif ext == 'txt': |
| file_bytes.seek(0) |
| text_content = file_bytes.read().decode('utf-8', errors='ignore') |
| elif ext in ['png', 'jpg', 'jpeg', 'webp']: |
| img = Image.open(file_bytes) |
| text_content = pytesseract.image_to_string(img) |
| else: |
| return jsonify({"error": f"Unsupported file type: {ext}"}), 400 |
| if not text_content.strip(): |
| return jsonify({"error": "Could not extract text from the file."}), 400 |
| truncated_text = text_content[:4000] |
| return jsonify({"filename": filename, "content": truncated_text, "full_length": len(text_content), "status": "success"}) |
| except Exception as e: |
| logger.error(f"File analysis failed: {e}") |
| return jsonify({"error": f"Failed to process file: {str(e)}"}), 500 |
|
|
| @app.route('/static/outputs/<path:filename>') |
| def serve_output(filename): |
| from flask import send_from_directory |
| return send_from_directory(OUTPUT_DIR, filename) |
|
|
| @app.route('/api/ide/execute', methods=['POST']) |
| def ide_execute(): |
| try: |
| data = request.json or {} |
| action = data.get("action") |
| payload = data.get("payload", {}) |
|
|
| if action == "list_files": |
| files = [] |
| for root, dirs, filenames in os.walk(os.getcwd()): |
| if "venv" in root or ".git" in root or "__pycache__" in root: |
| continue |
| for f in filenames: |
| rel_path = os.path.relpath(os.path.join(root, f), os.getcwd()) |
| files.append(rel_path) |
| return jsonify({"status": "success", "files": files}) |
|
|
| elif action == "create_file": |
| path = payload.get("path") |
| content = payload.get("content", "") |
| if not path: |
| return jsonify({"status": "error", "error": "Path required"}), 400 |
| |
| |
| abs_path = os.path.abspath(os.path.join(os.getcwd(), path)) |
| if not abs_path.startswith(os.getcwd()): |
| return jsonify({"status": "error", "error": "Invalid path"}), 403 |
| |
| os.makedirs(os.path.dirname(abs_path), exist_ok=True) |
| with open(abs_path, 'w', encoding='utf-8') as f: |
| f.write(content) |
| return jsonify({"status": "success", "message": f"File {path} created"}) |
|
|
| elif action == "run_command": |
| command = payload.get("command") |
| if not command: |
| return jsonify({"status": "error", "error": "Command required"}), 400 |
| |
| |
| forbidden = ["rm -rf", "format", "mkfs", "> /dev/"] |
| if any(f in command for f in forbidden): |
| return jsonify({"status": "error", "error": "Forbidden command"}), 403 |
| |
| import subprocess |
| try: |
| result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30) |
| return jsonify({ |
| "status": "success", |
| "stdout": result.stdout, |
| "stderr": result.stderr, |
| "code": result.returncode |
| }) |
| except Exception as e: |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
| return jsonify({"status": "error", "error": "Unknown action"}), 400 |
| except Exception as e: |
| logger.error(f"IDE Execute error: {e}") |
| return jsonify({"status": "error", "error": str(e)}), 500 |
|
|
| @app.route('/api/ide/plan', methods=['POST']) |
| def ide_plan(): |
| try: |
| data = request.json or {} |
| command = data.get("command", "").strip() |
| if not command: |
| return jsonify({"error": "No command provided"}), 400 |
|
|
| |
| files = [] |
| for root, dirs, filenames in os.walk(os.getcwd()): |
| if "venv" in root or ".git" in root or "__pycache__" in root: |
| continue |
| for f in filenames: |
| files.append(os.path.relpath(os.path.join(root, f), os.getcwd())) |
| |
| |
| prompt = IDE_PLAN_PROMPT.format(files=", ".join(files[:50]), command=command) |
| |
| |
| |
| |
| |
| plan_response = "" |
| groq_key = os.environ.get("GROQ_API_KEY") |
|
|
| |
| if groq_key: |
| try: |
| client = groq.Groq(api_key=groq_key) |
| resp = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=[{"role": "user", "content": prompt}], |
| stream=False, |
| response_format={"type": "json_object"} |
| ) |
| plan_response = resp.choices[0].message.content |
| except Exception as e: |
| logger.warning(f"Groq planning failed: {e}") |
|
|
| |
| if not plan_response and llm_primary: |
| output = llm_primary( |
| f"<|system|>\n{prompt}<|end|>\n<|user|>\nGenerate the plan in JSON format.<|end|>\n<|assistant|>\n", |
| max_tokens=1024, |
| stop=["<|end|>", "<|user|>"] |
| ) |
| plan_response = output["choices"][0]["text"] |
|
|
| try: |
| |
| json_match = re.search(r'\{.*\}', plan_response, re.DOTALL) |
| if json_match: |
| plan_data = json.loads(json_match.group(0)) |
| return jsonify(plan_data) |
| else: |
| raise ValueError("No JSON found in response") |
| except Exception as e: |
| |
| if "overlay" in command.lower(): |
| return jsonify({ |
| "plan": [ |
| {"id": 1, "action": "run_command", "description": "Check MoviePy version", "command": "pip show moviepy"}, |
| {"id": 2, "action": "edit_file", "path": "app.py", "description": "Ensure overlay endpoint is optimized"} |
| ], |
| "stats": {"costSavings": "$10"} |
| }) |
| return jsonify({"error": "Failed to generate structured plan. Please try a simpler command."}), 500 |
|
|
| except Exception as e: |
| logger.error(f"IDE Plan error: {e}") |
| return jsonify({"error": str(e)}), 500 |
|
|
| @app.route('/api/ide/debug', methods=['POST']) |
| def ide_debug(): |
| return jsonify({"error": "Local debugging is coming soon.", "suggestions": [], "root_cause": "N/A"}) |
|
|
| @app.route('/api/status', methods=['GET']) |
| def status_check(): |
| return jsonify({ |
| "status": "online", |
| "backend": "Nexa AI Unified", |
| "timestamp": time.ctime(), |
| "multi_agent_system": "available" if multi_agent_system else "unavailable", |
| "search_engines": ["DuckDuckGo", "Bing"] |
| }) |
|
|
| |
| @app.route('/api/multi-agent/process', methods=['POST']) |
| def multi_agent_process(): |
| """Process a request through the full multi-agent system""" |
| try: |
| data = request.json or {} |
| user_request = data.get("request", "") |
| |
| if not user_request: |
| return jsonify({"error": "Missing 'request' parameter"}), 400 |
| |
| if not multi_agent_system: |
| return jsonify({"error": "Multi-Agent System not initialized"}), 503 |
| |
| logger.info(f"Received multi-agent request: {user_request[:100]}") |
| |
| result = multi_agent_system.process_request(user_request) |
| |
| return jsonify(result) |
| |
| except Exception as e: |
| logger.error(f"Multi-agent processing failed: {e}") |
| return jsonify({"error": "Internal server error during multi-agent processing"}), 500 |
|
|
| @app.route('/api/multi-agent/agents', methods=['GET']) |
| def list_agents(): |
| """List all available agents in the system""" |
| agents = [ |
| "IntakeAgent", |
| "PlanningAgent", |
| "OrchestratorAgent", |
| "ResearchAgent", |
| "AnalysisAgent", |
| "CodingAgent", |
| "WritingAgent", |
| "ValidationAgent", |
| "FeedbackAgent", |
| "OptimizationAgent" |
| ] |
| return jsonify({ |
| "agents": agents, |
| "total": len(agents) |
| }) |
|
|
| @app.errorhandler(500) |
| def internal_error(error): |
| return jsonify({"error": "Internal Server Error", "message": "An unexpected error occurred on the server."}), 500 |
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860) |