diff --git "a/app_enhanced.py" "b/app_enhanced.py" --- "a/app_enhanced.py" +++ "b/app_enhanced.py" @@ -1,249 +1,411 @@ +import spaces # <--- CRITICAL: MUST BE THE FIRST IMPORT import os import time import threading -import uuid -import shutil import json import traceback import logging import string import random -from concurrent.futures import ThreadPoolExecutor -from flask import Flask, render_template, request, jsonify, send_from_directory, send_file - -# --- 0. CONFIG & LOGGING --- +import shutil +import cv2 +import math +import numpy as np +import srt +from flask import Flask, jsonify, request, send_from_directory, send_file + +# ====================================================== +# ๐Ÿš€ ZEROGPU CONFIGURATION +# ====================================================== +@spaces.GPU +def gpu_warmup(): + import torch + print(f"โœ… ZeroGPU Warmup: CUDA Available: {torch.cuda.is_available()}") + return True + +# ====================================================== +# ๐Ÿงฑ DATA CLASSES +# ====================================================== +def bubble(dialog="", bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, emotion='normal', type='speech'): + return { + 'dialog': dialog, + 'bubble_offset_x': int(bubble_offset_x), + 'bubble_offset_y': int(bubble_offset_y), + 'lip_x': int(lip_x), + 'lip_y': int(lip_y), + 'emotion': emotion, + 'type': type, + 'tail_pos': '50%', + 'classes': f'speech-bubble {type} tail-bottom' + } + +def panel(image=""): + return {'image': image} + +class Page: + def __init__(self, panels, bubbles): + self.panels = panels + self.bubbles = bubbles + +# ====================================================== +# ๐Ÿ”ง APP CONFIG +# ====================================================== logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# --- 1. CORE DEPENDENCY CHECKS --- -try: - import cv2 - import numpy as np - from PIL import Image - import srt -except ImportError as e: - print(f"โŒ CRITICAL ERROR: Missing python library. {e}") - cv2 = None - np = None - Image = None - srt = None - -# --- 2. BACKEND IMPORTS WITH FALLBACKS --- -def dummy_func(*args, **kwargs): - return 0, 0, None, None - -try: - from backend.keyframes.keyframes import black_bar_crop - print("โœ… Black bar cropping module loaded.") -except Exception as e: - print(f"โš ๏ธ Could not load black_bar_crop: {e}. Cropping will be SKIPPED.") - black_bar_crop = dummy_func - -try: - from backend.simple_color_enhancer import SimpleColorEnhancer - print("โœ… SimpleColorEnhancer loaded.") -except Exception as e: - print(f"โš ๏ธ Could not load SimpleColorEnhancer: {e}.") - class SimpleColorEnhancer: - def enhance_batch(self, *args, **kwargs): pass - def enhance_single(self, *args, **kwargs): pass - -try: - from backend.quality_color_enhancer import QualityColorEnhancer - print("โœ… QualityColorEnhancer loaded.") -except Exception as e: - print(f"โš ๏ธ Could not load QualityColorEnhancer: {e}.") - class QualityColorEnhancer: - def batch_enhance(self, *args, **kwargs): pass - def enhance_single(self, *args, **kwargs): pass - -try: - from backend.class_def import bubble, panel, Page - print("โœ… Core class definitions loaded.") -except Exception as e: - print(f"โš ๏ธ Using fallback class definitions.") - def bubble(dialog="", bubble_offset_x=50, bubble_offset_y=20, lip_x=-1, lip_y=-1, emotion='normal'): - return { - 'dialog': dialog, - 'bubble_offset_x': bubble_offset_x, - 'bubble_offset_y': bubble_offset_y, - 'lip_x': lip_x, - 'lip_y': lip_y, - 'emotion': emotion - } - def panel(image=""): - return {'image': image} - class Page: - def __init__(self, panels, bubbles): - self.panels = panels - self.bubbles = bubbles - -try: - from backend.ai_enhanced_core import image_processor, comic_styler, face_detector, layout_optimizer - from backend.ai_bubble_placement import ai_bubble_placer - from backend.subtitles.subs_real import get_real_subtitles - from backend.keyframes.keyframes_simple import generate_keyframes_simple - print("โœ… Core utility modules loaded.") -except Exception as e: - print(f"โš ๏ธ Could not load utility modules: {e}") - def get_real_subtitles(v): pass - def generate_keyframes_simple(*args, **kwargs): pass - class DummyDetector: - def detect_faces(self, p): return [] - def get_lip_position(self, p, f): return -1, -1 - face_detector = DummyDetector() - class DummyPlacer: - def place_bubble_ai(self, p, l): return 50, 20 - ai_bubble_placer = DummyPlacer() - -# --- FLASK APP SETUP --- app = Flask(__name__) -BASE_USER_DIR = "userdata" +BASE_USER_DIR = "userdata" SAVED_COMICS_DIR = "saved_comics" -# Create directories os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) def generate_save_code(length=8): - """Generate a unique save code""" chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code -# --- FULL HTML INTERFACE --- +# ====================================================== +# ๐Ÿง  GLOBAL GPU FUNCTIONS +# ====================================================== + +@spaces.GPU(duration=300) +def generate_comic_gpu(video_path, user_dir, frames_dir, metadata_path, target_pages): + print(f"๐Ÿš€ GPU Task Started: {video_path} | Pages: {target_pages}") + + import cv2 + import srt + import numpy as np + from backend.keyframes.keyframes import black_bar_crop + from backend.simple_color_enhancer import SimpleColorEnhancer + from backend.quality_color_enhancer import QualityColorEnhancer + from backend.subtitles.subs_real import get_real_subtitles + from backend.ai_bubble_placement import ai_bubble_placer + from backend.ai_enhanced_core import face_detector + + # 1. Analyze Video + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): raise Exception("Cannot open video") + fps = cap.get(cv2.CAP_PROP_FPS) or 25 + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = total_frames / fps + cap.release() + + # 2. Subtitles Generation + user_srt = os.path.join(user_dir, 'subs.srt') + try: + get_real_subtitles(video_path) + if os.path.exists('test1.srt'): + shutil.move('test1.srt', user_srt) + except: + with open(user_srt, 'w') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\n...\n") + + with open(user_srt, 'r', encoding='utf-8') as f: + all_subs = list(srt.parse(f.read())) + + # 3. Smart Keyframe Selection + valid_subs = [s for s in all_subs if s.content.strip()] + raw_moments = [{'text': s.content, 'start': s.start.total_seconds(), 'end': s.end.total_seconds()} for s in valid_subs] + + if target_pages <= 0: target_pages = 1 + panels_per_page = 4 + total_panels_needed = target_pages * panels_per_page + + selected_moments = [] + if not raw_moments: + times = np.linspace(1, duration-1, total_panels_needed) + for t in times: selected_moments.append({'text': '', 'start': t, 'end': t+1}) + elif len(raw_moments) <= total_panels_needed: + selected_moments = raw_moments + else: + indices = np.linspace(0, len(raw_moments) - 1, total_panels_needed, dtype=int) + selected_moments = [raw_moments[i] for i in indices] + + # 4. Extract Frames + frame_metadata = {} + cap = cv2.VideoCapture(video_path) + count = 0 + frame_files_ordered = [] + + for i, moment in enumerate(selected_moments): + mid = (moment['start'] + moment['end']) / 2 + if mid > duration: mid = duration - 1 + cap.set(cv2.CAP_PROP_POS_FRAMES, int(mid * fps)) + ret, frame = cap.read() + if ret: + fname = f"frame_{count:04d}.png" + p = os.path.join(frames_dir, fname) + cv2.imwrite(p, frame) + os.sync() + frame_metadata[fname] = {'dialogue': moment['text'], 'time': mid} + frame_files_ordered.append(fname) + count += 1 + cap.release() + + with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) + + # 5. Image Enhancement + try: black_bar_crop() + except: pass + + se = SimpleColorEnhancer() + qe = QualityColorEnhancer() + + for f in frame_files_ordered: + p = os.path.join(frames_dir, f) + try: se.enhance_single(p, p) + except: pass + try: qe.enhance_single(p, p) + except: pass + + # 6. Bubble Placement + bubbles_list = [] + for f in frame_files_ordered: + p = os.path.join(frames_dir, f) + dialogue = frame_metadata.get(f, {}).get('dialogue', '') + + b_type = 'speech' + if '(' in dialogue and ')' in dialogue: b_type = 'narration' + elif '!' in dialogue and dialogue.isupper(): b_type = 'reaction' + elif '?' in dialogue: b_type = 'speech' + + try: + faces = face_detector.detect_faces(p) + lip = face_detector.get_lip_position(p, faces[0]) if faces else (-1, -1) + bx, by = ai_bubble_placer.place_bubble_ai(p, lip) + b = bubble(dialog=dialogue, bubble_offset_x=bx, bubble_offset_y=by, lip_x=lip[0], lip_y=lip[1], type=b_type) + bubbles_list.append(b) + except: + bubbles_list.append(bubble(dialog=dialogue, type=b_type)) + + # 7. Final Layout + pages = [] + for i in range(target_pages): + start_idx = i * 4 + end_idx = start_idx + 4 + p_frames = frame_files_ordered[start_idx:end_idx] + p_bubbles = bubbles_list[start_idx:end_idx] + if p_frames: + pg_panels = [panel(image=f) for f in p_frames] + pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) + + result = [] + for pg in pages: + p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] + b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] + result.append({'panels': p_data, 'bubbles': b_data}) + + return result + +@spaces.GPU +def regen_frame_gpu(video_path, frames_dir, metadata_path, fname, direction): + import cv2 + import json + from backend.simple_color_enhancer import SimpleColorEnhancer + + if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} + with open(metadata_path, 'r') as f: meta = json.load(f) + if fname not in meta: return {"success": False, "message": "Frame not found"} + + t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] + cap = cv2.VideoCapture(video_path) + fps = cap.get(cv2.CAP_PROP_FPS) or 25 + offset = (1.0/fps) * (1 if direction == 'forward' else -1) + new_t = max(0, t + offset) + + cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) + ret, frame = cap.read() + cap.release() + + if ret: + p = os.path.join(frames_dir, fname) + cv2.imwrite(p, frame) + os.sync() + try: SimpleColorEnhancer().enhance_single(p, p) + except: pass + + if isinstance(meta[fname], dict): meta[fname]['time'] = new_t + else: meta[fname] = new_t + with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) + return {"success": True, "message": f"Adjusted to {new_t:.2f}s"} + return {"success": False, "message": "End of video"} + +@spaces.GPU +def get_frame_at_ts_gpu(video_path, frames_dir, metadata_path, fname, ts): + import cv2 + import json + from backend.simple_color_enhancer import SimpleColorEnhancer + + cap = cv2.VideoCapture(video_path) + cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) + ret, frame = cap.read() + cap.release() + + if ret: + p = os.path.join(frames_dir, fname) + cv2.imwrite(p, frame) + os.sync() + try: SimpleColorEnhancer().enhance_single(p, p) + except: pass + + if os.path.exists(metadata_path): + with open(metadata_path, 'r') as f: meta = json.load(f) + if fname in meta: + if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) + else: meta[fname] = float(ts) + with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) + return {"success": True, "message": f"Jumped to {ts}s"} + return {"success": False, "message": "Invalid timestamp"} + +# ====================================================== +# ๐Ÿ’ป BACKEND CLASS +# ====================================================== +class EnhancedComicGenerator: + def __init__(self, sid): + self.sid = sid + self.user_dir = os.path.join(BASE_USER_DIR, sid) + self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') + self.frames_dir = os.path.join(self.user_dir, 'frames') + self.output_dir = os.path.join(self.user_dir, 'output') + os.makedirs(self.frames_dir, exist_ok=True) + os.makedirs(self.output_dir, exist_ok=True) + self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') + + def cleanup(self): + if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) + if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) + os.makedirs(self.frames_dir, exist_ok=True) + os.makedirs(self.output_dir, exist_ok=True) + + def run(self, target_pages): + try: + self.write_status("Waiting for GPU...", 5) + data = generate_comic_gpu(self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages)) + with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: + json.dump(data, f, indent=2) + self.write_status("Complete!", 100) + except Exception as e: + traceback.print_exc() + self.write_status(f"Error: {str(e)}", -1) + + def write_status(self, msg, prog): + with open(os.path.join(self.output_dir, 'status.json'), 'w') as f: + json.dump({'message': msg, 'progress': prog}, f) + +# ====================================================== +# ๐ŸŒ ROUTES & FULL UI +# ====================================================== + INDEX_HTML = ''' - Movie to Comic Generator + ๐ŸŽฌ Enhanced Comic Generator -
-

