import os import webbrowser import time import threading from flask import Flask, render_template, request, jsonify, send_from_directory, send_file from pathlib import Path import cv2 import numpy as np from PIL import Image import srt import json import shutil from typing import List import traceback from concurrent.futures import ThreadPoolExecutor # --- ROBUST IMPORTS WITH FALLBACKS --- try: from backend.keyframes.keyframes import black_bar_crop print("✅ Black bar cropping module loaded.") except Exception as e: print(f"⚠️ Could not load black_bar_crop: {e}. Cropping will be SKIPPED.") def black_bar_crop(): return 0, 0, None, None try: from backend.simple_color_enhancer import SimpleColorEnhancer print("✅ SimpleColorEnhancer loaded.") except Exception as e: print(f"⚠️ Could not load SimpleColorEnhancer: {e}. This feature will be SKIPPED.") class SimpleColorEnhancer: def enhance_batch(self, *args, **kwargs): print("-> Skipping simple color enhancement (module not loaded).") def enhance_single(self, *args, **kwargs): print("-> Skipping simple color enhancement (module not loaded).") try: from backend.quality_color_enhancer import QualityColorEnhancer print("✅ QualityColorEnhancer loaded.") except Exception as e: print(f"⚠️ Could not load QualityColorEnhancer: {e}. This feature will be SKIPPED.") class QualityColorEnhancer: def batch_enhance(self, *args, **kwargs): print("-> Skipping quality color enhancement (module not loaded).") def enhance_single(self, *args, **kwargs): print("-> Skipping quality color enhancement (module not loaded).") try: from backend.class_def import bubble, panel, Page print("✅ Core class definitions (bubble, panel, Page) loaded.") except Exception as e: print(f"⚠️ CRITICAL: Could not load core class definitions: {e}. Using fallback definitions.") def bubble(**kwargs): return kwargs def panel(**kwargs): return kwargs class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles try: from backend.ai_enhanced_core import image_processor, comic_styler, face_detector, layout_optimizer from backend.ai_bubble_placement import ai_bubble_placer from backend.subtitles.subs_real import get_real_subtitles from backend.keyframes.keyframes_simple import generate_keyframes_simple print("✅ Core utility modules loaded.") except Exception as e: print(f"⚠️ Could not load a core utility module: {e}") app = Flask(__name__) INDEX_HTML = ''' Movie to Comic Generator

🎬 Movie to Comic Generator

No file selected

Starting...

✅ Generation Complete! Opening your comic...

