import os import sys import cv2 import time import uuid import json import logging import re import shutil import atexit import subprocess import tempfile import requests import numpy as np import itertools import base64 from flask import Flask, request, send_file, render_template_string, jsonify from dotenv import load_dotenv # Load environment variables if available load_dotenv() # ========================================== # 1. LOGGING & SYSTEM SETUP # ========================================== logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger("PicMagicPro_Stable") # Temporary folders setup UPLOAD_FOLDER = os.path.join(tempfile.gettempdir(), 'picmagic_uploads') OUTPUT_FOLDER = os.path.join(tempfile.gettempdir(), 'picmagic_outputs') os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(OUTPUT_FOLDER, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB Limit # Automatic cleanup of old files (older than 1 hour) def start_cleanup(): """ Background cleaner to remove old temporary images and videos. Keeps the server storage from filling up. """ now = time.time() for folder in [UPLOAD_FOLDER, OUTPUT_FOLDER]: if not os.path.exists(folder): continue for f in os.listdir(folder): path = os.path.join(folder, f) try: if os.stat(path).st_mtime < now - 3600: if os.path.isfile(path): os.remove(path) logger.info(f"Cleaned up old file: {f}") except Exception as e: logger.error(f"Cleanup error for {f}: {e}") atexit.register(start_cleanup) # ========================================== # 2. KEY MANAGEMENT & MODEL HUNTING (APP.PY STYLE) # ========================================== # 1. Keys Collect Karna (Dynamic + App.py Hybrid Approach) # Hum environment se saari keys utha lenge jo 'gmni' se start hoti hain. GEMINI_API_KEYS = [ val for key, val in os.environ.items() if key.lower().startswith("gmni") and val.strip() ] # Agar environment me keys nahi mili, to fallback check (optional testing ke liye) if not GEMINI_API_KEYS: logger.warning("No keys found starting with 'gmni'. Checking for 'GEMINI_API_KEY'...") if os.environ.get("GEMINI_API_KEY"): GEMINI_API_KEYS.append(os.environ.get("GEMINI_API_KEY")) logger.info(f"Loaded {len(GEMINI_API_KEYS)} Gemini API Keys.") # 2. Key Cycler Setup (Itertools - The App.py Magic) # Yeh kabhi khatam nahi hoga, gol-gol ghumta rahega. if GEMINI_API_KEYS: gemini_key_cycler = itertools.cycle(GEMINI_API_KEYS) else: logger.critical("CRITICAL: No API keys found! App will fail on generation.") gemini_key_cycler = itertools.cycle(["placeholder_key_to_prevent_crash"]) # 3. Best Model Finder (Run ONCE at startup - App.py Logic) def find_best_gemini_vision_model(api_keys): """ Checks the first available key to determine the best model name. Does NOT run on every request. """ if not api_keys: return "gemini-1.5-flash" # Safe default test_key = api_keys[0] try: url = f"https://generativelanguage.googleapis.com/v1beta/models?key={test_key}" response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json() models = data.get('models', []) # Priority List (Flash is usually fastest and cheapest/free-tier friendly) for m in models: name = m['name'].split('/')[-1] if "gemini-1.5-flash" in name: return name # Fallback checks for m in models: if "vision" in m['name'] or "pro" in m['name']: return m['name'].split('/')[-1] except Exception as e: logger.error(f"Model discovery failed: {e}") return "gemini-1.5-flash" # Global Model Variable CURRENT_MODEL_NAME = find_best_gemini_vision_model(GEMINI_API_KEYS) logger.info(f"Global Model Selected: {CURRENT_MODEL_NAME}") # ========================================== # 3. HELPER FUNCTIONS # ========================================== def robust_json_parser(raw_str): """ AI response cleaner. Extracts JSON objects from text even if wrapped in Markdown code blocks. """ if not raw_str: return None try: # 1. Try direct parse return json.loads(raw_str) except: try: # 2. Regex to find content between curly braces match = re.search(r'(\{.*\})', raw_str, re.DOTALL) if match: clean_json = match.group(1) return json.loads(clean_json) except Exception as e: logger.error(f"JSON Parsing failed on string: {raw_str[:50]}... Error: {e}") return None # ========================================== # 4. AI VISION LOGIC (REFACTORED TO APP.PY STYLE) # ========================================== def get_smart_mask_coords(image_path, effect_type): """ Uses the global key cycler and global model name. No complex per-request model hunting. """ global gemini_key_cycler, CURRENT_MODEL_NAME # 1. Prepare Image try: with open(image_path, "rb") as f: img_data = base64.b64encode(f.read()).decode('utf-8') except Exception as e: logger.error(f"Failed to read image file: {e}") return None # 2. Prepare Prompt (Strict JSON instruction) prompt_text = ( f"Analyze this image for a {effect_type} cinematic motion effect. " "Identify the specific area where motion (like water flow, sky movement, or breathing) should happen. " "Return ONLY a JSON object with percentages for the bounding box. " "Format: {'ymin': 0-100, 'xmin': 0-100, 'ymax': 0-100, 'xmax': 0-100}. " "Do not include any other text." ) # 3. Retry Logic (Simple Loop) max_retries = 3 for attempt in range(max_retries): # Get next key from the infinite cycle current_key = next(gemini_key_cycler) url = f"https://generativelanguage.googleapis.com/v1beta/models/{CURRENT_MODEL_NAME}:generateContent?key={current_key}" payload = { "contents": [{ "parts": [ {"text": prompt_text}, {"inline_data": {"mime_type": "image/jpeg", "data": img_data}} ] }], "generationConfig": { "temperature": 0.4, # Low temp for deterministic JSON "maxOutputTokens": 200 } } try: logger.info(f"AI Request Attempt {attempt+1}/{max_retries} using key ...{current_key[-4:]}") response = requests.post(url, json=payload, timeout=25) if response.status_code == 200: try: candidates = response.json().get('candidates', []) if candidates: raw_text = candidates[0]['content']['parts'][0]['text'] coords = robust_json_parser(raw_text) if coords: logger.info(f"AI Success: {coords}") return coords else: logger.warning("AI returned invalid JSON. Retrying...") except IndexError: logger.warning("AI returned empty candidates.") elif response.status_code == 429: logger.warning(f"Rate Limit (429) on key ...{current_key[-4:]}. Rotating to next key immediately.") # No sleep needed, just continue loop to grab next key continue else: logger.error(f"API Error {response.status_code}: {response.text}") except Exception as e: logger.error(f"Request Exception on Attempt {attempt+1}: {e}") time.sleep(1) # Small pause on connection error logger.error("All attempts failed to get mask coordinates.") return None # ========================================== # 5. MOTION GENERATION ENGINE (UNCHANGED CORE LOGIC) # ========================================== def process_motion_video(input_path, output_path, effect, intensity): logger.info(f"Starting motion processing for {effect}...") # Load Image img = cv2.imread(input_path) if img is None: raise ValueError("Image not found or unreadable.") # High-quality resize to prevent memory overload h, w = img.shape[:2] max_res = 1080 if max(h, w) > max_res: scale = max_res / max(h, w) img = cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LANCZOS4) h, w = img.shape[:2] # Normalize dimensions for FFmpeg (must be divisible by 2) h, w = h - (h % 2), w - (w % 2) img = img[:h, :w] # Get AI Masking coords = get_smart_mask_coords(input_path, effect) # Initialize Mask mask = np.ones((h, w), dtype=np.float32) # Default: Full motion if AI fails if coords: try: mask = np.zeros((h, w), dtype=np.float32) # Parse coordinates safely y1_p = float(coords.get('ymin', 0)) x1_p = float(coords.get('xmin', 0)) y2_p = float(coords.get('ymax', 100)) x2_p = float(coords.get('xmax', 100)) y1, x1 = int(y1_p * h / 100), int(x1_p * w / 100) y2, x2 = int(y2_p * h / 100), int(x2_p * w / 100) # Apply safety bounds y1, x1, y2, x2 = max(0,y1), max(0,x1), min(h,y2), min(w,x2) # Set Active Area mask[y1:y2, x1:x2] = 1.0 # Soften edges for seamless blending mask = cv2.GaussianBlur(mask, (151, 151), 0) logger.info("Mask applied successfully.") except Exception as e: logger.error(f"Error applying mask coordinates: {e}. Using full image.") mask = np.ones((h, w), dtype=np.float32) # Animation Parameters fps = 30 duration = 3 total_frames = fps * duration strength = float(intensity) / 50.0 # Pre-calculate Grids x_grid, y_grid = np.meshgrid(np.arange(w), np.arange(h)) x_grid = x_grid.astype(np.float32) y_grid = y_grid.astype(np.float32) with tempfile.TemporaryDirectory() as frame_dir: logger.info("Rendering frames...") for i in range(total_frames): t = i / total_frames # Cyclic phase for seamless looping phase = 2 * np.pi * t # Displacement Maps map_x, map_y = x_grid.copy(), y_grid.copy() if effect == 'water': # Sine wave horizontal flow shift = (7.0 * strength) * np.sin(y_grid * 0.04 + phase) map_x += shift * mask elif effect == 'cosmic': # Circular orbital drift map_x += (5.0 * strength * np.sin(x_grid * 0.015 + phase)) * mask map_y += (5.0 * strength * np.cos(y_grid * 0.015 + phase)) * mask elif effect == 'pulse': # Zoom in/out breathing zoom = 1.0 + (0.025 * strength * np.sin(phase)) # Center point cx, cy = w/2, h/2 map_x = ((x_grid - cx) / zoom + cx) * mask + x_grid * (1-mask) map_y = ((y_grid - cy) / zoom + cy) * mask + y_grid * (1-mask) # Render frame using Remap rendered = cv2.remap(img, map_x, map_y, cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT) # Save frame frame_path = os.path.join(frame_dir, f"frame_{i:03d}.jpg") cv2.imwrite(frame_path, rendered, [cv2.IMWRITE_JPEG_QUALITY, 90]) logger.info("Encoding video with FFmpeg...") # FFmpeg: Compile frames to MP4 # Using libx264 and yuv420p for maximum compatibility across browsers/phones ffmpeg_cmd = [ 'ffmpeg', '-y', '-framerate', str(fps), '-i', os.path.join(frame_dir, 'frame_%03d.jpg'), '-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-crf', '23', # Balance between quality and size '-preset', 'fast', # Faster encoding output_path ] result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) if result.returncode != 0: logger.error(f"FFmpeg Error: {result.stderr}") raise Exception("Video encoding failed. Check server FFmpeg installation.") logger.info(f"Video generated at {output_path}") # ========================================== # 6. WEB INTERFACE (FLASK) # ========================================== INDEX_HTML = """ PicMagic AI - Pro Motion

