import os import re import time import random import logging import io import json import requests import urllib.parse import cv2 import numpy as np from PIL import Image, ImageDraw, ImageFont from flask import Flask, request, jsonify, Response from flask_cors import CORS from flask_limiter import Limiter from flask_limiter.util import get_remote_address from datetime import datetime from dotenv import load_dotenv from llama_cpp import Llama from huggingface_hub import hf_hub_download import pdfplumber import docx import pytesseract from moviepy import VideoFileClip, TextClip, CompositeVideoClip import tempfile import uuid from gradio_client import Client import shutil import sqlite3 import jwt import groq from functools import wraps from bs4 import BeautifulSoup from passlib.context import CryptContext # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) load_dotenv() # Initialize password context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Initialize Multi-Agent System try: from agents.multi_agent_system import MultiAgentSystem multi_agent_system = MultiAgentSystem() logger.info("Multi-Agent System initialized") except Exception as e: logger.error(f"Failed to initialize Multi-Agent System: {e}") multi_agent_system = None app = Flask(__name__) CORS(app, supports_credentials=True) app.config['SECRET_KEY'] = os.environ.get("SECRET_KEY", "nexa-ai-secret-key-2026") # --- Database Setup --- DB_PATH = "nexa_ai.db" def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): with get_db_connection() as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, email TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, avatar TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, title TEXT NOT NULL, model TEXT, pinned INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, conversation_id INTEGER NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, feedback INTEGER, -- 1 for up, -1 for down feedback_text TEXT, FOREIGN KEY (conversation_id) REFERENCES conversations (id) ) ''') # FIXED: Ensure a default admin account exists and has the correct password try: admin_user = conn.execute('SELECT * FROM users WHERE username = ?', ('admin',)).fetchone() admin_pass_hash = pwd_context.hash("admin123") if not admin_user: conn.execute( 'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)', ('admin', 'admin@nexa.ai', admin_pass_hash) ) logger.info(f"Default admin account created: admin / admin123 (Hash: {admin_pass_hash[:10]}...)") else: # Force update password hash to ensure it matches admin123 conn.execute( 'UPDATE users SET password_hash = ?, email = ? WHERE username = ?', (admin_pass_hash, 'admin@nexa.ai', 'admin') ) logger.info(f"Default admin account password reset to: admin123 (Hash: {admin_pass_hash[:10]}...)") # Self-verification check test_verify = pwd_context.verify("admin123", admin_pass_hash) logger.info(f"Admin self-verification test: {'PASSED' if test_verify else 'FAILED'}") except Exception as e: logger.error(f"Error creating/resetting default admin: {e}") conn.commit() init_db() # --- Auth Helpers --- def token_required(f): @wraps(f) def decorated(*args, **kwargs): token = request.cookies.get('nexa_token') if not token: return jsonify({'message': 'Token is missing!'}), 401 try: data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) current_user_id = data['user_id'] except: return jsonify({'message': 'Token is invalid!'}), 401 return f(current_user_id, *args, **kwargs) return decorated # Output directory for generated media OUTPUT_DIR = os.path.join(os.getcwd(), "static", "outputs") os.makedirs(OUTPUT_DIR, exist_ok=True) # --- Rate Limiting (Disabled for Free Forever) --- limiter = Limiter( get_remote_address, app=app, default_limits=["10000 per day", "1000 per hour"], # Effectively unlimited storage_uri="memory://", ) # --- Configuration & Model Selection --- OLLAMA_URL = "http://localhost:11434" # NOTE: Swapped from TinyLlama-1.1B to Phi-3-mini (3.8B). TinyLlama is a very # weak model and was the main reason replies looked low quality whenever Groq # was unavailable. Phi-3-mini is a much stronger instruction-following model # while still being small enough to run via llama_cpp on CPU. HF_FALLBACK_REPO = "microsoft/Phi-3-mini-4k-instruct-gguf" HF_FALLBACK_FILE = "Phi-3-mini-4k-instruct-q4.gguf" IS_HF_SPACE = os.environ.get("SPACE_ID") is not None # Load Primary Local Model (Phi-3-mini fallback engine) llm_primary = None try: logger.info("Initializing Phi-3-mini Primary Engine...") # Check for local file first (as seen in the screenshot) local_model_path = os.path.join(os.getcwd(), HF_FALLBACK_FILE) if os.path.exists(local_model_path): logger.info(f"Using local model file: {local_model_path}") model_path = local_model_path else: logger.info("Local model not found, downloading from Hugging Face...") model_path = hf_hub_download(repo_id=HF_FALLBACK_REPO, filename=HF_FALLBACK_FILE) llm_primary = Llama( model_path=model_path, n_ctx=4096, n_threads=4, verbose=False ) logger.info("Phi-3-mini engine loaded successfully.") except Exception as e: logger.error(f"Failed to load Phi-3-mini model: {e}") SYSTEM_PROMPT = """You are Nexa AI, a professional technical execution engine. ### **WRITING PRINCIPLES**: 1. Match response depth and length to the question. A simple question gets a short, direct answer in plain prose — no headers, no bullet template. A complex technical request can use structure (headers, numbered steps, tables, code blocks) where it genuinely improves clarity. 2. Write in clear, natural language. Avoid filler words ("just", "really", "very", "basically") but do not strip the response down to unnatural telegraphic phrasing either. 3. Never simulate dialogue, never include "AI:"/"User:" labels, never narrate what you're about to do — just answer. 4. Don't force emoji, bold labels, or section headers onto every message. Use them only when they add real value (e.g., a multi-step technical walkthrough, a comparison table). 5. If the user asks for code, give clean, correct, runnable code with only as much explanation as is useful — don't pad it with restating what the code obviously does. 6. Be honest about uncertainty instead of inventing confident-sounding details. ### **WHEN WEB RESULTS ARE PROVIDED**: - Synthesize the sources into your own words; cite specific claims with [N] tied to the source list. - End with a short **Reference** section listing each [N] as a markdown link. - Never cite Wikipedia. ### **WHEN A TOOL FAILS**: Briefly state what failed and what you'll try next (or what the user can do), in one or two sentences — no need for a formatted alert block. Current Date: {date} """ IDE_PLAN_PROMPT = """You are the Nexa AI IDE Engine. Your task is to convert a high-level user request into a concrete, multi-step technical execution plan. ### **OUTPUT FORMAT**: You MUST return a JSON object with the following structure: { "plan": [ { "id": 1, "action": "create_file" | "edit_file" | "run_command", "path": "relative/path/to/file", "description": "Short explanation of what this step does", "content": "The code content (only for create_file/edit_file)", "command": "The shell command (only for run_command)" } ], "stats": { "costSavings": "e.g., $15" } } ### **GUIDELINES**: 1. **Be Precise**: Use exact file paths. 2. **Be Modular**: Each step should do exactly one thing. 3. **Be Safe**: Do not suggest commands that delete system files. 4. **Project Context**: The user is working on a Flask/Python/React project. Current Files: {files} User Command: {command} """ # ============ BULLETPROOF SEARCH SYSTEM ============ class SearchManager: """Multi-engine search with automatic fallback. Never fails completely.""" def __init__(self): self.last_search_time = 0 self.min_delay = 2 self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0" ] def _get_headers(self): return {"User-Agent": random.choice(self.user_agents)} def _rate_limit(self): elapsed = time.time() - self.last_search_time if elapsed < self.min_delay: time.sleep(self.min_delay - elapsed) self.last_search_time = time.time() def search_duckduckgo(self, query, max_results=5): try: self._rate_limit() from duckduckgo_search import DDGS with DDGS(headers=self._get_headers(), timeout=15) as ddgs: results = list(ddgs.text(query, max_results=max_results)) if results: logger.info(f"DuckDuckGo: {len(results)} results") return [{"title": r["title"], "href": r["href"], "body": r["body"]} for r in results] except Exception as e: logger.warning(f"DuckDuckGo failed: {e}") return [] def search_bing(self, query, max_results=5): try: self._rate_limit() encoded_query = urllib.parse.quote(query) url = f"https://www.bing.com/search?q={encoded_query}" response = requests.get(url, headers=self._get_headers(), timeout=10) response.raise_for_status() html = response.text results = [] result_blocks = re.findall(r'
]*>(.*?)
', block, re.DOTALL) body = re.sub(r'<[^>]+>', '', snippet_match.group(1)).strip() if snippet_match else "" if href.startswith('/'): href = 'https://www.bing.com' + href results.append({"title": title, "href": href, "body": body}) if results: logger.info(f"Bing: {len(results)} results") return results except Exception as e: logger.warning(f"Bing failed: {e}") return [] def search_wikipedia(self, query, max_results=3): try: self._rate_limit() url = "https://en.wikipedia.org/w/api.php" params = { "action": "query", "list": "search", "srsearch": query, "format": "json", "srlimit": max_results } response = requests.get(url, params=params, headers=self._get_headers(), timeout=10) data = response.json() results = [] for item in data.get("query", {}).get("search", []): title = item["title"] page_url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}" snippet = re.sub(r'|', '', item["snippet"]) results.append({"title": title, "href": page_url, "body": snippet}) if results: logger.info(f"Wikipedia: {len(results)} results") return results except Exception as e: logger.warning(f"Wikipedia failed: {e}") return [] def visit_website(self, url, timeout=15): """Fetches and extracts text from a URL for deeper research.""" try: self._rate_limit() response = requests.get(url, headers=self._get_headers(), timeout=timeout) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Remove noise for script in soup(["script", "style", "nav", "footer", "header", "aside"]): script.decompose() text = soup.get_text(separator=' ', strip=True) # Basic cleaning text = re.sub(r'\s+', ' ', text) return text[:10000] # Return first 10k chars except Exception as e: logger.error(f"Failed to visit {url}: {e}") return None def search(self, query, max_results=5): if not query or not query.strip(): return {"engine": "None", "results": [], "success": False, "error": "Empty query"} # Strip common prefixes for better engine results clean_query = re.sub(r'^(search for|find|look up|what is|who is|where is)\s+', '', query, flags=re.IGNORECASE).strip() logger.info(f"Searching for: {clean_query}") # REMOVED Wikipedia as per Autonomous Research Agent protocol engines = [self.search_duckduckgo, self.search_bing] for engine_func in engines: engine_name = engine_func.__name__.replace('search_', '').capitalize() try: results = engine_func(clean_query, max_results) if results: # Filter out any accidentally returned Wikipedia results filtered_results = [r for r in results if 'wikipedia.org' not in r.get('href', '').lower()] if filtered_results: return {"engine": engine_name, "results": filtered_results, "success": True, "error": None} except Exception as e: logger.warning(f"Engine {engine_name} crashed: {e}") logger.error("All search engines failed or were blocked") return {"engine": "None", "results": [], "success": False, "error": "Search unavailable or results blocked. Please try again later."} search_manager = SearchManager() # ============ TOOL DETECTION ============ def _kw_in(text, keyword): """Match a keyword as a whole word/phrase, not a raw substring. Plain `in` matching caused false positives like 'photo' firing inside 'photosynthesis', or 'now' firing inside 'know' — both of which made the bot misfire image-gen/search on ordinary questions.""" if " " in keyword: return keyword in text return re.search(rf"\b{re.escape(keyword)}\b", text) is not None def detect_tools(message): message = message.lower().strip() tools = [] # 1. Explicit Search Commands (User clearly wants a search) explicit_search_commands = ["/search", "/news", "/youtube", "search for", "look up", "browse the web for"] if any(_kw_in(message, kw) for kw in explicit_search_commands): tools.append("search") return tools # Return immediately if explicit # 2. Real-time / News Indicators (Likely needs up-to-date info). # These matter regardless of whether the message is phrased as a # question — "latest iPhone price" should search just as much as # "what's the latest iPhone price?" did (the old code required an # exact "what " prefix, so "what's..." silently never matched). real_time_keywords = [ "latest", "news", "today", "current", "weather", "stock", "price", "now", "recent", "yesterday", "tonight", "2024", "2025", "2026", "score", "game", "match", "live" ] if any(_kw_in(message, kw) for kw in real_time_keywords): tools.append("search") # 4. Other tools (Image/File/Video/Diagram) diagram_keywords = [ "diagram", "mind map", "mindmap", "flowchart", "flow chart", "architecture diagram", "sequence diagram", "er diagram", "entity relationship", "gantt chart", "org chart", "tree diagram", "visualize the structure", "show me the steps visually" ] chart_keywords = ["bar chart", "pie chart", "line chart", "plot the data", "graph the numbers", "graph of"] is_diagram_request = any(_kw_in(message, kw) for kw in diagram_keywords) or any(_kw_in(message, kw) for kw in chart_keywords) if is_diagram_request: tools.append("diagram") image_keywords = ["image", "photo", "paint", "picture", "generate image", "thumbnail"] if not is_diagram_request: image_keywords += ["draw", "visualize"] if any(_kw_in(message, kw) for kw in image_keywords) or ("create" in message and "image" in message) or ("make" in message and "image" in message): tools.append("image") video_keywords = ["video", "movie", "animation", "generate video", "create video", "make a video"] if any(_kw_in(message, kw) for kw in video_keywords) or ("create" in message and "video" in message) or ("make" in message and "video" in message): tools.append("video") file_keywords = ["pdf", "document", "file", "analyze file", "read file", "upload"] if any(_kw_in(message, kw) for kw in file_keywords): tools.append("file") return tools # ============ FORMATTING ============ def format_search_results(data): if not data.get("success"): error = data.get("error", "Search failed") return f"Warning: {error}" results = data.get("results", []) if not results: return "No results found for this query." formatted = [] formatted.append(f"Web Search Results from {data['engine']}:") formatted.append("-" * 30) for i, r in enumerate(results, 1): title = r.get("title", "No title") url = r.get("href", "#") body = r.get("body", "No description") body = re.sub(r'[\s]+', ' ', body).strip()[:600] # Increased context formatted.append(f"Source [{i}]: {title}") formatted.append(f"URL: {url}") formatted.append(f"Content: {body}") formatted.append("-" * 15) formatted.append("\n### MANDATORY RESEARCH PROTOCOL ###") formatted.append("1. Compose a well-structured answer based on the provided sources.") formatted.append("2. Insert numbered citations [N] immediately after every fact, statistic, or quote.") formatted.append("3. After the response, output a dedicated Reference section formatted exactly like this:") formatted.append("\n---\n**Reference**\n\n[1] [Source Title](URL)\n[2] [Source Title](URL)\n---") formatted.append("\nStrictly adhere to this format. Do not use Wikipedia.") return "\n".join(formatted) def clean_search_query(query): prefixes = [ "search for", "find out about", "look up", "check", "tell me about", "what is", "who is", "where is", "/search", "search:", "find:" ] query = query.lower().strip() for prefix in prefixes: if query.startswith(prefix): query = query[len(prefix):].strip() return query # ============ TIME BOT ============ def get_time_response(message): message = message.lower() # ONLY exact phrases or specific combinations time_phrases = ["what time is it", "current time", "what's the date", "whats the date"] # Require at least 2 time-related words using regex boundaries time_words = ["time", "date", "clock", "hour", "minute"] word_matches = [word for word in time_words if re.search(rf"\b{word}\b", message)] if any(phrase in message for phrase in time_phrases) or len(word_matches) >= 2: now = datetime.now() return f"The current time is **{now.strftime('%H:%M:%S')}**. Today is **{now.strftime('%A, %Y-%m-%d')}**." return None # ============ AUTH ROUTES ============ @app.route('/api/auth/signup', methods=['POST']) def signup(): data = request.json username = data.get('username', '').strip() email = data.get('email', '').strip() password = data.get('password', '').strip() if not email or not password: return jsonify({"error": "Missing required fields"}), 400 if not username: username = email.split('@')[0] password_hash = pwd_context.hash(password) try: with get_db_connection() as conn: conn.execute( 'INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)', (username, email, password_hash) ) conn.commit() return jsonify({"message": "User created successfully"}), 201 except sqlite3.IntegrityError: return jsonify({"error": "Username or email already exists"}), 409 except Exception as e: logger.error(f"Signup error: {e}") return jsonify({"error": "Internal server error"}), 500 @app.route('/api/auth/login', methods=['POST']) def login(): data = request.json # FIXED: Allow login with either email or username login_id = data.get('email', '').strip() password = data.get('password', '').strip() if not login_id or not password: return jsonify({"error": "Missing credentials"}), 400 with get_db_connection() as conn: # Check both email and username columns user = conn.execute( 'SELECT * FROM users WHERE email = ? OR username = ?', (login_id, login_id) ).fetchone() # FIXED: Emergency Fallback to guarantee "admin / admin123" access if not user and login_id == "admin" and password == "admin123": try: with get_db_connection() as conn: admin_pass_hash = pwd_context.hash("admin123") conn.execute( 'INSERT OR IGNORE INTO users (username, email, password_hash) VALUES (?, ?, ?)', ('admin', 'admin@nexa.ai', admin_pass_hash) ) conn.commit() user = conn.execute('SELECT * FROM users WHERE username = ?', ('admin',)).fetchone() logger.info("Emergency fallback: Admin user recreated on login.") except Exception as e: logger.error(f"Emergency fallback failed: {e}") if user: logger.info(f"Login attempt for user: {user['username']} (ID: {user['id']})") # Log the result of verification (be careful not to log the actual password) is_valid = pwd_context.verify(password, user['password_hash']) logger.info(f"Password verification result for {user['username']}: {is_valid}") if is_valid: token = jwt.encode({ 'user_id': user['id'], 'username': user['username'], 'exp': time.time() + (24 * 3600) # 24 hours }, app.config['SECRET_KEY'], algorithm="HS256") resp = jsonify({ "message": "Login successful", "user": { "id": user['id'], "username": user['username'], "email": user['email'], "avatar": user['avatar'] } }) resp.set_cookie('nexa_token', token, httponly=True, samesite='Lax', max_age=24*3600) return resp else: logger.warning(f"Invalid password for user: {user['username']}") else: logger.warning(f"User not found for login ID: {login_id}") return jsonify({"error": "Invalid credentials"}), 401 @app.route('/api/auth/logout', methods=['POST']) def logout(): resp = jsonify({"message": "Logged out successfully"}) resp.set_cookie('nexa_token', '', expires=0) return resp @app.route('/api/auth/me', methods=['GET']) @token_required def get_me(current_user_id): with get_db_connection() as conn: user = conn.execute('SELECT id, username, email, avatar FROM users WHERE id = ?', (current_user_id,)).fetchone() if user: return jsonify(dict(user)) return jsonify({"error": "User not found"}), 404 # ============ CONVERSATION ROUTES ============ @app.route('/api/video/generate', methods=['POST']) @limiter.limit("100 per minute") def generate_video(): """Unified video generation with fallback (Upsampler -> HF Spaces -> Pollinations).""" try: data = request.json or {} prompt = data.get("prompt", "").strip() if not prompt: return jsonify({"status": "error", "error": "Prompt is required"}), 400 # Clean prompt clean_prompt = re.sub(r'^(generate|create|make)\s+(a\s+)?video\s+(about|of|for)?\s*', '', prompt, flags=re.IGNORECASE).strip() logger.info(f"Generating video for: {clean_prompt}") video_dir = os.path.join(os.getcwd(), "static", "outputs") os.makedirs(video_dir, exist_ok=True) # Step 1: Try Upsampler API if key exists api_key = os.environ.get("UPSAMPLER_API_KEY") if api_key: try: headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} body = {"prompt": clean_prompt, "model": "wan-2.2-5b-fast", "width": 1024, "height": 576} response = requests.post("https://upsampler.com/api/v1/video/generate", headers=headers, json=body, timeout=60) if response.status_code == 200: res_data = response.json() video_url = res_data.get("video_url") if video_url: video_filename = f"vid_{uuid.uuid4().hex}.mp4" local_path = os.path.join(video_dir, video_filename) v_res = requests.get(video_url, stream=True) if v_res.status_code == 200: with open(local_path, 'wb') as f: shutil.copyfileobj(v_res.raw, f) return jsonify({"status": "success", "video_url": f"/static/outputs/{video_filename}", "is_motion_image": False, "provider": "Upsampler", "prompt": clean_prompt}) except Exception as e: logger.error(f"Upsampler failed: {e}") # Step 2: Try Hugging Face Spaces (Gradio) video_spaces = ["THUDM/CogVideoX-5b", "ali-vilas/text-to-video-ms-1.7b", "fffiloni/zeroscope-v2-xl"] for space in video_spaces: try: logger.info(f"🎬 Attempting Video Gen via {space}") client = Client(space) result = client.predict(clean_prompt, api_name="/predict") if result: if isinstance(result, list): result = result[0] output_filename = f"ai_video_{uuid.uuid4().hex}.mp4" output_path = os.path.join(video_dir, output_filename) if result.startswith("http"): resp = requests.get(result, stream=True, timeout=30) with open(output_path, 'wb') as f: shutil.copyfileobj(resp.raw, f) elif os.path.exists(result): shutil.copy(result, output_path) return jsonify({"status": "success", "video_url": f"/static/outputs/{output_filename}", "is_motion_image": False, "provider": space, "prompt": clean_prompt}) except Exception as e: logger.warning(f"Space {space} failed: {e}") # Step 3: Final Fallback: Pollinations "Motion Image" try: logger.info(f"🚀 Falling back to Pollinations Motion") seed = int(time.time()) motion_url = f"https://image.pollinations.ai/prompt/{requests.utils.quote(clean_prompt)}?seed={seed}&nologo=true&width=1024&height=576" return jsonify({"status": "success", "video_url": motion_url, "is_motion_image": True, "provider": "Pollinations AI (Motion)", "prompt": clean_prompt}) except Exception as e: logger.error(f"❌ All video methods failed: {e}") return jsonify({"status": "error", "error": "Our video generation servers are temporarily unavailable."}), 503 except Exception as e: logger.error(f"Video generation crash: {e}") return jsonify({"status": "error", "error": str(e)}), 500 @app.route('/api/video/overlay-url', methods=['POST']) @limiter.limit("100 per minute") def overlay_video_url(): try: data = request.json or {} video_url = data.get("video_url", "").strip() text = data.get("text", "").strip() pos = data.get("pos", "Bottom Center") font_size = int(data.get("font_size", 60)) color = data.get("color", "#FFFFFF") bg_opacity = float(data.get("bg_opacity", 0.5)) if not video_url or not text: return jsonify({"status": "error", "error": "Video URL and text are required"}), 400 video_dir = os.environ.get("VIDEO_TEMP_DIR", "./temp/videos") os.makedirs(video_dir, exist_ok=True) # 1. Download original video input_filename = f"input_{uuid.uuid4()}.mp4" input_path = os.path.join(video_dir, input_filename) # Handle relative internal URLs if video_url.startswith('/'): # Convert to local path if it's our own temp video video_url = video_url.lstrip('/') local_input = os.path.join(os.getcwd(), video_url) if os.path.exists(local_input): shutil.copy(local_input, input_path) else: return jsonify({"status": "error", "error": "Internal video source not found"}), 404 else: v_res = requests.get(video_url, stream=True) if v_res.status_code == 200: with open(input_path, 'wb') as f: for chunk in v_res.iter_content(8192): f.write(chunk) else: return jsonify({"status": "error", "error": "Failed to fetch source video"}), 400 # 2. Process with MoviePy output_filename = process_video_overlay( input_path=input_path, overlay_text=text, position=pos, font_size=font_size, text_color=color, bg_opacity=bg_opacity ) if output_filename: os.remove(input_path) return jsonify({ "status": "success", "video_url": f"/static/outputs/{output_filename}" }) else: return jsonify({"status": "error", "error": "MoviePy processing failed"}), 500 except Exception as e: logger.error(f"Overlay error: {e}") return jsonify({"status": "error", "error": str(e)}), 500 @app.route('/api/conversations', methods=['GET']) @token_required def get_conversations(current_user_id): with get_db_connection() as conn: convs = conn.execute( 'SELECT * FROM conversations WHERE user_id = ? ORDER BY pinned DESC, updated_at DESC', (current_user_id,) ).fetchall() return jsonify([dict(c) for c in convs]) @app.route('/api/conversations', methods=['POST']) @token_required def create_conversation(current_user_id): data = request.json title = data.get('title', 'New Chat') model = data.get('model', 'phi3:mini') with get_db_connection() as conn: cursor = conn.execute( 'INSERT INTO conversations (user_id, title, model) VALUES (?, ?, ?)', (current_user_id, title, model) ) conn.commit() conv_id = cursor.lastrowid conv = conn.execute('SELECT * FROM conversations WHERE id = ?', (conv_id,)).fetchone() return jsonify(dict(conv)), 201 @app.route('/api/messages', methods=['POST']) @token_required def save_message(current_user_id): data = request.json conv_id = data.get('conversation_id') role = data.get('role', 'user') content = data.get('content') if not conv_id or not content: return jsonify({"error": "Missing fields"}), 400 with get_db_connection() as conn: # Verify ownership conv = conn.execute('SELECT * FROM conversations WHERE id = ? AND user_id = ?', (conv_id, current_user_id)).fetchone() if not conv: return jsonify({"error": "Conversation not found"}), 404 conn.execute( 'INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)', (conv_id, role, content) ) conn.commit() return jsonify({"status": "success"}), 201 @app.route('/api/conversations/