๐ŸŽฌ Comic Generator

- - +

๐ŸŽฌ Enhanced Comic Generator

No file selected - - - +
+ + + System calculates ~4 panels per page. +
+ + + -

๐Ÿ“ฅ Load Saved Comic

-

Enter your save code to continue editing

-
-
- -

โœ๏ธ Interactive Editor

-
- - -
- +
-
- - -
-
- - -
+
+
+
+
+ +
- -
-
-
- - + +
- +
-
@@ -438,161 +537,82 @@ INDEX_HTML = '''
-
- +
- ''' -# --- 3. ENHANCED COMIC GENERATOR CLASS --- -class EnhancedComicGenerator: - def __init__(self, sid): - self.sid = sid - self.user_dir = os.path.join(BASE_USER_DIR, sid) - self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') - self.frames_dir = os.path.join(self.user_dir, 'frames') - self.output_dir = os.path.join(self.user_dir, 'output') - self.status_file = os.path.join(self.output_dir, 'status.json') - self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') - os.makedirs(self.frames_dir, exist_ok=True) - os.makedirs(self.output_dir, exist_ok=True) - self.video_fps = None - self.frame_metadata = {} - - def update_status(self, message, progress): - try: - with open(self.status_file, 'w') as f: - json.dump({'message': message, 'progress': progress}, f) - except: - pass - - def cleanup_previous_run(self): - print(f"๐Ÿงน Cleaning up for session {self.sid}...") - if os.path.exists(self.frames_dir): - for f in os.listdir(self.frames_dir): - try: - os.remove(os.path.join(self.frames_dir, f)) - except: - pass - if os.path.exists(self.output_dir): - for f in os.listdir(self.output_dir): - if f != 'status.json': - try: - os.remove(os.path.join(self.output_dir, f)) - except: - pass - user_srt = os.path.join(self.user_dir, 'subs.srt') - if os.path.exists(user_srt): - os.remove(user_srt) - print("โœ… Cleanup complete.") - - def generate_keyframes_from_moments(self, key_moments, max_frames=48): - try: - cap = cv2.VideoCapture(self.video_path) - if not cap.isOpened(): - raise Exception("Cannot open video for keyframe extraction") - - fps = self.video_fps - total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - duration = total_frames / fps - - key_moments.sort(key=lambda x: x['start']) - frame_metadata = {} - frame_count = 0 - - for i, moment in enumerate(key_moments[:max_frames]): - self.update_status(f"Extracting frame {i+1}/{min(len(key_moments), max_frames)}...", - 25 + int(20 * (i / min(len(key_moments), max_frames)))) - - frame_time = (moment['start'] + moment['end']) / 2 - if frame_time > duration: - continue - - frame_number = int(frame_time * fps) - cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) - ret, frame = cap.read() - - if ret: - frame_filename = f"frame_{frame_count:04d}.png" - frame_path = os.path.join(self.frames_dir, frame_filename) - cv2.imwrite(frame_path, frame) - frame_metadata[frame_filename] = { - 'time': frame_time, - 'dialogue': moment['text'], - 'start': moment['start'], - 'end': moment['end'] - } - frame_count += 1 - - cap.release() - - with open(self.metadata_path, 'w') as f: - json.dump(frame_metadata, f, indent=2) - - print(f"โœ… Extracted {frame_count} keyframes from video") - return True - except Exception as e: - print(f"โŒ Error extracting keyframes: {e}") - traceback.print_exc() - return False - - def _enhance_all_images(self, single_image_path=None): - try: - enhancer = SimpleColorEnhancer() - if single_image_path: - enhancer.enhance_single(single_image_path) - else: - frame_paths = [os.path.join(self.frames_dir, f) - for f in os.listdir(self.frames_dir) if f.endswith('.png')] - with ThreadPoolExecutor() as executor: - list(executor.map(enhancer.enhance_single, frame_paths)) - print("โœ… Simple color enhancement complete") - except Exception as e: - print(f"โš ๏ธ Simple enhancement failed: {e}") - - def _enhance_quality_colors(self, single_image_path=None): - try: - enhancer = QualityColorEnhancer() - if single_image_path: - enhancer.enhance_single(single_image_path) - else: - frame_paths = [os.path.join(self.frames_dir, f) - for f in os.listdir(self.frames_dir) if f.endswith('.png')] - with ThreadPoolExecutor() as executor: - list(executor.map(enhancer.enhance_single, frame_paths)) - print("โœ… Quality color enhancement complete") - except Exception as e: - print(f"โš ๏ธ Quality enhancement failed: {e}") - - def _process_bubble_for_frame(self, frame_file): - frame_path = os.path.join(self.frames_dir, frame_file) - meta = self.frame_metadata.get(frame_file, {}) - dialogue = meta.get('dialogue', '') if isinstance(meta, dict) else '' - - try: - faces = face_detector.detect_faces(frame_path) - if faces: - lip_x, lip_y = face_detector.get_lip_position(frame_path, faces[0]) - else: - lip_x, lip_y = -1, -1 - bubble_x, bubble_y = ai_bubble_placer.place_bubble_ai(frame_path, (lip_x, lip_y)) - return bubble( - bubble_offset_x=bubble_x, - bubble_offset_y=bubble_y, - lip_x=lip_x, - lip_y=lip_y, - dialog=dialogue, - emotion='normal' - ) - except Exception as e: - print(f"-> Could not place bubble for {frame_file}: {e}. Using default.") - return bubble( - bubble_offset_x=50, - bubble_offset_y=20, - lip_x=-1, - lip_y=-1, - dialog=dialogue, - emotion='normal' - ) - - def _create_ai_bubbles_from_moments(self): - frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) - - if not os.path.exists(self.metadata_path): - return [bubble(dialog="") for _ in frame_files] - - with open(self.metadata_path, 'r') as f: - self.frame_metadata = json.load(f) - - with ThreadPoolExecutor() as executor: - bubbles = list(executor.map(self._process_bubble_for_frame, frame_files)) - - return bubbles - - def _generate_pages(self, bubbles_list): - try: - from backend.fixed_12_pages_800x1080 import generate_12_pages_800x1080 - frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) - return generate_12_pages_800x1080(frame_files, bubbles_list) - except ImportError: - pages = [] - frame_files = sorted([f for f in os.listdir(self.frames_dir) if f.endswith('.png')]) - num_pages = (len(frame_files) + 3) // 4 - - for i in range(num_pages): - start, end = i * 4, (i + 1) * 4 - page_panels = [panel(image=f) for f in frame_files[start:end]] - page_bubbles = bubbles_list[start:end] - if page_panels: - pages.append(Page(panels=page_panels, bubbles=page_bubbles)) - - return pages - - def generate_comic(self): - start_time = time.time() - try: - if cv2 is None: - raise Exception("OpenCV not installed") - - self.update_status("Cleaning up previous run...", 0) - self.cleanup_previous_run() - - self.update_status("Analyzing video...", 5) - cap = cv2.VideoCapture(self.video_path) - if not cap.isOpened(): - raise Exception("Cannot open video") - self.video_fps = cap.get(cv2.CAP_PROP_FPS) or 25 - cap.release() - print(f"โœ… Video FPS detected: {self.video_fps:.2f}") - - self.update_status("Generating subtitles (this may take a while)...", 10) - user_srt = os.path.join(self.user_dir, 'subs.srt') - try: - get_real_subtitles(self.video_path) - if os.path.exists('test1.srt'): - shutil.move('test1.srt', user_srt) - except Exception as e: - print(f"โš ๏ธ Subtitle generation failed: {e}. Creating fallback.") - with open(user_srt, 'w') as f: - f.write("1\n00:00:01,000 --> 00:00:04,000\nHello\n") - - self.update_status("Parsing subtitles...", 20) - with open(user_srt, 'r', encoding='utf-8') as f: - all_subs = list(srt.parse(f.read())) - - key_moments = [{ - 'index': s.index, - 'text': s.content, - 'start': s.start.total_seconds(), - 'end': s.end.total_seconds() - } for s in all_subs] - - self.update_status("Extracting keyframes...", 25) - if not self.generate_keyframes_from_moments(key_moments, max_frames=48): - raise Exception("Keyframe extraction failed") - - self.update_status("Cropping black bars...", 45) - try: - black_x, black_y, _, _ = black_bar_crop() - except: - black_x, black_y = 0, 0 - - self.update_status("Enhancing images...", 50) - self._enhance_all_images() - - self.update_status("Applying quality color enhancement...", 60) - self._enhance_quality_colors() - - self.update_status("Placing speech bubbles...", 75) - bubbles = self._create_ai_bubbles_from_moments() - - self.update_status("Assembling comic pages...", 90) - pages = self._generate_pages(bubbles) - - self.update_status("Saving results...", 95) - self._save_results(pages) - - execution_time = (time.time() - start_time) / 60 - print(f"โœ… Comic generation completed in {execution_time:.2f} minutes") - self.update_status("Complete!", 100) - return True - - except Exception as e: - print(f"โŒ Comic generation failed: {e}") - traceback.print_exc() - self.update_status(f"Error: {str(e)}", -1) - return False - - def _save_results(self, pages): - try: - pages_data = [] - for page in pages: - panels = [p.__dict__ if hasattr(p, '__dict__') else p for p in page.panels] - bubbles_data = [b.__dict__ if hasattr(b, '__dict__') else b for b in page.bubbles] - pages_data.append({'panels': panels, 'bubbles': bubbles_data}) - - with open(os.path.join(self.output_dir, 'pages.json'), 'w', encoding='utf-8') as f: - json.dump(pages_data, f, indent=2) - - print("โœ… Results saved successfully!") - except Exception as e: - print(f"โŒ Save results failed: {e}") - - def regenerate_frame(self, fname, direction): - try: - if not os.path.exists(self.metadata_path): - return {"success": False, "message": "Frame metadata missing."} - - with open(self.metadata_path, 'r') as f: - meta = json.load(f) - - if fname not in meta: - return {"success": False, "message": "Panel not linked to video."} - - current_data = meta[fname] - if isinstance(current_data, dict): - curr_time = current_data['time'] - else: - curr_time = current_data - - if not self.video_fps: - cap = cv2.VideoCapture(self.video_path) - self.video_fps = cap.get(cv2.CAP_PROP_FPS) or 25 - cap.release() - - offset = (1.0 / self.video_fps) * (1 if direction == 'forward' else -1) - new_time = max(0, curr_time + offset) - - cap = cv2.VideoCapture(self.video_path) - cap.set(cv2.CAP_PROP_POS_MSEC, new_time * 1000) - ret, frame = cap.read() - cap.release() - - if ret: - frame_path = os.path.join(self.frames_dir, fname) - cv2.imwrite(frame_path, frame) - - print(f"๐ŸŽจ Applying enhancements to new frame: {fname}") - self._enhance_all_images(single_image_path=frame_path) - self._enhance_quality_colors(single_image_path=frame_path) - - if isinstance(meta[fname], dict): - meta[fname]['time'] = new_time - else: - meta[fname] = new_time - with open(self.metadata_path, 'w') as f: - json.dump(meta, f, indent=2) - - message = f"Adjusted {direction} to {new_time:.3f}s" - print(f"โœ… {message}") - return {"success": True, "message": message} - - return {"success": False, "message": "End of video"} - - except Exception as e: - traceback.print_exc() - return {"success": False, "message": str(e)} - - def get_frame_at_timestamp(self, fname, ts): - try: - cap = cv2.VideoCapture(self.video_path) - if not cap.isOpened(): - return {"success": False, "message": "Cannot open video."} - - fps = cap.get(cv2.CAP_PROP_FPS) or 25 - duration = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / fps - - if ts < 0 or ts > duration: - cap.release() - return {"success": False, "message": f"Timestamp must be between 0 and {duration:.2f}s."} - - cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) - ret, frame = cap.read() - cap.release() - - if ret: - frame_path = os.path.join(self.frames_dir, fname) - cv2.imwrite(frame_path, frame) - - print(f"๐ŸŽจ Applying enhancements to frame from timestamp: {fname}") - self._enhance_all_images(single_image_path=frame_path) - self._enhance_quality_colors(single_image_path=frame_path) - - if os.path.exists(self.metadata_path): - with open(self.metadata_path, 'r') as f: - meta = json.load(f) - if fname in meta: - if isinstance(meta[fname], dict): - meta[fname]['time'] = float(ts) - else: - meta[fname] = float(ts) - with open(self.metadata_path, 'w') as f: - json.dump(meta, f, indent=2) - - message = f"Jumped to timestamp {ts:.3f}s" - print(f"โœ… {message}") - return {"success": True, "message": message} - - return {"success": False, "message": "Invalid time"} - - except Exception as e: - traceback.print_exc() - return {"success": False, "message": str(e)} - - -# --- ROUTES --- @app.route('/') def index(): return INDEX_HTML @@ -1561,75 +856,60 @@ def index(): @app.route('/uploader', methods=['POST']) def upload(): sid = request.args.get('sid') - if not sid: - return jsonify({'success': False, 'message': 'Missing session ID'}), 400 - + if not sid: return jsonify({'success': False, 'message': 'Missing session ID'}), 400 if 'file' not in request.files or not request.files['file'].filename: return jsonify({'success': False, 'message': 'No file selected'}), 400 + # GET PAGE COUNT FROM FORM + target_pages = request.form.get('target_pages', 4) + f = request.files['file'] gen = EnhancedComicGenerator(sid) - gen.cleanup_previous_run() + gen.cleanup() f.save(gen.video_path) - gen.update_status("Starting...", 5) + gen.write_status("Starting...", 5) - threading.Thread(target=gen.generate_comic).start() + # Run in thread + threading.Thread(target=gen.run, args=(target_pages,)).start() return jsonify({'success': True, 'message': 'Generation started.'}) @app.route('/status') def get_status(): sid = request.args.get('sid') - if not sid: - return jsonify({'progress': 0, 'message': 'Missing session ID'}) - path = os.path.join(BASE_USER_DIR, sid, 'output', 'status.json') - if os.path.exists(path): - return send_file(path) + if os.path.exists(path): return send_file(path) return jsonify({'progress': 0, 'message': "Waiting..."}) @app.route('/output/') def get_output(filename): sid = request.args.get('sid') - if not sid: - return "Missing session ID", 400 return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'output'), filename) @app.route('/frames/') def get_frame(filename): sid = request.args.get('sid') - if not sid: - return "Missing session ID", 400 return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'frames'), filename) @app.route('/regenerate_frame', methods=['POST']) def regen(): sid = request.args.get('sid') - if not sid: - return jsonify({'success': False, 'message': 'Missing session ID'}) - d = request.get_json() gen = EnhancedComicGenerator(sid) - return jsonify(gen.regenerate_frame(d['filename'], d['direction'])) + # CALL GLOBAL GPU FUNCTION + return jsonify(regen_frame_gpu(gen.video_path, gen.frames_dir, gen.metadata_path, d['filename'], d['direction'])) @app.route('/goto_timestamp', methods=['POST']) def go_time(): sid = request.args.get('sid') - if not sid: - return jsonify({'success': False, 'message': 'Missing session ID'}) - d = request.get_json() gen = EnhancedComicGenerator(sid) - return jsonify(gen.get_frame_at_timestamp(d['filename'], float(d['timestamp']))) + # CALL GLOBAL GPU FUNCTION + return jsonify(get_frame_at_ts_gpu(gen.video_path, gen.frames_dir, gen.metadata_path, d['filename'], float(d['timestamp']))) @app.route('/replace_panel', methods=['POST']) def rep_panel(): sid = request.args.get('sid') - if not sid: - return jsonify({'success': False, 'error': 'Missing session ID'}) - - if 'image' not in request.files: - return jsonify({'success': False, 'error': 'No image provided.'}) - + if 'image' not in request.files: return jsonify({'success': False, 'error': 'No image'}) f = request.files['image'] frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') os.makedirs(frames_dir, exist_ok=True) @@ -1637,103 +917,59 @@ def rep_panel(): f.save(os.path.join(frames_dir, fname)) return jsonify({'success': True, 'new_filename': fname}) -# --- SAVE COMIC ENDPOINT --- @app.route('/save_comic', methods=['POST']) def save_comic(): sid = request.args.get('sid') - if not sid: - return jsonify({'success': False, 'message': 'Missing session ID'}) - try: data = request.get_json() - - # Generate unique save code save_code = generate_save_code() save_dir = os.path.join(SAVED_COMICS_DIR, save_code) os.makedirs(save_dir, exist_ok=True) - # Copy frames from user directory to saved directory user_frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') saved_frames_dir = os.path.join(save_dir, 'frames') if os.path.exists(user_frames_dir): - if os.path.exists(saved_frames_dir): - shutil.rmtree(saved_frames_dir) + if os.path.exists(saved_frames_dir): shutil.rmtree(saved_frames_dir) shutil.copytree(user_frames_dir, saved_frames_dir) - # Save the comic state save_data = { 'code': save_code, 'originalSid': sid, 'pages': data.get('pages', []), 'savedAt': data.get('savedAt', time.strftime('%Y-%m-%d %H:%M:%S')) } - - with open(os.path.join(save_dir, 'comic_state.json'), 'w') as f: - json.dump(save_data, f, indent=2) - - print(f"โœ… Comic saved with code: {save_code}") + with open(os.path.join(save_dir, 'comic_state.json'), 'w') as f: json.dump(save_data, f, indent=2) return jsonify({'success': True, 'code': save_code}) - except Exception as e: traceback.print_exc() return jsonify({'success': False, 'message': str(e)}) -# --- LOAD COMIC ENDPOINT --- @app.route('/load_comic/') def load_comic(code): code = code.upper() save_dir = os.path.join(SAVED_COMICS_DIR, code) state_file = os.path.join(save_dir, 'comic_state.json') - if not os.path.exists(state_file): - return jsonify({'success': False, 'message': 'Save code not found'}) + if not os.path.exists(state_file): return jsonify({'success': False, 'message': 'Save code not found'}) try: - with open(state_file, 'r') as f: - save_data = json.load(f) - + with open(state_file, 'r') as f: save_data = json.load(f) original_sid = save_data.get('originalSid') - - # Copy frames to user directory if needed saved_frames_dir = os.path.join(save_dir, 'frames') if original_sid and os.path.exists(saved_frames_dir): user_frames_dir = os.path.join(BASE_USER_DIR, original_sid, 'frames') os.makedirs(user_frames_dir, exist_ok=True) - - # Copy files that don't exist for fname in os.listdir(saved_frames_dir): src = os.path.join(saved_frames_dir, fname) dst = os.path.join(user_frames_dir, fname) - if not os.path.exists(dst): - shutil.copy2(src, dst) - - return jsonify({ - 'success': True, - 'pages': save_data.get('pages', []), - 'originalSid': original_sid, - 'savedAt': save_data.get('savedAt') - }) - + if not os.path.exists(dst): shutil.copy2(src, dst) + return jsonify({ 'success': True, 'pages': save_data.get('pages', []), 'originalSid': original_sid, 'savedAt': save_data.get('savedAt') }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'message': str(e)}) -# --- SERVE SAVED COMIC FRAMES --- -@app.route('/saved_frames//') -def get_saved_frame(code, filename): - code = code.upper() - frames_dir = os.path.join(SAVED_COMICS_DIR, code, 'frames') - if os.path.exists(os.path.join(frames_dir, filename)): - return send_from_directory(frames_dir, filename) - return "Frame not found", 404 - - if __name__ == '__main__': - os.makedirs(BASE_USER_DIR, exist_ok=True) - os.makedirs(SAVED_COMICS_DIR, exist_ok=True) - port = int(os.getenv("PORT", 7860)) - print(f"๐Ÿš€ Starting Enhanced Comic Generator on host 0.0.0.0, port {port}") - print(f"๐Ÿ“ User data directory: {BASE_USER_DIR}") - print(f"๐Ÿ’พ Saved comics directory: {SAVED_COMICS_DIR}") - app.run(host='0.0.0.0', port=port, debug=False) \ No newline at end of file + try: gpu_warmup() + except: pass + app.run(host='0.0.0.0', port=7860) \ No newline at end of file