import os import time import threading import uuid import shutil import json import traceback import logging import string import random from concurrent.futures import ThreadPoolExecutor from flask import Flask, render_template, request, jsonify, send_from_directory, send_file # --- HUGGING FACE ZEROGPU SETUP --- try: import spaces print("✅ Hugging Face Spaces 'spaces' module detected.") HAS_ZEROGPU = True except ImportError: print("⚠️ 'spaces' module not found. Running in standard mode (CPU/GPU without ZeroGPU allocator).") HAS_ZEROGPU = False # Conditional Decorator for ZeroGPU def gpu_task(duration=120): def decorator(func): if HAS_ZEROGPU: return spaces.GPU(duration=duration)(func) return func return decorator # --- 0. CONFIG & LOGGING --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- HUGGING FACE FILESYSTEM SETUP --- # Spaces are read-only in root. We must use /tmp or /data (persistent storage) if os.access('/data', os.W_OK): BASE_PATH = '/data' print("✅ Using Persistent Storage at /data") else: BASE_PATH = '/tmp' print("✅ Using Temporary Storage at /tmp") BASE_USER_DIR = os.path.join(BASE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_PATH, "saved_comics") # Create directories os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) # --- 1. CORE DEPENDENCY CHECKS --- try: import cv2 import numpy as np from PIL import Image import srt except ImportError as e: print(f"❌ CRITICAL ERROR: Missing python library. {e}") cv2 = None np = None Image = None srt = None # --- 2. BACKEND IMPORTS WITH FALLBACKS --- def dummy_func(*args, **kwargs): return 0, 0, None, None try: from backend.keyframes.keyframes import black_bar_crop print("✅ Black bar cropping module loaded.") except Exception as e: print(f"⚠️ Could not load black_bar_crop: {e}. Cropping will be SKIPPED.") black_bar_crop = dummy_func try: from backend.simple_color_enhancer import SimpleColorEnhancer print("✅ SimpleColorEnhancer loaded.") except Exception as e: print(f"⚠️ Could not load SimpleColorEnhancer: {e}.") class SimpleColorEnhancer: def enhance_batch(self, *args, **kwargs): pass def enhance_single(self, *args, **kwargs): pass try: from backend.quality_color_enhancer import QualityColorEnhancer print("✅ QualityColorEnhancer loaded.") except Exception as e: print(f"⚠️ Could not load QualityColorEnhancer: {e}.") class QualityColorEnhancer: def batch_enhance(self, *args, **kwargs): pass def enhance_single(self, *args, **kwargs): pass try: from backend.class_def import bubble, panel, Page print("✅ Core class definitions loaded.") except Exception as e: print(f"⚠️ Using fallback class definitions.") def bubble(dialog="", bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, emotion='normal'): return { 'dialog': dialog, 'bubble_offset_x': bubble_offset_x, 'bubble_offset_y': bubble_offset_y, 'lip_x': lip_x, 'lip_y': lip_y, 'emotion': emotion } def panel(image=""): return {'image': image} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles try: from backend.ai_enhanced_core import image_processor, comic_styler, face_detector, layout_optimizer from backend.ai_bubble_placement import ai_bubble_placer from backend.subtitles.subs_real import get_real_subtitles from backend.keyframes.keyframes_simple import generate_keyframes_simple print("✅ Core utility modules loaded.") except Exception as e: print(f"⚠️ Could not load utility modules: {e}") def get_real_subtitles(v): pass def generate_keyframes_simple(*args, **kwargs): pass class DummyDetector: def detect_faces(self, p): return [] def get_lip_position(self, p, f): return -1, -1 face_detector = DummyDetector() class DummyPlacer: def place_bubble_ai(self, p, l): return 50, 20 ai_bubble_placer = DummyPlacer() # --- FLASK APP SETUP --- app = Flask(__name__) def generate_save_code(length=8): """Generate a unique save code""" chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # --- FULL HTML INTERFACE --- INDEX_HTML = ''' Movie to Comic Generator

🎬 Comic Generator

No file selected

📥 Load Saved Comic

