import os import time import threading import json import traceback import logging import string import random import shutil import cv2 import math import numpy as np import srt from flask import Flask, jsonify, request, send_from_directory, send_file # ====================================================== # 💾 STORAGE SETUP # ====================================================== if os.path.exists('/data'): BASE_STORAGE_PATH = '/data' print("✅ Using Persistent Storage at /data") else: BASE_STORAGE_PATH = '.' print("âš ī¸ Using Ephemeral Storage") BASE_USER_DIR = os.path.join(BASE_STORAGE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_STORAGE_PATH, "saved_comics") os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 def generate_save_code(length=8): chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # ====================================================== # 🧱 DATA CLASSES # ====================================================== def bubble(dialog="", x=50, y=20, type='speech'): classes = f"speech-bubble {type}" if type == 'speech': classes += " tail-bottom" elif type == 'thought': classes += " pos-bl" elif type == 'reaction': classes += " tail-bottom" return { 'dialog': dialog, 'bubble_offset_x': int(x), 'bubble_offset_y': int(y), 'type': type, 'tail_pos': '50%', 'classes': classes, 'colors': {'fill': '#ffffff', 'text': '#000000'}, 'font': "'Comic Neue', cursive" } def panel(image="", time=0.0): return {'image': image, 'time': time} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles # ====================================================== # 🧠 CPU GENERATION (Formerly GPU) # ====================================================== def generate_comic_cpu(video_path, user_dir, frames_dir, metadata_path, target_pages): print(f"🚀 Generating 864x1080 Comic (CPU Mode): {video_path}") import cv2 import srt import numpy as np # Assuming backend.subtitles.subs_real exists in your file structure. # If not, the try/except block below handles the fallback. try: from backend.subtitles.subs_real import get_real_subtitles except ImportError: get_real_subtitles = None cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Cannot open video") fps = cap.get(cv2.CAP_PROP_FPS) or 25 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps cap.release() # Subtitles user_srt = os.path.join(user_dir, 'subs.srt') try: if get_real_subtitles: get_real_subtitles(video_path) if os.path.exists('test1.srt'): shutil.move('test1.srt', user_srt) elif not os.path.exists(user_srt): with open(user_srt, 'w') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\n...\n") else: raise Exception("Subtitles module missing") except: with open(user_srt, 'w') as f: f.write("1\n00:00:01,000 --> 00:00:04,000\n...\n") with open(user_srt, 'r', encoding='utf-8') as f: try: all_subs = list(srt.parse(f.read())) except: all_subs = [] valid_subs = [s for s in all_subs if s.content.strip()] if valid_subs: raw_moments = [{'text': s.content.strip(), 'start': s.start.total_seconds(), 'end': s.end.total_seconds()} for s in valid_subs] else: raw_moments = [] panels_per_page = 4 total_panels_needed = int(target_pages) * panels_per_page selected_moments = [] if not raw_moments: times = np.linspace(1, max(1, duration-1), total_panels_needed) for t in times: selected_moments.append({'text': '', 'start': t, 'end': t+1}) elif len(raw_moments) <= total_panels_needed: selected_moments = raw_moments else: indices = np.linspace(0, len(raw_moments) - 1, total_panels_needed, dtype=int) selected_moments = [raw_moments[i] for i in indices] frame_metadata = {} cap = cv2.VideoCapture(video_path) count = 0 frame_files_ordered = [] frame_times = [] for i, moment in enumerate(selected_moments): mid = (moment['start'] + moment['end']) / 2 cap.set(cv2.CAP_PROP_POS_MSEC, mid * 1000) ret, frame = cap.read() if ret: frame = cv2.resize(frame, (1920, 1080)) fname = f"frame_{count:04d}.png" p = os.path.join(frames_dir, fname) cv2.imwrite(p, frame) frame_metadata[fname] = {'dialogue': moment['text'], 'time': mid} frame_files_ordered.append(fname) frame_times.append(mid) count += 1 cap.release() with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) bubbles_list = [] for i, f in enumerate(frame_files_ordered): dialogue = frame_metadata.get(f, {}).get('dialogue', '') # đŸŽ¯ STRICT BUBBLE TYPE LOGIC (Prefer Speech) b_type = 'speech' if '(' in dialogue: b_type = 'narration' elif '!' in dialogue and dialogue.isupper() and len(dialogue) < 10: # Only use reaction if VERY short and yelling b_type = 'reaction' # Smart Positioning for 864x1080 pos_idx = i % 4 if pos_idx == 0: bx, by = 150, 80 elif pos_idx == 1: bx, by = 580, 80 elif pos_idx == 2: bx, by = 150, 600 elif pos_idx == 3: bx, by = 580, 600 else: bx, by = 50, 50 bubbles_list.append(bubble(dialog=dialogue, x=bx, y=by, type=b_type)) pages = [] for i in range(int(target_pages)): start_idx = i * 4 end_idx = start_idx + 4 p_frames = frame_files_ordered[start_idx:end_idx] p_times = frame_times[start_idx:end_idx] p_bubbles = bubbles_list[start_idx:end_idx] while len(p_frames) < 4: fname = f"empty_{i}_{len(p_frames)}.png" img = np.zeros((1080, 1920, 3), dtype=np.uint8); img[:] = (30,30,30) cv2.imwrite(os.path.join(frames_dir, fname), img) p_frames.append(fname) p_times.append(0.0) p_bubbles.append(bubble(dialog="", x=-999, y=-999, type='speech')) if p_frames: pg_panels = [panel(image=p_frames[j], time=p_times[j]) for j in range(len(p_frames))] pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) result = [] for pg in pages: p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] result.append({'panels': p_data, 'bubbles': b_data}) return result def regen_frame_cpu(video_path, frames_dir, metadata_path, fname, direction): import cv2 import json if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 offset = (1.0/fps) * (1 if direction == 'forward' else -1) new_t = max(0, t + offset) cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame) if isinstance(meta[fname], dict): meta[fname]['time'] = new_t else: meta[fname] = new_t with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Time: {new_t:.2f}s", "new_time": new_t} return {"success": False} def get_frame_at_ts_cpu(video_path, frames_dir, metadata_path, fname, ts): import cv2 import json cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame) if os.path.exists(metadata_path): with open(metadata_path, 'r') as f: meta = json.load(f) if fname in meta: if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Jumped to {ts}s", "new_time": float(ts)} return {"success": False, "message": "Invalid timestamp"} class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') def cleanup(self): if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) def run(self, target_pages): try: self.write_status("Generating...", 5) # Call CPU function instead of GPU data = generate_comic_cpu(self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages)) with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: json.dump(data, f, indent=2) self.write_status("Complete!", 100) except Exception as e: traceback.print_exc() self.write_status(f"Error: {str(e)}", -1) def write_status(self, msg, prog): with open(os.path.join(self.output_dir, 'status.json'), 'w') as f: json.dump({'message': msg, 'progress': prog}, f) # ====================================================== # 🌐 ROUTES & FRONTEND # ====================================================== INDEX_HTML = ''' 864x1080 Robust Comic

