PDF_SlideShow / app.py
tomo2chin2's picture
Upload app.py
b4bfd35 verified
"""
PDF to Video Converter API
PDFをダウンロードして各ページを画像化し、スライドショー動画を生成してHugging Faceにアップロードする
"""
import gradio as gr
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
from typing import Optional, Union
import requests
import tempfile
import os
import logging
import numpy as np
from datetime import datetime
import uuid
from pathlib import Path
# 画像・動画処理ライブラリ
from pdf2image import convert_from_path
from PIL import Image
import cv2
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips
# Hugging Face Hub
from huggingface_hub import HfApi, login
# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ==============================
# リクエスト/レスポンスモデル
# ==============================
class PdfToVideoRequest(BaseModel):
"""PDF→動画変換リクエストモデル"""
pdf_url: str # HttpUrlからstrに変更(バックスラッシュ対応のため)
duration_per_page: int = 5 # デフォルト5秒
dpi: int = 150 # デフォルトDPI
class VideoResponse(BaseModel):
"""動画生成レスポンスモデル"""
status: str
video_url: Optional[str] = None
page2_image_url: Optional[str] = None # 2ページ目の画像URL
message: str
total_pages: Optional[int] = None
video_duration: Optional[float] = None # 秒
# V2.2: スライドデータ→音声付き動画変換モデル(history: String型対応)
class SlideDataToVideoRequest(BaseModel):
"""スライドデータ→音声付き動画変換リクエスト(V2.3 - Dify list型対応)"""
slide_data: Union[str, list] # JSON文字列 or list(Difyからの直接配列も受付)
pdf_url: str # GASが生成したPDF URL
history: Union[str, list] # JSON文字列 or list(Difyからの直接配列も受付) # 6件の元イベント(年号・語呂合わせ・サマリー)- JSON文字列
class AudioInfo(BaseModel):
"""音声情報"""
slide_index: int
slide_type: str
audio_url: str
duration: float
text: str
class AudioVideoResponse(BaseModel):
"""音声付き動画生成レスポンス"""
status: str
video_url: Optional[str] = None
page2_image_url: Optional[str] = None
audio_urls: list = [] # list[AudioInfo]として使用
message: str
total_slides: Optional[int] = None
video_duration: Optional[float] = None
# ==============================
# URL前処理ユーティリティ
# ==============================
def sanitize_url(url: str) -> str:
"""
URLからバックスラッシュやエスケープシーケンスを除去
Args:
url: 元のURL文字列
Returns:
str: クリーニングされたURL
"""
import urllib.parse
# URLデコード(%22等のエンコードされた文字を元に戻す)
cleaned_url = urllib.parse.unquote(url)
# バックスラッシュを除去
cleaned_url = cleaned_url.replace('\\', '')
# 前後の空白を削除
cleaned_url = cleaned_url.strip()
# ダブルクォートを除去(JSON文字列から来た場合)
cleaned_url = cleaned_url.strip('"').strip("'")
logger.info(f"URL sanitized: {url}{cleaned_url}")
return cleaned_url
# ==============================
# V2.0: スライドデータ処理関数
# ==============================
def clean_mnemonic(text: str) -> str:
"""
語呂合わせから(数字)または(数字)パターンを除去
Args:
text: 語呂合わせテキスト(例: "いい国つくろう鎌倉幕府(1192)" または "兄(2)さん(3)ク(9)イーン")
Returns:
str: 数字を除去したテキスト(例: "いい国つくろう鎌倉幕府" または "兄さんクイーン")
"""
import re
# 全角・半角両対応
cleaned = re.sub(r'[((]\d+[))]', '', text)
return cleaned
def determine_slide_type(slide: dict) -> str:
"""
スライド種別を判定
Args:
slide: スライドデータ辞書
Returns:
str: "title" | "imageText_image_only" | "imageText_with_text" | "closing"
"""
slide_type = slide.get("type", "")
if slide_type == "title":
return "title"
elif slide_type == "closing":
return "closing"
elif slide_type == "imageText":
points = slide.get("points", [])
if not points or len(points) == 0:
return "imageText_image_only"
else:
return "imageText_with_text"
else:
return "unknown"
def extract_audio_text(slide: dict) -> str:
"""
スライドから音声テキストを抽出
Args:
slide: スライドデータ辞書
Returns:
str: 読み上げるテキスト
"""
slide_type = determine_slide_type(slide)
if slide_type == "title":
# タイトルスライド: titleフィールドをそのまま
return slide.get("title", "")
elif slide_type == "imageText_image_only":
# 画像のみスライド: 年号 + 語呂合わせ(数字除去)× 2回
subhead = slide.get("subhead", "")
if ":" in subhead:
year = subhead.split(":")[0]
mnemonic = subhead.split(":")[1]
else:
year = ""
mnemonic = subhead
# (数字)パターンを除去
mnemonic_clean = clean_mnemonic(mnemonic)
# 2回繰り返し
audio_text = f"{year}年、{mnemonic_clean}{year}年、{mnemonic_clean}。"
return audio_text
elif slide_type == "imageText_with_text":
# 画像+テキストスライド: pointsを結合(1回のみ)
points = slide.get("points", [])
# 各pointの末尾の句点を除去してから結合
cleaned_points = [p.rstrip("。") for p in points]
summary = "。".join(cleaned_points)
# 最後に句点を追加
if summary and not summary.endswith("。"):
summary += "。"
return summary
elif slide_type == "closing":
# クロージングスライド: notesフィールドまたはデフォルトメッセージ
return slide.get("notes", "本日の学習は以上です。復習を忘れずに。")
else:
return ""
def extract_audio_text_v2(slide: dict, slide_index: int, history: list) -> str:
"""
スライドインデックスとhistoryから音声テキストを抽出
Args:
slide: スライドデータ辞書
slide_index: 0-13のスライドインデックス
history: 6件の元イベントデータ(year/mnemonic/summary)
Returns:
str: 読み上げるテキスト
"""
slide_type = determine_slide_type(slide)
if slide_type == "title":
title_text = slide.get("title", "")
notes_text = slide.get("notes", "")
# 「語呂羽丸五郎」を読み仮名に変換(正しく読まれないため)
title_text = title_text.replace("語呂羽丸五郎", "ごろーまるごろう")
notes_text = notes_text.replace("語呂羽丸五郎", "ごろーまるごろう")
# titleとnotesを結合
if notes_text:
return f"{title_text}{notes_text}"
else:
return title_text
elif slide_type == "closing":
closing_text = slide.get("notes", "本日の学習は以上です。復習を忘れずに。")
# 「語呂羽丸五郎」を読み仮名に変換(正しく読まれないため)
closing_text = closing_text.replace("語呂羽丸五郎", "ごろーまるごろう")
return closing_text
elif slide_type == "imageText_image_only":
# slide_index 1, 3, 5, 7, 9, 11 → history[0, 1, 2, 3, 4, 5]
history_index = (slide_index - 1) // 2
event = history[history_index]
year = str(event.get("year", ""))
# 既に「年」が含まれている場合は除去(重複防止)
year_clean = year.replace("年", "")
mnemonic = clean_mnemonic(str(event.get("mnemonic", "")))
return f"{year_clean}年、{mnemonic}{year_clean}年、{mnemonic}。"
elif slide_type == "imageText_with_text":
# slide_index 2, 4, 6, 8, 10, 12 → history[0, 1, 2, 3, 4, 5]
history_index = (slide_index - 2) // 2
event = history[history_index]
summary = str(event.get("summary", ""))
return summary
else:
return ""
def convert_pil_to_array(pil_image: Image.Image, target_size: tuple) -> np.ndarray:
"""
PIL ImageをNumPy配列に変換し、指定サイズにリサイズ
Args:
pil_image: PIL Image
target_size: (width, height) - 例: (1280, 720)
Returns:
numpy array (RGB)
"""
# アスペクト比を保ってリサイズ
pil_image = pil_image.resize(target_size, Image.Resampling.LANCZOS)
# RGB変換
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
# numpy array変換
img_array = np.array(pil_image)
return img_array
# ==============================
# V2.0: Gemini TTS音声生成
# ==============================
def generate_audio_with_gemini(audio_text: str, gemini_token: str) -> bytes:
"""
Gemini REST APIでテキストから音声を生成
Args:
audio_text: 読み上げるテキスト
gemini_token: GEMINI_TOKEN環境変数
Returns:
WAVバイナリデータ(24kHz PCM16)
"""
import base64
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro-preview-tts:generateContent?key={gemini_token}"
headers = {
"Content-Type": "application/json"
}
# プロンプトベース制御(ドラマチック表現と正確な読み上げを重視)
prompt_text = f"Read this Japanese historical text with dramatic emotional expression and accurate kanji pronunciation, bringing the story to life with vivid intonation: {audio_text}"
payload = {
"contents": [
{
"role": "user",
"parts": [
{
"text": prompt_text
}
]
}
],
"generationConfig": {
"responseModalities": ["audio"],
"speech_config": {
"voice_config": {
"prebuilt_voice_config": {
"voice_name": "Charon"
}
}
}
}
}
logger.info(f"Gemini TTS API呼び出し: {len(audio_text)}文字")
logger.info(f"Payload: {payload}")
response = requests.post(url, json=payload, headers=headers, timeout=60)
# エラーレスポンスの詳細をログ出力
if response.status_code != 200:
logger.error(f"Gemini API Error: {response.status_code}")
logger.error(f"Response: {response.text}")
response.raise_for_status()
# レスポンスからaudioデータを取得(base64デコード)
response_data = response.json()
audio_data_b64 = response_data["candidates"][0]["content"]["parts"][0]["inlineData"]["data"]
pcm_bytes = base64.b64decode(audio_data_b64)
logger.info(f"音声データ取得完了: {len(pcm_bytes)} bytes (PCM)")
# PCM16をWAVファイルに変換
wav_bytes = convert_pcm_to_wav(pcm_bytes, sample_rate=24000, channels=1, sample_width=2)
logger.info(f"WAV変換完了: {len(wav_bytes)} bytes")
return wav_bytes
def convert_pcm_to_wav(pcm_bytes: bytes, sample_rate: int, channels: int, sample_width: int) -> bytes:
"""
PCMバイナリをWAV形式に変換
Args:
pcm_bytes: PCMバイナリデータ
sample_rate: サンプルレート(Hz)
channels: チャンネル数(1=モノラル、2=ステレオ)
sample_width: サンプル幅(バイト、2=16bit)
Returns:
WAVバイナリデータ
"""
import wave
import io
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(sample_width)
wav_file.setframerate(sample_rate)
wav_file.writeframes(pcm_bytes)
wav_buffer.seek(0)
return wav_buffer.read()
def get_audio_duration(wav_bytes: bytes) -> float:
"""
WAVバイナリから音声の長さ(秒)を取得
Args:
wav_bytes: WAVバイナリデータ
Returns:
float: 音声の長さ(秒)
"""
import wave
import io
wav_buffer = io.BytesIO(wav_bytes)
with wave.open(wav_buffer, 'rb') as wav_file:
frames = wav_file.getnframes()
rate = wav_file.getframerate()
duration = frames / float(rate)
return duration
def speed_up_audio(wav_bytes: bytes, speed_factor: float = 1.5) -> bytes:
"""
WAVファイルを指定倍速に変換
Args:
wav_bytes: 元のWAVバイナリデータ
speed_factor: 倍速係数(1.5 = 1.5倍速)
Returns:
倍速変換後のWAVバイナリデータ
"""
import wave
import io
# WAV読み込み
wav_buffer = io.BytesIO(wav_bytes)
with wave.open(wav_buffer, 'rb') as wav_file:
params = wav_file.getparams()
frames = wav_file.readframes(wav_file.getnframes())
# PCMデータをnumpy配列に変換
audio_data = np.frombuffer(frames, dtype=np.int16)
# リサンプリング(間引き処理)
target_length = int(len(audio_data) / speed_factor)
indices = np.linspace(0, len(audio_data) - 1, target_length).astype(int)
resampled_data = audio_data[indices]
# WAV形式で書き出し
output_buffer = io.BytesIO()
with wave.open(output_buffer, 'wb') as output_wav:
output_wav.setparams(params)
output_wav.writeframes(resampled_data.tobytes())
output_buffer.seek(0)
return output_buffer.read()
def save_audio_to_hf(wav_bytes: bytes, prefix: str = "slide_audio") -> str:
"""
音声WAVファイルをHugging Faceデータセットにアップロード
Args:
wav_bytes: WAVバイナリデータ
prefix: ファイル名のプレフィックス
Returns:
str: アップロードされた音声ファイルのURL
"""
# 一時ファイルに保存
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
tmp_file.write(wav_bytes)
tmp_path = tmp_file.name
try:
# HFアップロード
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
filename = f"{prefix}_{timestamp}_{unique_id}.wav"
path_in_repo = f"audios/{filename}"
logger.info(f"音声アップロード開始: {path_in_repo}")
video_uploader.api.upload_file(
path_or_fileobj=tmp_path,
path_in_repo=path_in_repo,
repo_id=video_uploader.repo_id,
repo_type="dataset"
)
audio_url = f"https://huggingface.co/datasets/{video_uploader.repo_id}/resolve/main/{path_in_repo}"
logger.info(f"音声アップロード完了: {audio_url}")
return audio_url
finally:
# 一時ファイル削除
if os.path.exists(tmp_path):
os.remove(tmp_path)
# ==============================
# V2.0: 音声付き動画生成
# ==============================
def create_video_with_audio_from_slides(
slide_data: list,
gemini_token: str,
progress_callback=None
) -> tuple:
"""
スライドデータから音声付き動画を生成
Args:
slide_data: スライドデータJSON配列
gemini_token: GEMINI_TOKEN環境変数
progress_callback: 進捗コールバック関数(Gradio用)
Returns:
tuple: (video_url, page2_image_url, audio_info_list)
"""
audio_files = [] # 一時ファイル管理
clips = []
audio_info_list = []
video_path = None
try:
total_slides = len(slide_data)
# 各スライドの音声生成と動画クリップ作成
for idx, slide in enumerate(slide_data):
if progress_callback:
progress_callback((idx / total_slides) * 0.6, desc=f"音声生成中 ({idx+1}/{total_slides})")
logger.info(f"スライド {idx+1}/{total_slides} 処理中...")
# 音声テキスト抽出
audio_text = extract_audio_text(slide)
if not audio_text:
logger.warning(f"スライド {idx+1}: 音声テキストが空です")
continue
# 音声生成
wav_bytes = generate_audio_with_gemini(audio_text, gemini_token)
# 1.5倍速処理(50%アップ)
wav_bytes = speed_up_audio(wav_bytes, speed_factor=1.5)
# 音声長さ測定(倍速処理後)
audio_duration = get_audio_duration(wav_bytes)
# スライド再生時間計算(音声 + 0.6秒余白、前後0.3秒ずつ)
slide_duration = audio_duration + 0.6
# 音声を一時ファイルに保存
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio:
tmp_audio.write(wav_bytes)
audio_path = tmp_audio.name
audio_files.append(audio_path)
# HFアップロード
slide_type = determine_slide_type(slide)
audio_url = save_audio_to_hf(wav_bytes, prefix=f"slide_{idx:02d}_{slide_type}")
# 音声情報記録
audio_info_list.append({
"slide_index": idx,
"slide_type": slide_type,
"audio_url": audio_url,
"duration": audio_duration,
"text": audio_text
})
# 画像クリップ作成(仮の黒画像、実際はスライド画像を使用)
# TODO: スライド画像生成または取得
img_array = np.zeros((720, 1280, 3), dtype=np.uint8) # 仮の黒画像(720p)
# moviepyクリップ作成
img_clip = ImageClip(img_array, duration=slide_duration)
audio_clip = AudioFileClip(audio_path)
# 音声を動画に設定
video_clip = img_clip.set_audio(audio_clip)
clips.append(video_clip)
logger.info(f"スライド {idx+1}: 音声{audio_duration:.2f}秒, 再生時間{slide_duration:.2f}秒")
if not clips:
raise Exception("動画クリップが生成されませんでした")
if progress_callback:
progress_callback(0.7, desc="動画を結合中...")
# 全クリップを連結
final_video = concatenate_videoclips(clips, method="compose")
# 一時動画ファイルに出力
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_video:
video_path = tmp_video.name
if progress_callback:
progress_callback(0.8, desc="動画をエンコード中...")
# 動画エンコード
final_video.write_videofile(
video_path,
fps=30,
codec='libx264',
audio_codec='aac',
logger=None # moviepyのログを抑制
)
# クリップをクローズ
final_video.close()
for clip in clips:
clip.close()
if progress_callback:
progress_callback(0.9, desc="動画をアップロード中...")
# HFアップロード
video_url = video_uploader.upload_video(video_path, prefix="slidedata_video")
# 2ページ目画像抽出・アップロード(TODO: 実装)
page2_image_url = None
if progress_callback:
progress_callback(1.0, desc="完了!")
logger.info(f"動画生成完了: {video_url}")
return (video_url, page2_image_url, audio_info_list)
finally:
# 一時ファイルクリーンアップ
for audio_file in audio_files:
if os.path.exists(audio_file):
try:
os.remove(audio_file)
except Exception as e:
logger.warning(f"音声ファイル削除エラー: {e}")
if video_path and os.path.exists(video_path):
try:
os.remove(video_path)
except Exception as e:
logger.warning(f"動画ファイル削除エラー: {e}")
def create_video_with_audio_from_slides_v2(
slide_data: Union[str, list],
pdf_url: str,
history: Union[str, list],
gemini_token: str,
progress_callback=None
) -> tuple:
"""
PDF画像とslideData/historyから音声付き動画を生成(V2.2版 - 全パラメータJSON文字列対応)
Args:
slide_data: 14枚のスライドデータ - JSON文字列
pdf_url: GASが生成したPDF URL
history: 6件の元イベントデータ(year/mnemonic/summary)- JSON文字列
gemini_token: GEMINI_TOKEN環境変数
progress_callback: 進捗コールバック関数(Gradio用)
Returns:
tuple: (video_url, page2_image_url, audio_info_list)
"""
import json
pdf_path = None
audio_files = []
clips = []
audio_info_list = []
video_path = None
try:
# slide_dataをJSON文字列からlistにパース
if isinstance(slide_data, str):
slide_data_list = json.loads(slide_data)
else:
slide_data_list = slide_data # 既にlistの場合(後方互換性)
# historyをJSON文字列からlistにパース
if isinstance(history, str):
history_list = json.loads(history)
else:
history_list = history # 既にlistの場合(後方互換性)
logger.info(f"history parsed: {len(history_list)}件のイベント")
# 1. PDFダウンロード
if progress_callback:
progress_callback(0.05, desc="PDFダウンロード中...")
pdf_path = download_pdf_from_url(sanitize_url(pdf_url))
# 2. PDF → 画像変換(14ページ → 14枚)
if progress_callback:
progress_callback(0.1, desc="PDF→画像変換中...")
images = convert_pdf_to_images(pdf_path, dpi=150)
# 画像枚数とスライドデータの整合性チェック
if len(images) != len(slide_data_list):
raise Exception(f"画像枚数とスライドデータが不一致: {len(images)}枚 vs {len(slide_data_list)}枚")
total_slides = len(slide_data_list)
logger.info(f"PDF変換完了: {total_slides}枚の画像を取得")
# 3. 各スライドの音声生成と動画クリップ作成
for idx, (slide, pil_image) in enumerate(zip(slide_data_list, images)):
if progress_callback:
progress_callback(0.1 + (idx / total_slides) * 0.5, desc=f"音声生成中 ({idx+1}/{total_slides})")
logger.info(f"スライド {idx+1}/{total_slides} 処理中...")
# 音声テキスト抽出(history_listを使用)
audio_text = extract_audio_text_v2(slide, idx, history_list)
if not audio_text:
logger.warning(f"スライド {idx+1}: 音声テキストが空です")
continue
# 音声生成
wav_bytes = generate_audio_with_gemini(audio_text, gemini_token)
# 1.5倍速処理(50%アップ)
wav_bytes = speed_up_audio(wav_bytes, speed_factor=1.5)
# 音声長さ測定(倍速処理後)
audio_duration = get_audio_duration(wav_bytes)
# スライド再生時間計算(音声 + 0.6秒余白)
slide_duration = audio_duration + 0.6
# 音声を一時ファイルに保存
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio:
tmp_audio.write(wav_bytes)
audio_path = tmp_audio.name
audio_files.append(audio_path)
# HFアップロード
slide_type = determine_slide_type(slide)
audio_url = save_audio_to_hf(wav_bytes, prefix=f"slide_{idx:02d}_{slide_type}")
# 音声情報記録
audio_info_list.append({
"slide_index": idx,
"slide_type": slide_type,
"audio_url": audio_url,
"duration": audio_duration,
"text": audio_text
})
# PIL Image → NumPy配列(720p)
img_array = convert_pil_to_array(pil_image, target_size=(1280, 720))
# moviepyクリップ作成
img_clip = ImageClip(img_array, duration=slide_duration)
audio_clip = AudioFileClip(audio_path)
# 音声を動画に設定
video_clip = img_clip.set_audio(audio_clip)
clips.append(video_clip)
logger.info(f"✅ スライド {idx+1}/{total_slides}: クリップ作成完了(音声{audio_duration:.2f}秒 + 余白0.6秒 = {slide_duration:.2f}秒)")
if not clips:
raise Exception("動画クリップが生成されませんでした")
# 総動画時間計算
total_video_duration = sum([clip.duration for clip in clips])
logger.info(f"📹 全{len(clips)}クリップ作成完了(総再生時間: {total_video_duration:.2f}秒)")
if progress_callback:
progress_callback(0.7, desc="動画を結合中...")
# 4. 全クリップを連結
logger.info(f"🔗 動画結合開始: {len(clips)}個のクリップを連結中...")
final_video = concatenate_videoclips(clips, method="compose")
logger.info(f"✅ 動画結合完了")
# 一時動画ファイルに出力
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_video:
video_path = tmp_video.name
if progress_callback:
progress_callback(0.8, desc="動画をエンコード中...")
# 5. 動画エンコード
logger.info(f"🎬 動画エンコード開始: {total_video_duration:.2f}秒の動画をH.264/AACでエンコード中...")
final_video.write_videofile(
video_path,
fps=30,
codec='libx264',
audio_codec='aac',
logger=None # moviepyのログを抑制
)
logger.info(f"✅ 動画エンコード完了: {video_path}")
# クリップをクローズ
final_video.close()
for clip in clips:
clip.close()
logger.info(f"🧹 メモリ解放完了({len(clips)}クリップクローズ)")
if progress_callback:
progress_callback(0.9, desc="動画をアップロード中...")
# 6. HFアップロード
video_file_size = os.path.getsize(video_path) / (1024 * 1024) # MB
logger.info(f"☁️ 動画アップロード開始: {video_file_size:.2f}MB → Hugging Face Dataset...")
video_url = video_uploader.upload_video(video_path, prefix="slidedata_video_v2")
logger.info(f"✅ 動画アップロード完了: {video_url}")
# 7. 2ページ目画像アップロード
page2_image_url = None
if len(images) >= 2:
logger.info(f"🖼️ 2ページ目画像アップロード開始...")
page2_image = images[1]
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img:
page2_image_path = tmp_img.name
page2_image.save(page2_image_path, format='JPEG', quality=90)
page2_image_url = video_uploader.upload_image(page2_image_path, prefix="slidedata_page2")
logger.info(f"✅ 2ページ目画像アップロード完了: {page2_image_url}")
# 一時ファイル削除
if os.path.exists(page2_image_path):
os.remove(page2_image_path)
if progress_callback:
progress_callback(1.0, desc="完了!")
logger.info(f"動画生成完了: {video_url}")
return (video_url, page2_image_url, audio_info_list)
finally:
# 一時ファイルクリーンアップ
if pdf_path and os.path.exists(pdf_path):
try:
os.remove(pdf_path)
except Exception as e:
logger.warning(f"PDFファイル削除エラー: {e}")
for audio_file in audio_files:
if os.path.exists(audio_file):
try:
os.remove(audio_file)
except Exception as e:
logger.warning(f"音声ファイル削除エラー: {e}")
if video_path and os.path.exists(video_path):
try:
os.remove(video_path)
except Exception as e:
logger.warning(f"動画ファイル削除エラー: {e}")
# ==============================
# コア機能実装
# ==============================
def download_pdf_from_url(pdf_url: str) -> str:
"""
指定されたURLからPDFをダウンロードして一時ファイルとして保存
Args:
pdf_url: PDFファイルのURL
Returns:
str: ダウンロードされたPDFファイルのパス
Raises:
Exception: ダウンロード失敗時
"""
try:
# URLをサニタイズ(バックスラッシュ等を除去)
pdf_url = sanitize_url(pdf_url)
logger.info(f"PDFダウンロード開始: {pdf_url}")
# HTTPリクエスト
response = requests.get(pdf_url, timeout=30, stream=True)
response.raise_for_status()
# Content-Typeの検証
content_type = response.headers.get('Content-Type', '')
if 'pdf' not in content_type.lower():
logger.warning(f"Content-Type が PDF ではありません: {content_type}")
# 一時ファイルに保存
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp_file:
tmp_path = tmp_file.name
for chunk in response.iter_content(chunk_size=8192):
tmp_file.write(chunk)
logger.info(f"PDFダウンロード完了: {tmp_path} ({os.path.getsize(tmp_path)} bytes)")
return tmp_path
except requests.exceptions.RequestException as e:
logger.error(f"PDFダウンロードエラー: {e}")
raise Exception(f"PDFのダウンロードに失敗しました: {e}")
def convert_pdf_to_images(pdf_path: str, dpi: int = 150) -> list:
"""
PDFファイルを画像リストに変換
Args:
pdf_path: PDFファイルのパス
dpi: 解像度(デフォルト150)
Returns:
list: PIL.Imageオブジェクトのリスト
"""
try:
logger.info(f"PDF→画像変換開始: {pdf_path}, DPI={dpi}")
# PDFを画像に変換
images = convert_from_path(
pdf_path,
dpi=dpi,
fmt='jpeg', # JPEG形式で出力
thread_count=2 # 並列処理スレッド数
)
logger.info(f"PDF変換完了: {len(images)}ページ")
return images
except Exception as e:
logger.error(f"PDF変換エラー: {e}")
raise Exception(f"PDFの画像変換に失敗しました: {e}")
def create_video_from_images(
images: list,
duration_per_page: int = 5,
fps: int = 30
) -> str:
"""
画像リストからスライドショー動画を生成
Args:
images: PIL.Imageオブジェクトのリスト
duration_per_page: 1ページあたりの表示秒数(デフォルト5秒)
fps: フレームレート(デフォルト30fps)
Returns:
str: 生成された動画ファイルのパス
"""
try:
if not images:
raise ValueError("画像リストが空です")
logger.info(f"動画生成開始: {len(images)}ページ, {duration_per_page}秒/ページ, {fps}fps")
# 全画像を同じサイズにリサイズ(最初の画像のサイズに統一)
first_img = images[0]
width, height = first_img.size
logger.info(f"動画サイズ: {width}x{height}")
# 一時ファイルパス
tmp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
video_path = tmp_video.name
tmp_video.close()
# 動画ライター初期化
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(
video_path,
fourcc,
fps,
(width, height)
)
# 各画像を指定秒数分のフレームとして追加
frames_per_page = duration_per_page * fps
for idx, img in enumerate(images):
logger.info(f"ページ {idx+1}/{len(images)} を処理中...")
# 画像をリサイズ(必要な場合)
if img.size != (width, height):
img = img.resize((width, height), Image.Resampling.LANCZOS)
# PIL Image → OpenCV形式に変換(RGB→BGR)
img_array = np.array(img)
if len(img_array.shape) == 3 and img_array.shape[2] == 3:
img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
else:
img_bgr = img_array
# 同じフレームを複数回書き込み(静止画として表示)
for _ in range(frames_per_page):
video_writer.write(img_bgr)
video_writer.release()
logger.info(f"動画生成完了: {video_path} ({os.path.getsize(video_path)} bytes)")
return video_path
except Exception as e:
logger.error(f"動画生成エラー: {e}")
raise Exception(f"動画の生成に失敗しました: {e}")
# ==============================
# Hugging Face アップロード
# ==============================
class VideoUploader:
"""Hugging Face Datasetへの動画アップロード機能"""
def __init__(self):
self.repo_id = os.environ.get("HF_REPO_ID", "tomo2chin2/video-storage")
self.token = os.environ.get("HF_TOKEN")
if not self.token:
logger.warning("HF_TOKEN 環境変数が設定されていません")
self.api = None
return
try:
# ログイン
login(token=self.token)
self.api = HfApi()
logger.info(f"HuggingFace にログイン完了: {self.repo_id}")
except Exception as e:
logger.error(f"HuggingFace ログインエラー: {e}")
self.api = None
def upload_video(self, video_path: str, prefix: str = "video") -> str:
"""
動画をHugging Faceデータセットにアップロード
Args:
video_path: アップロードする動画ファイルのパス
prefix: ファイル名のプレフィックス
Returns:
str: アップロードされた動画のURL
"""
if not self.api:
raise Exception("HuggingFace API が初期化されていません。HF_TOKEN を確認してください。")
try:
# ユニークなファイル名を生成
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
filename = f"{prefix}_{timestamp}_{unique_id}.mp4"
path_in_repo = f"videos/{filename}"
logger.info(f"動画アップロード開始: {path_in_repo}")
# アップロード実行
upload_info = self.api.upload_file(
path_or_fileobj=video_path,
path_in_repo=path_in_repo,
repo_id=self.repo_id,
repo_type="dataset"
)
# URLを構築
video_url = f"https://huggingface.co/datasets/{self.repo_id}/resolve/main/{path_in_repo}"
logger.info(f"動画アップロード完了: {video_url}")
return video_url
except Exception as e:
logger.error(f"動画アップロードエラー: {e}")
raise Exception(f"動画のアップロードに失敗しました: {e}")
def upload_image(self, image_path: str, prefix: str = "image") -> str:
"""
画像をHugging Faceデータセットにアップロード
Args:
image_path: アップロードする画像ファイルのパス
prefix: ファイル名のプレフィックス
Returns:
str: アップロードされた画像のURL
"""
if not self.api:
raise Exception("HuggingFace API が初期化されていません。HF_TOKEN を確認してください。")
try:
# ユニークなファイル名を生成
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = str(uuid.uuid4())[:8]
# ファイルの拡張子を取得
ext = Path(image_path).suffix
filename = f"{prefix}_{timestamp}_{unique_id}{ext}"
path_in_repo = f"images/{filename}"
logger.info(f"画像アップロード開始: {path_in_repo}")
# アップロード実行
upload_info = self.api.upload_file(
path_or_fileobj=image_path,
path_in_repo=path_in_repo,
repo_id=self.repo_id,
repo_type="dataset"
)
# URLを構築
image_url = f"https://huggingface.co/datasets/{self.repo_id}/resolve/main/{path_in_repo}"
logger.info(f"画像アップロード完了: {image_url}")
return image_url
except Exception as e:
logger.error(f"画像アップロードエラー: {e}")
raise Exception(f"画像のアップロードに失敗しました: {e}")
# グローバルなアップローダーインスタンスを作成
video_uploader = VideoUploader()
# ==============================
# FastAPI アプリケーション
# ==============================
app = FastAPI(
title="PDF to Video API",
description="PDFをスライドショー動画に変換するAPI",
version="1.0.0"
)
# CORS設定
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post(
"/api/pdf-to-video",
response_model=VideoResponse,
tags=["Video Generation"],
summary="PDFをスライドショー動画に変換",
description="指定されたURLからPDFをダウンロードし、各ページを画像化して動画を生成します。"
)
async def pdf_to_video(request: PdfToVideoRequest):
"""PDF→動画変換APIエンドポイント"""
pdf_path = None
video_path = None
page2_image_path = None
try:
logger.info(f"API リクエスト受信: {request.pdf_url}")
# 1. PDFダウンロード
pdf_path = download_pdf_from_url(str(request.pdf_url))
# 2. PDF→画像変換
images = convert_pdf_to_images(pdf_path, dpi=request.dpi)
total_pages = len(images)
# 3. 動画生成
video_path = create_video_from_images(
images,
duration_per_page=request.duration_per_page
)
# 4. Hugging Faceにアップロード
video_url = video_uploader.upload_video(video_path, prefix="pdf_video")
# 5. 2ページ目の画像を保存してアップロード
page2_image_url = None
if total_pages >= 2:
# 2ページ目の画像(インデックス1)を一時ファイルとして保存
page2_image = images[1]
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img:
page2_image_path = tmp_img.name
page2_image.save(page2_image_path, format='JPEG', quality=90)
logger.info(f"2ページ目の画像を保存: {page2_image_path}")
# Hugging Faceにアップロード
page2_image_url = video_uploader.upload_image(page2_image_path, prefix="pdf_page2")
logger.info(f"2ページ目の画像アップロード完了: {page2_image_url}")
# 動画の総再生時間を計算
video_duration = total_pages * request.duration_per_page
logger.info(f"処理完了: 動画={video_url}, 2ページ目画像={page2_image_url}")
return VideoResponse(
status="success",
video_url=video_url,
page2_image_url=page2_image_url,
message="動画の生成とアップロードに成功しました",
total_pages=total_pages,
video_duration=video_duration
)
except Exception as e:
logger.error(f"エラー発生: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"動画生成に失敗しました: {str(e)}"
)
finally:
# 一時ファイルのクリーンアップ
if pdf_path and os.path.exists(pdf_path):
try:
os.remove(pdf_path)
logger.info(f"一時PDFファイル削除: {pdf_path}")
except Exception as e:
logger.warning(f"PDFファイル削除エラー: {e}")
if video_path and os.path.exists(video_path):
try:
os.remove(video_path)
logger.info(f"一時動画ファイル削除: {video_path}")
except Exception as e:
logger.warning(f"動画ファイル削除エラー: {e}")
if page2_image_path and os.path.exists(page2_image_path):
try:
os.remove(page2_image_path)
logger.info(f"一時画像ファイル削除: {page2_image_path}")
except Exception as e:
logger.warning(f"画像ファイル削除エラー: {e}")
@app.post(
"/api/slidedata-to-video",
response_model=AudioVideoResponse,
tags=["Video Generation"],
summary="スライドデータから音声付き動画を生成",
description="スライドデータJSONから音声を生成し、音声付き動画を作成します。"
)
async def slidedata_to_video(request: SlideDataToVideoRequest):
"""スライドデータ→音声付き動画変換APIエンドポイント"""
# GEMINI_TOKEN取得
gemini_token = os.environ.get("GEMINI_TOKEN")
if not gemini_token:
raise HTTPException(
status_code=500,
detail="GEMINI_TOKEN環境変数が設定されていません"
)
try:
logger.info(f"API リクエスト受信(V2.3 - Dify list型対応)")
logger.info(f" slide_data type: {type(request.slide_data).__name__}")
logger.info(f" history type: {type(request.history).__name__}")
logger.info(f" pdf_url: {request.pdf_url[:50]}...")
# 動画生成(V2.3完全版)
video_url, page2_image_url, audio_info_list = create_video_with_audio_from_slides_v2(
slide_data=request.slide_data,
pdf_url=request.pdf_url,
history=request.history,
gemini_token=gemini_token
)
# 総再生時間計算
total_duration = sum([info["duration"] + 0.6 for info in audio_info_list])
logger.info(f"処理完了: 動画={video_url}")
return AudioVideoResponse(
status="success",
video_url=video_url,
page2_image_url=page2_image_url,
audio_urls=audio_info_list,
message="音声付き動画の生成とアップロードに成功しました",
total_slides=len(audio_info_list),
video_duration=total_duration
)
except Exception as e:
logger.error(f"エラー発生: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"動画生成に失敗しました: {str(e)}"
)
@app.get("/health")
async def health_check():
"""ヘルスチェックエンドポイント"""
return {
"status": "healthy",
"service": "PDF to Video API",
"hf_configured": video_uploader.api is not None
}
# ==============================
# Gradio UI
# ==============================
def process_pdf_url(pdf_url, duration_per_page, dpi, progress=gr.Progress()):
"""Gradio UIからの処理関数"""
try:
if not pdf_url:
return None, None, "❌ PDF URLを入力してください", None, None
# URLをサニタイズ
pdf_url = sanitize_url(pdf_url)
progress(0, desc="PDFダウンロード中...")
# PDFダウンロード
pdf_path = download_pdf_from_url(pdf_url)
progress(0.3, desc="PDF→画像変換中...")
# PDF→画像変換
images = convert_pdf_to_images(pdf_path, dpi=dpi)
total_pages = len(images)
progress(0.6, desc=f"動画生成中({total_pages}ページ)...")
# 動画生成
video_path = create_video_from_images(
images,
duration_per_page=duration_per_page
)
progress(0.8, desc="Hugging Faceにアップロード中...")
# 動画アップロード
video_url = video_uploader.upload_video(video_path, prefix="pdf_video")
# 2ページ目の画像を保存してアップロード
page2_image_url = None
page2_image_pil = None
if total_pages >= 2:
progress(0.9, desc="2ページ目の画像をアップロード中...")
page2_image = images[1]
# 一時ファイルに保存してアップロード
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img:
page2_image_path = tmp_img.name
page2_image.save(page2_image_path, format='JPEG', quality=90)
page2_image_url = video_uploader.upload_image(page2_image_path, prefix="pdf_page2")
# PIL.Imageオブジェクトを直接Gradioに渡す
page2_image_pil = page2_image
# 一時ファイルを即座に削除(PIL経由で表示するため不要)
if os.path.exists(page2_image_path):
os.remove(page2_image_path)
# クリーンアップ
if pdf_path and os.path.exists(pdf_path):
os.remove(pdf_path)
if video_path and os.path.exists(video_path):
os.remove(video_path)
video_duration = total_pages * duration_per_page
progress(1.0, desc="完了!")
message = f"✅ 成功: {total_pages}ページ、{video_duration}秒の動画を生成しました"
if page2_image_url:
message += "\n📸 2ページ目の画像も保存しました"
return (
video_url, # ビデオURL
page2_image_url, # 2ページ目の画像URL
message, # ステータスメッセージ
page2_image_pil # 画像プレビュー(PIL.Image)
)
except Exception as e:
logger.error(f"Gradio処理エラー: {e}", exc_info=True)
return None, None, f"❌ エラー: {str(e)}", None
# Gradio UI定義
with gr.Blocks(title="PDF to Video Converter", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 📄 PDF → 🎬 動画変換")
gr.Markdown("PDFのURLを指定すると、各ページをスライドショー動画に変換します。")
with gr.Row():
with gr.Column(scale=2):
pdf_url_input = gr.Textbox(
label="PDF URL",
placeholder="https://example.com/sample.pdf",
info="変換したいPDFファイルのURLを入力してください"
)
with gr.Row():
duration_slider = gr.Slider(
minimum=1,
maximum=10,
step=1,
value=5,
label="1ページあたりの表示秒数"
)
dpi_slider = gr.Slider(
minimum=72,
maximum=300,
step=1,
value=150,
label="画像解像度(DPI)",
info="高いほど高画質ですが処理時間が増加します"
)
convert_btn = gr.Button("🎬 動画生成", variant="primary", size="lg")
with gr.Column(scale=1):
status_output = gr.Textbox(
label="ステータス",
interactive=False
)
video_url_output = gr.Textbox(
label="動画URL",
interactive=False,
info="生成された動画のURL"
)
page2_image_url_output = gr.Textbox(
label="2ページ目画像URL",
interactive=False,
info="2ページ目の画像URL"
)
with gr.Row():
page2_image_preview = gr.Image(
label="2ページ目プレビュー",
interactive=False,
type="pil"
)
# イベント設定
convert_btn.click(
fn=process_pdf_url,
inputs=[pdf_url_input, duration_slider, dpi_slider],
outputs=[video_url_output, page2_image_url_output, status_output, page2_image_preview]
)
# 使用例とサンプルURL
gr.Markdown("""
## 📖 使用方法
1. PDFのURLを入力
2. 1ページあたりの表示秒数を調整(デフォルト5秒)
3. 解像度(DPI)を調整(デフォルト150)
4. 「動画生成」ボタンをクリック
5. 生成された動画URLをコピーして利用
## 🔗 サンプルPDF URL(テスト用)
- W3C サンプル: `https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf`
## ⚙️ 環境変数
- `HF_TOKEN`: Hugging Face認証トークン(必須)
- `HF_REPO_ID`: データセットリポジトリID(デフォルト: tomo2chin2/video-storage)
## 🔗 APIエンドポイント
- `POST /api/pdf-to-video`: PDF→動画変換API
- `GET /health`: ヘルスチェック
""")
# 環境変数情報表示
hf_repo = os.environ.get("HF_REPO_ID", "tomo2chin2/video-storage")
hf_configured = "✅ 設定済み" if video_uploader.api else "❌ 未設定"
gr.Markdown(f"""
## 📊 現在の設定
- HuggingFace リポジトリ: `{hf_repo}`
- HF_TOKEN: {hf_configured}
""")
# FastAPIにGradioをマウント
app = gr.mount_gradio_app(app, demo, path="/")
# ==============================
# アプリケーション起動
# ==============================
if __name__ == "__main__":
import uvicorn
logger.info("Starting PDF to Video Converter API...")
uvicorn.run(app, host="0.0.0.0", port=7860)