''' os.makedirs('video', exist_ok=True) os.makedirs('frames/final', exist_ok=True) os.makedirs('output', exist_ok=True) def update_status(message, progress): status_file = os.path.join('output', 'status.json') with open(status_file, 'w') as f: json.dump({'message': message, 'progress': progress}, f) class EnhancedComicGenerator: def __init__(self): self.video_path = 'video/uploaded.mp4' self.frames_dir = 'frames/final' self.output_dir = 'output' self.apply_comic_style = False self.video_fps = None def cleanup_generated(self): print("🧹 Performing full cleanup...") if os.path.isdir(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.isdir(self.output_dir): shutil.rmtree(self.output_dir) if os.path.isdir('temp'): shutil.rmtree('temp') if os.path.exists('test1.srt'): os.remove('test1.srt') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) print("✅ Cleanup complete.") def regenerate_frame(self, frame_filename, direction): try: if not self.video_fps: return {"success": False, "message": "Video FPS not found."} metadata_path = 'frames/frame_metadata.json' if not os.path.exists(metadata_path): return {"success": False, "message": "Frame metadata missing."} with open(metadata_path, 'r') as f: frame_to_time = json.load(f) if frame_filename not in frame_to_time: return {"success": False, "message": "Panel not linked to video."} current_time = frame_to_time[frame_filename]['time'] if isinstance(frame_to_time[frame_filename], dict) else frame_to_time[frame_filename] frame_duration = 1.0 / self.video_fps target_time = current_time + frame_duration if direction == 'forward' else current_time - frame_duration target_time = max(0, target_time) cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): return {"success": False, "message": "Cannot open video."} cap.set(cv2.CAP_PROP_POS_MSEC, target_time * 1000) ret, frame = cap.read() cap.release() if not ret or frame is None: return {"success": False, "message": f"No frame at {target_time:.2f}s."} new_path = os.path.join(self.frames_dir, frame_filename) cv2.imwrite(new_path, frame) print(f"🎨 Applying enhancements to the new frame: {frame_filename}") self._enhance_all_images(single_image_path=new_path) self._enhance_quality_colors(single_image_path=new_path) if isinstance(frame_to_time[frame_filename], dict): frame_to_time[frame_filename]['time'] = target_time else: frame_to_time[frame_filename] = target_time with open(metadata_path, 'w') as f: json.dump(frame_to_time, f, indent=2) message = f"Adjusted {direction} to {target_time:.3f}s" print(f"✅ {message}") return {"success": True, "message": message, "new_filename": frame_filename} except Exception as e: traceback.print_exc() return {"success": False, "message": str(e)} def get_frame_at_timestamp(self, frame_filename, timestamp_seconds): try: metadata_path = 'frames/frame_metadata.json' if not os.path.exists(metadata_path): return {"success": False, "message": "Frame metadata missing."} cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): return {"success": False, "message": "Cannot open video."} fps = cap.get(cv2.CAP_PROP_FPS) if fps == 0: fps = 25 duration = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / fps if timestamp_seconds < 0 or timestamp_seconds > duration: cap.release() return {"success": False, "message": f"Timestamp must be between 0 and {duration:.2f}s."} cap.set(cv2.CAP_PROP_POS_MSEC, timestamp_seconds * 1000) ret, frame = cap.read() cap.release() if not ret or frame is None: return {"success": False, "message": f"Could not retrieve frame at {timestamp_seconds:.2f}s."} new_path = os.path.join(self.frames_dir, frame_filename) cv2.imwrite(new_path, frame) print(f"🎨 Applying enhancements to the new frame from timestamp: {frame_filename}") self._enhance_all_images(single_image_path=new_path) self._enhance_quality_colors(single_image_path=new_path) with open(metadata_path, 'r') as f: frame_to_time = json.load(f) if frame_filename in frame_to_time: if isinstance(frame_to_time[frame_filename], dict): frame_to_time[frame_filename]['time'] = timestamp_seconds else: frame_to_time[frame_filename] = timestamp_seconds with open(metadata_path, 'w') as f: json.dump(frame_to_time, f, indent=2) message = f"Jumped to timestamp {timestamp_seconds:.3f}s" print(f"✅ {message}") return { "success": True, "message": message } except Exception as e: traceback.print_exc() return {"success": False, "message": str(e)} def generate_keyframes_from_moments(self, video_path, key_moments, max_frames=32): try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Cannot open video for keyframe extraction") fps, total_frames = self.video_fps, int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps key_moments.sort(key=lambda x: x['start']) if len(key_moments) > max_frames: pass # Simplified sampling frame_metadata, frame_count = {}, 0 for i, moment in enumerate(key_moments): update_status(f"Extracting frame {i+1}/{len(key_moments)}...", 25 + int(20 * (i / len(key_moments)))) frame_time = (moment['start'] + moment['end']) / 2 if frame_time > duration: continue frame_number = int(frame_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() if ret: frame_filename = f"frame_{frame_count:04d}.png" frame_path = os.path.join(self.frames_dir, frame_filename) cv2.imwrite(frame_path, frame) frame_metadata[frame_filename] = { 'time': frame_time, 'dialogue': moment['text'], 'start': moment['start'], 'end': moment['end'] } frame_count += 1 cap.release() with open(os.path.join('frames', 'frame_metadata.json'), 'w') as f: json.dump(frame_metadata, f, indent=2) print(f"✅ Extracted {frame_count} keyframes from video") return True except Exception as e: print(f"❌ Error extracting keyframes: {e}") return False def generate_comic(self): start_time = time.time() try: update_status("Cleaning up...", 0) self.cleanup_generated() update_status("Analyzing video...", 5) cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise Exception("Cannot open video to get FPS.") self.video_fps = cap.get(cv2.CAP_PROP_FPS) if self.video_fps == 0: self.video_fps = 25 cap.release() print(f"✅ Video FPS detected: {self.video_fps:.2f}") update_status("Generating subtitles (this may take a while)...", 10) get_real_subtitles(self.video_path) with open('test1.srt', 'r', encoding='utf-8') as f: all_subs = list(srt.parse(f.read())) key_moments = [{'index': s.index, 'text': s.content, 'start': s.start.total_seconds(), 'end': s.end.total_seconds()} for s in all_subs] if not self.generate_keyframes_from_moments(self.video_path, key_moments, max_frames=32): raise Exception("Keyframe extraction failed.") update_status("Cropping black bars...", 45) black_x, black_y, _, _ = black_bar_crop() update_status("Enhancing images (in parallel)...", 50) self._enhance_all_images() self._enhance_quality_colors() update_status("Placing speech bubbles (in parallel)...", 75) bubbles = self._create_ai_bubbles_from_moments(black_x, black_y) update_status("Assembling comic pages...", 90) pages = self._generate_pages(bubbles) update_status("Saving final comic...", 95) self._save_results(pages) execution_time = (time.time() - start_time) / 60 print(f"✅ Comic generation completed in {execution_time:.2f} minutes") update_status("Complete!", 100) return True except Exception as e: print(f"❌ Comic generation failed: {e}") traceback.print_exc() update_status(f"Error: {e}", -1) return False def _enhance_all_images(self, single_image_path=None): try: enhancer = SimpleColorEnhancer() if single_image_path: enhancer.enhance_single(single_image_path) else: frame_paths = [os.path.join(self.frames_dir, f) for f in os.listdir(self.frames_dir) if f.endswith('.png')] with ThreadPoolExecutor() as executor: list(executor.map(enhancer.enhance_single, frame_paths)) except Exception as e: print(f"❌ Simple enhancement failed: {e}") def _enhance_quality_colors(self, single_image_path=None): try: enhancer = QualityColorEnhancer() if single_image_path: enhancer.enhance_single(single_image_path) else: frame_paths = [os.path.join(self.frames_dir, f) for f in os.listdir(self.frames_dir) if f.endswith('.png')] with ThreadPoolExecutor() as executor: list(executor.map(enhancer.enhance_single, frame_paths)) except Exception as e: print(f"⚠️ Quality enhancement failed: {e}") def _process_bubble_for_frame(self, frame_file): frame_path = os.path.join(self.frames_dir, frame_file) dialogue = self.frame_metadata.get(frame_file, {}).get('dialogue', "") try: faces = face_detector.detect_faces(frame_path) lip_x, lip_y = face_detector.get_lip_position(frame_path, faces[0]) if faces else (-1, -1) bubble_x, bubble_y = ai_bubble_placer.place_bubble_ai(frame_path, (lip_x, lip_y)) return bubble(bubble_offset_x=bubble_x, bubble_offset_y=bubble_y, lip_x=lip_x, lip_y=lip_y, dialog=dialogue, emotion='normal') except Exception as e: print(f"-> Could not place bubble for {frame_file}: {e}. Using default.") return bubble(bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, dialog=dialogue, emotion='normal') def _create_ai_bubbles_from_moments(self, black_x, black_y): frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) metadata_path = 'frames/frame_metadata.json' if not os.path.exists(metadata_path): return [bubble(dialog="") for _ in frame_files] with open(metadata_path, 'r') as f: self.frame_metadata = json.load(f) with ThreadPoolExecutor() as executor: bubbles = list(executor.map(self._process_bubble_for_frame, frame_files)) return bubbles def _generate_pages(self, bubbles): try: from backend.fixed_12_pages_800x1080 import generate_12_pages_800x1080 return generate_12_pages_800x1080(sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]), bubbles) except ImportError: pages, frame_files = [], sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) num_pages = (len(frame_files) + 3) // 4 for i in range(num_pages): start, end = i*4, (i+1)*4 page_panels = [panel(image=f) for f in frame_files[start:end]] page_bubbles = bubbles[start:end] if page_panels: pages.append(Page(panels=page_panels, bubbles=page_bubbles)) return pages def _save_results(self, pages): try: os.makedirs(self.output_dir, exist_ok=True) pages_data = [] for page in pages: panels = [p.__dict__ if hasattr(p, '__dict__') else p for p in page.panels] bubbles_data = [b.__dict__ if hasattr(b, '__dict__') else b for b in page.bubbles] pages_data.append({'panels': panels, 'bubbles': bubbles_data}) with open(os.path.join(self.output_dir, 'pages.json'), 'w', encoding='utf-8') as f: json.dump(pages_data, f, indent=2) self._copy_template_files() print("✅ Results saved successfully!") except Exception as e: print(f"Save results failed: {e}") def _copy_template_files(self): try: template_html = ''' Comic Editor

