import os import time import threading import json import traceback import string import random import shutil import cv2 import numpy as np import gc import srt import warnings import subprocess import whisper from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, jsonify, request, send_from_directory, send_file # Filter annoying CPU warnings warnings.filterwarnings("ignore") # ====================================================== # 💾 STORAGE SETUP # ====================================================== if os.path.exists('/data'): BASE_STORAGE_PATH = '/data' print("✅ Using Persistent Storage at /data") else: BASE_STORAGE_PATH = '.' print("⚠️ Using Ephemeral Storage") BASE_USER_DIR = os.path.join(BASE_STORAGE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_STORAGE_PATH, "saved_comics") os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 2 * 1024 * 1024 * 1024 # 2GB Limit def generate_save_code(length=8): chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # ====================================================== # 🧱 DATA CLASSES # ====================================================== def bubble(dialog="", x=50, y=20, type='speech'): classes = f"speech-bubble {type}" if type == 'speech': classes += " tail-bottom" elif type == 'thought': classes += " pos-bl" elif type == 'reaction': classes += " tail-bottom" return { 'dialog': dialog, 'bubble_offset_x': int(x), 'bubble_offset_y': int(y), 'type': type, 'tail_pos': '50%', 'classes': classes, 'colors': {'fill': '#ffffff', 'text': '#000000'}, 'font': "'Comic Neue', cursive", 'font_size': '16px' } def panel(image="", time=0.0): return {'image': image, 'time': time} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles # ====================================================== # 🧠 HIGH QUALITY GENERATION LOGIC # ====================================================== def resize_contain_hq(image, target_w=1080, target_h=1080): """ 1:1 Square Ratio Resize (Instagram Style) """ h, w = image.shape[:2] scale = min(target_w / w, target_h / h) new_w = int(w * scale) new_h = int(h * scale) # Lanczos4 for sharpness resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) canvas = np.zeros((target_h, target_w, 3), dtype=np.uint8) # Center x_offset = (target_w - new_w) // 2 y_offset = (target_h - new_h) // 2 canvas[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = resized return canvas def generate_subtitles_cpu(video_path, srt_path, status_callback=None): try: if status_callback: status_callback("Extracting Audio...", 5) audio_path = video_path.replace(".mp4", ".wav") try: # -preset ultrafast for speed command = ["ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-threads", "4", "-preset", "ultrafast", audio_path] subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except: return False if status_callback: status_callback("Loading AI...", 15) model = whisper.load_model("tiny", device="cpu") if status_callback: status_callback("Transcribing...", 25) result = model.transcribe(audio_path, fp16=False, language='en') with open(srt_path, 'w', encoding='utf-8') as f: for i, segment in enumerate(result['segments']): start, end, text = segment['start'], segment['end'], segment['text'].strip() def fmt(t): return f"{int(t//3600):02d}:{int((t%3600)//60):02d}:{int(t%60):02d},{int((t%1)*1000):03d}" f.write(f"{i+1}\n{fmt(start)} --> {fmt(end)}\n{text}\n\n") if os.path.exists(audio_path): os.remove(audio_path) return True except: return False def extract_frame_task(args): video_path, time_sec, output_path, width, height = args try: cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000) ret, frame = cap.read() cap.release() if ret: final = resize_contain_hq(frame, width, height) cv2.imwrite(output_path, final, [cv2.IMWRITE_PNG_COMPRESSION, 3]) return True return False except: return False def generate_comic_smart(video_path, user_dir, frames_dir, metadata_path, target_pages, status_callback=None): WORKER_COUNT = 4 if status_callback: status_callback("Analyzing Video...", 5) cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps cap.release() user_srt = os.path.join(user_dir, 'subs.srt') has_text = generate_subtitles_cpu(video_path, user_srt, status_callback) if status_callback: status_callback("Planning Pages...", 40) raw_moments = [] if has_text and os.path.exists(user_srt): with open(user_srt, 'r', encoding='utf-8') as f: try: for s in list(srt.parse(f.read())): if s.content.strip(): raw_moments.append({'text': s.content.strip(), 'mid': (s.start.total_seconds() + s.end.total_seconds())/2}) except: pass total_panels = int(target_pages) * 4 selected_moments = [] if len(raw_moments) > 0: indices = np.linspace(0, len(raw_moments) - 1, total_panels, dtype=int) for i in indices: selected_moments.append(raw_moments[i]) else: times = np.linspace(1, max(1, duration-1), total_panels) for t in times: selected_moments.append({'text': "...", 'mid': t}) frame_metadata = {} frame_files = [] tasks = [] EXTRACT_W, EXTRACT_H = 1080, 1080 count = 0 for m in selected_moments: fname = f"frame_{count:04d}.png" tasks.append((video_path, m['mid'], os.path.join(frames_dir, fname), EXTRACT_W, EXTRACT_H)) frame_metadata[fname] = {'dialogue': m['text'], 'time': m['mid']} frame_files.append(fname) count += 1 completed = 0 gc.collect() if status_callback: status_callback("Drawing Panels...", 50) with ThreadPoolExecutor(max_workers=WORKER_COUNT) as executor: futures = [executor.submit(extract_frame_task, t) for t in tasks] for _ in as_completed(futures): completed += 1 if status_callback: prog = 50 + int((completed / len(tasks)) * 45) status_callback(f"Drawing Panels: {completed}/{len(tasks)}", prog) with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) bubbles_list = [] for i, f in enumerate(frame_files): dialogue = frame_metadata.get(f, {}).get('dialogue', '...') b_type = 'speech' if '(' in dialogue or '[' in dialogue: b_type = 'narration' elif '!' in dialogue and dialogue.isupper() and len(dialogue) < 15: b_type = 'reaction' elif dialogue == "...": b_type = 'thought' pos_idx = i % 4 bx, by = (150, 80) if pos_idx == 0 else (580, 80) if pos_idx == 1 else (150, 600) if pos_idx == 2 else (580, 600) bubbles_list.append(bubble(dialog=dialogue, x=bx, y=by, type=b_type)) pages = [] for i in range(int(target_pages)): start, end = i * 4, (i + 1) * 4 p_frames = frame_files[start:end] p_bubbles = bubbles_list[start:end] while len(p_frames) < 4: fname = f"empty_{i}_{len(p_frames)}.png" cv2.imwrite(os.path.join(frames_dir, fname), np.zeros((EXTRACT_H, EXTRACT_W, 3), dtype=np.uint8)) p_frames.append(fname) p_bubbles.append(bubble(dialog="...", x=-999, y=-999, type='speech')) if p_frames: pg_panels = [panel(image=p_frames[j], time=frame_metadata.get(p_frames[j], {}).get('time', 0)) for j in range(len(p_frames))] pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) result = [] for pg in pages: p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] result.append({'panels': p_data, 'bubbles': b_data}) return result # ====================================================== # 🔧 HELPER FUNCTIONS # ====================================================== def regen_frame_cpu(video_path, frames_dir, metadata_path, fname, direction): if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) if fname not in meta: return {"success": False, "message": "Frame not linked"} t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 new_t = max(0, t + (1.0/fps) * (10 if direction == 'forward' else -10)) cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) ret, frame = cap.read() cap.release() if ret: cv2.imwrite(os.path.join(frames_dir, fname), resize_contain_hq(frame, 1080, 1080)) if isinstance(meta[fname], dict): meta[fname]['time'] = new_t else: meta[fname] = new_t with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Time: {new_t:.2f}s", "new_time": new_t} return {"success": False} def get_frame_at_ts_cpu(video_path, frames_dir, metadata_path, fname, ts): if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) if fname not in meta: return {"success": False, "message": "Frame not linked"} cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: cv2.imwrite(os.path.join(frames_dir, fname), resize_contain_hq(frame, 1080, 1080)) if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Jumped to {ts}s", "new_time": float(ts)} return {"success": False} class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') def cleanup(self): if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) def run(self, target_pages): try: self.write_status("Initializing...", 1) data = generate_comic_smart(self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages), status_callback=self.write_status) with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: json.dump(data, f, indent=2) self.write_status("Complete!", 100) except Exception as e: traceback.print_exc() self.write_status(f"Error: {str(e)}", -1) def write_status(self, msg, prog): try: with open(os.path.join(self.output_dir, 'status.tmp'), 'w') as f: json.dump({'message': msg, 'progress': prog}, f) os.replace(os.path.join(self.output_dir, 'status.tmp'), os.path.join(self.output_dir, 'status.json')) except: pass # ====================================================== # 🌐 FRONTEND # ====================================================== INDEX_HTML = r''' Comic Studio Pro (HQ)

