import os import time import threading import json import traceback import string import random import shutil import cv2 import numpy as np import gc import srt import warnings import subprocess import whisper from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, jsonify, request, send_from_directory, send_file # Filter annoying CPU warnings warnings.filterwarnings("ignore") # ====================================================== # đž STORAGE SETUP # ====================================================== if os.path.exists('/data'): BASE_STORAGE_PATH = '/data' print("â Using Persistent Storage at /data") else: BASE_STORAGE_PATH = '.' print("â ī¸ Using Ephemeral Storage") BASE_USER_DIR = os.path.join(BASE_STORAGE_PATH, "userdata") SAVED_COMICS_DIR = os.path.join(BASE_STORAGE_PATH, "saved_comics") os.makedirs(BASE_USER_DIR, exist_ok=True) os.makedirs(SAVED_COMICS_DIR, exist_ok=True) app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 2 * 1024 * 1024 * 1024 # 2GB Limit def generate_save_code(length=8): chars = string.ascii_uppercase + string.digits while True: code = ''.join(random.choices(chars, k=length)) if not os.path.exists(os.path.join(SAVED_COMICS_DIR, code)): return code # ====================================================== # đ§ą DATA CLASSES # ====================================================== def bubble(dialog="", x=50, y=20, type='speech'): classes = f"speech-bubble {type}" if type == 'speech': classes += " tail-bottom" elif type == 'thought': classes += " pos-bl" elif type == 'reaction': classes += " tail-bottom" return { 'dialog': dialog, 'bubble_offset_x': int(x), 'bubble_offset_y': int(y), 'type': type, 'tail_pos': '50%', 'classes': classes, 'colors': {'fill': '#ffffff', 'text': '#000000'}, 'font': "'Comic Neue', cursive", 'font_size': '16px' } def panel(image="", time=0.0): return {'image': image, 'time': time} class Page: def __init__(self, panels, bubbles): self.panels = panels self.bubbles = bubbles # ====================================================== # đ§ ROBUST GENERATION LOGIC # ====================================================== def generate_subtitles_cpu(video_path, srt_path, status_callback=None): try: if status_callback: status_callback("Extracting Audio (FFmpeg)...", 10) audio_path = video_path.replace(".mp4", ".wav") try: command = [ "ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path ] subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except FileNotFoundError: print("â FFmpeg not found.") return False except Exception as e: print(f"â Audio extraction failed: {e}") return False if status_callback: status_callback("Loading AI Model (Base)...", 15) model = whisper.load_model("base", device="cpu") if status_callback: status_callback("Transcribing Speech...", 20) result = model.transcribe(audio_path, fp16=False, language='en') segments = result['segments'] with open(srt_path, 'w', encoding='utf-8') as f: for i, segment in enumerate(segments): start = segment['start'] end = segment['end'] text = segment['text'].strip() def fmt_time(t): hours = int(t // 3600) minutes = int((t % 3600) // 60) seconds = int(t % 60) milliseconds = int((t % 1) * 1000) return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" f.write(f"{i + 1}\n") f.write(f"{fmt_time(start)} --> {fmt_time(end)}\n") f.write(f"{text}\n\n") if os.path.exists(audio_path): os.remove(audio_path) return True except Exception as e: print(f"Transcription Failed: {e}") traceback.print_exc() return False def extract_frame_task(args): video_path, time_sec, output_path, width, height = args try: cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (width, height)) cv2.imwrite(output_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) return True return False except Exception: return False def generate_comic_smart(video_path, user_dir, frames_dir, metadata_path, target_pages, status_callback=None): WORKER_COUNT = 4 print(f"đ Starting Smart Generation for {user_dir}") if status_callback: status_callback("Analyzing Video...", 5) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Cannot open video") fps = cap.get(cv2.CAP_PROP_FPS) or 25 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps cap.release() user_srt = os.path.join(user_dir, 'subs.srt') has_text = generate_subtitles_cpu(video_path, user_srt, status_callback) if status_callback: status_callback("Planning Pages...", 25) raw_moments = [] if has_text and os.path.exists(user_srt): with open(user_srt, 'r', encoding='utf-8') as f: try: subs = list(srt.parse(f.read())) valid_subs = [s for s in subs if s.content and s.content.strip() and s.content.strip() != "..."] for s in valid_subs: start = s.start.total_seconds() end = s.end.total_seconds() raw_moments.append({'text': s.content.strip(), 'start': start, 'end': end, 'mid': (start+end)/2}) except Exception as e: print(f"SRT Parse Error: {e}") panels_per_page = 4 total_panels_needed = int(target_pages) * panels_per_page selected_moments = [] if len(raw_moments) > 0: indices = np.linspace(0, len(raw_moments) - 1, total_panels_needed, dtype=int) for i in indices: selected_moments.append(raw_moments[i]) else: print("â ī¸ No speech detected or FFmpeg missing, using visual spacing.") times = np.linspace(1, max(1, duration-1), total_panels_needed) for t in times: selected_moments.append({'text': "...", 'start': t, 'end': t+1, 'mid': t}) frame_metadata = {} frame_files_ordered = [] tasks = [] count = 0 for m in selected_moments: fname = f"frame_{count:04d}.jpg" out_p = os.path.join(frames_dir, fname) tasks.append((video_path, m['mid'], out_p, 1920, 1080)) frame_metadata[fname] = {'dialogue': m['text'], 'time': m['mid']} frame_files_ordered.append(fname) count += 1 total_tasks = len(tasks) completed = 0 start_time = time.time() gc.collect() if status_callback: status_callback("Drawing Panels...", 35) with ThreadPoolExecutor(max_workers=WORKER_COUNT) as executor: futures = [executor.submit(extract_frame_task, t) for t in tasks] for _ in as_completed(futures): completed += 1 if status_callback: elapsed = time.time() - start_time prog = 35 + int((completed / total_tasks) * 60) status_callback(f"Drawing Panels: {completed}/{total_tasks}", prog) with open(metadata_path, 'w') as f: json.dump(frame_metadata, f, indent=2) bubbles_list = [] for i, f in enumerate(frame_files_ordered): dialogue = frame_metadata.get(f, {}).get('dialogue', '...') b_type = 'speech' if '(' in dialogue or '[' in dialogue: b_type = 'narration' elif '!' in dialogue and dialogue.isupper() and len(dialogue) < 15: b_type = 'reaction' elif dialogue == "...": b_type = 'thought' pos_idx = i % 4 if pos_idx == 0: bx, by = 150, 80 elif pos_idx == 1: bx, by = 580, 80 elif pos_idx == 2: bx, by = 150, 600 elif pos_idx == 3: bx, by = 580, 600 else: bx, by = 50, 50 bubbles_list.append(bubble(dialog=dialogue, x=bx, y=by, type=b_type)) pages = [] for i in range(int(target_pages)): start_idx = i * 4 end_idx = start_idx + 4 p_frames = frame_files_ordered[start_idx:end_idx] p_times = [frame_metadata[f]['time'] for f in p_frames] p_bubbles = bubbles_list[start_idx:end_idx] while len(p_frames) < 4: fname = f"empty_{i}_{len(p_frames)}.jpg" img = np.zeros((1080, 1920, 3), dtype=np.uint8); img[:] = (30,30,30) cv2.imwrite(os.path.join(frames_dir, fname), img, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) p_frames.append(fname) p_times.append(0.0) p_bubbles.append(bubble(dialog="...", x=-999, y=-999, type='speech')) if p_frames: pg_panels = [panel(image=p_frames[j], time=p_times[j]) for j in range(len(p_frames))] pages.append(Page(panels=pg_panels, bubbles=p_bubbles)) result = [] for pg in pages: p_data = [p if isinstance(p, dict) else p.__dict__ for p in pg.panels] b_data = [b if isinstance(b, dict) else b.__dict__ for b in pg.bubbles] result.append({'panels': p_data, 'bubbles': b_data}) return result # ====================================================== # đ§ HELPER FUNCTIONS # ====================================================== def regen_frame_cpu(video_path, frames_dir, metadata_path, fname, direction): if not os.path.exists(metadata_path): return {"success": False, "message": "No metadata"} with open(metadata_path, 'r') as f: meta = json.load(f) t = meta[fname]['time'] if isinstance(meta[fname], dict) else meta[fname] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25 offset = (1.0/fps) * (10 if direction == 'forward' else -10) new_t = max(0, t + offset) cap.set(cv2.CAP_PROP_POS_MSEC, new_t * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) if isinstance(meta[fname], dict): meta[fname]['time'] = new_t else: meta[fname] = new_t with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Time: {new_t:.2f}s", "new_time": new_t} return {"success": False} def get_frame_at_ts_cpu(video_path, frames_dir, metadata_path, fname, ts): cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, float(ts) * 1000) ret, frame = cap.read() cap.release() if ret: frame = cv2.resize(frame, (1920, 1080)) cv2.imwrite(os.path.join(frames_dir, fname), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) if os.path.exists(metadata_path): with open(metadata_path, 'r') as f: meta = json.load(f) if fname in meta: if isinstance(meta[fname], dict): meta[fname]['time'] = float(ts) else: meta[fname] = float(ts) with open(metadata_path, 'w') as f: json.dump(meta, f, indent=2) return {"success": True, "message": f"Jumped to {ts}s", "new_time": float(ts)} return {"success": False, "message": "Invalid timestamp"} class EnhancedComicGenerator: def __init__(self, sid): self.sid = sid self.user_dir = os.path.join(BASE_USER_DIR, sid) self.video_path = os.path.join(self.user_dir, 'uploaded.mp4') self.frames_dir = os.path.join(self.user_dir, 'frames') self.output_dir = os.path.join(self.user_dir, 'output') os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) self.metadata_path = os.path.join(self.frames_dir, 'frame_metadata.json') def cleanup(self): if os.path.exists(self.frames_dir): shutil.rmtree(self.frames_dir) if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) def run(self, target_pages): try: self.write_status("Initializing...", 1) data = generate_comic_smart( self.video_path, self.user_dir, self.frames_dir, self.metadata_path, int(target_pages), status_callback=self.write_status ) with open(os.path.join(self.output_dir, 'pages.json'), 'w') as f: json.dump(data, f, indent=2) self.write_status("Complete!", 100) except Exception as e: traceback.print_exc() self.write_status(f"Error: {str(e)}", -1) def write_status(self, msg, prog): temp_path = os.path.join(self.output_dir, 'status.tmp') final_path = os.path.join(self.output_dir, 'status.json') try: with open(temp_path, 'w') as f: json.dump({'message': msg, 'progress': prog}, f) os.replace(temp_path, final_path) except: pass # ====================================================== # đ FRONTEND # ====================================================== INDEX_HTML = '''
')
def load_comic_route(code):
save_dir = os.path.join(SAVED_COMICS_DIR, code)
if not os.path.exists(save_dir): return jsonify({'success': False, 'message': 'Comic not found'})
with open(os.path.join(save_dir, 'comic_data.json'), 'r') as f:
data = json.load(f)
# Restore to a new active session or existing one?
# For simplicity, we just copy frames back to a "restored" session ID
new_sid = f"restored_{code}_{int(time.time())}"
new_user_dir = os.path.join(BASE_USER_DIR, new_sid)
os.makedirs(new_user_dir, exist_ok=True)
if os.path.exists(os.path.join(save_dir, 'frames')):
shutil.copytree(os.path.join(save_dir, 'frames'), os.path.join(new_user_dir, 'frames'))
# We need to make sure the output dir exists for status checks (dummy)
os.makedirs(os.path.join(new_user_dir, 'output'), exist_ok=True)
with open(os.path.join(new_user_dir, 'output', 'pages.json'), 'w') as f:
f.write("[]") # Placeholder
return jsonify({'success': True, 'originalSid': new_sid, 'pages': data['pages']})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)