import os import time import threading import json import traceback import string import random import shutil import cv2 import numpy as np import gc import srt import warnings import subprocess import whisper from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, jsonify, request, send_from_directory, send_file # Filter annoying CPU warnings warnings.filterwarnings("ignore") # ====================================================== # 💾 STORAGE SETUP # ====================================================== if os.path.exists('/data'): BASE_STORAGE_PATH = '/data' print("✅ Using Persistent Storage at /data") else: BASE_STORAGE_PATH = '.' print("âš ī¸ Using Ephemeral Storage") BASE_USER_DIR = os.path.join(BASE_STORAGE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_STORAGE_PATH, "saved_comics") os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 2 * 1024 * 1024 * 1024 # 2GB Limit def generate_save_code(length=8): chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # ====================================================== # 🧱 DATA CLASSES # ====================================================== def bubble(dialog="", x=50, y=20, type='speech'): classes = f"speech-bubble {type}" if type == 'speech': classes += " tail-bottom" elif type == 'thought': classes += " pos-bl" elif type == 'reaction': classes += " tail-bottom" return { 'dialog': dialog, 'bubble_offset_x': int(x), 'bubble_offset_y': int(y), 'type': type, 'tail_pos': '50%', 'classes': classes, 'colors': {'fill': '#ffffff', 'text': '#000000'}, 'font': "'Comic Neue', cursive", 'font_size': '16px' } def panel(image="", time=0.0): return {'image': image, 'time': time} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles # ====================================================== # 🧠 ROBUST GENERATION LOGIC # ====================================================== def generate_subtitles_cpu(video_path, srt_path, status_callback=None): try: if status_callback: status_callback("Extracting Audio (FFmpeg)...", 10) audio_path = video_path.replace(".mp4", ".wav") try: command = [ "ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path ] subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except FileNotFoundError: print("❌ FFmpeg not found.") return False except Exception as e: print(f"❌ Audio extraction failed: {e}") return False if status_callback: status_callback("Loading AI Model (Base)...", 15) model = whisper.load_model("base", device="cpu") if status_callback: status_callback("Transcribing Speech...", 20) result = model.transcribe(audio_path, fp16=False, language='en') segments = result['segments'] with open(srt_path, 'w', encoding='utf-8') as f: for i, segment in enumerate(segments): start = segment['start'] end = segment['end'] text = segment['text'].strip() def fmt_time(t): hours = int(t // 3600) minutes = int((t % 3600) // 60) seconds = int(t % 60) milliseconds = int((t % 1) * 1000) return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" f.write(f"{i + 1}\n") f.write(f"{fmt_time(start)} --> {fmt_time(end)}\n") f.write(f"{text}\n\n") if os.path.exists(audio_path): os.remove(audio_path) return True except Exception as e: print(f"Transcription Failed: {e}") traceback.print_exc() return False def extract_frame_task(args): video_path, time_sec, output_path, width, height = args try: cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (width, height)) cv2.imwrite(output_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) return True return False except Exception: return False def generate_comic_smart(video_path, user_dir, frames_dir, metadata_path, target_pages, status_callback=None): WORKER_COUNT = 4 print(f"🚀 Starting Smart Generation for {user_dir}") if status_callback: status_callback("Analyzing Video...", 5) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Cannot open video") fps = cap.get(cv2.CAP_PROP_FPS) or 25 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps cap.release() user_srt = os.path.join(user_dir, 'subs.srt') has_text = generate_subtitles_cpu(video_path, user_srt, status_callback) if status_callback: status_callback("Planning Pages...", 25) raw_moments = [] if has_text and os.path.exists(user_srt): with open(user_srt, 'r', encoding='utf-8') as f: try: subs = list(srt.parse(f.read())) valid_subs = [s for s in subs if s.content and s.content.strip() and s.content.strip() != "..."] for s in valid_subs: start = s.start.total_seconds() end = s.end.total_seconds() raw_moments.append({'text': s.content.strip(), 'start': start, 'end': end, 'mid': (start+end)/2}) except Exception as e: print(f"SRT Parse Error: {e}") panels_per_page = 4 total_panels_needed = int(target_pages) * panels_per_page selected_moments = [] if len(raw_moments) > 0: indices = np.linspace(0, len(raw_moments) - 1, total_panels_needed, dtype=int) for i in indices: selected_moments.append(raw_moments[i]) else: print("âš ī¸ No speech detected or FFmpeg missing, using visual spacing.") times = np.linspace(1, max(1, duration-1), total_panels_needed) for t in times: selected_moments.append({'text': "...", 'start': t, 'end': t+1, 'mid': t}) frame_metadata = {} frame_files_ordered = [] tasks = [] count = 0 for m in selected_moments: fname = f"frame_{count:04d}.jpg" out_p = os.path.join(frames_dir, fname) tasks.append((video_path, m['mid'], out_p, 1920, 1080)) frame_metadata[fname] = {'dialogue': m['text'], 'time': m['mid']} frame_files_ordered.append(fname) count += 1 total_tasks = len(tasks) completed = 0 start_time = time.time() gc.collect() if status_callback: status_callback("Drawing Panels...", 35) with ThreadPoolExecutor(max_workers=WORKER_COUNT) as executor: futures = [executor.submit(extract_frame_task, t) for t in tasks] for _ in as_completed(futures): completed += 1 if status_callback: elapsed = time.time() - start_time prog = 35 + int((completed / total_tasks) * 60) status_callback(f"Drawing Panels: {completed}/{total_tasks}", prog) with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) bubbles_list = [] for i, f in enumerate(frame_files_ordered): dialogue = frame_metadata.get(f, {}).get('dialogue', '...') b_type = 'speech' if '(' in dialogue or '[' in dialogue: b_type = 'narration' elif '!' in dialogue and dialogue.isupper() and len(dialogue) < 15: b_type = 'reaction' elif dialogue == "...": b_type = 'thought' pos_idx = i % 4 if pos_idx == 0: bx, by = 150, 80 elif pos_idx == 1: bx, by = 580, 80 elif pos_idx == 2: bx, by = 150, 600 elif pos_idx == 3: bx, by = 580, 600 else: bx, by = 50, 50 bubbles_list.append(bubble(dialog=dialogue, x=bx, y=by, type=b_type)) pages = [] for i in range(int(target_pages)): start_idx = i * 4 end_idx = start_idx + 4 p_frames = frame_files_ordered[start_idx:end_idx] p_times = [frame_metadata[f]['time'] for f in p_frames] p_bubbles = bubbles_list[start_idx:end_idx] while len(p_frames) < 4: fname = f"empty_{i}_{len(p_frames)}.jpg" img = np.zeros((1080, 1920, 3), dtype=np.uint8); img[:] = (30,30,30) cv2.imwrite(os.path.join(frames_dir, fname), img, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) p_frames.append(fname) p_times.append(0.0) p_bubbles.append(bubble(dialog="...", x=-999, y=-999, type='speech')) if p_frames: pg_panels = [panel(image=p_frames[j], time=p_times[j]) for j in range(len(p_frames))] pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) result = [] for pg in pages: p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] result.append({'panels': p_data, 'bubbles': b_data}) return result # ====================================================== # 🔧 HELPER FUNCTIONS # ====================================================== def regen_frame_cpu(video_path, frames_dir, metadata_path, fname, direction): if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 offset = (1.0/fps) * (10 if direction == 'forward' else -10) new_t = max(0, t + offset) cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) if isinstance(meta[fname], dict): meta[fname]['time'] = new_t else: meta[fname] = new_t with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Time: {new_t:.2f}s", "new_time": new_t} return {"success": False} def get_frame_at_ts_cpu(video_path, frames_dir, metadata_path, fname, ts): cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) if os.path.exists(metadata_path): with open(metadata_path, 'r') as f: meta = json.load(f) if fname in meta: if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Jumped to {ts}s", "new_time": float(ts)} return {"success": False, "message": "Invalid timestamp"} class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') def cleanup(self): if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) def run(self, target_pages): try: self.write_status("Initializing...", 1) data = generate_comic_smart( self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages), status_callback=self.write_status ) with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: json.dump(data, f, indent=2) self.write_status("Complete!", 100) except Exception as e: traceback.print_exc() self.write_status(f"Error: {str(e)}", -1) def write_status(self, msg, prog): temp_path = os.path.join(self.output_dir, 'status.tmp') final_path = os.path.join(self.output_dir, 'status.json') try: with open(temp_path, 'w') as f: json.dump({'message': msg, 'progress': prog}, f) os.replace(temp_path, final_path) except: pass # ====================================================== # 🌐 FRONTEND # ====================================================== INDEX_HTML = ''' 864x1080 Robust Comic