Enter your save code to continue editing

✏️ Interactive Editor

''' # --- 3. ENHANCED COMIC GENERATOR CLASS --- class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') self.status_file = os.path.join(self.output_dir, 'status.json') self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.video_fps = None self.frame_metadata = {} def update_status(self, message, progress): try: with open(self.status_file, 'w') as f: json.dump({'message': message, 'progress': progress}, f) except: pass def cleanup_previous_run(self): print(f"🧹 Cleaning up for session {self.sid}...") if os.path.exists(self.frames_dir): for f in os.listdir(self.frames_dir): try: os.remove(os.path.join(self.frames_dir, f)) except: pass if os.path.exists(self.output_dir): for f in os.listdir(self.output_dir): if f != 'status.json': try: os.remove(os.path.join(self.output_dir, f)) except: pass user_srt = os.path.join(self.user_dir, 'subs.srt') if os.path.exists(user_srt): os.remove(user_srt) print("✅ Cleanup complete.") def generate_keyframes_from_moments(self, key_moments, max_frames=48): try: cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise Exception("Cannot open video for keyframe extraction") fps = self.video_fps total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps key_moments.sort(key=lambda x: x['start']) frame_metadata = {} frame_count = 0 for i, moment in enumerate(key_moments[:max_frames]): self.update_status(f"Extracting frame {i+1}/{min(len(key_moments), max_frames)}...", 25 + int(20 * (i / min(len(key_moments), max_frames)))) frame_time = (moment['start'] + moment['end']) / 2 if frame_time > duration: continue frame_number = int(frame_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() if ret: frame_filename = f"frame_{frame_count:04d}.png" frame_path = os.path.join(self.frames_dir, frame_filename) cv2.imwrite(frame_path, frame) frame_metadata[frame_filename] = { 'time': frame_time, 'dialogue': moment['text'], 'start': moment['start'], 'end': moment['end'] } frame_count += 1 cap.release() with open(self.metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) print(f"✅ Extracted {frame_count} keyframes from video") return True except Exception as e: print(f"❌ Error extracting keyframes: {e}") traceback.print_exc() return False def _enhance_all_images(self, single_image_path=None): try: enhancer = SimpleColorEnhancer() if single_image_path: enhancer.enhance_single(single_image_path) else: frame_paths = [os.path.join(self.frames_dir, f) for f in os.listdir(self.frames_dir) if f.endswith('.png')] with ThreadPoolExecutor() as executor: list(executor.map(enhancer.enhance_single, frame_paths)) print("✅ Simple color enhancement complete") except Exception as e: print(f"⚠️ Simple enhancement failed: {e}") def _enhance_quality_colors(self, single_image_path=None): try: enhancer = QualityColorEnhancer() if single_image_path: enhancer.enhance_single(single_image_path) else: frame_paths = [os.path.join(self.frames_dir, f) for f in os.listdir(self.frames_dir) if f.endswith('.png')] with ThreadPoolExecutor() as executor: list(executor.map(enhancer.enhance_single, frame_paths)) print("✅ Quality color enhancement complete") except Exception as e: print(f"⚠️ Quality enhancement failed: {e}") def _process_bubble_for_frame(self, frame_file): frame_path = os.path.join(self.frames_dir, frame_file) meta = self.frame_metadata.get(frame_file, {}) dialogue = meta.get('dialogue', '') if isinstance(meta, dict) else '' try: faces = face_detector.detect_faces(frame_path) if faces: lip_x, lip_y = face_detector.get_lip_position(frame_path, faces[0]) else: lip_x, lip_y = -1, -1 bubble_x, bubble_y = ai_bubble_placer.place_bubble_ai(frame_path, (lip_x, lip_y)) return bubble( bubble_offset_x=bubble_x, bubble_offset_y=bubble_y, lip_x=lip_x, lip_y=lip_y, dialog=dialogue, emotion='normal' ) except Exception as e: print(f"-> Could not place bubble for {frame_file}: {e}. Using default.") return bubble( bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, dialog=dialogue, emotion='normal' ) def _create_ai_bubbles_from_moments(self): frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) if not os.path.exists(self.metadata_path): return [bubble(dialog="") for _ in frame_files] with open(self.metadata_path, 'r') as f: self.frame_metadata = json.load(f) with ThreadPoolExecutor() as executor: bubbles = list(executor.map(self._process_bubble_for_frame, frame_files)) return bubbles def _generate_pages(self, bubbles_list): try: from backend.fixed_12_pages_800x1080 import generate_12_pages_800x1080 frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) return generate_12_pages_800x1080(frame_files, bubbles_list) except ImportError: pages = [] frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) num_pages = (len(frame_files) + 3) // 4 for i in range(num_pages): start, end = i * 4, (i + 1) * 4 page_panels = [panel(image=f) for f in frame_files[start:end]] page_bubbles = bubbles_list[start:end] if page_panels: pages.append(Page(panels=page_panels, bubbles=page_bubbles)) return pages # --- APPLY ZEROGPU DECORATOR HERE --- @gpu_task(duration=180) # Allocate GPU for 3 minutes for this task def generate_comic(self): start_time = time.time() try: if cv2 is None: raise Exception("OpenCV not installed") self.update_status("Cleaning up previous run...", 0) self.cleanup_previous_run() self.update_status("Analyzing video...", 5) cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise Exception("Cannot open video") self.video_fps = cap.get(cv2.CAP_PROP_FPS) or 25 cap.release() print(f"✅ Video FPS detected: {self.video_fps:.2f}") self.update_status("Generating subtitles (this may take a while)...", 10) user_srt = os.path.join(self.user_dir, 'subs.srt') try: get_real_subtitles(self.video_path) if os.path.exists('test1.srt'): shutil.move('test1.srt', user_srt) except Exception as e: print(f"⚠️ Subtitle generation failed: {e}. Creating fallback.") with open(user_srt, 'w') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\nHello\n") self.update_status("Parsing subtitles...", 20) with open(user_srt, 'r', encoding='utf-8') as f: all_subs = list(srt.parse(f.read())) key_moments = [{ 'index': s.index, 'text': s.content, 'start': s.start.total_seconds(), 'end': s.end.total_seconds() } for s in all_subs] self.update_status("Extracting keyframes...", 25) if not self.generate_keyframes_from_moments(key_moments, max_frames=48): raise Exception("Keyframe extraction failed") self.update_status("Cropping black bars...", 45) try: black_x, black_y, _, _ = black_bar_crop() except: black_x, black_y = 0, 0 self.update_status("Enhancing images...", 50) self._enhance_all_images() self.update_status("Applying quality color enhancement...", 60) self._enhance_quality_colors() self.update_status("Placing speech bubbles...", 75) bubbles = self._create_ai_bubbles_from_moments() self.update_status("Assembling comic pages...", 90) pages = self._generate_pages(bubbles) self.update_status("Saving results...", 95) self._save_results(pages) execution_time = (time.time() - start_time) / 60 print(f"✅ Comic generation completed in {execution_time:.2f} minutes") self.update_status("Complete!", 100) return True except Exception as e: print(f"❌ Comic generation failed: {e}") traceback.print_exc() self.update_status(f"Error: {str(e)}", -1) return False def _save_results(self, pages): try: pages_data = [] for page in pages: panels = [p.__dict__ if hasattr(p, '__dict__') else p for p in page.panels] bubbles_data = [b.__dict__ if hasattr(b, '__dict__') else b for b in page.bubbles] pages_data.append({'panels': panels, 'bubbles': bubbles_data}) with open(os.path.join(self.output_dir, 'pages.json'), 'w', encoding='utf-8') as f: json.dump(pages_data, f, indent=2) print("✅ Results saved successfully!") except Exception as e: print(f"❌ Save results failed: {e}") @gpu_task(duration=60) def regenerate_frame(self, fname, direction): try: if not os.path.exists(self.metadata_path): return {"success": False, "message": "Frame metadata missing."} with open(self.metadata_path, 'r') as f: meta = json.load(f) if fname not in meta: return {"success": False, "message": "Panel not linked to video."} current_data = meta[fname] if isinstance(current_data, dict): curr_time = current_data['time'] else: curr_time = current_data if not self.video_fps: cap = cv2.VideoCapture(self.video_path) self.video_fps = cap.get(cv2.CAP_PROP_FPS) or 25 cap.release() offset = (1.0 / self.video_fps) * (1 if direction == 'forward' else -1) new_time = max(0, curr_time + offset) cap = cv2.VideoCapture(self.video_path) cap.set(cv2.CAP_PROP_POS_MSEC, new_time * 1000) ret, frame = cap.read() cap.release() if ret: frame_path = os.path.join(self.frames_dir, fname) cv2.imwrite(frame_path, frame) print(f"🎨 Applying enhancements to new frame: {fname}") self._enhance_all_images(single_image_path=frame_path) self._enhance_quality_colors(single_image_path=frame_path) if isinstance(meta[fname], dict): meta[fname]['time'] = new_time else: meta[fname] = new_time with open(self.metadata_path, 'w') as f: json.dump(meta, f, indent=2) message = f"Adjusted {direction} to {new_time:.3f}s" print(f"✅ {message}") return {"success": True, "message": message} return {"success": False, "message": "End of video"} except Exception as e: traceback.print_exc() return {"success": False, "message": str(e)} @gpu_task(duration=60) def get_frame_at_timestamp(self, fname, ts): try: cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): return {"success": False, "message": "Cannot open video."} fps = cap.get(cv2.CAP_PROP_FPS) or 25 duration = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / fps if ts < 0 or ts > duration: cap.release() return {"success": False, "message": f"Timestamp must be between 0 and {duration:.2f}s."} cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: frame_path = os.path.join(self.frames_dir, fname) cv2.imwrite(frame_path, frame) print(f"🎨 Applying enhancements to frame from timestamp: {fname}") self._enhance_all_images(single_image_path=frame_path) self._enhance_quality_colors(single_image_path=frame_path) if os.path.exists(self.metadata_path): with open(self.metadata_path, 'r') as f: meta = json.load(f) if fname in meta: if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(self.metadata_path, 'w') as f: json.dump(meta, f, indent=2) message = f"Jumped to timestamp {ts:.3f}s" print(f"✅ {message}") return {"success": True, "message": message} return {"success": False, "message": "Invalid time"} except Exception as e: traceback.print_exc() return {"success": False, "message": str(e)} # --- ROUTES --- @app.route('/') def index(): return INDEX_HTML @app.route('/uploader', methods=['POST']) def upload(): sid = request.args.get('sid') if not sid: return jsonify({'success': False, 'message': 'Missing session ID'}), 400 if 'file' not in request.files or not request.files['file'].filename: return jsonify({'success': False, 'message': 'No file selected'}), 400 f = request.files['file'] gen = EnhancedComicGenerator(sid) gen.cleanup_previous_run() f.save(gen.video_path) gen.update_status("Starting...", 5) # We use a Thread to call the generated GPU function threading.Thread(target=gen.generate_comic).start() return jsonify({'success': True, 'message': 'Generation started.'}) @app.route('/status') def get_status(): sid = request.args.get('sid') if not sid: return jsonify({'progress': 0, 'message': 'Missing session ID'}) path = os.path.join(BASE_USER_DIR, sid, 'output', 'status.json') if os.path.exists(path): return send_file(path) return jsonify({'progress': 0, 'message': "Waiting..."}) @app.route('/output/') def get_output(filename): sid = request.args.get('sid') if not sid: return "Missing session ID", 400 return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'output'), filename) @app.route('/frames/') def get_frame(filename): sid = request.args.get('sid') if not sid: return "Missing session ID", 400 return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'frames'), filename) @app.route('/regenerate_frame', methods=['POST']) def regen(): sid = request.args.get('sid') if not sid: return jsonify({'success': False, 'message': 'Missing session ID'}) d = request.get_json() gen = EnhancedComicGenerator(sid) # This might use ZeroGPU if configured return jsonify(gen.regenerate_frame(d['filename'], d['direction'])) @app.route('/goto_timestamp', methods=['POST']) def go_time(): sid = request.args.get('sid') if not sid: return jsonify({'success': False, 'message': 'Missing session ID'}) d = request.get_json() gen = EnhancedComicGenerator(sid) return jsonify(gen.get_frame_at_timestamp(d['filename'], float(d['timestamp']))) @app.route('/replace_panel', methods=['POST']) def rep_panel(): sid = request.args.get('sid') if not sid: return jsonify({'success': False, 'error': 'Missing session ID'}) if 'image' not in request.files: return jsonify({'success': False, 'error': 'No image provided.'}) f = request.files['image'] frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') os.makedirs(frames_dir, exist_ok=True) fname = f"replaced_{int(time.time() * 1000)}.png" f.save(os.path.join(frames_dir, fname)) return jsonify({'success': True, 'new_filename': fname}) # --- SAVE COMIC ENDPOINT --- @app.route('/save_comic', methods=['POST']) def save_comic(): sid = request.args.get('sid') if not sid: return jsonify({'success': False, 'message': 'Missing session ID'}) try: data = request.get_json() # Generate unique save code save_code = generate_save_code() save_dir = os.path.join(SAVED_COMICS_DIR, save_code) os.makedirs(save_dir, exist_ok=True) # Copy frames from user directory to saved directory user_frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') saved_frames_dir = os.path.join(save_dir, 'frames') if os.path.exists(user_frames_dir): if os.path.exists(saved_frames_dir): shutil.rmtree(saved_frames_dir) shutil.copytree(user_frames_dir, saved_frames_dir) # Save the comic state save_data = { 'code': save_code, 'originalSid': sid, 'pages': data.get('pages', []), 'savedAt': data.get('savedAt', time.strftime('%Y-%m-%d %H:%M:%S')) } with open(os.path.join(save_dir, 'comic_state.json'), 'w') as f: json.dump(save_data, f, indent=2) print(f"✅ Comic saved with code: {save_code}") return jsonify({'success': True, 'code': save_code}) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'message': str(e)}) # --- LOAD COMIC ENDPOINT --- @app.route('/load_comic/') def load_comic(code): code = code.upper() save_dir = os.path.join(SAVED_COMICS_DIR, code) state_file = os.path.join(save_dir, 'comic_state.json') if not os.path.exists(state_file): return jsonify({'success': False, 'message': 'Save code not found'}) try: with open(state_file, 'r') as f: save_data = json.load(f) original_sid = save_data.get('originalSid') # Copy frames to user directory if needed saved_frames_dir = os.path.join(save_dir, 'frames') if original_sid and os.path.exists(saved_frames_dir): user_frames_dir = os.path.join(BASE_USER_DIR, original_sid, 'frames') os.makedirs(user_frames_dir, exist_ok=True) # Copy files that don't exist for fname in os.listdir(saved_frames_dir): src = os.path.join(saved_frames_dir, fname) dst = os.path.join(user_frames_dir, fname) if not os.path.exists(dst): shutil.copy2(src, dst) return jsonify({ 'success': True, 'pages': save_data.get('pages', []), 'originalSid': original_sid, 'savedAt': save_data.get('savedAt') }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'message': str(e)}) # --- SERVE SAVED COMIC FRAMES --- @app.route('/saved_frames//') def get_saved_frame(code, filename): code = code.upper() frames_dir = os.path.join(SAVED_COMICS_DIR, code, 'frames') if os.path.exists(os.path.join(frames_dir, filename)): return send_from_directory(frames_dir, filename) return "Frame not found", 404 if __name__ == '__main__': # HF Spaces use port 7860 by default port = int(os.getenv("PORT", 7860)) print(f"🚀 Starting Enhanced Comic Generator on host 0.0.0.0, port {port}") print(f"📁 User data directory: {BASE_USER_DIR}") print(f"💾 Saved comics directory: {SAVED_COMICS_DIR}") app.run(host='0.0.0.0', port=port, debug=False)