import os
import webbrowser
import time
import threading
from flask import Flask, render_template, request, jsonify, send_from_directory, send_file
from pathlib import Path
import cv2
import numpy as np
from PIL import Image
import srt
import json
import shutil
from typing import List
import traceback
from concurrent.futures import ThreadPoolExecutor
# --- ROBUST IMPORTS WITH FALLBACKS ---
try:
from backend.keyframes.keyframes import black_bar_crop
print("✅ Black bar cropping module loaded.")
except Exception as e:
print(f"⚠️ Could not load black_bar_crop: {e}. Cropping will be SKIPPED.")
def black_bar_crop():
return 0, 0, None, None
try:
from backend.simple_color_enhancer import SimpleColorEnhancer
print("✅ SimpleColorEnhancer loaded.")
except Exception as e:
print(f"⚠️ Could not load SimpleColorEnhancer: {e}. This feature will be SKIPPED.")
class SimpleColorEnhancer:
def enhance_batch(self, *args, **kwargs): print("-> Skipping simple color enhancement (module not loaded).")
def enhance_single(self, *args, **kwargs): print("-> Skipping simple color enhancement (module not loaded).")
try:
from backend.quality_color_enhancer import QualityColorEnhancer
print("✅ QualityColorEnhancer loaded.")
except Exception as e:
print(f"⚠️ Could not load QualityColorEnhancer: {e}. This feature will be SKIPPED.")
class QualityColorEnhancer:
def batch_enhance(self, *args, **kwargs): print("-> Skipping quality color enhancement (module not loaded).")
def enhance_single(self, *args, **kwargs): print("-> Skipping quality color enhancement (module not loaded).")
try:
from backend.class_def import bubble, panel, Page
print("✅ Core class definitions (bubble, panel, Page) loaded.")
except Exception as e:
print(f"⚠️ CRITICAL: Could not load core class definitions: {e}. Using fallback definitions.")
def bubble(**kwargs): return kwargs
def panel(**kwargs): return kwargs
class Page:
def __init__(self, panels, bubbles):
self.panels = panels
self.bubbles = bubbles
try:
from backend.ai_enhanced_core import image_processor, comic_styler, face_detector, layout_optimizer
from backend.ai_bubble_placement import ai_bubble_placer
from backend.subtitles.subs_real import get_real_subtitles
from backend.keyframes.keyframes_simple import generate_keyframes_simple
print("✅ Core utility modules loaded.")
except Exception as e:
print(f"⚠️ Could not load a core utility module: {e}")
app = Flask(__name__)
INDEX_HTML = '''
Movie to Comic Generator
🎬 Movie to Comic Generator
Starting...
✅ Generation Complete! Opening your comic...
'''
os.makedirs('video', exist_ok=True)
os.makedirs('frames/final', exist_ok=True)
os.makedirs('output', exist_ok=True)
def update_status(message, progress):
status_file = os.path.join('output', 'status.json')
with open(status_file, 'w') as f:
json.dump({'message': message, 'progress': progress}, f)
class EnhancedComicGenerator:
def __init__(self):
self.video_path = 'video/uploaded.mp4'
self.frames_dir = 'frames/final'
self.output_dir = 'output'
self.apply_comic_style = False
self.video_fps = None
def cleanup_generated(self):
print("🧹 Performing full cleanup...")
if os.path.isdir(self.frames_dir): shutil.rmtree(self.frames_dir)
if os.path.isdir(self.output_dir): shutil.rmtree(self.output_dir)
if os.path.isdir('temp'): shutil.rmtree('temp')
if os.path.exists('test1.srt'): os.remove('test1.srt')
os.makedirs(self.frames_dir, exist_ok=True)
os.makedirs(self.output_dir, exist_ok=True)
print("✅ Cleanup complete.")
def regenerate_frame(self, frame_filename, direction):
try:
if not self.video_fps:
return {"success": False, "message": "Video FPS not found."}
metadata_path = 'frames/frame_metadata.json'
if not os.path.exists(metadata_path):
return {"success": False, "message": "Frame metadata missing."}
with open(metadata_path, 'r') as f:
frame_to_time = json.load(f)
if frame_filename not in frame_to_time:
return {"success": False, "message": "Panel not linked to video."}
current_time = frame_to_time[frame_filename]['time'] if isinstance(frame_to_time[frame_filename], dict) else frame_to_time[frame_filename]
frame_duration = 1.0 / self.video_fps
target_time = current_time + frame_duration if direction == 'forward' else current_time - frame_duration
target_time = max(0, target_time)
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened(): return {"success": False, "message": "Cannot open video."}
cap.set(cv2.CAP_PROP_POS_MSEC, target_time * 1000)
ret, frame = cap.read()
cap.release()
if not ret or frame is None:
return {"success": False, "message": f"No frame at {target_time:.2f}s."}
new_path = os.path.join(self.frames_dir, frame_filename)
cv2.imwrite(new_path, frame)
print(f"🎨 Applying enhancements to the new frame: {frame_filename}")
self._enhance_all_images(single_image_path=new_path)
self._enhance_quality_colors(single_image_path=new_path)
if isinstance(frame_to_time[frame_filename], dict):
frame_to_time[frame_filename]['time'] = target_time
else:
frame_to_time[frame_filename] = target_time
with open(metadata_path, 'w') as f: json.dump(frame_to_time, f, indent=2)
message = f"Adjusted {direction} to {target_time:.3f}s"
print(f"✅ {message}")
return {"success": True, "message": message, "new_filename": frame_filename}
except Exception as e:
traceback.print_exc()
return {"success": False, "message": str(e)}
def get_frame_at_timestamp(self, frame_filename, timestamp_seconds):
try:
metadata_path = 'frames/frame_metadata.json'
if not os.path.exists(metadata_path): return {"success": False, "message": "Frame metadata missing."}
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened(): return {"success": False, "message": "Cannot open video."}
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0: fps = 25
duration = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / fps
if timestamp_seconds < 0 or timestamp_seconds > duration:
cap.release()
return {"success": False, "message": f"Timestamp must be between 0 and {duration:.2f}s."}
cap.set(cv2.CAP_PROP_POS_MSEC, timestamp_seconds * 1000)
ret, frame = cap.read()
cap.release()
if not ret or frame is None: return {"success": False, "message": f"Could not retrieve frame at {timestamp_seconds:.2f}s."}
new_path = os.path.join(self.frames_dir, frame_filename)
cv2.imwrite(new_path, frame)
print(f"🎨 Applying enhancements to the new frame from timestamp: {frame_filename}")
self._enhance_all_images(single_image_path=new_path)
self._enhance_quality_colors(single_image_path=new_path)
with open(metadata_path, 'r') as f: frame_to_time = json.load(f)
if frame_filename in frame_to_time:
if isinstance(frame_to_time[frame_filename], dict):
frame_to_time[frame_filename]['time'] = timestamp_seconds
else:
frame_to_time[frame_filename] = timestamp_seconds
with open(metadata_path, 'w') as f: json.dump(frame_to_time, f, indent=2)
message = f"Jumped to timestamp {timestamp_seconds:.3f}s"
print(f"✅ {message}")
return { "success": True, "message": message }
except Exception as e:
traceback.print_exc()
return {"success": False, "message": str(e)}
def generate_keyframes_from_moments(self, video_path, key_moments, max_frames=32):
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): raise Exception("Cannot open video for keyframe extraction")
fps, total_frames = self.video_fps, int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
key_moments.sort(key=lambda x: x['start'])
if len(key_moments) > max_frames: pass # Simplified sampling
frame_metadata, frame_count = {}, 0
for i, moment in enumerate(key_moments):
update_status(f"Extracting frame {i+1}/{len(key_moments)}...", 25 + int(20 * (i / len(key_moments))))
frame_time = (moment['start'] + moment['end']) / 2
if frame_time > duration: continue
frame_number = int(frame_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
if ret:
frame_filename = f"frame_{frame_count:04d}.png"
frame_path = os.path.join(self.frames_dir, frame_filename)
cv2.imwrite(frame_path, frame)
frame_metadata[frame_filename] = { 'time': frame_time, 'dialogue': moment['text'], 'start': moment['start'], 'end': moment['end'] }
frame_count += 1
cap.release()
with open(os.path.join('frames', 'frame_metadata.json'), 'w') as f:
json.dump(frame_metadata, f, indent=2)
print(f"✅ Extracted {frame_count} keyframes from video")
return True
except Exception as e:
print(f"❌ Error extracting keyframes: {e}")
return False
def generate_comic(self):
start_time = time.time()
try:
update_status("Cleaning up...", 0)
self.cleanup_generated()
update_status("Analyzing video...", 5)
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened(): raise Exception("Cannot open video to get FPS.")
self.video_fps = cap.get(cv2.CAP_PROP_FPS)
if self.video_fps == 0: self.video_fps = 25
cap.release()
print(f"✅ Video FPS detected: {self.video_fps:.2f}")
update_status("Generating subtitles (this may take a while)...", 10)
get_real_subtitles(self.video_path)
with open('test1.srt', 'r', encoding='utf-8') as f:
all_subs = list(srt.parse(f.read()))
key_moments = [{'index': s.index, 'text': s.content, 'start': s.start.total_seconds(), 'end': s.end.total_seconds()} for s in all_subs]
if not self.generate_keyframes_from_moments(self.video_path, key_moments, max_frames=32):
raise Exception("Keyframe extraction failed.")
update_status("Cropping black bars...", 45)
black_x, black_y, _, _ = black_bar_crop()
update_status("Enhancing images (in parallel)...", 50)
self._enhance_all_images()
self._enhance_quality_colors()
update_status("Placing speech bubbles (in parallel)...", 75)
bubbles = self._create_ai_bubbles_from_moments(black_x, black_y)
update_status("Assembling comic pages...", 90)
pages = self._generate_pages(bubbles)
update_status("Saving final comic...", 95)
self._save_results(pages)
execution_time = (time.time() - start_time) / 60
print(f"✅ Comic generation completed in {execution_time:.2f} minutes")
update_status("Complete!", 100)
return True
except Exception as e:
print(f"❌ Comic generation failed: {e}")
traceback.print_exc()
update_status(f"Error: {e}", -1)
return False
def _enhance_all_images(self, single_image_path=None):
try:
enhancer = SimpleColorEnhancer()
if single_image_path:
enhancer.enhance_single(single_image_path)
else:
frame_paths = [os.path.join(self.frames_dir, f) for f in os.listdir(self.frames_dir) if f.endswith('.png')]
with ThreadPoolExecutor() as executor:
list(executor.map(enhancer.enhance_single, frame_paths))
except Exception as e:
print(f"❌ Simple enhancement failed: {e}")
def _enhance_quality_colors(self, single_image_path=None):
try:
enhancer = QualityColorEnhancer()
if single_image_path:
enhancer.enhance_single(single_image_path)
else:
frame_paths = [os.path.join(self.frames_dir, f) for f in os.listdir(self.frames_dir) if f.endswith('.png')]
with ThreadPoolExecutor() as executor:
list(executor.map(enhancer.enhance_single, frame_paths))
except Exception as e:
print(f"⚠️ Quality enhancement failed: {e}")
def _process_bubble_for_frame(self, frame_file):
frame_path = os.path.join(self.frames_dir, frame_file)
dialogue = self.frame_metadata.get(frame_file, {}).get('dialogue', "")
try:
faces = face_detector.detect_faces(frame_path)
lip_x, lip_y = face_detector.get_lip_position(frame_path, faces[0]) if faces else (-1, -1)
bubble_x, bubble_y = ai_bubble_placer.place_bubble_ai(frame_path, (lip_x, lip_y))
return bubble(bubble_offset_x=bubble_x, bubble_offset_y=bubble_y, lip_x=lip_x, lip_y=lip_y, dialog=dialogue, emotion='normal')
except Exception as e:
print(f"-> Could not place bubble for {frame_file}: {e}. Using default.")
return bubble(bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, dialog=dialogue, emotion='normal')
def _create_ai_bubbles_from_moments(self, black_x, black_y):
frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')])
metadata_path = 'frames/frame_metadata.json'
if not os.path.exists(metadata_path):
return [bubble(dialog="") for _ in frame_files]
with open(metadata_path, 'r') as f:
self.frame_metadata = json.load(f)
with ThreadPoolExecutor() as executor:
bubbles = list(executor.map(self._process_bubble_for_frame, frame_files))
return bubbles
def _generate_pages(self, bubbles):
try:
from backend.fixed_12_pages_800x1080 import generate_12_pages_800x1080
return generate_12_pages_800x1080(sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]), bubbles)
except ImportError:
pages, frame_files = [], sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')])
num_pages = (len(frame_files) + 3) // 4
for i in range(num_pages):
start, end = i*4, (i+1)*4
page_panels = [panel(image=f) for f in frame_files[start:end]]
page_bubbles = bubbles[start:end]
if page_panels: pages.append(Page(panels=page_panels, bubbles=page_bubbles))
return pages
def _save_results(self, pages):
try:
os.makedirs(self.output_dir, exist_ok=True)
pages_data = []
for page in pages:
panels = [p.__dict__ if hasattr(p, '__dict__') else p for p in page.panels]
bubbles_data = [b.__dict__ if hasattr(b, '__dict__') else b for b in page.bubbles]
pages_data.append({'panels': panels, 'bubbles': bubbles_data})
with open(os.path.join(self.output_dir, 'pages.json'), 'w', encoding='utf-8') as f:
json.dump(pages_data, f, indent=2)
self._copy_template_files()
print("✅ Results saved successfully!")
except Exception as e:
print(f"Save results failed: {e}")
def _copy_template_files(self):
try:
template_html = '''
Comic Editor
🎬 Generated Comic
Loading comic...
✏️ Interactive Editor
'''
# --- CORRECTED INDENTATION ---
with open(os.path.join(self.output_dir, 'page.html'), 'w', encoding='utf-8') as f:
f.write(template_html)
print("📄 Template files copied successfully!")
except Exception as e:
print(f"Template copy failed: {e}")
# --- Flask Routes ---
comic_generator = EnhancedComicGenerator()
@app.route('/')
def index():
return INDEX_HTML
@app.route('/uploader', methods=['POST'])
def upload_file():
try:
if 'file' not in request.files or not request.files['file'].filename:
return jsonify({'success': False, 'message': 'No file selected'}), 400
f = request.files['file']
if os.path.exists(comic_generator.video_path): os.remove(comic_generator.video_path)
f.save(comic_generator.video_path)
threading.Thread(target=comic_generator.generate_comic).start()
return jsonify({'success': True, 'message': 'Generation started.'})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/status')
def status():
status_file = os.path.join('output', 'status.json')
if os.path.exists(status_file):
return send_from_directory('output', 'status.json')
return jsonify({'message': 'Initializing...', 'progress': 0})
@app.route('/handle_link', methods=['POST'])
def handle_link():
# This route is disabled in the UI but remains functional
pass
@app.route('/replace_panel', methods=['POST'])
def replace_panel():
try:
if 'image' not in request.files: return jsonify({'success': False, 'error': 'No image provided.'})
file = request.files['image']
filename = f"replaced_panel_{int(time.time() * 1000)}.png"
file.save(os.path.join(comic_generator.frames_dir, filename))
return jsonify({'success': True, 'new_filename': filename})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/regenerate_frame', methods=['POST'])
def regenerate_frame_route():
try:
data = request.get_json()
result = comic_generator.regenerate_frame(data['filename'], data['direction'])
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/goto_timestamp', methods=['POST'])
def goto_timestamp_route():
try:
data = request.get_json()
result = comic_generator.get_frame_at_timestamp(data['filename'], float(data['timestamp']))
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/comic')
def view_comic():
return send_from_directory('output', 'page.html')
@app.route('/output/')
def output_file(filename):
return send_from_directory('output', filename)
@app.route('/frames/final/')
def frame_file(filename):
return send_from_directory('frames/final', filename)
if __name__ == '__main__':
port = int(os.getenv("PORT", 7860))
print(f"🚀 Starting Enhanced Comic Generator on host 0.0.0.0, port {port}")
app.run(debug=False, host='0.0.0.0', port=port)