PicMagic Pro

AI-Powered Cinemagraph Generator
📸
Click to Upload Image
AI is analyzing scene structure...
📥 Download MP4
""" @app.route('/') def index(): # Pass the current model name to the template for transparency return render_template_string(INDEX_HTML, model_name=CURRENT_MODEL_NAME) @app.route('/generate', methods=['POST']) def handle_generate(): try: image_file = request.files.get('image') effect = request.form.get('effect', 'water') intensity = request.form.get('intensity', 50) if not image_file: return jsonify({"error": "No image provided"}), 400 # Create unique filenames job_id = str(uuid.uuid4())[:12] input_path = os.path.join(UPLOAD_FOLDER, f"{job_id}.jpg") output_path = os.path.join(OUTPUT_FOLDER, f"{job_id}.mp4") # Save uploaded image image_file.save(input_path) # Core generation logic process_motion_video(input_path, output_path, effect, intensity) # Cleanup input file to save space immediately if os.path.exists(input_path): os.remove(input_path) return jsonify({"url": f"/download/{job_id}.mp4"}) except Exception as e: logger.exception("Processing error:") return jsonify({"error": str(e)}), 500 @app.route('/download/') def download_file(filename): path = os.path.join(OUTPUT_FOLDER, filename) if os.path.exists(path): return send_file(path, mimetype='video/mp4', as_attachment=True) return "File not found", 404 if __name__ == '__main__': # Standard Flask Run # Hugging Face usually maps port 7860 logger.info("Starting PicMagic Server...") app.run(host='0.0.0.0', port=7860, debug=False)