⚡ 864x1080 Robust Comic (Smart Mode)

No file selected
👉 Drag Right-Side Dots to reveal 4 panels! | 📜 Scroll to Zoom/Pan

âœī¸ Editor

''' # ====================================================== # 🚀 FLASK ROUTES # ====================================================== @app.route('/') def index(): return INDEX_HTML @app.route('/uploader', methods=['POST']) def upload_file(): try: sid = request.args.get('sid') if not sid: return jsonify({'message': 'No SID'}), 400 target_pages = request.form.get('target_pages', 4) file = request.files['file'] user_dir = os.path.join(BASE_USER_DIR, sid) os.makedirs(user_dir, exist_ok=True) video_path = os.path.join(user_dir, 'uploaded.mp4') file.save(video_path) generator = EnhancedComicGenerator(sid) generator.cleanup() thread = threading.Thread(target=generator.run, args=(target_pages,)) thread.start() return jsonify({'message': 'Upload successful, processing started.'}) except Exception as e: traceback.print_exc() return jsonify({'message': str(e)}), 500 @app.route('/status') def status(): sid = request.args.get('sid') status_path = os.path.join(BASE_USER_DIR, sid, 'output', 'status.json') if os.path.exists(status_path): try: with open(status_path, 'r') as f: return jsonify(json.load(f)) except: return jsonify({'message': 'Reading status...', 'progress': 0}) return jsonify({'message': 'Waiting...', 'progress': 0}) @app.route('/frames/') def serve_frame(filename): sid = request.args.get('sid') if not sid: return "Missing SID", 400 return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'frames'), filename) @app.route('/output/') def serve_output(filename): sid = request.args.get('sid') if not sid: return "Missing SID", 400 return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'output'), filename) @app.route('/regenerate_frame', methods=['POST']) def regenerate_frame(): data = request.json sid = request.args.get('sid') return jsonify(regen_frame_cpu( os.path.join(BASE_USER_DIR, sid, 'uploaded.mp4'), os.path.join(BASE_USER_DIR, sid, 'frames'), os.path.join(BASE_USER_DIR, sid, 'frames', 'frame_metadata.json'), data['filename'], data['direction'] )) @app.route('/goto_timestamp', methods=['POST']) def goto_timestamp(): data = request.json sid = request.args.get('sid') return jsonify(get_frame_at_ts_cpu( os.path.join(BASE_USER_DIR, sid, 'uploaded.mp4'), os.path.join(BASE_USER_DIR, sid, 'frames'), os.path.join(BASE_USER_DIR, sid, 'frames', 'frame_metadata.json'), data['filename'], data['timestamp'] )) @app.route('/replace_panel', methods=['POST']) def replace_panel(): sid = request.args.get('sid') f = request.files['image'] # Changed to .jpg fname = f"custom_{int(time.time())}.jpg" save_path = os.path.join(BASE_USER_DIR, sid, 'frames', fname) f.save(save_path) # Resize and save as JPG img = cv2.imread(save_path) if img is not None: img = cv2.resize(img, (1920, 1080)) cv2.imwrite(save_path, img, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) return jsonify({'success': True, 'new_filename': fname}) @app.route('/save_comic', methods=['POST']) def save_comic(): try: sid = request.args.get('sid') data = request.json code = generate_save_code() save_dir = os.path.join(SAVED_COMICS_DIR, code) os.makedirs(save_dir, exist_ok=True) # Save JSON State with open(os.path.join(save_dir, 'comic_data.json'), 'w') as f: json.dump({'originalSid': sid, 'pages': data['pages']}, f) # Copy Frames src_frames = os.path.join(BASE_USER_DIR, sid, 'frames') dest_frames = os.path.join(save_dir, 'frames') if os.path.exists(src_frames): shutil.copytree(src_frames, dest_frames) return jsonify({'success': True, 'code': code}) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'message': str(e)}) @app.route('/load_comic/') def load_comic_route(code): save_dir = os.path.join(SAVED_COMICS_DIR, code) if not os.path.exists(save_dir): return jsonify({'success': False, 'message': 'Comic not found'}) with open(os.path.join(save_dir, 'comic_data.json'), 'r') as f: data = json.load(f) # Restore to a new active session or existing one? # For simplicity, we just copy frames back to a "restored" session ID new_sid = f"restored_{code}_{int(time.time())}" new_user_dir = os.path.join(BASE_USER_DIR, new_sid) os.makedirs(new_user_dir, exist_ok=True) if os.path.exists(os.path.join(save_dir, 'frames')): shutil.copytree(os.path.join(save_dir, 'frames'), os.path.join(new_user_dir, 'frames')) # We need to make sure the output dir exists for status checks (dummy) os.makedirs(os.path.join(new_user_dir, 'output'), exist_ok=True) with open(os.path.join(new_user_dir, 'output', 'pages.json'), 'w') as f: f.write("[]") # Placeholder return jsonify({'success': True, 'originalSid': new_sid, 'pages': data['pages']}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)