creatorstudio-ai-backend-develop / services /video_generator_service.py
matsuap's picture
Upload folder using huggingface_hub
b6e32c9 verified
import json
import asyncio
import os
import tempfile
import time
import shutil
from typing import List, Dict, Optional, Any
import wave
from openai import AsyncOpenAI
from google.genai import types
from PIL import Image
from pdf2image import convert_from_path
from moviepy import ImageClip, AudioFileClip, VideoFileClip, concatenate_videoclips
from core.config import settings
from core.prompts import get_video_script_prompt
from services.s3_service import s3_service
from google import genai
class VideoGeneratorService:
def __init__(self):
self.openai_client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
# Match Temp project: Use API Key for Gemini TTS
print("[VideoGenerator] Initializing Gemini Client with API Key")
self.gemini_client = genai.Client(api_key=settings.GEMINI_API_KEY)
async def generate_video_summary(
self,
file_key: str,
language: str = "Japanese",
voice_name: str = "Kore"
) -> Dict[str, Any]:
"""
Complete pipeline: PDF -> Script -> Audio -> Images -> Video -> S3
"""
temp_dir = tempfile.mkdtemp(prefix="video_gen_")
try:
# 1. Download PDF from S3
print(f"[VideoGenerator] Starting generation for: {file_key}")
pdf_path = os.path.join(temp_dir, "input.pdf")
await asyncio.to_thread(s3_service.s3_client.download_file, settings.AWS_S3_BUCKET, file_key, pdf_path)
# 2. Convert PDF to Images
image_dir = os.path.join(temp_dir, "images")
os.makedirs(image_dir, exist_ok=True)
poppler_path = os.environ.get("POPPLER_PATH")
print("[VideoGenerator] Converting PDF to images...")
if poppler_path:
images = await asyncio.to_thread(convert_from_path, pdf_path, dpi=200, poppler_path=poppler_path)
else:
images = await asyncio.to_thread(convert_from_path, pdf_path, dpi=200)
total_pages = len(images)
image_paths = []
for i, img in enumerate(images, start=1):
img_path = os.path.join(image_dir, f"page_{i:02d}.png")
img.save(img_path, "PNG")
image_paths.append(img_path)
# 3. Generate Narration Script (Native Async OpenAI)
print(f"[VideoGenerator] Generating script with OpenAI for {total_pages} pages...")
with open(pdf_path, "rb") as f:
content = f.read()
openai_file = await self.openai_client.files.create(
file=("source.pdf", content),
purpose="assistants"
)
prompt = get_video_script_prompt(language, total_pages)
response = await self.openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "file", "file": {"file_id": openai_file.id}}
]
}
],
response_format={"type": "json_object"},
temperature=0.3
)
script_data = json.loads(response.choices[0].message.content)
scripts = script_data.get("scripts", [])
await self.openai_client.files.delete(openai_file.id)
# 4. Generate Audio (Native Async Gemini)
audio_dir = os.path.join(temp_dir, "audio")
os.makedirs(audio_dir, exist_ok=True)
audio_paths = []
for i, script in enumerate(scripts):
if i == len(scripts) - 1:
print("[VideoGenerator] Skipping audio for last page (logo slide)")
continue
page_num = script.get("page_number", i+1)
text = script.get("script_text", "")
if not text: continue
audio_path = os.path.join(audio_dir, f"audio_{page_num:02d}.wav")
print(f"[VideoGenerator] Generating TTS for page {page_num}...")
try:
# Use Native Async Gemini
model_name = "gemini-2.5-flash-preview-tts"
tts_resp = await self.gemini_client.aio.models.generate_content(
model=model_name,
contents=text,
config=types.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name=voice_name
)
)
)
)
)
except Exception as tts_err:
print(f"[VideoGenerator] TTS Primary failed, using fallback: {tts_err}")
tts_resp = await self.gemini_client.aio.models.generate_content(
model="gemini-1.5-flash",
contents=text,
config=types.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name=voice_name
)
)
)
)
)
audio_bytes = tts_resp.candidates[0].content.parts[0].inline_data.data
with wave.open(audio_path, "wb") as wf:
wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(24000); wf.writeframes(audio_bytes)
audio_paths.append(audio_path)
await asyncio.sleep(2) # Non-blocking sleep
# 5. Assembly (MoviePy in Thread)
print("[VideoGenerator] Assembled audio/images. Now rendering final video with MoviePy (background thread)...")
page_clips = []
target_size = (1920, 1080)
for i, img_path in enumerate(image_paths):
img = Image.open(img_path)
img = self._resize_and_pad(img, target_size)
temp_img_res = os.path.join(temp_dir, f"res_{i}.png")
img.save(temp_img_res)
if i < len(audio_paths):
aud_clip = AudioFileClip(audio_paths[i])
img_clip = ImageClip(temp_img_res, duration=aud_clip.duration)
page_clips.append(img_clip.with_audio(aud_clip))
else:
page_clips.append(ImageClip(temp_img_res, duration=3.0))
final_video_path = os.path.join(temp_dir, "final.mp4")
final_clip = concatenate_videoclips(page_clips, method="compose")
await asyncio.to_thread(
final_clip.write_videofile,
final_video_path,
fps=24,
codec="libx264",
audio_codec="aac",
logger=None
)
for clip in page_clips: clip.close()
final_clip.close()
# 6. Upload to S3
timestamp = int(time.time())
s3_key = f"users/video_summaries/{timestamp}_summary.mp4"
print(f"[VideoGenerator] Uploading final video to S3: {s3_key}")
await asyncio.to_thread(s3_service.s3_client.upload_file, final_video_path, settings.AWS_S3_BUCKET, s3_key)
s3_url = f"https://{settings.AWS_S3_BUCKET}.s3.{settings.AWS_REGION}.amazonaws.com/{s3_key}"
print(f"[VideoGenerator] Success! Video ready at: {s3_url}")
return {
"title": f"Video Summary - {os.path.basename(file_key)}",
"s3_key": s3_key,
"s3_url": s3_url
}
except Exception as e:
print(f"[VideoGenerator] ERROR: {str(e)}")
import traceback
traceback.print_exc()
raise
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def _resize_and_pad(self, img: Image.Image, size: tuple) -> Image.Image:
"""Resizes image to fit in size while maintaining aspect ratio, adding black padding."""
img.thumbnail(size, Image.Resampling.LANCZOS)
new_img = Image.new("RGB", size, (0, 0, 0))
new_img.paste(img, ((size[0] - img.size[0]) // 2, (size[1] - img.size[1]) // 2))
return new_img
video_generator_service = VideoGeneratorService()