import os import time import threading import json import traceback import logging import string import random import shutil import cv2 import math import numpy as np import srt from flask import Flask, jsonify, request, send_from_directory, send_file # ====================================================== # đž STORAGE SETUP # ====================================================== if os.path.exists('/data'): BASE_STORAGE_PATH = '/data' print("â Using Persistent Storage at /data") else: BASE_STORAGE_PATH = '.' print("â ī¸ Using Ephemeral Storage") BASE_USER_DIR = os.path.join(BASE_STORAGE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_STORAGE_PATH, "saved_comics") os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 def generate_save_code(length=8): chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # ====================================================== # đ§ą DATA CLASSES # ====================================================== def bubble(dialog="", x=50, y=20, type='speech'): classes = f"speech-bubble {type}" if type == 'speech': classes += " tail-bottom" elif type == 'thought': classes += " pos-bl" elif type == 'reaction': classes += " tail-bottom" return { 'dialog': dialog, 'bubble_offset_x': int(x), 'bubble_offset_y': int(y), 'type': type, 'tail_pos': '50%', 'classes': classes, 'colors': {'fill': '#ffffff', 'text': '#000000'}, 'font': "'Comic Neue', cursive" } def panel(image="", time=0.0): return {'image': image, 'time': time} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles # ====================================================== # đ§ CPU GENERATION (Formerly GPU) # ====================================================== def generate_comic_cpu(video_path, user_dir, frames_dir, metadata_path, target_pages): print(f"đ Generating 864x1080 Comic (CPU Mode): {video_path}") import cv2 import srt import numpy as np # Assuming backend.subtitles.subs_real exists in your file structure. # If not, the try/except block below handles the fallback. try: from backend.subtitles.subs_real import get_real_subtitles except ImportError: get_real_subtitles = None cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Cannot open video") fps = cap.get(cv2.CAP_PROP_FPS) or 25 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps cap.release() # Subtitles user_srt = os.path.join(user_dir, 'subs.srt') try: if get_real_subtitles: get_real_subtitles(video_path) if os.path.exists('test1.srt'): shutil.move('test1.srt', user_srt) elif not os.path.exists(user_srt): with open(user_srt, 'w') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\n...\n") else: raise Exception("Subtitles module missing") except: with open(user_srt, 'w') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\n...\n") with open(user_srt, 'r', encoding='utf-8') as f: try: all_subs = list(srt.parse(f.read())) except: all_subs = [] valid_subs = [s for s in all_subs if s.content.strip()] if valid_subs: raw_moments = [{'text': s.content.strip(), 'start': s.start.total_seconds(), 'end': s.end.total_seconds()} for s in valid_subs] else: raw_moments = [] panels_per_page = 4 total_panels_needed = int(target_pages) * panels_per_page selected_moments = [] if not raw_moments: times = np.linspace(1, max(1, duration-1), total_panels_needed) for t in times: selected_moments.append({'text': '', 'start': t, 'end': t+1}) elif len(raw_moments) <= total_panels_needed: selected_moments = raw_moments else: indices = np.linspace(0, len(raw_moments) - 1, total_panels_needed, dtype=int) selected_moments = [raw_moments[i] for i in indices] frame_metadata = {} cap = cv2.VideoCapture(video_path) count = 0 frame_files_ordered = [] frame_times = [] for i, moment in enumerate(selected_moments): mid = (moment['start'] + moment['end']) / 2 cap.set(cv2.CAP_PROP_POS_MSEC, mid * 1000) ret, frame = cap.read() if ret: frame = cv2.resize(frame, (1920, 1080)) fname = f"frame_{count:04d}.png" p = os.path.join(frames_dir, fname) cv2.imwrite(p, frame) frame_metadata[fname] = {'dialogue': moment['text'], 'time': mid} frame_files_ordered.append(fname) frame_times.append(mid) count += 1 cap.release() with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) bubbles_list = [] for i, f in enumerate(frame_files_ordered): dialogue = frame_metadata.get(f, {}).get('dialogue', '') # đ¯ STRICT BUBBLE TYPE LOGIC (Prefer Speech) b_type = 'speech' if '(' in dialogue: b_type = 'narration' elif '!' in dialogue and dialogue.isupper() and len(dialogue) < 10: # Only use reaction if VERY short and yelling b_type = 'reaction' # Smart Positioning for 864x1080 pos_idx = i % 4 if pos_idx == 0: bx, by = 150, 80 elif pos_idx == 1: bx, by = 580, 80 elif pos_idx == 2: bx, by = 150, 600 elif pos_idx == 3: bx, by = 580, 600 else: bx, by = 50, 50 bubbles_list.append(bubble(dialog=dialogue, x=bx, y=by, type=b_type)) pages = [] for i in range(int(target_pages)): start_idx = i * 4 end_idx = start_idx + 4 p_frames = frame_files_ordered[start_idx:end_idx] p_times = frame_times[start_idx:end_idx] p_bubbles = bubbles_list[start_idx:end_idx] while len(p_frames) < 4: fname = f"empty_{i}_{len(p_frames)}.png" img = np.zeros((1080, 1920, 3), dtype=np.uint8); img[:] = (30,30,30) cv2.imwrite(os.path.join(frames_dir, fname), img) p_frames.append(fname) p_times.append(0.0) p_bubbles.append(bubble(dialog="", x=-999, y=-999, type='speech')) if p_frames: pg_panels = [panel(image=p_frames[j], time=p_times[j]) for j in range(len(p_frames))] pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) result = [] for pg in pages: p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] result.append({'panels': p_data, 'bubbles': b_data}) return result def regen_frame_cpu(video_path, frames_dir, metadata_path, fname, direction): import cv2 import json if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 offset = (1.0/fps) * (1 if direction == 'forward' else -1) new_t = max(0, t + offset) cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame) if isinstance(meta[fname], dict): meta[fname]['time'] = new_t else: meta[fname] = new_t with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Time: {new_t:.2f}s", "new_time": new_t} return {"success": False} def get_frame_at_ts_cpu(video_path, frames_dir, metadata_path, fname, ts): import cv2 import json cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame) if os.path.exists(metadata_path): with open(metadata_path, 'r') as f: meta = json.load(f) if fname in meta: if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Jumped to {ts}s", "new_time": float(ts)} return {"success": False, "message": "Invalid timestamp"} class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') def cleanup(self): if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) def run(self, target_pages): try: self.write_status("Generating...", 5) # Call CPU function instead of GPU data = generate_comic_cpu(self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages)) with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: json.dump(data, f, indent=2) self.write_status("Complete!", 100) except Exception as e: traceback.print_exc() self.write_status(f"Error: {str(e)}", -1) def write_status(self, msg, prog): with open(os.path.join(self.output_dir, 'status.json'), 'w') as f: json.dump({'message': msg, 'progress': prog}, f) # ====================================================== # đ ROUTES & FRONTEND # ====================================================== INDEX_HTML = '''
')
def load_comic(code):
code = code.upper()
save_dir = os.path.join(SAVED_COMICS_DIR, code)
if not os.path.exists(save_dir): return jsonify({'success': False, 'message': 'Code not found'})
try:
with open(os.path.join(save_dir, 'comic_state.json'), 'r') as f: data = json.load(f)
orig_sid = data['originalSid']
saved_frames = os.path.join(save_dir, 'frames')
user_frames = os.path.join(BASE_USER_DIR, orig_sid, 'frames')
os.makedirs(user_frames, exist_ok=True)
for fn in os.listdir(saved_frames):
shutil.copy2(os.path.join(saved_frames, fn), os.path.join(user_frames, fn))
return jsonify({'success': True, 'originalSid': orig_sid, 'pages': data['pages']})
except Exception as e: return jsonify({'success': False, 'message': str(e)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)