AI-Powered Comic Generation Suite

GENERATING COMIC...
INITIALIZING...
COMIC STUDIO
💬
Properties
Select an item to edit
''' # ====================================================== # 🚀 FLASK ROUTES # ====================================================== @app.route('/') def index(): return INDEX_HTML @app.route('/uploader', methods=['POST']) def upload(): sid = request.args.get('sid') or request.form.get('sid') if not sid: return jsonify({'success': False, 'message': 'Missing session ID'}), 400 file = request.files.get('file') if not file or file.filename == '': return jsonify({'success': False, 'message': 'No file uploaded'}), 400 target_pages = request.form.get('target_pages', 4) gen = EnhancedComicGenerator(sid) gen.cleanup() file.save(gen.video_path) gen.write_status("Starting...", 5) threading.Thread(target=gen.run, args=(target_pages,)).start() return jsonify({'success': True}) @app.route('/status') def get_status(): sid = request.args.get('sid') path = os.path.join(BASE_USER_DIR, sid, 'output', 'status.json') if os.path.exists(path): return send_file(path) return jsonify({'progress': 0, 'message': "Waiting..."}) @app.route('/output/') def get_output(filename): sid = request.args.get('sid') return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'output'), filename) @app.route('/frames/') def get_frame(filename): sid = request.args.get('sid') return send_from_directory(os.path.join(BASE_USER_DIR, sid, 'frames'), filename) @app.route('/regenerate_frame', methods=['POST']) def regen(): sid = request.args.get('sid') d = request.get_json() gen = EnhancedComicGenerator(sid) return jsonify(regen_frame_cpu(gen.video_path, gen.frames_dir, gen.metadata_path, d['filename'], d['direction'])) @app.route('/goto_timestamp', methods=['POST']) def go_time(): sid = request.args.get('sid') d = request.get_json() gen = EnhancedComicGenerator(sid) return jsonify(get_frame_at_ts_cpu(gen.video_path, gen.frames_dir, gen.metadata_path, d['filename'], float(d['timestamp']))) @app.route('/replace_panel', methods=['POST']) def rep_panel(): sid = request.args.get('sid') f = request.files['image'] frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') os.makedirs(frames_dir, exist_ok=True) fname = f"replaced_{int(time.time() * 1000)}.png" f.save(os.path.join(frames_dir, fname)) return jsonify({'success': True, 'new_filename': fname}) @app.route('/save_comic', methods=['POST']) def save_comic(): sid = request.args.get('sid') try: data = request.get_json() save_code = generate_save_code() save_dir = os.path.join(SAVED_COMICS_DIR, save_code) os.makedirs(save_dir, exist_ok=True) user_frames_dir = os.path.join(BASE_USER_DIR, sid, 'frames') saved_frames_dir = os.path.join(save_dir, 'frames') if os.path.exists(user_frames_dir): if os.path.exists(saved_frames_dir): shutil.rmtree(saved_frames_dir) shutil.copytree(user_frames_dir, saved_frames_dir) with open(os.path.join(save_dir, 'comic_state.json'), 'w') as f: json.dump({'originalSid': sid, 'pages': data['pages'], 'savedAt': time.time()}, f) return jsonify({'success': True, 'code': save_code}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) @app.route('/load_comic/') def load_comic(code): code = code.upper() save_dir = os.path.join(SAVED_COMICS_DIR, code) if not os.path.exists(save_dir): return jsonify({'success': False, 'message': 'Code not found'}) try: with open(os.path.join(save_dir, 'comic_state.json'), 'r') as f: data = json.load(f) orig_sid = data['originalSid'] saved_frames = os.path.join(save_dir, 'frames') user_frames = os.path.join(BASE_USER_DIR, orig_sid, 'frames') os.makedirs(user_frames, exist_ok=True) for fn in os.listdir(saved_frames): shutil.copy2(os.path.join(saved_frames, fn), os.path.join(user_frames, fn)) return jsonify({'success': True, 'originalSid': orig_sid, 'pages': data['pages']}) except Exception as e: return jsonify({'success': False, 'message': str(e)}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)