picmagic / app.py
Diwashbarla's picture
Update app.py
8e1365c verified
import os
import sys
import cv2
import time
import uuid
import json
import logging
import re
import shutil
import atexit
import subprocess
import tempfile
import requests
import numpy as np
import itertools
import base64
from flask import Flask, request, send_file, render_template_string, jsonify
from dotenv import load_dotenv
# Load environment variables if available
load_dotenv()
# ==========================================
# 1. LOGGING & SYSTEM SETUP
# ==========================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("PicMagicPro_Stable")
# Temporary folders setup
UPLOAD_FOLDER = os.path.join(tempfile.gettempdir(), 'picmagic_uploads')
OUTPUT_FOLDER = os.path.join(tempfile.gettempdir(), 'picmagic_outputs')
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB Limit
# Automatic cleanup of old files (older than 1 hour)
def start_cleanup():
"""
Background cleaner to remove old temporary images and videos.
Keeps the server storage from filling up.
"""
now = time.time()
for folder in [UPLOAD_FOLDER, OUTPUT_FOLDER]:
if not os.path.exists(folder):
continue
for f in os.listdir(folder):
path = os.path.join(folder, f)
try:
if os.stat(path).st_mtime < now - 3600:
if os.path.isfile(path):
os.remove(path)
logger.info(f"Cleaned up old file: {f}")
except Exception as e:
logger.error(f"Cleanup error for {f}: {e}")
atexit.register(start_cleanup)
# ==========================================
# 2. KEY MANAGEMENT & MODEL HUNTING (APP.PY STYLE)
# ==========================================
# 1. Keys Collect Karna (Dynamic + App.py Hybrid Approach)
# Hum environment se saari keys utha lenge jo 'gmni' se start hoti hain.
GEMINI_API_KEYS = [
val for key, val in os.environ.items()
if key.lower().startswith("gmni") and val.strip()
]
# Agar environment me keys nahi mili, to fallback check (optional testing ke liye)
if not GEMINI_API_KEYS:
logger.warning("No keys found starting with 'gmni'. Checking for 'GEMINI_API_KEY'...")
if os.environ.get("GEMINI_API_KEY"):
GEMINI_API_KEYS.append(os.environ.get("GEMINI_API_KEY"))
logger.info(f"Loaded {len(GEMINI_API_KEYS)} Gemini API Keys.")
# 2. Key Cycler Setup (Itertools - The App.py Magic)
# Yeh kabhi khatam nahi hoga, gol-gol ghumta rahega.
if GEMINI_API_KEYS:
gemini_key_cycler = itertools.cycle(GEMINI_API_KEYS)
else:
logger.critical("CRITICAL: No API keys found! App will fail on generation.")
gemini_key_cycler = itertools.cycle(["placeholder_key_to_prevent_crash"])
# 3. Best Model Finder (Run ONCE at startup - App.py Logic)
def find_best_gemini_vision_model(api_keys):
"""
Checks the first available key to determine the best model name.
Does NOT run on every request.
"""
if not api_keys:
return "gemini-1.5-flash" # Safe default
test_key = api_keys[0]
try:
url = f"https://generativelanguage.googleapis.com/v1beta/models?key={test_key}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
models = data.get('models', [])
# Priority List (Flash is usually fastest and cheapest/free-tier friendly)
for m in models:
name = m['name'].split('/')[-1]
if "gemini-1.5-flash" in name:
return name
# Fallback checks
for m in models:
if "vision" in m['name'] or "pro" in m['name']:
return m['name'].split('/')[-1]
except Exception as e:
logger.error(f"Model discovery failed: {e}")
return "gemini-1.5-flash"
# Global Model Variable
CURRENT_MODEL_NAME = find_best_gemini_vision_model(GEMINI_API_KEYS)
logger.info(f"Global Model Selected: {CURRENT_MODEL_NAME}")
# ==========================================
# 3. HELPER FUNCTIONS
# ==========================================
def robust_json_parser(raw_str):
"""
AI response cleaner.
Extracts JSON objects from text even if wrapped in Markdown code blocks.
"""
if not raw_str:
return None
try:
# 1. Try direct parse
return json.loads(raw_str)
except:
try:
# 2. Regex to find content between curly braces
match = re.search(r'(\{.*\})', raw_str, re.DOTALL)
if match:
clean_json = match.group(1)
return json.loads(clean_json)
except Exception as e:
logger.error(f"JSON Parsing failed on string: {raw_str[:50]}... Error: {e}")
return None
# ==========================================
# 4. AI VISION LOGIC (REFACTORED TO APP.PY STYLE)
# ==========================================
def get_smart_mask_coords(image_path, effect_type):
"""
Uses the global key cycler and global model name.
No complex per-request model hunting.
"""
global gemini_key_cycler, CURRENT_MODEL_NAME
# 1. Prepare Image
try:
with open(image_path, "rb") as f:
img_data = base64.b64encode(f.read()).decode('utf-8')
except Exception as e:
logger.error(f"Failed to read image file: {e}")
return None
# 2. Prepare Prompt (Strict JSON instruction)
prompt_text = (
f"Analyze this image for a {effect_type} cinematic motion effect. "
"Identify the specific area where motion (like water flow, sky movement, or breathing) should happen. "
"Return ONLY a JSON object with percentages for the bounding box. "
"Format: {'ymin': 0-100, 'xmin': 0-100, 'ymax': 0-100, 'xmax': 0-100}. "
"Do not include any other text."
)
# 3. Retry Logic (Simple Loop)
max_retries = 3
for attempt in range(max_retries):
# Get next key from the infinite cycle
current_key = next(gemini_key_cycler)
url = f"https://generativelanguage.googleapis.com/v1beta/models/{CURRENT_MODEL_NAME}:generateContent?key={current_key}"
payload = {
"contents": [{
"parts": [
{"text": prompt_text},
{"inline_data": {"mime_type": "image/jpeg", "data": img_data}}
]
}],
"generationConfig": {
"temperature": 0.4, # Low temp for deterministic JSON
"maxOutputTokens": 200
}
}
try:
logger.info(f"AI Request Attempt {attempt+1}/{max_retries} using key ...{current_key[-4:]}")
response = requests.post(url, json=payload, timeout=25)
if response.status_code == 200:
try:
candidates = response.json().get('candidates', [])
if candidates:
raw_text = candidates[0]['content']['parts'][0]['text']
coords = robust_json_parser(raw_text)
if coords:
logger.info(f"AI Success: {coords}")
return coords
else:
logger.warning("AI returned invalid JSON. Retrying...")
except IndexError:
logger.warning("AI returned empty candidates.")
elif response.status_code == 429:
logger.warning(f"Rate Limit (429) on key ...{current_key[-4:]}. Rotating to next key immediately.")
# No sleep needed, just continue loop to grab next key
continue
else:
logger.error(f"API Error {response.status_code}: {response.text}")
except Exception as e:
logger.error(f"Request Exception on Attempt {attempt+1}: {e}")
time.sleep(1) # Small pause on connection error
logger.error("All attempts failed to get mask coordinates.")
return None
# ==========================================
# 5. MOTION GENERATION ENGINE (UNCHANGED CORE LOGIC)
# ==========================================
def process_motion_video(input_path, output_path, effect, intensity):
logger.info(f"Starting motion processing for {effect}...")
# Load Image
img = cv2.imread(input_path)
if img is None: raise ValueError("Image not found or unreadable.")
# High-quality resize to prevent memory overload
h, w = img.shape[:2]
max_res = 1080
if max(h, w) > max_res:
scale = max_res / max(h, w)
img = cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_LANCZOS4)
h, w = img.shape[:2]
# Normalize dimensions for FFmpeg (must be divisible by 2)
h, w = h - (h % 2), w - (w % 2)
img = img[:h, :w]
# Get AI Masking
coords = get_smart_mask_coords(input_path, effect)
# Initialize Mask
mask = np.ones((h, w), dtype=np.float32) # Default: Full motion if AI fails
if coords:
try:
mask = np.zeros((h, w), dtype=np.float32)
# Parse coordinates safely
y1_p = float(coords.get('ymin', 0))
x1_p = float(coords.get('xmin', 0))
y2_p = float(coords.get('ymax', 100))
x2_p = float(coords.get('xmax', 100))
y1, x1 = int(y1_p * h / 100), int(x1_p * w / 100)
y2, x2 = int(y2_p * h / 100), int(x2_p * w / 100)
# Apply safety bounds
y1, x1, y2, x2 = max(0,y1), max(0,x1), min(h,y2), min(w,x2)
# Set Active Area
mask[y1:y2, x1:x2] = 1.0
# Soften edges for seamless blending
mask = cv2.GaussianBlur(mask, (151, 151), 0)
logger.info("Mask applied successfully.")
except Exception as e:
logger.error(f"Error applying mask coordinates: {e}. Using full image.")
mask = np.ones((h, w), dtype=np.float32)
# Animation Parameters
fps = 30
duration = 3
total_frames = fps * duration
strength = float(intensity) / 50.0
# Pre-calculate Grids
x_grid, y_grid = np.meshgrid(np.arange(w), np.arange(h))
x_grid = x_grid.astype(np.float32)
y_grid = y_grid.astype(np.float32)
with tempfile.TemporaryDirectory() as frame_dir:
logger.info("Rendering frames...")
for i in range(total_frames):
t = i / total_frames
# Cyclic phase for seamless looping
phase = 2 * np.pi * t
# Displacement Maps
map_x, map_y = x_grid.copy(), y_grid.copy()
if effect == 'water':
# Sine wave horizontal flow
shift = (7.0 * strength) * np.sin(y_grid * 0.04 + phase)
map_x += shift * mask
elif effect == 'cosmic':
# Circular orbital drift
map_x += (5.0 * strength * np.sin(x_grid * 0.015 + phase)) * mask
map_y += (5.0 * strength * np.cos(y_grid * 0.015 + phase)) * mask
elif effect == 'pulse':
# Zoom in/out breathing
zoom = 1.0 + (0.025 * strength * np.sin(phase))
# Center point
cx, cy = w/2, h/2
map_x = ((x_grid - cx) / zoom + cx) * mask + x_grid * (1-mask)
map_y = ((y_grid - cy) / zoom + cy) * mask + y_grid * (1-mask)
# Render frame using Remap
rendered = cv2.remap(img, map_x, map_y, cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
# Save frame
frame_path = os.path.join(frame_dir, f"frame_{i:03d}.jpg")
cv2.imwrite(frame_path, rendered, [cv2.IMWRITE_JPEG_QUALITY, 90])
logger.info("Encoding video with FFmpeg...")
# FFmpeg: Compile frames to MP4
# Using libx264 and yuv420p for maximum compatibility across browsers/phones
ffmpeg_cmd = [
'ffmpeg', '-y',
'-framerate', str(fps),
'-i', os.path.join(frame_dir, 'frame_%03d.jpg'),
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-crf', '23', # Balance between quality and size
'-preset', 'fast', # Faster encoding
output_path
]
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"FFmpeg Error: {result.stderr}")
raise Exception("Video encoding failed. Check server FFmpeg installation.")
logger.info(f"Video generated at {output_path}")
# ==========================================
# 6. WEB INTERFACE (FLASK)
# ==========================================
INDEX_HTML = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>PicMagic AI - Pro Motion</title>
<style>
:root { --accent: #00e5ff; --bg: #121212; --card: #1e1e1e; --text: #e0e0e0; }
body { background: var(--bg); color: var(--text); font-family: 'Segoe UI', sans-serif; margin: 0; padding: 20px; display: flex; flex-direction: column; align-items: center; min-height: 100vh; }
.container { width: 100%; max-width: 500px; padding-bottom: 50px; }
.header { text-align: center; margin-bottom: 30px; }
h1 { color: var(--accent); margin: 0; font-size: 2.5rem; text-shadow: 0 0 10px rgba(0, 229, 255, 0.3); }
.subtitle { color: #888; font-size: 0.9rem; margin-top: 5px; }
.card { background: var(--card); border-radius: 24px; padding: 25px; box-shadow: 0 20px 40px rgba(0,0,0,0.6); border: 1px solid #333; }
.upload-area { border: 2px dashed #444; border-radius: 16px; padding: 40px 20px; text-align: center; cursor: pointer; transition: all 0.3s ease; position: relative; overflow: hidden; }
.upload-area:hover { border-color: var(--accent); background: rgba(0, 229, 255, 0.03); }
.upload-icon { font-size: 40px; margin-bottom: 10px; opacity: 0.7; }
#preview-img { width: 100%; height: auto; border-radius: 12px; display: none; margin-top: 0; }
.hidden { display: none; }
.controls { margin-top: 25px; }
label { display: block; margin: 15px 0 8px; font-size: 0.9rem; color: #aaa; letter-spacing: 0.5px; }
select, input[type=range] { width: 100%; background: #2a2a2a; border: 1px solid #444; color: #fff; padding: 12px; border-radius: 10px; font-size: 1rem; outline: none; transition: 0.3s; box-sizing: border-box; }
select:focus { border-color: var(--accent); }
.btn-gen { background: linear-gradient(135deg, var(--accent), #00b8d4); color: #000; width: 100%; padding: 16px; border: none; border-radius: 12px; font-weight: 800; font-size: 1.1rem; margin-top: 30px; cursor: pointer; transition: transform 0.2s, opacity 0.2s; text-transform: uppercase; letter-spacing: 1px; }
.btn-gen:hover { transform: translateY(-3px); opacity: 0.95; box-shadow: 0 10px 20px rgba(0, 229, 255, 0.2); }
.btn-gen:disabled { background: #444; color: #888; cursor: not-allowed; transform: none; box-shadow: none; }
.loader { display: none; margin-top: 25px; text-align: center; }
.spinner { width: 40px; height: 40px; border: 4px solid rgba(255,255,255,0.1); border-top-color: var(--accent); border-radius: 50%; animation: spin 1s linear infinite; margin: 0 auto 15px; }
@keyframes spin { to { transform: rotate(360deg); } }
.status-text { color: var(--accent); font-size: 0.9rem; animation: pulse 1.5s infinite; }
@keyframes pulse { 0%, 100% { opacity: 0.6; } 50% { opacity: 1; } }
video { width: 100%; border-radius: 16px; margin-top: 25px; display: none; border: 1px solid #444; box-shadow: 0 10px 30px rgba(0,0,0,0.5); }
.download-btn { display: none; background: #333; color: #fff; text-decoration: none; padding: 15px; border-radius: 12px; text-align: center; margin-top: 15px; font-weight: bold; transition: 0.3s; border: 1px solid #444; }
.download-btn:hover { background: #444; border-color: #fff; }
.footer { margin-top: 30px; text-align: center; font-size: 0.8rem; color: #555; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>PicMagic Pro</h1>
<div class="subtitle">AI-Powered Cinemagraph Generator</div>
</div>
<div class="card">
<div class="upload-area" onclick="document.getElementById('imageInput').click()">
<input type="file" id="imageInput" accept="image/*" class="hidden" onchange="handlePreview(this)">
<div id="upload-placeholder">
<div class="upload-icon">📸</div>
<div>Click to Upload Image</div>
</div>
<img id="preview-img">
</div>
<div class="controls">
<label>Choose Effect</label>
<select id="effectType">
<option value="water">🌊 Water Flow / River</option>
<option value="cosmic">✨ Cosmic / Star Drift</option>
<option value="pulse">💓 Breathing / Pulse</option>
</select>
<label>Motion Intensity: <span id="intVal">50</span>%</label>
<input type="range" id="intensity" min="10" max="100" value="50" oninput="document.getElementById('intVal').innerText = this.value">
</div>
<button id="genBtn" class="btn-gen" onclick="startGeneration()">Generate Video</button>
<div class="loader" id="loadingArea">
<div class="spinner"></div>
<div class="status-text" id="statusText">AI is analyzing scene structure...</div>
</div>
<video id="outputVideo" controls loop playsinline></video>
<a id="downloadLink" class="download-btn" href="#" download>📥 Download MP4</a>
</div>
<div class="footer">
System: Stable Cycle Logic Active • Model: {{model_name}}
</div>
</div>
<script>
function handlePreview(input) {
if (input.files && input.files[0]) {
const reader = new FileReader();
reader.onload = function(e) {
const img = document.getElementById('preview-img');
img.src = e.target.result;
img.style.display = 'block';
document.getElementById('upload-placeholder').style.display = 'none';
}
reader.readAsDataURL(input.files[0]);
}
}
async function startGeneration() {
const fileInput = document.getElementById('imageInput');
if (!fileInput.files[0]) return alert("Please upload an image first.");
const btn = document.getElementById('genBtn');
const loader = document.getElementById('loadingArea');
const status = document.getElementById('statusText');
const video = document.getElementById('outputVideo');
const dl = document.getElementById('downloadLink');
// Reset UI
btn.disabled = true;
btn.innerText = "Processing...";
loader.style.display = 'block';
video.style.display = 'none';
dl.style.display = 'none';
const messages = [
"AI is analyzing scene structure...",
"Calculating motion vectors...",
"Applying cinematic effects...",
"Rendering high-quality frames...",
"Compiling video output..."
];
let msgIdx = 0;
const msgInterval = setInterval(() => {
if(msgIdx < messages.length) status.innerText = messages[msgIdx++];
}, 3000);
const formData = new FormData();
formData.append('image', fileInput.files[0]);
formData.append('effect', document.getElementById('effectType').value);
formData.append('intensity', document.getElementById('intensity').value);
try {
const response = await fetch('/generate', { method: 'POST', body: formData });
const result = await response.json();
clearInterval(msgInterval);
if (result.url) {
video.src = result.url;
video.style.display = 'block';
dl.href = result.url;
dl.style.display = 'block';
video.play();
status.innerText = "Done!";
} else {
alert("Error: " + (result.error || "Unknown error occurred"));
}
} catch (err) {
clearInterval(msgInterval);
console.error(err);
alert("Server error. Please check your connection.");
} finally {
btn.disabled = false;
btn.innerText = "GENERATE VIDEO";
loader.style.display = 'none';
}
}
</script>
</body>
</html>
"""
@app.route('/')
def index():
# Pass the current model name to the template for transparency
return render_template_string(INDEX_HTML, model_name=CURRENT_MODEL_NAME)
@app.route('/generate', methods=['POST'])
def handle_generate():
try:
image_file = request.files.get('image')
effect = request.form.get('effect', 'water')
intensity = request.form.get('intensity', 50)
if not image_file:
return jsonify({"error": "No image provided"}), 400
# Create unique filenames
job_id = str(uuid.uuid4())[:12]
input_path = os.path.join(UPLOAD_FOLDER, f"{job_id}.jpg")
output_path = os.path.join(OUTPUT_FOLDER, f"{job_id}.mp4")
# Save uploaded image
image_file.save(input_path)
# Core generation logic
process_motion_video(input_path, output_path, effect, intensity)
# Cleanup input file to save space immediately
if os.path.exists(input_path):
os.remove(input_path)
return jsonify({"url": f"/download/{job_id}.mp4"})
except Exception as e:
logger.exception("Processing error:")
return jsonify({"error": str(e)}), 500
@app.route('/download/<filename>')
def download_file(filename):
path = os.path.join(OUTPUT_FOLDER, filename)
if os.path.exists(path):
return send_file(path, mimetype='video/mp4', as_attachment=True)
return "File not found", 404
if __name__ == '__main__':
# Standard Flask Run
# Hugging Face usually maps port 7860
logger.info("Starting PicMagic Server...")
app.run(host='0.0.0.0', port=7860, debug=False)