| | import json |
| | import asyncio |
| | import os |
| | import tempfile |
| | import time |
| | import shutil |
| | from typing import List, Dict, Optional, Any |
| | import wave |
| |
|
| | from openai import AsyncOpenAI |
| | from google.genai import types |
| | from PIL import Image |
| | from pdf2image import convert_from_path |
| | from moviepy import ImageClip, AudioFileClip, VideoFileClip, concatenate_videoclips |
| |
|
| | from core.config import settings |
| | from core.prompts import get_video_script_prompt |
| | from services.s3_service import s3_service |
| | from google import genai |
| |
|
| | class VideoGeneratorService: |
| | def __init__(self): |
| | self.openai_client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY) |
| | |
| | |
| | print("[VideoGenerator] Initializing Gemini Client with API Key") |
| | self.gemini_client = genai.Client(api_key=settings.GEMINI_API_KEY) |
| |
|
| | async def generate_video_summary( |
| | self, |
| | file_key: str, |
| | language: str = "Japanese", |
| | voice_name: str = "Kore" |
| | ) -> Dict[str, Any]: |
| | """ |
| | Complete pipeline: PDF -> Script -> Audio -> Images -> Video -> S3 |
| | """ |
| | temp_dir = tempfile.mkdtemp(prefix="video_gen_") |
| | try: |
| | |
| | print(f"[VideoGenerator] Starting generation for: {file_key}") |
| | pdf_path = os.path.join(temp_dir, "input.pdf") |
| | await asyncio.to_thread(s3_service.s3_client.download_file, settings.AWS_S3_BUCKET, file_key, pdf_path) |
| |
|
| | |
| | image_dir = os.path.join(temp_dir, "images") |
| | os.makedirs(image_dir, exist_ok=True) |
| | |
| | poppler_path = os.environ.get("POPPLER_PATH") |
| | print("[VideoGenerator] Converting PDF to images...") |
| | if poppler_path: |
| | images = await asyncio.to_thread(convert_from_path, pdf_path, dpi=200, poppler_path=poppler_path) |
| | else: |
| | images = await asyncio.to_thread(convert_from_path, pdf_path, dpi=200) |
| | |
| | total_pages = len(images) |
| | image_paths = [] |
| | for i, img in enumerate(images, start=1): |
| | img_path = os.path.join(image_dir, f"page_{i:02d}.png") |
| | img.save(img_path, "PNG") |
| | image_paths.append(img_path) |
| |
|
| | |
| | print(f"[VideoGenerator] Generating script with OpenAI for {total_pages} pages...") |
| | with open(pdf_path, "rb") as f: |
| | content = f.read() |
| | |
| | openai_file = await self.openai_client.files.create( |
| | file=("source.pdf", content), |
| | purpose="assistants" |
| | ) |
| | |
| | prompt = get_video_script_prompt(language, total_pages) |
| | response = await self.openai_client.chat.completions.create( |
| | model="gpt-4o-mini", |
| | messages=[ |
| | { |
| | "role": "user", |
| | "content": [ |
| | {"type": "text", "text": prompt}, |
| | {"type": "file", "file": {"file_id": openai_file.id}} |
| | ] |
| | } |
| | ], |
| | response_format={"type": "json_object"}, |
| | temperature=0.3 |
| | ) |
| | |
| | script_data = json.loads(response.choices[0].message.content) |
| | scripts = script_data.get("scripts", []) |
| | await self.openai_client.files.delete(openai_file.id) |
| |
|
| | |
| | audio_dir = os.path.join(temp_dir, "audio") |
| | os.makedirs(audio_dir, exist_ok=True) |
| | audio_paths = [] |
| |
|
| | for i, script in enumerate(scripts): |
| | if i == len(scripts) - 1: |
| | print("[VideoGenerator] Skipping audio for last page (logo slide)") |
| | continue |
| |
|
| | page_num = script.get("page_number", i+1) |
| | text = script.get("script_text", "") |
| | if not text: continue |
| |
|
| | audio_path = os.path.join(audio_dir, f"audio_{page_num:02d}.wav") |
| | print(f"[VideoGenerator] Generating TTS for page {page_num}...") |
| | |
| | try: |
| | |
| | model_name = "gemini-2.5-flash-preview-tts" |
| | tts_resp = await self.gemini_client.aio.models.generate_content( |
| | model=model_name, |
| | contents=text, |
| | config=types.GenerateContentConfig( |
| | response_modalities=["AUDIO"], |
| | speech_config=types.SpeechConfig( |
| | voice_config=types.VoiceConfig( |
| | prebuilt_voice_config=types.PrebuiltVoiceConfig( |
| | voice_name=voice_name |
| | ) |
| | ) |
| | ) |
| | ) |
| | ) |
| | except Exception as tts_err: |
| | print(f"[VideoGenerator] TTS Primary failed, using fallback: {tts_err}") |
| | tts_resp = await self.gemini_client.aio.models.generate_content( |
| | model="gemini-1.5-flash", |
| | contents=text, |
| | config=types.GenerateContentConfig( |
| | response_modalities=["AUDIO"], |
| | speech_config=types.SpeechConfig( |
| | voice_config=types.VoiceConfig( |
| | prebuilt_voice_config=types.PrebuiltVoiceConfig( |
| | voice_name=voice_name |
| | ) |
| | ) |
| | ) |
| | ) |
| | ) |
| | |
| | audio_bytes = tts_resp.candidates[0].content.parts[0].inline_data.data |
| | with wave.open(audio_path, "wb") as wf: |
| | wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(24000); wf.writeframes(audio_bytes) |
| | |
| | audio_paths.append(audio_path) |
| | await asyncio.sleep(2) |
| |
|
| | |
| | print("[VideoGenerator] Assembled audio/images. Now rendering final video with MoviePy (background thread)...") |
| | page_clips = [] |
| | target_size = (1920, 1080) |
| |
|
| | for i, img_path in enumerate(image_paths): |
| | img = Image.open(img_path) |
| | img = self._resize_and_pad(img, target_size) |
| | temp_img_res = os.path.join(temp_dir, f"res_{i}.png") |
| | img.save(temp_img_res) |
| |
|
| | if i < len(audio_paths): |
| | aud_clip = AudioFileClip(audio_paths[i]) |
| | img_clip = ImageClip(temp_img_res, duration=aud_clip.duration) |
| | page_clips.append(img_clip.with_audio(aud_clip)) |
| | else: |
| | page_clips.append(ImageClip(temp_img_res, duration=3.0)) |
| |
|
| | final_video_path = os.path.join(temp_dir, "final.mp4") |
| | final_clip = concatenate_videoclips(page_clips, method="compose") |
| | |
| | await asyncio.to_thread( |
| | final_clip.write_videofile, |
| | final_video_path, |
| | fps=24, |
| | codec="libx264", |
| | audio_codec="aac", |
| | logger=None |
| | ) |
| |
|
| | for clip in page_clips: clip.close() |
| | final_clip.close() |
| |
|
| | |
| | timestamp = int(time.time()) |
| | s3_key = f"users/video_summaries/{timestamp}_summary.mp4" |
| | print(f"[VideoGenerator] Uploading final video to S3: {s3_key}") |
| | await asyncio.to_thread(s3_service.s3_client.upload_file, final_video_path, settings.AWS_S3_BUCKET, s3_key) |
| | s3_url = f"https://{settings.AWS_S3_BUCKET}.s3.{settings.AWS_REGION}.amazonaws.com/{s3_key}" |
| |
|
| | print(f"[VideoGenerator] Success! Video ready at: {s3_url}") |
| | return { |
| | "title": f"Video Summary - {os.path.basename(file_key)}", |
| | "s3_key": s3_key, |
| | "s3_url": s3_url |
| | } |
| |
|
| | except Exception as e: |
| | print(f"[VideoGenerator] ERROR: {str(e)}") |
| | import traceback |
| | traceback.print_exc() |
| | raise |
| | finally: |
| | shutil.rmtree(temp_dir, ignore_errors=True) |
| |
|
| | def _resize_and_pad(self, img: Image.Image, size: tuple) -> Image.Image: |
| | """Resizes image to fit in size while maintaining aspect ratio, adding black padding.""" |
| | img.thumbnail(size, Image.Resampling.LANCZOS) |
| | new_img = Image.new("RGB", size, (0, 0, 0)) |
| | new_img.paste(img, ((size[0] - img.size[0]) // 2, (size[1] - img.size[1]) // 2)) |
| | return new_img |
| |
|
| | video_generator_service = VideoGeneratorService() |
| |
|