⚡ 864x1080 Robust Comic (CPU)

No file selected
👉 Drag Right-Side Dots to reveal 4 panels! | 📜 Scroll to Zoom/Pan

âœī¸ Editor

''' @app.route('/') def index(): return INDEX_HTML @app.route('/uploader', methods=['POST']) def upload(): sid = request.args.get('sid') or request.form.get('sid') if not sid: return jsonify({'success': False, 'message': 'Missing session ID'}), 400 file = request.files.get('file') if not file or file.filename == '': return jsonify({'success': False, 'message': 'No file uploaded'}), 400 target_pages = request.form.get('target_pages', 4) gen = EnhancedComicGenerator(sid) gen.cleanup() file.save(gen.video_path) gen.write_status("Starting...", 5) threading.Thread(target=gen.run, args=(target_pages,)).start() return jsonify({'success': True}) @app.route('/status') def get_status(): sid = request.args.get('sid') path = os.path.join(BASE_USER_DIR, sid, 'output', 'status.json') if os.path.exists(path): return send_file(path) return jsonify({'progress': 0, 'message': "Waiting..."}) @app.route('/output/') def get_output(filename): sid = request.args.get('sid') return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'output'), filename) @app.route('/frames/') def get_frame(filename): sid = request.args.get('sid') return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'frames'), filename) @app.route('/regenerate_frame', methods=['POST']) def regen(): sid = request.args.get('sid') d = request.get_json() gen = EnhancedComicGenerator(sid) return jsonify(regen_frame_cpu(gen.video_path, gen.frames_dir, gen.metadata_path, d['filename'], d['direction'])) @app.route('/goto_timestamp', methods=['POST']) def go_time(): sid = request.args.get('sid') d = request.get_json() gen = EnhancedComicGenerator(sid) return jsonify(get_frame_at_ts_cpu(gen.video_path, gen.frames_dir, gen.metadata_path, d['filename'], float(d['timestamp']))) @app.route('/replace_panel', methods=['POST']) def rep_panel(): sid = request.args.get('sid') f = request.files['image'] frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') os.makedirs(frames_dir, exist_ok=True) fname = f"replaced_{int(time.time() * 1000)}.png" f.save(os.path.join(frames_dir, fname)) return jsonify({'success': True, 'new_filename': fname}) @app.route('/save_comic', methods=['POST']) def save_comic(): sid = request.args.get('sid') try: data = request.get_json() save_code = generate_save_code() save_dir = os.path.join(SAVED_COMICS_DIR, save_code) os.makedirs(save_dir, exist_ok=True) user_frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') saved_frames_dir = os.path.join(save_dir, 'frames') if os.path.exists(user_frames_dir): if os.path.exists(saved_frames_dir): shutil.rmtree(saved_frames_dir) shutil.copytree(user_frames_dir, saved_frames_dir) with open(os.path.join(save_dir, 'comic_state.json'), 'w') as f: json.dump({'originalSid': sid, 'pages': data['pages'], 'savedAt': time.time()}, f) return jsonify({'success': True, 'code': save_code}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @app.route('/load_comic/') def load_comic(code): code = code.upper() save_dir = os.path.join(SAVED_COMICS_DIR, code) if not os.path.exists(save_dir): return jsonify({'success': False, 'message': 'Code not found'}) try: with open(os.path.join(save_dir, 'comic_state.json'), 'r') as f: data = json.load(f) orig_sid = data['originalSid'] saved_frames = os.path.join(save_dir, 'frames') user_frames = os.path.join(BASE_USER_DIR, orig_sid, 'frames') os.makedirs(user_frames, exist_ok=True) for fn in os.listdir(saved_frames): shutil.copy2(os.path.join(saved_frames, fn), os.path.join(user_frames, fn)) return jsonify({'success': True, 'originalSid': orig_sid, 'pages': data['pages']}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)