Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import cv2 | |
| import time | |
| import uuid | |
| import json | |
| import logging | |
| import re | |
| import shutil | |
| import atexit | |
| import subprocess | |
| import tempfile | |
| import requests | |
| import numpy as np | |
| import itertools | |
| import base64 | |
| from flask import Flask, request, send_file, render_template_string, jsonify | |
| from dotenv import load_dotenv | |
| # Load environment variables if available | |
| load_dotenv() | |
| # ========================================== | |
| # 1. LOGGING & SYSTEM SETUP | |
| # ========================================== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', | |
| handlers=[logging.StreamHandler(sys.stdout)] | |
| ) | |
| logger = logging.getLogger("PicMagicPro_Stable") | |
| # Temporary folders setup | |
| UPLOAD_FOLDER = os.path.join(tempfile.gettempdir(), 'picmagic_uploads') | |
| OUTPUT_FOLDER = os.path.join(tempfile.gettempdir(), 'picmagic_outputs') | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
| app = Flask(__name__) | |
| app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB Limit | |
| # Automatic cleanup of old files (older than 1 hour) | |
| def start_cleanup(): | |
| """ | |
| Background cleaner to remove old temporary images and videos. | |
| Keeps the server storage from filling up. | |
| """ | |
| now = time.time() | |
| for folder in [UPLOAD_FOLDER, OUTPUT_FOLDER]: | |
| if not os.path.exists(folder): | |
| continue | |
| for f in os.listdir(folder): | |
| path = os.path.join(folder, f) | |
| try: | |
| if os.stat(path).st_mtime < now - 3600: | |
| if os.path.isfile(path): | |
| os.remove(path) | |
| logger.info(f"Cleaned up old file: {f}") | |
| except Exception as e: | |
| logger.error(f"Cleanup error for {f}: {e}") | |
| atexit.register(start_cleanup) | |
| # ========================================== | |
| # 2. KEY MANAGEMENT & MODEL HUNTING (APP.PY STYLE) | |
| # ========================================== | |
| # 1. Keys Collect Karna (Dynamic + App.py Hybrid Approach) | |
| # Hum environment se saari keys utha lenge jo 'gmni' se start hoti hain. | |
| GEMINI_API_KEYS = [ | |
| val for key, val in os.environ.items() | |
| if key.lower().startswith("gmni") and val.strip() | |
| ] | |
| # Agar environment me keys nahi mili, to fallback check (optional testing ke liye) | |
| if not GEMINI_API_KEYS: | |
| logger.warning("No keys found starting with 'gmni'. Checking for 'GEMINI_API_KEY'...") | |
| if os.environ.get("GEMINI_API_KEY"): | |
| GEMINI_API_KEYS.append(os.environ.get("GEMINI_API_KEY")) | |
| logger.info(f"Loaded {len(GEMINI_API_KEYS)} Gemini API Keys.") | |
| # 2. Key Cycler Setup (Itertools - The App.py Magic) | |
| # Yeh kabhi khatam nahi hoga, gol-gol ghumta rahega. | |
| if GEMINI_API_KEYS: | |
| gemini_key_cycler = itertools.cycle(GEMINI_API_KEYS) | |
| else: | |
| logger.critical("CRITICAL: No API keys found! App will fail on generation.") | |
| gemini_key_cycler = itertools.cycle(["placeholder_key_to_prevent_crash"]) | |
| # 3. Best Model Finder (Run ONCE at startup - App.py Logic) | |
| def find_best_gemini_vision_model(api_keys): | |
| """ | |
| Checks the first available key to determine the best model name. | |
| Does NOT run on every request. | |
| """ | |
| if not api_keys: | |
| return "gemini-1.5-flash" # Safe default | |
| test_key = api_keys[0] | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models?key={test_key}" | |
| response = requests.get(url, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| models = data.get('models', []) | |
| # Priority List (Flash is usually fastest and cheapest/free-tier friendly) | |
| for m in models: | |
| name = m['name'].split('/')[-1] | |
| if "gemini-1.5-flash" in name: | |
| return name | |
| # Fallback checks | |
| for m in models: | |
| if "vision" in m['name'] or "pro" in m['name']: | |
| return m['name'].split('/')[-1] | |
| except Exception as e: | |
| logger.error(f"Model discovery failed: {e}") | |
| return "gemini-1.5-flash" | |
| # Global Model Variable | |
| CURRENT_MODEL_NAME = find_best_gemini_vision_model(GEMINI_API_KEYS) | |
| logger.info(f"Global Model Selected: {CURRENT_MODEL_NAME}") | |
| # ========================================== | |
| # 3. HELPER FUNCTIONS | |
| # ========================================== | |
| def robust_json_parser(raw_str): | |
| """ | |
| AI response cleaner. | |
| Extracts JSON objects from text even if wrapped in Markdown code blocks. | |
| """ | |
| if not raw_str: | |
| return None | |
| try: | |
| # 1. Try direct parse | |
| return json.loads(raw_str) | |
| except: | |
| try: | |
| # 2. Regex to find content between curly braces | |
| match = re.search(r'(\{.*\})', raw_str, re.DOTALL) | |
| if match: | |
| clean_json = match.group(1) | |
| return json.loads(clean_json) | |
| except Exception as e: | |
| logger.error(f"JSON Parsing failed on string: {raw_str[:50]}... Error: {e}") | |
| return None | |
| # ========================================== | |
| # 4. AI VISION LOGIC (REFACTORED TO APP.PY STYLE) | |
| # ========================================== | |
| def get_smart_mask_coords(image_path, effect_type): | |
| """ | |
| Uses the global key cycler and global model name. | |
| No complex per-request model hunting. | |
| """ | |
| global gemini_key_cycler, CURRENT_MODEL_NAME | |
| # 1. Prepare Image | |
| try: | |
| with open(image_path, "rb") as f: | |
| img_data = base64.b64encode(f.read()).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Failed to read image file: {e}") | |
| return None | |
| # 2. Prepare Prompt (Strict JSON instruction) | |
| prompt_text = ( | |
| f"Analyze this image for a {effect_type} cinematic motion effect. " | |
| "Identify the specific area where motion (like water flow, sky movement, or breathing) should happen. " | |
| "Return ONLY a JSON object with percentages for the bounding box. " | |
| "Format: {'ymin': 0-100, 'xmin': 0-100, 'ymax': 0-100, 'xmax': 0-100}. " | |
| "Do not include any other text." | |
| ) | |
| # 3. Retry Logic (Simple Loop) | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| # Get next key from the infinite cycle | |
| current_key = next(gemini_key_cycler) | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{CURRENT_MODEL_NAME}:generateContent?key={current_key}" | |
| payload = { | |
| "contents": [{ | |
| "parts": [ | |
| {"text": prompt_text}, | |
| {"inline_data": {"mime_type": "image/jpeg", "data": img_data}} | |
| ] | |
| }], | |
| "generationConfig": { | |
| "temperature": 0.4, # Low temp for deterministic JSON | |
| "maxOutputTokens": 200 | |
| } | |
| } | |
| try: | |
| logger.info(f"AI Request Attempt {attempt+1}/{max_retries} using key ...{current_key[-4:]}") | |
| response = requests.post(url, json=payload, timeout=25) | |
| if response.status_code == 200: | |
| try: | |
| candidates = response.json().get('candidates', []) | |
| if candidates: | |
| raw_text = candidates[0]['content']['parts'][0]['text'] | |
| coords = robust_json_parser(raw_text) | |
| if coords: | |
| logger.info(f"AI Success: {coords}") | |
| return coords | |
| else: | |
| logger.warning("AI returned invalid JSON. Retrying...") | |
| except IndexError: | |
| logger.warning("AI returned empty candidates.") | |
| elif response.status_code == 429: | |
| logger.warning(f"Rate Limit (429) on key ...{current_key[-4:]}. Rotating to next key immediately.") | |
| # No sleep needed, just continue loop to grab next key | |
| continue | |
| else: | |
| logger.error(f"API Error {response.status_code}: {response.text}") | |
| except Exception as e: | |
| logger.error(f"Request Exception on Attempt {attempt+1}: {e}") | |
| time.sleep(1) # Small pause on connection error | |
| logger.error("All attempts failed to get mask coordinates.") | |
| return None | |
| # ========================================== | |
| # 5. MOTION GENERATION ENGINE (UNCHANGED CORE LOGIC) | |
| # ========================================== | |
| def process_motion_video(input_path, output_path, effect, intensity): | |
| logger.info(f"Starting motion processing for {effect}...") | |
| # Load Image | |
| img = cv2.imread(input_path) | |
| if img is None: raise ValueError("Image not found or unreadable.") | |
| # High-quality resize to prevent memory overload | |
| h, w = img.shape[:2] | |
| max_res = 1080 | |
| if max(h, w) > max_res: | |
| scale = max_res / max(h, w) | |
| img = cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LANCZOS4) | |
| h, w = img.shape[:2] | |
| # Normalize dimensions for FFmpeg (must be divisible by 2) | |
| h, w = h - (h % 2), w - (w % 2) | |
| img = img[:h, :w] | |
| # Get AI Masking | |
| coords = get_smart_mask_coords(input_path, effect) | |
| # Initialize Mask | |
| mask = np.ones((h, w), dtype=np.float32) # Default: Full motion if AI fails | |
| if coords: | |
| try: | |
| mask = np.zeros((h, w), dtype=np.float32) | |
| # Parse coordinates safely | |
| y1_p = float(coords.get('ymin', 0)) | |
| x1_p = float(coords.get('xmin', 0)) | |
| y2_p = float(coords.get('ymax', 100)) | |
| x2_p = float(coords.get('xmax', 100)) | |
| y1, x1 = int(y1_p * h / 100), int(x1_p * w / 100) | |
| y2, x2 = int(y2_p * h / 100), int(x2_p * w / 100) | |
| # Apply safety bounds | |
| y1, x1, y2, x2 = max(0,y1), max(0,x1), min(h,y2), min(w,x2) | |
| # Set Active Area | |
| mask[y1:y2, x1:x2] = 1.0 | |
| # Soften edges for seamless blending | |
| mask = cv2.GaussianBlur(mask, (151, 151), 0) | |
| logger.info("Mask applied successfully.") | |
| except Exception as e: | |
| logger.error(f"Error applying mask coordinates: {e}. Using full image.") | |
| mask = np.ones((h, w), dtype=np.float32) | |
| # Animation Parameters | |
| fps = 30 | |
| duration = 3 | |
| total_frames = fps * duration | |
| strength = float(intensity) / 50.0 | |
| # Pre-calculate Grids | |
| x_grid, y_grid = np.meshgrid(np.arange(w), np.arange(h)) | |
| x_grid = x_grid.astype(np.float32) | |
| y_grid = y_grid.astype(np.float32) | |
| with tempfile.TemporaryDirectory() as frame_dir: | |
| logger.info("Rendering frames...") | |
| for i in range(total_frames): | |
| t = i / total_frames | |
| # Cyclic phase for seamless looping | |
| phase = 2 * np.pi * t | |
| # Displacement Maps | |
| map_x, map_y = x_grid.copy(), y_grid.copy() | |
| if effect == 'water': | |
| # Sine wave horizontal flow | |
| shift = (7.0 * strength) * np.sin(y_grid * 0.04 + phase) | |
| map_x += shift * mask | |
| elif effect == 'cosmic': | |
| # Circular orbital drift | |
| map_x += (5.0 * strength * np.sin(x_grid * 0.015 + phase)) * mask | |
| map_y += (5.0 * strength * np.cos(y_grid * 0.015 + phase)) * mask | |
| elif effect == 'pulse': | |
| # Zoom in/out breathing | |
| zoom = 1.0 + (0.025 * strength * np.sin(phase)) | |
| # Center point | |
| cx, cy = w/2, h/2 | |
| map_x = ((x_grid - cx) / zoom + cx) * mask + x_grid * (1-mask) | |
| map_y = ((y_grid - cy) / zoom + cy) * mask + y_grid * (1-mask) | |
| # Render frame using Remap | |
| rendered = cv2.remap(img, map_x, map_y, cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT) | |
| # Save frame | |
| frame_path = os.path.join(frame_dir, f"frame_{i:03d}.jpg") | |
| cv2.imwrite(frame_path, rendered, [cv2.IMWRITE_JPEG_QUALITY, 90]) | |
| logger.info("Encoding video with FFmpeg...") | |
| # FFmpeg: Compile frames to MP4 | |
| # Using libx264 and yuv420p for maximum compatibility across browsers/phones | |
| ffmpeg_cmd = [ | |
| 'ffmpeg', '-y', | |
| '-framerate', str(fps), | |
| '-i', os.path.join(frame_dir, 'frame_%03d.jpg'), | |
| '-c:v', 'libx264', | |
| '-pix_fmt', 'yuv420p', | |
| '-crf', '23', # Balance between quality and size | |
| '-preset', 'fast', # Faster encoding | |
| output_path | |
| ] | |
| result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| logger.error(f"FFmpeg Error: {result.stderr}") | |
| raise Exception("Video encoding failed. Check server FFmpeg installation.") | |
| logger.info(f"Video generated at {output_path}") | |
| # ========================================== | |
| # 6. WEB INTERFACE (FLASK) | |
| # ========================================== | |
| INDEX_HTML = """ | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>PicMagic AI - Pro Motion</title> | |
| <style> | |
| :root { --accent: #00e5ff; --bg: #121212; --card: #1e1e1e; --text: #e0e0e0; } | |
| body { background: var(--bg); color: var(--text); font-family: 'Segoe UI', sans-serif; margin: 0; padding: 20px; display: flex; flex-direction: column; align-items: center; min-height: 100vh; } | |
| .container { width: 100%; max-width: 500px; padding-bottom: 50px; } | |
| .header { text-align: center; margin-bottom: 30px; } | |
| h1 { color: var(--accent); margin: 0; font-size: 2.5rem; text-shadow: 0 0 10px rgba(0, 229, 255, 0.3); } | |
| .subtitle { color: #888; font-size: 0.9rem; margin-top: 5px; } | |
| .card { background: var(--card); border-radius: 24px; padding: 25px; box-shadow: 0 20px 40px rgba(0,0,0,0.6); border: 1px solid #333; } | |
| .upload-area { border: 2px dashed #444; border-radius: 16px; padding: 40px 20px; text-align: center; cursor: pointer; transition: all 0.3s ease; position: relative; overflow: hidden; } | |
| .upload-area:hover { border-color: var(--accent); background: rgba(0, 229, 255, 0.03); } | |
| .upload-icon { font-size: 40px; margin-bottom: 10px; opacity: 0.7; } | |
| #preview-img { width: 100%; height: auto; border-radius: 12px; display: none; margin-top: 0; } | |
| .hidden { display: none; } | |
| .controls { margin-top: 25px; } | |
| label { display: block; margin: 15px 0 8px; font-size: 0.9rem; color: #aaa; letter-spacing: 0.5px; } | |
| select, input[type=range] { width: 100%; background: #2a2a2a; border: 1px solid #444; color: #fff; padding: 12px; border-radius: 10px; font-size: 1rem; outline: none; transition: 0.3s; box-sizing: border-box; } | |
| select:focus { border-color: var(--accent); } | |
| .btn-gen { background: linear-gradient(135deg, var(--accent), #00b8d4); color: #000; width: 100%; padding: 16px; border: none; border-radius: 12px; font-weight: 800; font-size: 1.1rem; margin-top: 30px; cursor: pointer; transition: transform 0.2s, opacity 0.2s; text-transform: uppercase; letter-spacing: 1px; } | |
| .btn-gen:hover { transform: translateY(-3px); opacity: 0.95; box-shadow: 0 10px 20px rgba(0, 229, 255, 0.2); } | |
| .btn-gen:disabled { background: #444; color: #888; cursor: not-allowed; transform: none; box-shadow: none; } | |
| .loader { display: none; margin-top: 25px; text-align: center; } | |
| .spinner { width: 40px; height: 40px; border: 4px solid rgba(255,255,255,0.1); border-top-color: var(--accent); border-radius: 50%; animation: spin 1s linear infinite; margin: 0 auto 15px; } | |
| @keyframes spin { to { transform: rotate(360deg); } } | |
| .status-text { color: var(--accent); font-size: 0.9rem; animation: pulse 1.5s infinite; } | |
| @keyframes pulse { 0%, 100% { opacity: 0.6; } 50% { opacity: 1; } } | |
| video { width: 100%; border-radius: 16px; margin-top: 25px; display: none; border: 1px solid #444; box-shadow: 0 10px 30px rgba(0,0,0,0.5); } | |
| .download-btn { display: none; background: #333; color: #fff; text-decoration: none; padding: 15px; border-radius: 12px; text-align: center; margin-top: 15px; font-weight: bold; transition: 0.3s; border: 1px solid #444; } | |
| .download-btn:hover { background: #444; border-color: #fff; } | |
| .footer { margin-top: 30px; text-align: center; font-size: 0.8rem; color: #555; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="header"> | |
| <h1>PicMagic Pro</h1> | |
| <div class="subtitle">AI-Powered Cinemagraph Generator</div> | |
| </div> | |
| <div class="card"> | |
| <div class="upload-area" onclick="document.getElementById('imageInput').click()"> | |
| <input type="file" id="imageInput" accept="image/*" class="hidden" onchange="handlePreview(this)"> | |
| <div id="upload-placeholder"> | |
| <div class="upload-icon">📸</div> | |
| <div>Click to Upload Image</div> | |
| </div> | |
| <img id="preview-img"> | |
| </div> | |
| <div class="controls"> | |
| <label>Choose Effect</label> | |
| <select id="effectType"> | |
| <option value="water">🌊 Water Flow / River</option> | |
| <option value="cosmic">✨ Cosmic / Star Drift</option> | |
| <option value="pulse">💓 Breathing / Pulse</option> | |
| </select> | |
| <label>Motion Intensity: <span id="intVal">50</span>%</label> | |
| <input type="range" id="intensity" min="10" max="100" value="50" oninput="document.getElementById('intVal').innerText = this.value"> | |
| </div> | |
| <button id="genBtn" class="btn-gen" onclick="startGeneration()">Generate Video</button> | |
| <div class="loader" id="loadingArea"> | |
| <div class="spinner"></div> | |
| <div class="status-text" id="statusText">AI is analyzing scene structure...</div> | |
| </div> | |
| <video id="outputVideo" controls loop playsinline></video> | |
| <a id="downloadLink" class="download-btn" href="#" download>📥 Download MP4</a> | |
| </div> | |
| <div class="footer"> | |
| System: Stable Cycle Logic Active • Model: {{model_name}} | |
| </div> | |
| </div> | |
| <script> | |
| function handlePreview(input) { | |
| if (input.files && input.files[0]) { | |
| const reader = new FileReader(); | |
| reader.onload = function(e) { | |
| const img = document.getElementById('preview-img'); | |
| img.src = e.target.result; | |
| img.style.display = 'block'; | |
| document.getElementById('upload-placeholder').style.display = 'none'; | |
| } | |
| reader.readAsDataURL(input.files[0]); | |
| } | |
| } | |
| async function startGeneration() { | |
| const fileInput = document.getElementById('imageInput'); | |
| if (!fileInput.files[0]) return alert("Please upload an image first."); | |
| const btn = document.getElementById('genBtn'); | |
| const loader = document.getElementById('loadingArea'); | |
| const status = document.getElementById('statusText'); | |
| const video = document.getElementById('outputVideo'); | |
| const dl = document.getElementById('downloadLink'); | |
| // Reset UI | |
| btn.disabled = true; | |
| btn.innerText = "Processing..."; | |
| loader.style.display = 'block'; | |
| video.style.display = 'none'; | |
| dl.style.display = 'none'; | |
| const messages = [ | |
| "AI is analyzing scene structure...", | |
| "Calculating motion vectors...", | |
| "Applying cinematic effects...", | |
| "Rendering high-quality frames...", | |
| "Compiling video output..." | |
| ]; | |
| let msgIdx = 0; | |
| const msgInterval = setInterval(() => { | |
| if(msgIdx < messages.length) status.innerText = messages[msgIdx++]; | |
| }, 3000); | |
| const formData = new FormData(); | |
| formData.append('image', fileInput.files[0]); | |
| formData.append('effect', document.getElementById('effectType').value); | |
| formData.append('intensity', document.getElementById('intensity').value); | |
| try { | |
| const response = await fetch('/generate', { method: 'POST', body: formData }); | |
| const result = await response.json(); | |
| clearInterval(msgInterval); | |
| if (result.url) { | |
| video.src = result.url; | |
| video.style.display = 'block'; | |
| dl.href = result.url; | |
| dl.style.display = 'block'; | |
| video.play(); | |
| status.innerText = "Done!"; | |
| } else { | |
| alert("Error: " + (result.error || "Unknown error occurred")); | |
| } | |
| } catch (err) { | |
| clearInterval(msgInterval); | |
| console.error(err); | |
| alert("Server error. Please check your connection."); | |
| } finally { | |
| btn.disabled = false; | |
| btn.innerText = "GENERATE VIDEO"; | |
| loader.style.display = 'none'; | |
| } | |
| } | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| def index(): | |
| # Pass the current model name to the template for transparency | |
| return render_template_string(INDEX_HTML, model_name=CURRENT_MODEL_NAME) | |
| def handle_generate(): | |
| try: | |
| image_file = request.files.get('image') | |
| effect = request.form.get('effect', 'water') | |
| intensity = request.form.get('intensity', 50) | |
| if not image_file: | |
| return jsonify({"error": "No image provided"}), 400 | |
| # Create unique filenames | |
| job_id = str(uuid.uuid4())[:12] | |
| input_path = os.path.join(UPLOAD_FOLDER, f"{job_id}.jpg") | |
| output_path = os.path.join(OUTPUT_FOLDER, f"{job_id}.mp4") | |
| # Save uploaded image | |
| image_file.save(input_path) | |
| # Core generation logic | |
| process_motion_video(input_path, output_path, effect, intensity) | |
| # Cleanup input file to save space immediately | |
| if os.path.exists(input_path): | |
| os.remove(input_path) | |
| return jsonify({"url": f"/download/{job_id}.mp4"}) | |
| except Exception as e: | |
| logger.exception("Processing error:") | |
| return jsonify({"error": str(e)}), 500 | |
| def download_file(filename): | |
| path = os.path.join(OUTPUT_FOLDER, filename) | |
| if os.path.exists(path): | |
| return send_file(path, mimetype='video/mp4', as_attachment=True) | |
| return "File not found", 404 | |
| if __name__ == '__main__': | |
| # Standard Flask Run | |
| # Hugging Face usually maps port 7860 | |
| logger.info("Starting PicMagic Server...") | |
| app.run(host='0.0.0.0', port=7860, debug=False) |