🎬 Generated Comic

Loading comic...

✏️ Interactive Editor

''' # --- CORRECTED INDENTATION --- with open(os.path.join(self.output_dir, 'page.html'), 'w', encoding='utf-8') as f: f.write(template_html) print("📄 Template files copied successfully!") except Exception as e: print(f"Template copy failed: {e}") # --- Flask Routes --- comic_generator = EnhancedComicGenerator() @app.route('/') def index(): return INDEX_HTML @app.route('/uploader', methods=['POST']) def upload_file(): try: if 'file' not in request.files or not request.files['file'].filename: return jsonify({'success': False, 'message': 'No file selected'}), 400 f = request.files['file'] if os.path.exists(comic_generator.video_path): os.remove(comic_generator.video_path) f.save(comic_generator.video_path) threading.Thread(target=comic_generator.generate_comic).start() return jsonify({'success': True, 'message': 'Generation started.'}) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/status') def status(): status_file = os.path.join('output', 'status.json') if os.path.exists(status_file): return send_from_directory('output', 'status.json') return jsonify({'message': 'Initializing...', 'progress': 0}) @app.route('/handle_link', methods=['POST']) def handle_link(): # This route is disabled in the UI but remains functional pass @app.route('/replace_panel', methods=['POST']) def replace_panel(): try: if 'image' not in request.files: return jsonify({'success': False, 'error': 'No image provided.'}) file = request.files['image'] filename = f"replaced_panel_{int(time.time() * 1000)}.png" file.save(os.path.join(comic_generator.frames_dir, filename)) return jsonify({'success': True, 'new_filename': filename}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/regenerate_frame', methods=['POST']) def regenerate_frame_route(): try: data = request.get_json() result = comic_generator.regenerate_frame(data['filename'], data['direction']) return jsonify(result) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @app.route('/goto_timestamp', methods=['POST']) def goto_timestamp_route(): try: data = request.get_json() result = comic_generator.get_frame_at_timestamp(data['filename'], float(data['timestamp'])) return jsonify(result) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @app.route('/comic') def view_comic(): return send_from_directory('output', 'page.html') @app.route('/output/') def output_file(filename): return send_from_directory('output', filename) @app.route('/frames/final/') def frame_file(filename): return send_from_directory('frames/final', filename) if __name__ == '__main__': port = int(os.getenv("PORT", 7860)) print(f"🚀 Starting Enhanced Comic Generator on host 0.0.0.0, port {port}") app.run(debug=False, host='0.0.0.0', port=port)