import json import asyncio import os import tempfile import time import shutil from typing import List, Dict, Optional, Any import wave from openai import AsyncOpenAI from google.genai import types from PIL import Image from pdf2image import convert_from_path from moviepy import ImageClip, AudioFileClip, VideoFileClip, concatenate_videoclips from core.config import settings from core.prompts import get_video_script_prompt from services.s3_service import s3_service from google import genai class VideoGeneratorService: def __init__(self): self.openai_client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY) # Match Temp project: Use API Key for Gemini TTS print("[VideoGenerator] Initializing Gemini Client with API Key") self.gemini_client = genai.Client(api_key=settings.GEMINI_API_KEY) async def generate_video_summary( self, file_key: str, language: str = "Japanese", voice_name: str = "Kore" ) -> Dict[str, Any]: """ Complete pipeline: PDF -> Script -> Audio -> Images -> Video -> S3 """ temp_dir = tempfile.mkdtemp(prefix="video_gen_") try: # 1. Download PDF from S3 print(f"[VideoGenerator] Starting generation for: {file_key}") pdf_path = os.path.join(temp_dir, "input.pdf") await asyncio.to_thread(s3_service.s3_client.download_file, settings.AWS_S3_BUCKET, file_key, pdf_path) # 2. Convert PDF to Images image_dir = os.path.join(temp_dir, "images") os.makedirs(image_dir, exist_ok=True) poppler_path = os.environ.get("POPPLER_PATH") print("[VideoGenerator] Converting PDF to images...") if poppler_path: images = await asyncio.to_thread(convert_from_path, pdf_path, dpi=200, poppler_path=poppler_path) else: images = await asyncio.to_thread(convert_from_path, pdf_path, dpi=200) total_pages = len(images) image_paths = [] for i, img in enumerate(images, start=1): img_path = os.path.join(image_dir, f"page_{i:02d}.png") img.save(img_path, "PNG") image_paths.append(img_path) # 3. Generate Narration Script (Native Async OpenAI) print(f"[VideoGenerator] Generating script with OpenAI for {total_pages} pages...") with open(pdf_path, "rb") as f: content = f.read() openai_file = await self.openai_client.files.create( file=("source.pdf", content), purpose="assistants" ) prompt = get_video_script_prompt(language, total_pages) response = await self.openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "file", "file": {"file_id": openai_file.id}} ] } ], response_format={"type": "json_object"}, temperature=0.3 ) script_data = json.loads(response.choices[0].message.content) scripts = script_data.get("scripts", []) await self.openai_client.files.delete(openai_file.id) # 4. Generate Audio (Native Async Gemini) audio_dir = os.path.join(temp_dir, "audio") os.makedirs(audio_dir, exist_ok=True) audio_paths = [] for i, script in enumerate(scripts): if i == len(scripts) - 1: print("[VideoGenerator] Skipping audio for last page (logo slide)") continue page_num = script.get("page_number", i+1) text = script.get("script_text", "") if not text: continue audio_path = os.path.join(audio_dir, f"audio_{page_num:02d}.wav") print(f"[VideoGenerator] Generating TTS for page {page_num}...") try: # Use Native Async Gemini model_name = "gemini-2.5-flash-preview-tts" tts_resp = await self.gemini_client.aio.models.generate_content( model=model_name, contents=text, config=types.GenerateContentConfig( response_modalities=["AUDIO"], speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig( voice_name=voice_name ) ) ) ) ) except Exception as tts_err: print(f"[VideoGenerator] TTS Primary failed, using fallback: {tts_err}") tts_resp = await self.gemini_client.aio.models.generate_content( model="gemini-1.5-flash", contents=text, config=types.GenerateContentConfig( response_modalities=["AUDIO"], speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig( voice_name=voice_name ) ) ) ) ) audio_bytes = tts_resp.candidates[0].content.parts[0].inline_data.data with wave.open(audio_path, "wb") as wf: wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(24000); wf.writeframes(audio_bytes) audio_paths.append(audio_path) await asyncio.sleep(2) # Non-blocking sleep # 5. Assembly (MoviePy in Thread) print("[VideoGenerator] Assembled audio/images. Now rendering final video with MoviePy (background thread)...") page_clips = [] target_size = (1920, 1080) for i, img_path in enumerate(image_paths): img = Image.open(img_path) img = self._resize_and_pad(img, target_size) temp_img_res = os.path.join(temp_dir, f"res_{i}.png") img.save(temp_img_res) if i < len(audio_paths): aud_clip = AudioFileClip(audio_paths[i]) img_clip = ImageClip(temp_img_res, duration=aud_clip.duration) page_clips.append(img_clip.with_audio(aud_clip)) else: page_clips.append(ImageClip(temp_img_res, duration=3.0)) final_video_path = os.path.join(temp_dir, "final.mp4") final_clip = concatenate_videoclips(page_clips, method="compose") await asyncio.to_thread( final_clip.write_videofile, final_video_path, fps=24, codec="libx264", audio_codec="aac", logger=None ) for clip in page_clips: clip.close() final_clip.close() # 6. Upload to S3 timestamp = int(time.time()) s3_key = f"users/video_summaries/{timestamp}_summary.mp4" print(f"[VideoGenerator] Uploading final video to S3: {s3_key}") await asyncio.to_thread(s3_service.s3_client.upload_file, final_video_path, settings.AWS_S3_BUCKET, s3_key) s3_url = f"https://{settings.AWS_S3_BUCKET}.s3.{settings.AWS_REGION}.amazonaws.com/{s3_key}" print(f"[VideoGenerator] Success! Video ready at: {s3_url}") return { "title": f"Video Summary - {os.path.basename(file_key)}", "s3_key": s3_key, "s3_url": s3_url } except Exception as e: print(f"[VideoGenerator] ERROR: {str(e)}") import traceback traceback.print_exc() raise finally: shutil.rmtree(temp_dir, ignore_errors=True) def _resize_and_pad(self, img: Image.Image, size: tuple) -> Image.Image: """Resizes image to fit in size while maintaining aspect ratio, adding black padding.""" img.thumbnail(size, Image.Resampling.LANCZOS) new_img = Image.new("RGB", size, (0, 0, 0)) new_img.paste(img, ((size[0] - img.size[0]) // 2, (size[1] - img.size[1]) // 2)) return new_img video_generator_service = VideoGeneratorService()