import os
import time
import threading
import uuid
import shutil
import json
import traceback
import logging
import string
import random
from concurrent.futures import ThreadPoolExecutor
from flask import Flask, render_template, request, jsonify, send_from_directory, send_file
# --- HUGGING FACE ZEROGPU SETUP ---
try:
import spaces
print("✅ Hugging Face Spaces 'spaces' module detected.")
HAS_ZEROGPU = True
except ImportError:
print("⚠️ 'spaces' module not found. Running in standard mode (CPU/GPU without ZeroGPU allocator).")
HAS_ZEROGPU = False
# Conditional Decorator for ZeroGPU
def gpu_task(duration=120):
def decorator(func):
if HAS_ZEROGPU:
return spaces.GPU(duration=duration)(func)
return func
return decorator
# --- 0. CONFIG & LOGGING ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- HUGGING FACE FILESYSTEM SETUP ---
# Spaces are read-only in root. We must use /tmp or /data (persistent storage)
if os.access('/data', os.W_OK):
BASE_PATH = '/data'
print("✅ Using Persistent Storage at /data")
else:
BASE_PATH = '/tmp'
print("✅ Using Temporary Storage at /tmp")
BASE_USER_DIR = os.path.join(BASE_PATH, "userdata")
SAVED_COMICS_DIR = os.path.join(BASE_PATH, "saved_comics")
# Create directories
os.makedirs(BASE_USER_DIR, exist_ok=True)
os.makedirs(SAVED_COMICS_DIR, exist_ok=True)
# --- 1. CORE DEPENDENCY CHECKS ---
try:
import cv2
import numpy as np
from PIL import Image
import srt
except ImportError as e:
print(f"❌ CRITICAL ERROR: Missing python library. {e}")
cv2 = None
np = None
Image = None
srt = None
# --- 2. BACKEND IMPORTS WITH FALLBACKS ---
def dummy_func(*args, **kwargs):
return 0, 0, None, None
try:
from backend.keyframes.keyframes import black_bar_crop
print("✅ Black bar cropping module loaded.")
except Exception as e:
print(f"⚠️ Could not load black_bar_crop: {e}. Cropping will be SKIPPED.")
black_bar_crop = dummy_func
try:
from backend.simple_color_enhancer import SimpleColorEnhancer
print("✅ SimpleColorEnhancer loaded.")
except Exception as e:
print(f"⚠️ Could not load SimpleColorEnhancer: {e}.")
class SimpleColorEnhancer:
def enhance_batch(self, *args, **kwargs): pass
def enhance_single(self, *args, **kwargs): pass
try:
from backend.quality_color_enhancer import QualityColorEnhancer
print("✅ QualityColorEnhancer loaded.")
except Exception as e:
print(f"⚠️ Could not load QualityColorEnhancer: {e}.")
class QualityColorEnhancer:
def batch_enhance(self, *args, **kwargs): pass
def enhance_single(self, *args, **kwargs): pass
try:
from backend.class_def import bubble, panel, Page
print("✅ Core class definitions loaded.")
except Exception as e:
print(f"⚠️ Using fallback class definitions.")
def bubble(dialog="", bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, emotion='normal'):
return {
'dialog': dialog,
'bubble_offset_x': bubble_offset_x,
'bubble_offset_y': bubble_offset_y,
'lip_x': lip_x,
'lip_y': lip_y,
'emotion': emotion
}
def panel(image=""):
return {'image': image}
class Page:
def __init__(self, panels, bubbles):
self.panels = panels
self.bubbles = bubbles
try:
from backend.ai_enhanced_core import image_processor, comic_styler, face_detector, layout_optimizer
from backend.ai_bubble_placement import ai_bubble_placer
from backend.subtitles.subs_real import get_real_subtitles
from backend.keyframes.keyframes_simple import generate_keyframes_simple
print("✅ Core utility modules loaded.")
except Exception as e:
print(f"⚠️ Could not load utility modules: {e}")
def get_real_subtitles(v): pass
def generate_keyframes_simple(*args, **kwargs): pass
class DummyDetector:
def detect_faces(self, p): return []
def get_lip_position(self, p, f): return -1, -1
face_detector = DummyDetector()
class DummyPlacer:
def place_bubble_ai(self, p, l): return 50, 20
ai_bubble_placer = DummyPlacer()
# --- FLASK APP SETUP ---
app = Flask(__name__)
def generate_save_code(length=8):
"""Generate a unique save code"""
chars = string.ascii_uppercase + string.digits
while True:
code = ''.join(random.choices(chars, k=length))
if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)):
return code
# --- FULL HTML INTERFACE ---
INDEX_HTML = '''
Movie to Comic Generator
✏️ Interactive Editor
Current Save Code:
✅ Comic Saved!
Your unique save code is:
XXXXXXXX
Write this code down or copy it.
Anyone can load this comic using this code.
'''
# --- 3. ENHANCED COMIC GENERATOR CLASS ---
class EnhancedComicGenerator:
def __init__(self, sid):
self.sid = sid
self.user_dir = os.path.join(BASE_USER_DIR, sid)
self.video_path = os.path.join(self.user_dir, 'uploaded.mp4')
self.frames_dir = os.path.join(self.user_dir, 'frames')
self.output_dir = os.path.join(self.user_dir, 'output')
self.status_file = os.path.join(self.output_dir, 'status.json')
self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json')
os.makedirs(self.frames_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
self.video_fps = None
self.frame_metadata = {}
def update_status(self, message, progress):
try:
with open(self.status_file, 'w') as f:
json.dump({'message': message, 'progress': progress}, f)
except:
pass
def cleanup_previous_run(self):
print(f"🧹 Cleaning up for session {self.sid}...")
if os.path.exists(self.frames_dir):
for f in os.listdir(self.frames_dir):
try:
os.remove(os.path.join(self.frames_dir, f))
except:
pass
if os.path.exists(self.output_dir):
for f in os.listdir(self.output_dir):
if f != 'status.json':
try:
os.remove(os.path.join(self.output_dir, f))
except:
pass
user_srt = os.path.join(self.user_dir, 'subs.srt')
if os.path.exists(user_srt):
os.remove(user_srt)
print("✅ Cleanup complete.")
def generate_keyframes_from_moments(self, key_moments, max_frames=48):
try:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise Exception("Cannot open video for keyframe extraction")
fps = self.video_fps
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
key_moments.sort(key=lambda x: x['start'])
frame_metadata = {}
frame_count = 0
for i, moment in enumerate(key_moments[:max_frames]):
self.update_status(f"Extracting frame {i+1}/{min(len(key_moments), max_frames)}...",
25 + int(20 * (i / min(len(key_moments), max_frames))))
frame_time = (moment['start'] + moment['end']) / 2
if frame_time > duration:
continue
frame_number = int(frame_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
if ret:
frame_filename = f"frame_{frame_count:04d}.png"
frame_path = os.path.join(self.frames_dir, frame_filename)
cv2.imwrite(frame_path, frame)
frame_metadata[frame_filename] = {
'time': frame_time,
'dialogue': moment['text'],
'start': moment['start'],
'end': moment['end']
}
frame_count += 1
cap.release()
with open(self.metadata_path, 'w') as f:
json.dump(frame_metadata, f, indent=2)
print(f"✅ Extracted {frame_count} keyframes from video")
return True
except Exception as e:
print(f"❌ Error extracting keyframes: {e}")
traceback.print_exc()
return False
def _enhance_all_images(self, single_image_path=None):
try:
enhancer = SimpleColorEnhancer()
if single_image_path:
enhancer.enhance_single(single_image_path)
else:
frame_paths = [os.path.join(self.frames_dir, f)
for f in os.listdir(self.frames_dir) if f.endswith('.png')]
with ThreadPoolExecutor() as executor:
list(executor.map(enhancer.enhance_single, frame_paths))
print("✅ Simple color enhancement complete")
except Exception as e:
print(f"⚠️ Simple enhancement failed: {e}")
def _enhance_quality_colors(self, single_image_path=None):
try:
enhancer = QualityColorEnhancer()
if single_image_path:
enhancer.enhance_single(single_image_path)
else:
frame_paths = [os.path.join(self.frames_dir, f)
for f in os.listdir(self.frames_dir) if f.endswith('.png')]
with ThreadPoolExecutor() as executor:
list(executor.map(enhancer.enhance_single, frame_paths))
print("✅ Quality color enhancement complete")
except Exception as e:
print(f"⚠️ Quality enhancement failed: {e}")
def _process_bubble_for_frame(self, frame_file):
frame_path = os.path.join(self.frames_dir, frame_file)
meta = self.frame_metadata.get(frame_file, {})
dialogue = meta.get('dialogue', '') if isinstance(meta, dict) else ''
try:
faces = face_detector.detect_faces(frame_path)
if faces:
lip_x, lip_y = face_detector.get_lip_position(frame_path, faces[0])
else:
lip_x, lip_y = -1, -1
bubble_x, bubble_y = ai_bubble_placer.place_bubble_ai(frame_path, (lip_x, lip_y))
return bubble(
bubble_offset_x=bubble_x,
bubble_offset_y=bubble_y,
lip_x=lip_x,
lip_y=lip_y,
dialog=dialogue,
emotion='normal'
)
except Exception as e:
print(f"-> Could not place bubble for {frame_file}: {e}. Using default.")
return bubble(
bubble_offset_x=50,
bubble_offset_y=20,
lip_x=-1,
lip_y=-1,
dialog=dialogue,
emotion='normal'
)
def _create_ai_bubbles_from_moments(self):
frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')])
if not os.path.exists(self.metadata_path):
return [bubble(dialog="") for _ in frame_files]
with open(self.metadata_path, 'r') as f:
self.frame_metadata = json.load(f)
with ThreadPoolExecutor() as executor:
bubbles = list(executor.map(self._process_bubble_for_frame, frame_files))
return bubbles
def _generate_pages(self, bubbles_list):
try:
from backend.fixed_12_pages_800x1080 import generate_12_pages_800x1080
frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')])
return generate_12_pages_800x1080(frame_files, bubbles_list)
except ImportError:
pages = []
frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')])
num_pages = (len(frame_files) + 3) // 4
for i in range(num_pages):
start, end = i * 4, (i + 1) * 4
page_panels = [panel(image=f) for f in frame_files[start:end]]
page_bubbles = bubbles_list[start:end]
if page_panels:
pages.append(Page(panels=page_panels, bubbles=page_bubbles))
return pages
# --- APPLY ZEROGPU DECORATOR HERE ---
@gpu_task(duration=180) # Allocate GPU for 3 minutes for this task
def generate_comic(self):
start_time = time.time()
try:
if cv2 is None:
raise Exception("OpenCV not installed")
self.update_status("Cleaning up previous run...", 0)
self.cleanup_previous_run()
self.update_status("Analyzing video...", 5)
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise Exception("Cannot open video")
self.video_fps = cap.get(cv2.CAP_PROP_FPS) or 25
cap.release()
print(f"✅ Video FPS detected: {self.video_fps:.2f}")
self.update_status("Generating subtitles (this may take a while)...", 10)
user_srt = os.path.join(self.user_dir, 'subs.srt')
try:
get_real_subtitles(self.video_path)
if os.path.exists('test1.srt'):
shutil.move('test1.srt', user_srt)
except Exception as e:
print(f"⚠️ Subtitle generation failed: {e}. Creating fallback.")
with open(user_srt, 'w') as f:
f.write("1\n00:00:01,000 --> 00:00:04,000\nHello\n")
self.update_status("Parsing subtitles...", 20)
with open(user_srt, 'r', encoding='utf-8') as f:
all_subs = list(srt.parse(f.read()))
key_moments = [{
'index': s.index,
'text': s.content,
'start': s.start.total_seconds(),
'end': s.end.total_seconds()
} for s in all_subs]
self.update_status("Extracting keyframes...", 25)
if not self.generate_keyframes_from_moments(key_moments, max_frames=48):
raise Exception("Keyframe extraction failed")
self.update_status("Cropping black bars...", 45)
try:
black_x, black_y, _, _ = black_bar_crop()
except:
black_x, black_y = 0, 0
self.update_status("Enhancing images...", 50)
self._enhance_all_images()
self.update_status("Applying quality color enhancement...", 60)
self._enhance_quality_colors()
self.update_status("Placing speech bubbles...", 75)
bubbles = self._create_ai_bubbles_from_moments()
self.update_status("Assembling comic pages...", 90)
pages = self._generate_pages(bubbles)
self.update_status("Saving results...", 95)
self._save_results(pages)
execution_time = (time.time() - start_time) / 60
print(f"✅ Comic generation completed in {execution_time:.2f} minutes")
self.update_status("Complete!", 100)
return True
except Exception as e:
print(f"❌ Comic generation failed: {e}")
traceback.print_exc()
self.update_status(f"Error: {str(e)}", -1)
return False
def _save_results(self, pages):
try:
pages_data = []
for page in pages:
panels = [p.__dict__ if hasattr(p, '__dict__') else p for p in page.panels]
bubbles_data = [b.__dict__ if hasattr(b, '__dict__') else b for b in page.bubbles]
pages_data.append({'panels': panels, 'bubbles': bubbles_data})
with open(os.path.join(self.output_dir, 'pages.json'), 'w', encoding='utf-8') as f:
json.dump(pages_data, f, indent=2)
print("✅ Results saved successfully!")
except Exception as e:
print(f"❌ Save results failed: {e}")
@gpu_task(duration=60)
def regenerate_frame(self, fname, direction):
try:
if not os.path.exists(self.metadata_path):
return {"success": False, "message": "Frame metadata missing."}
with open(self.metadata_path, 'r') as f:
meta = json.load(f)
if fname not in meta:
return {"success": False, "message": "Panel not linked to video."}
current_data = meta[fname]
if isinstance(current_data, dict):
curr_time = current_data['time']
else:
curr_time = current_data
if not self.video_fps:
cap = cv2.VideoCapture(self.video_path)
self.video_fps = cap.get(cv2.CAP_PROP_FPS) or 25
cap.release()
offset = (1.0 / self.video_fps) * (1 if direction == 'forward' else -1)
new_time = max(0, curr_time + offset)
cap = cv2.VideoCapture(self.video_path)
cap.set(cv2.CAP_PROP_POS_MSEC, new_time * 1000)
ret, frame = cap.read()
cap.release()
if ret:
frame_path = os.path.join(self.frames_dir, fname)
cv2.imwrite(frame_path, frame)
print(f"🎨 Applying enhancements to new frame: {fname}")
self._enhance_all_images(single_image_path=frame_path)
self._enhance_quality_colors(single_image_path=frame_path)
if isinstance(meta[fname], dict):
meta[fname]['time'] = new_time
else:
meta[fname] = new_time
with open(self.metadata_path, 'w') as f:
json.dump(meta, f, indent=2)
message = f"Adjusted {direction} to {new_time:.3f}s"
print(f"✅ {message}")
return {"success": True, "message": message}
return {"success": False, "message": "End of video"}
except Exception as e:
traceback.print_exc()
return {"success": False, "message": str(e)}
@gpu_task(duration=60)
def get_frame_at_timestamp(self, fname, ts):
try:
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
return {"success": False, "message": "Cannot open video."}
fps = cap.get(cv2.CAP_PROP_FPS) or 25
duration = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / fps
if ts < 0 or ts > duration:
cap.release()
return {"success": False, "message": f"Timestamp must be between 0 and {duration:.2f}s."}
cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000)
ret, frame = cap.read()
cap.release()
if ret:
frame_path = os.path.join(self.frames_dir, fname)
cv2.imwrite(frame_path, frame)
print(f"🎨 Applying enhancements to frame from timestamp: {fname}")
self._enhance_all_images(single_image_path=frame_path)
self._enhance_quality_colors(single_image_path=frame_path)
if os.path.exists(self.metadata_path):
with open(self.metadata_path, 'r') as f:
meta = json.load(f)
if fname in meta:
if isinstance(meta[fname], dict):
meta[fname]['time'] = float(ts)
else:
meta[fname] = float(ts)
with open(self.metadata_path, 'w') as f:
json.dump(meta, f, indent=2)
message = f"Jumped to timestamp {ts:.3f}s"
print(f"✅ {message}")
return {"success": True, "message": message}
return {"success": False, "message": "Invalid time"}
except Exception as e:
traceback.print_exc()
return {"success": False, "message": str(e)}
# --- ROUTES ---
@app.route('/')
def index():
return INDEX_HTML
@app.route('/uploader', methods=['POST'])
def upload():
sid = request.args.get('sid')
if not sid:
return jsonify({'success': False, 'message': 'Missing session ID'}), 400
if 'file' not in request.files or not request.files['file'].filename:
return jsonify({'success': False, 'message': 'No file selected'}), 400
f = request.files['file']
gen = EnhancedComicGenerator(sid)
gen.cleanup_previous_run()
f.save(gen.video_path)
gen.update_status("Starting...", 5)
# We use a Thread to call the generated GPU function
threading.Thread(target=gen.generate_comic).start()
return jsonify({'success': True, 'message': 'Generation started.'})
@app.route('/status')
def get_status():
sid = request.args.get('sid')
if not sid:
return jsonify({'progress': 0, 'message': 'Missing session ID'})
path = os.path.join(BASE_USER_DIR, sid, 'output', 'status.json')
if os.path.exists(path):
return send_file(path)
return jsonify({'progress': 0, 'message': "Waiting..."})
@app.route('/output/')
def get_output(filename):
sid = request.args.get('sid')
if not sid:
return "Missing session ID", 400
return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'output'), filename)
@app.route('/frames/')
def get_frame(filename):
sid = request.args.get('sid')
if not sid:
return "Missing session ID", 400
return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'frames'), filename)
@app.route('/regenerate_frame', methods=['POST'])
def regen():
sid = request.args.get('sid')
if not sid:
return jsonify({'success': False, 'message': 'Missing session ID'})
d = request.get_json()
gen = EnhancedComicGenerator(sid)
# This might use ZeroGPU if configured
return jsonify(gen.regenerate_frame(d['filename'], d['direction']))
@app.route('/goto_timestamp', methods=['POST'])
def go_time():
sid = request.args.get('sid')
if not sid:
return jsonify({'success': False, 'message': 'Missing session ID'})
d = request.get_json()
gen = EnhancedComicGenerator(sid)
return jsonify(gen.get_frame_at_timestamp(d['filename'], float(d['timestamp'])))
@app.route('/replace_panel', methods=['POST'])
def rep_panel():
sid = request.args.get('sid')
if not sid:
return jsonify({'success': False, 'error': 'Missing session ID'})
if 'image' not in request.files:
return jsonify({'success': False, 'error': 'No image provided.'})
f = request.files['image']
frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames')
os.makedirs(frames_dir, exist_ok=True)
fname = f"replaced_{int(time.time() * 1000)}.png"
f.save(os.path.join(frames_dir, fname))
return jsonify({'success': True, 'new_filename': fname})
# --- SAVE COMIC ENDPOINT ---
@app.route('/save_comic', methods=['POST'])
def save_comic():
sid = request.args.get('sid')
if not sid:
return jsonify({'success': False, 'message': 'Missing session ID'})
try:
data = request.get_json()
# Generate unique save code
save_code = generate_save_code()
save_dir = os.path.join(SAVED_COMICS_DIR, save_code)
os.makedirs(save_dir, exist_ok=True)
# Copy frames from user directory to saved directory
user_frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames')
saved_frames_dir = os.path.join(save_dir, 'frames')
if os.path.exists(user_frames_dir):
if os.path.exists(saved_frames_dir):
shutil.rmtree(saved_frames_dir)
shutil.copytree(user_frames_dir, saved_frames_dir)
# Save the comic state
save_data = {
'code': save_code,
'originalSid': sid,
'pages': data.get('pages', []),
'savedAt': data.get('savedAt', time.strftime('%Y-%m-%d %H:%M:%S'))
}
with open(os.path.join(save_dir, 'comic_state.json'), 'w') as f:
json.dump(save_data, f, indent=2)
print(f"✅ Comic saved with code: {save_code}")
return jsonify({'success': True, 'code': save_code})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'message': str(e)})
# --- LOAD COMIC ENDPOINT ---
@app.route('/load_comic/')
def load_comic(code):
code = code.upper()
save_dir = os.path.join(SAVED_COMICS_DIR, code)
state_file = os.path.join(save_dir, 'comic_state.json')
if not os.path.exists(state_file):
return jsonify({'success': False, 'message': 'Save code not found'})
try:
with open(state_file, 'r') as f:
save_data = json.load(f)
original_sid = save_data.get('originalSid')
# Copy frames to user directory if needed
saved_frames_dir = os.path.join(save_dir, 'frames')
if original_sid and os.path.exists(saved_frames_dir):
user_frames_dir = os.path.join(BASE_USER_DIR, original_sid, 'frames')
os.makedirs(user_frames_dir, exist_ok=True)
# Copy files that don't exist
for fname in os.listdir(saved_frames_dir):
src = os.path.join(saved_frames_dir, fname)
dst = os.path.join(user_frames_dir, fname)
if not os.path.exists(dst):
shutil.copy2(src, dst)
return jsonify({
'success': True,
'pages': save_data.get('pages', []),
'originalSid': original_sid,
'savedAt': save_data.get('savedAt')
})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'message': str(e)})
# --- SERVE SAVED COMIC FRAMES ---
@app.route('/saved_frames//')
def get_saved_frame(code, filename):
code = code.upper()
frames_dir = os.path.join(SAVED_COMICS_DIR, code, 'frames')
if os.path.exists(os.path.join(frames_dir, filename)):
return send_from_directory(frames_dir, filename)
return "Frame not found", 404
if __name__ == '__main__':
# HF Spaces use port 7860 by default
port = int(os.getenv("PORT", 7860))
print(f"🚀 Starting Enhanced Comic Generator on host 0.0.0.0, port {port}")
print(f"📁 User data directory: {BASE_USER_DIR}")
print(f"💾 Saved comics directory: {SAVED_COMICS_DIR}")
app.run(host='0.0.0.0', port=port, debug=False)