import os import time import threading import json import traceback import string import random import shutil import cv2 import numpy as np import gc import srt import warnings import subprocess import whisper from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, jsonify, request, send_from_directory, send_file # Filter annoying CPU warnings warnings.filterwarnings("ignore") # ====================================================== # 💾 STORAGE SETUP # ====================================================== if os.path.exists('/data'): BASE_STORAGE_PATH = '/data' print("✅ Using Persistent Storage at /data") else: BASE_STORAGE_PATH = '.' print("⚠️ Using Ephemeral Storage") BASE_USER_DIR = os.path.join(BASE_STORAGE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_STORAGE_PATH, "saved_comics") os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 2 * 1024 * 1024 * 1024 # 2GB Limit def generate_save_code(length=8): chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # ====================================================== # 🧱 DATA CLASSES # ====================================================== def bubble(dialog="", x=50, y=20, type='speech'): classes = f"speech-bubble {type}" if type == 'speech': classes += " tail-bottom" elif type == 'thought': classes += " pos-bl" elif type == 'reaction': classes += " tail-bottom" return { 'dialog': dialog, 'bubble_offset_x': int(x), 'bubble_offset_y': int(y), 'type': type, 'tail_pos': '50%', 'classes': classes, 'colors': {'fill': '#ffffff', 'text': '#000000'}, 'font': "'Comic Neue', cursive", 'font_size': '16px' } def panel(image="", time=0.0): return {'image': image, 'time': time} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles # ====================================================== # 🧠 HIGH QUALITY GENERATION LOGIC # ====================================================== def resize_contain_hq(image, target_w=1080, target_h=1080): """ 1:1 Square Ratio Resize (Instagram Style) """ h, w = image.shape[:2] scale = min(target_w / w, target_h / h) new_w = int(w * scale) new_h = int(h * scale) # Lanczos4 for sharpness resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) canvas = np.zeros((target_h, target_w, 3), dtype=np.uint8) # Center x_offset = (target_w - new_w) // 2 y_offset = (target_h - new_h) // 2 canvas[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = resized return canvas def generate_subtitles_cpu(video_path, srt_path, status_callback=None): try: if status_callback: status_callback("Extracting Audio...", 5) audio_path = video_path.replace(".mp4", ".wav") try: # -preset ultrafast for speed command = ["ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-threads", "4", "-preset", "ultrafast", audio_path] subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except: return False if status_callback: status_callback("Loading AI...", 15) model = whisper.load_model("tiny", device="cpu") if status_callback: status_callback("Transcribing...", 25) result = model.transcribe(audio_path, fp16=False, language='en') with open(srt_path, 'w', encoding='utf-8') as f: for i, segment in enumerate(result['segments']): start, end, text = segment['start'], segment['end'], segment['text'].strip() def fmt(t): return f"{int(t//3600):02d}:{int((t%3600)//60):02d}:{int(t%60):02d},{int((t%1)*1000):03d}" f.write(f"{i+1}\n{fmt(start)} --> {fmt(end)}\n{text}\n\n") if os.path.exists(audio_path): os.remove(audio_path) return True except: return False def extract_frame_task(args): video_path, time_sec, output_path, width, height = args try: cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000) ret, frame = cap.read() cap.release() if ret: final = resize_contain_hq(frame, width, height) cv2.imwrite(output_path, final, [cv2.IMWRITE_PNG_COMPRESSION, 3]) return True return False except: return False def generate_comic_smart(video_path, user_dir, frames_dir, metadata_path, target_pages, status_callback=None): WORKER_COUNT = 4 if status_callback: status_callback("Analyzing Video...", 5) cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps cap.release() user_srt = os.path.join(user_dir, 'subs.srt') has_text = generate_subtitles_cpu(video_path, user_srt, status_callback) if status_callback: status_callback("Planning Pages...", 40) raw_moments = [] if has_text and os.path.exists(user_srt): with open(user_srt, 'r', encoding='utf-8') as f: try: for s in list(srt.parse(f.read())): if s.content.strip(): raw_moments.append({'text': s.content.strip(), 'mid': (s.start.total_seconds() + s.end.total_seconds())/2}) except: pass total_panels = int(target_pages) * 4 selected_moments = [] if len(raw_moments) > 0: indices = np.linspace(0, len(raw_moments) - 1, total_panels, dtype=int) for i in indices: selected_moments.append(raw_moments[i]) else: times = np.linspace(1, max(1, duration-1), total_panels) for t in times: selected_moments.append({'text': "...", 'mid': t}) frame_metadata = {} frame_files = [] tasks = [] EXTRACT_W, EXTRACT_H = 1080, 1080 count = 0 for m in selected_moments: fname = f"frame_{count:04d}.png" tasks.append((video_path, m['mid'], os.path.join(frames_dir, fname), EXTRACT_W, EXTRACT_H)) frame_metadata[fname] = {'dialogue': m['text'], 'time': m['mid']} frame_files.append(fname) count += 1 completed = 0 gc.collect() if status_callback: status_callback("Drawing Panels...", 50) with ThreadPoolExecutor(max_workers=WORKER_COUNT) as executor: futures = [executor.submit(extract_frame_task, t) for t in tasks] for _ in as_completed(futures): completed += 1 if status_callback: prog = 50 + int((completed / len(tasks)) * 45) status_callback(f"Drawing Panels: {completed}/{len(tasks)}", prog) with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) bubbles_list = [] for i, f in enumerate(frame_files): dialogue = frame_metadata.get(f, {}).get('dialogue', '...') b_type = 'speech' if '(' in dialogue or '[' in dialogue: b_type = 'narration' elif '!' in dialogue and dialogue.isupper() and len(dialogue) < 15: b_type = 'reaction' elif dialogue == "...": b_type = 'thought' pos_idx = i % 4 bx, by = (150, 80) if pos_idx == 0 else (580, 80) if pos_idx == 1 else (150, 600) if pos_idx == 2 else (580, 600) bubbles_list.append(bubble(dialog=dialogue, x=bx, y=by, type=b_type)) pages = [] for i in range(int(target_pages)): start, end = i * 4, (i + 1) * 4 p_frames = frame_files[start:end] p_bubbles = bubbles_list[start:end] while len(p_frames) < 4: fname = f"empty_{i}_{len(p_frames)}.png" cv2.imwrite(os.path.join(frames_dir, fname), np.zeros((EXTRACT_H, EXTRACT_W, 3), dtype=np.uint8)) p_frames.append(fname) p_bubbles.append(bubble(dialog="...", x=-999, y=-999, type='speech')) if p_frames: pg_panels = [panel(image=p_frames[j], time=frame_metadata.get(p_frames[j], {}).get('time', 0)) for j in range(len(p_frames))] pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) result = [] for pg in pages: p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] result.append({'panels': p_data, 'bubbles': b_data}) return result # ====================================================== # 🔧 HELPER FUNCTIONS # ====================================================== def regen_frame_cpu(video_path, frames_dir, metadata_path, fname, direction): if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) if fname not in meta: return {"success": False, "message": "Frame not linked"} t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 new_t = max(0, t + (1.0/fps) * (10 if direction == 'forward' else -10)) cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) ret, frame = cap.read() cap.release() if ret: cv2.imwrite(os.path.join(frames_dir, fname), resize_contain_hq(frame, 1080, 1080)) if isinstance(meta[fname], dict): meta[fname]['time'] = new_t else: meta[fname] = new_t with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Time: {new_t:.2f}s", "new_time": new_t} return {"success": False} def get_frame_at_ts_cpu(video_path, frames_dir, metadata_path, fname, ts): if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) if fname not in meta: return {"success": False, "message": "Frame not linked"} cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: cv2.imwrite(os.path.join(frames_dir, fname), resize_contain_hq(frame, 1080, 1080)) if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Jumped to {ts}s", "new_time": float(ts)} return {"success": False} class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') def cleanup(self): if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) def run(self, target_pages): try: self.write_status("Initializing...", 1) data = generate_comic_smart(self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages), status_callback=self.write_status) with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: json.dump(data, f, indent=2) self.write_status("Complete!", 100) except Exception as e: traceback.print_exc() self.write_status(f"Error: {str(e)}", -1) def write_status(self, msg, prog): try: with open(os.path.join(self.output_dir, 'status.tmp'), 'w') as f: json.dump({'message': msg, 'progress': prog}, f) os.replace(os.path.join(self.output_dir, 'status.tmp'), os.path.join(self.output_dir, 'status.json')) except: pass # ====================================================== # 🌐 FRONTEND # ====================================================== INDEX_HTML = r'''
AI-Powered Comic Generation Suite
EXISTING PROJECT?
')
def load_comic(code):
code = code.upper()
save_dir = os.path.join(SAVED_COMICS_DIR, code)
if not os.path.exists(save_dir): return jsonify({'success': False, 'message': 'Code not found'})
try:
with open(os.path.join(save_dir, 'comic_state.json'), 'r') as f: data = json.load(f)
orig_sid = data['originalSid']
saved_frames = os.path.join(save_dir, 'frames')
user_frames = os.path.join(BASE_USER_DIR, orig_sid, 'frames')
os.makedirs(user_frames, exist_ok=True)
for fn in os.listdir(saved_frames):
shutil.copy2(os.path.join(saved_frames, fn), os.path.join(user_frames, fn))
return jsonify({'success': True, 'originalSid': orig_sid, 'pages': data['pages']})
except Exception as e: return jsonify({'success': False, 'message': str(e)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)