Spaces:
Paused
Paused
| """ | |
| PDF to Video Converter API | |
| PDFをダウンロードして各ページを画像化し、スライドショー動画を生成してHugging Faceにアップロードする | |
| """ | |
| import gradio as gr | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, HttpUrl | |
| from typing import Optional, Union | |
| import requests | |
| import tempfile | |
| import os | |
| import logging | |
| import numpy as np | |
| from datetime import datetime | |
| import uuid | |
| from pathlib import Path | |
| # 画像・動画処理ライブラリ | |
| from pdf2image import convert_from_path | |
| from PIL import Image | |
| import cv2 | |
| from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips | |
| # Hugging Face Hub | |
| from huggingface_hub import HfApi, login | |
| # ロギング設定 | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================== | |
| # リクエスト/レスポンスモデル | |
| # ============================== | |
| class PdfToVideoRequest(BaseModel): | |
| """PDF→動画変換リクエストモデル""" | |
| pdf_url: str # HttpUrlからstrに変更(バックスラッシュ対応のため) | |
| duration_per_page: int = 5 # デフォルト5秒 | |
| dpi: int = 150 # デフォルトDPI | |
| class VideoResponse(BaseModel): | |
| """動画生成レスポンスモデル""" | |
| status: str | |
| video_url: Optional[str] = None | |
| page2_image_url: Optional[str] = None # 2ページ目の画像URL | |
| message: str | |
| total_pages: Optional[int] = None | |
| video_duration: Optional[float] = None # 秒 | |
| # V2.2: スライドデータ→音声付き動画変換モデル(history: String型対応) | |
| class SlideDataToVideoRequest(BaseModel): | |
| """スライドデータ→音声付き動画変換リクエスト(V2.3 - Dify list型対応)""" | |
| slide_data: Union[str, list] # JSON文字列 or list(Difyからの直接配列も受付) | |
| pdf_url: str # GASが生成したPDF URL | |
| history: Union[str, list] # JSON文字列 or list(Difyからの直接配列も受付) # 6件の元イベント(年号・語呂合わせ・サマリー)- JSON文字列 | |
| class AudioInfo(BaseModel): | |
| """音声情報""" | |
| slide_index: int | |
| slide_type: str | |
| audio_url: str | |
| duration: float | |
| text: str | |
| class AudioVideoResponse(BaseModel): | |
| """音声付き動画生成レスポンス""" | |
| status: str | |
| video_url: Optional[str] = None | |
| page2_image_url: Optional[str] = None | |
| audio_urls: list = [] # list[AudioInfo]として使用 | |
| message: str | |
| total_slides: Optional[int] = None | |
| video_duration: Optional[float] = None | |
| # ============================== | |
| # URL前処理ユーティリティ | |
| # ============================== | |
| def sanitize_url(url: str) -> str: | |
| """ | |
| URLからバックスラッシュやエスケープシーケンスを除去 | |
| Args: | |
| url: 元のURL文字列 | |
| Returns: | |
| str: クリーニングされたURL | |
| """ | |
| import urllib.parse | |
| # URLデコード(%22等のエンコードされた文字を元に戻す) | |
| cleaned_url = urllib.parse.unquote(url) | |
| # バックスラッシュを除去 | |
| cleaned_url = cleaned_url.replace('\\', '') | |
| # 前後の空白を削除 | |
| cleaned_url = cleaned_url.strip() | |
| # ダブルクォートを除去(JSON文字列から来た場合) | |
| cleaned_url = cleaned_url.strip('"').strip("'") | |
| logger.info(f"URL sanitized: {url} → {cleaned_url}") | |
| return cleaned_url | |
| # ============================== | |
| # V2.0: スライドデータ処理関数 | |
| # ============================== | |
| def clean_mnemonic(text: str) -> str: | |
| """ | |
| 語呂合わせから(数字)または(数字)パターンを除去 | |
| Args: | |
| text: 語呂合わせテキスト(例: "いい国つくろう鎌倉幕府(1192)" または "兄(2)さん(3)ク(9)イーン") | |
| Returns: | |
| str: 数字を除去したテキスト(例: "いい国つくろう鎌倉幕府" または "兄さんクイーン") | |
| """ | |
| import re | |
| # 全角・半角両対応 | |
| cleaned = re.sub(r'[((]\d+[))]', '', text) | |
| return cleaned | |
| def determine_slide_type(slide: dict) -> str: | |
| """ | |
| スライド種別を判定 | |
| Args: | |
| slide: スライドデータ辞書 | |
| Returns: | |
| str: "title" | "imageText_image_only" | "imageText_with_text" | "closing" | |
| """ | |
| slide_type = slide.get("type", "") | |
| if slide_type == "title": | |
| return "title" | |
| elif slide_type == "closing": | |
| return "closing" | |
| elif slide_type == "imageText": | |
| points = slide.get("points", []) | |
| if not points or len(points) == 0: | |
| return "imageText_image_only" | |
| else: | |
| return "imageText_with_text" | |
| else: | |
| return "unknown" | |
| def extract_audio_text(slide: dict) -> str: | |
| """ | |
| スライドから音声テキストを抽出 | |
| Args: | |
| slide: スライドデータ辞書 | |
| Returns: | |
| str: 読み上げるテキスト | |
| """ | |
| slide_type = determine_slide_type(slide) | |
| if slide_type == "title": | |
| # タイトルスライド: titleフィールドをそのまま | |
| return slide.get("title", "") | |
| elif slide_type == "imageText_image_only": | |
| # 画像のみスライド: 年号 + 語呂合わせ(数字除去)× 2回 | |
| subhead = slide.get("subhead", "") | |
| if ":" in subhead: | |
| year = subhead.split(":")[0] | |
| mnemonic = subhead.split(":")[1] | |
| else: | |
| year = "" | |
| mnemonic = subhead | |
| # (数字)パターンを除去 | |
| mnemonic_clean = clean_mnemonic(mnemonic) | |
| # 2回繰り返し | |
| audio_text = f"{year}年、{mnemonic_clean}。{year}年、{mnemonic_clean}。" | |
| return audio_text | |
| elif slide_type == "imageText_with_text": | |
| # 画像+テキストスライド: pointsを結合(1回のみ) | |
| points = slide.get("points", []) | |
| # 各pointの末尾の句点を除去してから結合 | |
| cleaned_points = [p.rstrip("。") for p in points] | |
| summary = "。".join(cleaned_points) | |
| # 最後に句点を追加 | |
| if summary and not summary.endswith("。"): | |
| summary += "。" | |
| return summary | |
| elif slide_type == "closing": | |
| # クロージングスライド: notesフィールドまたはデフォルトメッセージ | |
| return slide.get("notes", "本日の学習は以上です。復習を忘れずに。") | |
| else: | |
| return "" | |
| def extract_audio_text_v2(slide: dict, slide_index: int, history: list) -> str: | |
| """ | |
| スライドインデックスとhistoryから音声テキストを抽出 | |
| Args: | |
| slide: スライドデータ辞書 | |
| slide_index: 0-13のスライドインデックス | |
| history: 6件の元イベントデータ(year/mnemonic/summary) | |
| Returns: | |
| str: 読み上げるテキスト | |
| """ | |
| slide_type = determine_slide_type(slide) | |
| if slide_type == "title": | |
| title_text = slide.get("title", "") | |
| notes_text = slide.get("notes", "") | |
| # 「語呂羽丸五郎」を読み仮名に変換(正しく読まれないため) | |
| title_text = title_text.replace("語呂羽丸五郎", "ごろーまるごろう") | |
| notes_text = notes_text.replace("語呂羽丸五郎", "ごろーまるごろう") | |
| # titleとnotesを結合 | |
| if notes_text: | |
| return f"{title_text}。{notes_text}" | |
| else: | |
| return title_text | |
| elif slide_type == "closing": | |
| closing_text = slide.get("notes", "本日の学習は以上です。復習を忘れずに。") | |
| # 「語呂羽丸五郎」を読み仮名に変換(正しく読まれないため) | |
| closing_text = closing_text.replace("語呂羽丸五郎", "ごろーまるごろう") | |
| return closing_text | |
| elif slide_type == "imageText_image_only": | |
| # slide_index 1, 3, 5, 7, 9, 11 → history[0, 1, 2, 3, 4, 5] | |
| history_index = (slide_index - 1) // 2 | |
| event = history[history_index] | |
| year = str(event.get("year", "")) | |
| # 既に「年」が含まれている場合は除去(重複防止) | |
| year_clean = year.replace("年", "") | |
| mnemonic = clean_mnemonic(str(event.get("mnemonic", ""))) | |
| return f"{year_clean}年、{mnemonic}。{year_clean}年、{mnemonic}。" | |
| elif slide_type == "imageText_with_text": | |
| # slide_index 2, 4, 6, 8, 10, 12 → history[0, 1, 2, 3, 4, 5] | |
| history_index = (slide_index - 2) // 2 | |
| event = history[history_index] | |
| summary = str(event.get("summary", "")) | |
| return summary | |
| else: | |
| return "" | |
| def convert_pil_to_array(pil_image: Image.Image, target_size: tuple) -> np.ndarray: | |
| """ | |
| PIL ImageをNumPy配列に変換し、指定サイズにリサイズ | |
| Args: | |
| pil_image: PIL Image | |
| target_size: (width, height) - 例: (1280, 720) | |
| Returns: | |
| numpy array (RGB) | |
| """ | |
| # アスペクト比を保ってリサイズ | |
| pil_image = pil_image.resize(target_size, Image.Resampling.LANCZOS) | |
| # RGB変換 | |
| if pil_image.mode != 'RGB': | |
| pil_image = pil_image.convert('RGB') | |
| # numpy array変換 | |
| img_array = np.array(pil_image) | |
| return img_array | |
| # ============================== | |
| # V2.0: Gemini TTS音声生成 | |
| # ============================== | |
| def generate_audio_with_gemini(audio_text: str, gemini_token: str) -> bytes: | |
| """ | |
| Gemini REST APIでテキストから音声を生成 | |
| Args: | |
| audio_text: 読み上げるテキスト | |
| gemini_token: GEMINI_TOKEN環境変数 | |
| Returns: | |
| WAVバイナリデータ(24kHz PCM16) | |
| """ | |
| import base64 | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-pro-preview-tts:generateContent?key={gemini_token}" | |
| headers = { | |
| "Content-Type": "application/json" | |
| } | |
| # プロンプトベース制御(ドラマチック表現と正確な読み上げを重視) | |
| prompt_text = f"Read this Japanese historical text with dramatic emotional expression and accurate kanji pronunciation, bringing the story to life with vivid intonation: {audio_text}" | |
| payload = { | |
| "contents": [ | |
| { | |
| "role": "user", | |
| "parts": [ | |
| { | |
| "text": prompt_text | |
| } | |
| ] | |
| } | |
| ], | |
| "generationConfig": { | |
| "responseModalities": ["audio"], | |
| "speech_config": { | |
| "voice_config": { | |
| "prebuilt_voice_config": { | |
| "voice_name": "Charon" | |
| } | |
| } | |
| } | |
| } | |
| } | |
| logger.info(f"Gemini TTS API呼び出し: {len(audio_text)}文字") | |
| logger.info(f"Payload: {payload}") | |
| response = requests.post(url, json=payload, headers=headers, timeout=60) | |
| # エラーレスポンスの詳細をログ出力 | |
| if response.status_code != 200: | |
| logger.error(f"Gemini API Error: {response.status_code}") | |
| logger.error(f"Response: {response.text}") | |
| response.raise_for_status() | |
| # レスポンスからaudioデータを取得(base64デコード) | |
| response_data = response.json() | |
| audio_data_b64 = response_data["candidates"][0]["content"]["parts"][0]["inlineData"]["data"] | |
| pcm_bytes = base64.b64decode(audio_data_b64) | |
| logger.info(f"音声データ取得完了: {len(pcm_bytes)} bytes (PCM)") | |
| # PCM16をWAVファイルに変換 | |
| wav_bytes = convert_pcm_to_wav(pcm_bytes, sample_rate=24000, channels=1, sample_width=2) | |
| logger.info(f"WAV変換完了: {len(wav_bytes)} bytes") | |
| return wav_bytes | |
| def convert_pcm_to_wav(pcm_bytes: bytes, sample_rate: int, channels: int, sample_width: int) -> bytes: | |
| """ | |
| PCMバイナリをWAV形式に変換 | |
| Args: | |
| pcm_bytes: PCMバイナリデータ | |
| sample_rate: サンプルレート(Hz) | |
| channels: チャンネル数(1=モノラル、2=ステレオ) | |
| sample_width: サンプル幅(バイト、2=16bit) | |
| Returns: | |
| WAVバイナリデータ | |
| """ | |
| import wave | |
| import io | |
| wav_buffer = io.BytesIO() | |
| with wave.open(wav_buffer, 'wb') as wav_file: | |
| wav_file.setnchannels(channels) | |
| wav_file.setsampwidth(sample_width) | |
| wav_file.setframerate(sample_rate) | |
| wav_file.writeframes(pcm_bytes) | |
| wav_buffer.seek(0) | |
| return wav_buffer.read() | |
| def get_audio_duration(wav_bytes: bytes) -> float: | |
| """ | |
| WAVバイナリから音声の長さ(秒)を取得 | |
| Args: | |
| wav_bytes: WAVバイナリデータ | |
| Returns: | |
| float: 音声の長さ(秒) | |
| """ | |
| import wave | |
| import io | |
| wav_buffer = io.BytesIO(wav_bytes) | |
| with wave.open(wav_buffer, 'rb') as wav_file: | |
| frames = wav_file.getnframes() | |
| rate = wav_file.getframerate() | |
| duration = frames / float(rate) | |
| return duration | |
| def speed_up_audio(wav_bytes: bytes, speed_factor: float = 1.5) -> bytes: | |
| """ | |
| WAVファイルを指定倍速に変換 | |
| Args: | |
| wav_bytes: 元のWAVバイナリデータ | |
| speed_factor: 倍速係数(1.5 = 1.5倍速) | |
| Returns: | |
| 倍速変換後のWAVバイナリデータ | |
| """ | |
| import wave | |
| import io | |
| # WAV読み込み | |
| wav_buffer = io.BytesIO(wav_bytes) | |
| with wave.open(wav_buffer, 'rb') as wav_file: | |
| params = wav_file.getparams() | |
| frames = wav_file.readframes(wav_file.getnframes()) | |
| # PCMデータをnumpy配列に変換 | |
| audio_data = np.frombuffer(frames, dtype=np.int16) | |
| # リサンプリング(間引き処理) | |
| target_length = int(len(audio_data) / speed_factor) | |
| indices = np.linspace(0, len(audio_data) - 1, target_length).astype(int) | |
| resampled_data = audio_data[indices] | |
| # WAV形式で書き出し | |
| output_buffer = io.BytesIO() | |
| with wave.open(output_buffer, 'wb') as output_wav: | |
| output_wav.setparams(params) | |
| output_wav.writeframes(resampled_data.tobytes()) | |
| output_buffer.seek(0) | |
| return output_buffer.read() | |
| def save_audio_to_hf(wav_bytes: bytes, prefix: str = "slide_audio") -> str: | |
| """ | |
| 音声WAVファイルをHugging Faceデータセットにアップロード | |
| Args: | |
| wav_bytes: WAVバイナリデータ | |
| prefix: ファイル名のプレフィックス | |
| Returns: | |
| str: アップロードされた音声ファイルのURL | |
| """ | |
| # 一時ファイルに保存 | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: | |
| tmp_file.write(wav_bytes) | |
| tmp_path = tmp_file.name | |
| try: | |
| # HFアップロード | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| unique_id = str(uuid.uuid4())[:8] | |
| filename = f"{prefix}_{timestamp}_{unique_id}.wav" | |
| path_in_repo = f"audios/{filename}" | |
| logger.info(f"音声アップロード開始: {path_in_repo}") | |
| video_uploader.api.upload_file( | |
| path_or_fileobj=tmp_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=video_uploader.repo_id, | |
| repo_type="dataset" | |
| ) | |
| audio_url = f"https://huggingface.co/datasets/{video_uploader.repo_id}/resolve/main/{path_in_repo}" | |
| logger.info(f"音声アップロード完了: {audio_url}") | |
| return audio_url | |
| finally: | |
| # 一時ファイル削除 | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| # ============================== | |
| # V2.0: 音声付き動画生成 | |
| # ============================== | |
| def create_video_with_audio_from_slides( | |
| slide_data: list, | |
| gemini_token: str, | |
| progress_callback=None | |
| ) -> tuple: | |
| """ | |
| スライドデータから音声付き動画を生成 | |
| Args: | |
| slide_data: スライドデータJSON配列 | |
| gemini_token: GEMINI_TOKEN環境変数 | |
| progress_callback: 進捗コールバック関数(Gradio用) | |
| Returns: | |
| tuple: (video_url, page2_image_url, audio_info_list) | |
| """ | |
| audio_files = [] # 一時ファイル管理 | |
| clips = [] | |
| audio_info_list = [] | |
| video_path = None | |
| try: | |
| total_slides = len(slide_data) | |
| # 各スライドの音声生成と動画クリップ作成 | |
| for idx, slide in enumerate(slide_data): | |
| if progress_callback: | |
| progress_callback((idx / total_slides) * 0.6, desc=f"音声生成中 ({idx+1}/{total_slides})") | |
| logger.info(f"スライド {idx+1}/{total_slides} 処理中...") | |
| # 音声テキスト抽出 | |
| audio_text = extract_audio_text(slide) | |
| if not audio_text: | |
| logger.warning(f"スライド {idx+1}: 音声テキストが空です") | |
| continue | |
| # 音声生成 | |
| wav_bytes = generate_audio_with_gemini(audio_text, gemini_token) | |
| # 1.5倍速処理(50%アップ) | |
| wav_bytes = speed_up_audio(wav_bytes, speed_factor=1.5) | |
| # 音声長さ測定(倍速処理後) | |
| audio_duration = get_audio_duration(wav_bytes) | |
| # スライド再生時間計算(音声 + 0.6秒余白、前後0.3秒ずつ) | |
| slide_duration = audio_duration + 0.6 | |
| # 音声を一時ファイルに保存 | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio: | |
| tmp_audio.write(wav_bytes) | |
| audio_path = tmp_audio.name | |
| audio_files.append(audio_path) | |
| # HFアップロード | |
| slide_type = determine_slide_type(slide) | |
| audio_url = save_audio_to_hf(wav_bytes, prefix=f"slide_{idx:02d}_{slide_type}") | |
| # 音声情報記録 | |
| audio_info_list.append({ | |
| "slide_index": idx, | |
| "slide_type": slide_type, | |
| "audio_url": audio_url, | |
| "duration": audio_duration, | |
| "text": audio_text | |
| }) | |
| # 画像クリップ作成(仮の黒画像、実際はスライド画像を使用) | |
| # TODO: スライド画像生成または取得 | |
| img_array = np.zeros((720, 1280, 3), dtype=np.uint8) # 仮の黒画像(720p) | |
| # moviepyクリップ作成 | |
| img_clip = ImageClip(img_array, duration=slide_duration) | |
| audio_clip = AudioFileClip(audio_path) | |
| # 音声を動画に設定 | |
| video_clip = img_clip.set_audio(audio_clip) | |
| clips.append(video_clip) | |
| logger.info(f"スライド {idx+1}: 音声{audio_duration:.2f}秒, 再生時間{slide_duration:.2f}秒") | |
| if not clips: | |
| raise Exception("動画クリップが生成されませんでした") | |
| if progress_callback: | |
| progress_callback(0.7, desc="動画を結合中...") | |
| # 全クリップを連結 | |
| final_video = concatenate_videoclips(clips, method="compose") | |
| # 一時動画ファイルに出力 | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_video: | |
| video_path = tmp_video.name | |
| if progress_callback: | |
| progress_callback(0.8, desc="動画をエンコード中...") | |
| # 動画エンコード | |
| final_video.write_videofile( | |
| video_path, | |
| fps=30, | |
| codec='libx264', | |
| audio_codec='aac', | |
| logger=None # moviepyのログを抑制 | |
| ) | |
| # クリップをクローズ | |
| final_video.close() | |
| for clip in clips: | |
| clip.close() | |
| if progress_callback: | |
| progress_callback(0.9, desc="動画をアップロード中...") | |
| # HFアップロード | |
| video_url = video_uploader.upload_video(video_path, prefix="slidedata_video") | |
| # 2ページ目画像抽出・アップロード(TODO: 実装) | |
| page2_image_url = None | |
| if progress_callback: | |
| progress_callback(1.0, desc="完了!") | |
| logger.info(f"動画生成完了: {video_url}") | |
| return (video_url, page2_image_url, audio_info_list) | |
| finally: | |
| # 一時ファイルクリーンアップ | |
| for audio_file in audio_files: | |
| if os.path.exists(audio_file): | |
| try: | |
| os.remove(audio_file) | |
| except Exception as e: | |
| logger.warning(f"音声ファイル削除エラー: {e}") | |
| if video_path and os.path.exists(video_path): | |
| try: | |
| os.remove(video_path) | |
| except Exception as e: | |
| logger.warning(f"動画ファイル削除エラー: {e}") | |
| def create_video_with_audio_from_slides_v2( | |
| slide_data: Union[str, list], | |
| pdf_url: str, | |
| history: Union[str, list], | |
| gemini_token: str, | |
| progress_callback=None | |
| ) -> tuple: | |
| """ | |
| PDF画像とslideData/historyから音声付き動画を生成(V2.2版 - 全パラメータJSON文字列対応) | |
| Args: | |
| slide_data: 14枚のスライドデータ - JSON文字列 | |
| pdf_url: GASが生成したPDF URL | |
| history: 6件の元イベントデータ(year/mnemonic/summary)- JSON文字列 | |
| gemini_token: GEMINI_TOKEN環境変数 | |
| progress_callback: 進捗コールバック関数(Gradio用) | |
| Returns: | |
| tuple: (video_url, page2_image_url, audio_info_list) | |
| """ | |
| import json | |
| pdf_path = None | |
| audio_files = [] | |
| clips = [] | |
| audio_info_list = [] | |
| video_path = None | |
| try: | |
| # slide_dataをJSON文字列からlistにパース | |
| if isinstance(slide_data, str): | |
| slide_data_list = json.loads(slide_data) | |
| else: | |
| slide_data_list = slide_data # 既にlistの場合(後方互換性) | |
| # historyをJSON文字列からlistにパース | |
| if isinstance(history, str): | |
| history_list = json.loads(history) | |
| else: | |
| history_list = history # 既にlistの場合(後方互換性) | |
| logger.info(f"history parsed: {len(history_list)}件のイベント") | |
| # 1. PDFダウンロード | |
| if progress_callback: | |
| progress_callback(0.05, desc="PDFダウンロード中...") | |
| pdf_path = download_pdf_from_url(sanitize_url(pdf_url)) | |
| # 2. PDF → 画像変換(14ページ → 14枚) | |
| if progress_callback: | |
| progress_callback(0.1, desc="PDF→画像変換中...") | |
| images = convert_pdf_to_images(pdf_path, dpi=150) | |
| # 画像枚数とスライドデータの整合性チェック | |
| if len(images) != len(slide_data_list): | |
| raise Exception(f"画像枚数とスライドデータが不一致: {len(images)}枚 vs {len(slide_data_list)}枚") | |
| total_slides = len(slide_data_list) | |
| logger.info(f"PDF変換完了: {total_slides}枚の画像を取得") | |
| # 3. 各スライドの音声生成と動画クリップ作成 | |
| for idx, (slide, pil_image) in enumerate(zip(slide_data_list, images)): | |
| if progress_callback: | |
| progress_callback(0.1 + (idx / total_slides) * 0.5, desc=f"音声生成中 ({idx+1}/{total_slides})") | |
| logger.info(f"スライド {idx+1}/{total_slides} 処理中...") | |
| # 音声テキスト抽出(history_listを使用) | |
| audio_text = extract_audio_text_v2(slide, idx, history_list) | |
| if not audio_text: | |
| logger.warning(f"スライド {idx+1}: 音声テキストが空です") | |
| continue | |
| # 音声生成 | |
| wav_bytes = generate_audio_with_gemini(audio_text, gemini_token) | |
| # 1.5倍速処理(50%アップ) | |
| wav_bytes = speed_up_audio(wav_bytes, speed_factor=1.5) | |
| # 音声長さ測定(倍速処理後) | |
| audio_duration = get_audio_duration(wav_bytes) | |
| # スライド再生時間計算(音声 + 0.6秒余白) | |
| slide_duration = audio_duration + 0.6 | |
| # 音声を一時ファイルに保存 | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio: | |
| tmp_audio.write(wav_bytes) | |
| audio_path = tmp_audio.name | |
| audio_files.append(audio_path) | |
| # HFアップロード | |
| slide_type = determine_slide_type(slide) | |
| audio_url = save_audio_to_hf(wav_bytes, prefix=f"slide_{idx:02d}_{slide_type}") | |
| # 音声情報記録 | |
| audio_info_list.append({ | |
| "slide_index": idx, | |
| "slide_type": slide_type, | |
| "audio_url": audio_url, | |
| "duration": audio_duration, | |
| "text": audio_text | |
| }) | |
| # PIL Image → NumPy配列(720p) | |
| img_array = convert_pil_to_array(pil_image, target_size=(1280, 720)) | |
| # moviepyクリップ作成 | |
| img_clip = ImageClip(img_array, duration=slide_duration) | |
| audio_clip = AudioFileClip(audio_path) | |
| # 音声を動画に設定 | |
| video_clip = img_clip.set_audio(audio_clip) | |
| clips.append(video_clip) | |
| logger.info(f"✅ スライド {idx+1}/{total_slides}: クリップ作成完了(音声{audio_duration:.2f}秒 + 余白0.6秒 = {slide_duration:.2f}秒)") | |
| if not clips: | |
| raise Exception("動画クリップが生成されませんでした") | |
| # 総動画時間計算 | |
| total_video_duration = sum([clip.duration for clip in clips]) | |
| logger.info(f"📹 全{len(clips)}クリップ作成完了(総再生時間: {total_video_duration:.2f}秒)") | |
| if progress_callback: | |
| progress_callback(0.7, desc="動画を結合中...") | |
| # 4. 全クリップを連結 | |
| logger.info(f"🔗 動画結合開始: {len(clips)}個のクリップを連結中...") | |
| final_video = concatenate_videoclips(clips, method="compose") | |
| logger.info(f"✅ 動画結合完了") | |
| # 一時動画ファイルに出力 | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_video: | |
| video_path = tmp_video.name | |
| if progress_callback: | |
| progress_callback(0.8, desc="動画をエンコード中...") | |
| # 5. 動画エンコード | |
| logger.info(f"🎬 動画エンコード開始: {total_video_duration:.2f}秒の動画をH.264/AACでエンコード中...") | |
| final_video.write_videofile( | |
| video_path, | |
| fps=30, | |
| codec='libx264', | |
| audio_codec='aac', | |
| logger=None # moviepyのログを抑制 | |
| ) | |
| logger.info(f"✅ 動画エンコード完了: {video_path}") | |
| # クリップをクローズ | |
| final_video.close() | |
| for clip in clips: | |
| clip.close() | |
| logger.info(f"🧹 メモリ解放完了({len(clips)}クリップクローズ)") | |
| if progress_callback: | |
| progress_callback(0.9, desc="動画をアップロード中...") | |
| # 6. HFアップロード | |
| video_file_size = os.path.getsize(video_path) / (1024 * 1024) # MB | |
| logger.info(f"☁️ 動画アップロード開始: {video_file_size:.2f}MB → Hugging Face Dataset...") | |
| video_url = video_uploader.upload_video(video_path, prefix="slidedata_video_v2") | |
| logger.info(f"✅ 動画アップロード完了: {video_url}") | |
| # 7. 2ページ目画像アップロード | |
| page2_image_url = None | |
| if len(images) >= 2: | |
| logger.info(f"🖼️ 2ページ目画像アップロード開始...") | |
| page2_image = images[1] | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img: | |
| page2_image_path = tmp_img.name | |
| page2_image.save(page2_image_path, format='JPEG', quality=90) | |
| page2_image_url = video_uploader.upload_image(page2_image_path, prefix="slidedata_page2") | |
| logger.info(f"✅ 2ページ目画像アップロード完了: {page2_image_url}") | |
| # 一時ファイル削除 | |
| if os.path.exists(page2_image_path): | |
| os.remove(page2_image_path) | |
| if progress_callback: | |
| progress_callback(1.0, desc="完了!") | |
| logger.info(f"動画生成完了: {video_url}") | |
| return (video_url, page2_image_url, audio_info_list) | |
| finally: | |
| # 一時ファイルクリーンアップ | |
| if pdf_path and os.path.exists(pdf_path): | |
| try: | |
| os.remove(pdf_path) | |
| except Exception as e: | |
| logger.warning(f"PDFファイル削除エラー: {e}") | |
| for audio_file in audio_files: | |
| if os.path.exists(audio_file): | |
| try: | |
| os.remove(audio_file) | |
| except Exception as e: | |
| logger.warning(f"音声ファイル削除エラー: {e}") | |
| if video_path and os.path.exists(video_path): | |
| try: | |
| os.remove(video_path) | |
| except Exception as e: | |
| logger.warning(f"動画ファイル削除エラー: {e}") | |
| # ============================== | |
| # コア機能実装 | |
| # ============================== | |
| def download_pdf_from_url(pdf_url: str) -> str: | |
| """ | |
| 指定されたURLからPDFをダウンロードして一時ファイルとして保存 | |
| Args: | |
| pdf_url: PDFファイルのURL | |
| Returns: | |
| str: ダウンロードされたPDFファイルのパス | |
| Raises: | |
| Exception: ダウンロード失敗時 | |
| """ | |
| try: | |
| # URLをサニタイズ(バックスラッシュ等を除去) | |
| pdf_url = sanitize_url(pdf_url) | |
| logger.info(f"PDFダウンロード開始: {pdf_url}") | |
| # HTTPリクエスト | |
| response = requests.get(pdf_url, timeout=30, stream=True) | |
| response.raise_for_status() | |
| # Content-Typeの検証 | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'pdf' not in content_type.lower(): | |
| logger.warning(f"Content-Type が PDF ではありません: {content_type}") | |
| # 一時ファイルに保存 | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp_file: | |
| tmp_path = tmp_file.name | |
| for chunk in response.iter_content(chunk_size=8192): | |
| tmp_file.write(chunk) | |
| logger.info(f"PDFダウンロード完了: {tmp_path} ({os.path.getsize(tmp_path)} bytes)") | |
| return tmp_path | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"PDFダウンロードエラー: {e}") | |
| raise Exception(f"PDFのダウンロードに失敗しました: {e}") | |
| def convert_pdf_to_images(pdf_path: str, dpi: int = 150) -> list: | |
| """ | |
| PDFファイルを画像リストに変換 | |
| Args: | |
| pdf_path: PDFファイルのパス | |
| dpi: 解像度(デフォルト150) | |
| Returns: | |
| list: PIL.Imageオブジェクトのリスト | |
| """ | |
| try: | |
| logger.info(f"PDF→画像変換開始: {pdf_path}, DPI={dpi}") | |
| # PDFを画像に変換 | |
| images = convert_from_path( | |
| pdf_path, | |
| dpi=dpi, | |
| fmt='jpeg', # JPEG形式で出力 | |
| thread_count=2 # 並列処理スレッド数 | |
| ) | |
| logger.info(f"PDF変換完了: {len(images)}ページ") | |
| return images | |
| except Exception as e: | |
| logger.error(f"PDF変換エラー: {e}") | |
| raise Exception(f"PDFの画像変換に失敗しました: {e}") | |
| def create_video_from_images( | |
| images: list, | |
| duration_per_page: int = 5, | |
| fps: int = 30 | |
| ) -> str: | |
| """ | |
| 画像リストからスライドショー動画を生成 | |
| Args: | |
| images: PIL.Imageオブジェクトのリスト | |
| duration_per_page: 1ページあたりの表示秒数(デフォルト5秒) | |
| fps: フレームレート(デフォルト30fps) | |
| Returns: | |
| str: 生成された動画ファイルのパス | |
| """ | |
| try: | |
| if not images: | |
| raise ValueError("画像リストが空です") | |
| logger.info(f"動画生成開始: {len(images)}ページ, {duration_per_page}秒/ページ, {fps}fps") | |
| # 全画像を同じサイズにリサイズ(最初の画像のサイズに統一) | |
| first_img = images[0] | |
| width, height = first_img.size | |
| logger.info(f"動画サイズ: {width}x{height}") | |
| # 一時ファイルパス | |
| tmp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| video_path = tmp_video.name | |
| tmp_video.close() | |
| # 動画ライター初期化 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| video_writer = cv2.VideoWriter( | |
| video_path, | |
| fourcc, | |
| fps, | |
| (width, height) | |
| ) | |
| # 各画像を指定秒数分のフレームとして追加 | |
| frames_per_page = duration_per_page * fps | |
| for idx, img in enumerate(images): | |
| logger.info(f"ページ {idx+1}/{len(images)} を処理中...") | |
| # 画像をリサイズ(必要な場合) | |
| if img.size != (width, height): | |
| img = img.resize((width, height), Image.Resampling.LANCZOS) | |
| # PIL Image → OpenCV形式に変換(RGB→BGR) | |
| img_array = np.array(img) | |
| if len(img_array.shape) == 3 and img_array.shape[2] == 3: | |
| img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) | |
| else: | |
| img_bgr = img_array | |
| # 同じフレームを複数回書き込み(静止画として表示) | |
| for _ in range(frames_per_page): | |
| video_writer.write(img_bgr) | |
| video_writer.release() | |
| logger.info(f"動画生成完了: {video_path} ({os.path.getsize(video_path)} bytes)") | |
| return video_path | |
| except Exception as e: | |
| logger.error(f"動画生成エラー: {e}") | |
| raise Exception(f"動画の生成に失敗しました: {e}") | |
| # ============================== | |
| # Hugging Face アップロード | |
| # ============================== | |
| class VideoUploader: | |
| """Hugging Face Datasetへの動画アップロード機能""" | |
| def __init__(self): | |
| self.repo_id = os.environ.get("HF_REPO_ID", "tomo2chin2/video-storage") | |
| self.token = os.environ.get("HF_TOKEN") | |
| if not self.token: | |
| logger.warning("HF_TOKEN 環境変数が設定されていません") | |
| self.api = None | |
| return | |
| try: | |
| # ログイン | |
| login(token=self.token) | |
| self.api = HfApi() | |
| logger.info(f"HuggingFace にログイン完了: {self.repo_id}") | |
| except Exception as e: | |
| logger.error(f"HuggingFace ログインエラー: {e}") | |
| self.api = None | |
| def upload_video(self, video_path: str, prefix: str = "video") -> str: | |
| """ | |
| 動画をHugging Faceデータセットにアップロード | |
| Args: | |
| video_path: アップロードする動画ファイルのパス | |
| prefix: ファイル名のプレフィックス | |
| Returns: | |
| str: アップロードされた動画のURL | |
| """ | |
| if not self.api: | |
| raise Exception("HuggingFace API が初期化されていません。HF_TOKEN を確認してください。") | |
| try: | |
| # ユニークなファイル名を生成 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| unique_id = str(uuid.uuid4())[:8] | |
| filename = f"{prefix}_{timestamp}_{unique_id}.mp4" | |
| path_in_repo = f"videos/{filename}" | |
| logger.info(f"動画アップロード開始: {path_in_repo}") | |
| # アップロード実行 | |
| upload_info = self.api.upload_file( | |
| path_or_fileobj=video_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=self.repo_id, | |
| repo_type="dataset" | |
| ) | |
| # URLを構築 | |
| video_url = f"https://huggingface.co/datasets/{self.repo_id}/resolve/main/{path_in_repo}" | |
| logger.info(f"動画アップロード完了: {video_url}") | |
| return video_url | |
| except Exception as e: | |
| logger.error(f"動画アップロードエラー: {e}") | |
| raise Exception(f"動画のアップロードに失敗しました: {e}") | |
| def upload_image(self, image_path: str, prefix: str = "image") -> str: | |
| """ | |
| 画像をHugging Faceデータセットにアップロード | |
| Args: | |
| image_path: アップロードする画像ファイルのパス | |
| prefix: ファイル名のプレフィックス | |
| Returns: | |
| str: アップロードされた画像のURL | |
| """ | |
| if not self.api: | |
| raise Exception("HuggingFace API が初期化されていません。HF_TOKEN を確認してください。") | |
| try: | |
| # ユニークなファイル名を生成 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| unique_id = str(uuid.uuid4())[:8] | |
| # ファイルの拡張子を取得 | |
| ext = Path(image_path).suffix | |
| filename = f"{prefix}_{timestamp}_{unique_id}{ext}" | |
| path_in_repo = f"images/{filename}" | |
| logger.info(f"画像アップロード開始: {path_in_repo}") | |
| # アップロード実行 | |
| upload_info = self.api.upload_file( | |
| path_or_fileobj=image_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=self.repo_id, | |
| repo_type="dataset" | |
| ) | |
| # URLを構築 | |
| image_url = f"https://huggingface.co/datasets/{self.repo_id}/resolve/main/{path_in_repo}" | |
| logger.info(f"画像アップロード完了: {image_url}") | |
| return image_url | |
| except Exception as e: | |
| logger.error(f"画像アップロードエラー: {e}") | |
| raise Exception(f"画像のアップロードに失敗しました: {e}") | |
| # グローバルなアップローダーインスタンスを作成 | |
| video_uploader = VideoUploader() | |
| # ============================== | |
| # FastAPI アプリケーション | |
| # ============================== | |
| app = FastAPI( | |
| title="PDF to Video API", | |
| description="PDFをスライドショー動画に変換するAPI", | |
| version="1.0.0" | |
| ) | |
| # CORS設定 | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def pdf_to_video(request: PdfToVideoRequest): | |
| """PDF→動画変換APIエンドポイント""" | |
| pdf_path = None | |
| video_path = None | |
| page2_image_path = None | |
| try: | |
| logger.info(f"API リクエスト受信: {request.pdf_url}") | |
| # 1. PDFダウンロード | |
| pdf_path = download_pdf_from_url(str(request.pdf_url)) | |
| # 2. PDF→画像変換 | |
| images = convert_pdf_to_images(pdf_path, dpi=request.dpi) | |
| total_pages = len(images) | |
| # 3. 動画生成 | |
| video_path = create_video_from_images( | |
| images, | |
| duration_per_page=request.duration_per_page | |
| ) | |
| # 4. Hugging Faceにアップロード | |
| video_url = video_uploader.upload_video(video_path, prefix="pdf_video") | |
| # 5. 2ページ目の画像を保存してアップロード | |
| page2_image_url = None | |
| if total_pages >= 2: | |
| # 2ページ目の画像(インデックス1)を一時ファイルとして保存 | |
| page2_image = images[1] | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img: | |
| page2_image_path = tmp_img.name | |
| page2_image.save(page2_image_path, format='JPEG', quality=90) | |
| logger.info(f"2ページ目の画像を保存: {page2_image_path}") | |
| # Hugging Faceにアップロード | |
| page2_image_url = video_uploader.upload_image(page2_image_path, prefix="pdf_page2") | |
| logger.info(f"2ページ目の画像アップロード完了: {page2_image_url}") | |
| # 動画の総再生時間を計算 | |
| video_duration = total_pages * request.duration_per_page | |
| logger.info(f"処理完了: 動画={video_url}, 2ページ目画像={page2_image_url}") | |
| return VideoResponse( | |
| status="success", | |
| video_url=video_url, | |
| page2_image_url=page2_image_url, | |
| message="動画の生成とアップロードに成功しました", | |
| total_pages=total_pages, | |
| video_duration=video_duration | |
| ) | |
| except Exception as e: | |
| logger.error(f"エラー発生: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"動画生成に失敗しました: {str(e)}" | |
| ) | |
| finally: | |
| # 一時ファイルのクリーンアップ | |
| if pdf_path and os.path.exists(pdf_path): | |
| try: | |
| os.remove(pdf_path) | |
| logger.info(f"一時PDFファイル削除: {pdf_path}") | |
| except Exception as e: | |
| logger.warning(f"PDFファイル削除エラー: {e}") | |
| if video_path and os.path.exists(video_path): | |
| try: | |
| os.remove(video_path) | |
| logger.info(f"一時動画ファイル削除: {video_path}") | |
| except Exception as e: | |
| logger.warning(f"動画ファイル削除エラー: {e}") | |
| if page2_image_path and os.path.exists(page2_image_path): | |
| try: | |
| os.remove(page2_image_path) | |
| logger.info(f"一時画像ファイル削除: {page2_image_path}") | |
| except Exception as e: | |
| logger.warning(f"画像ファイル削除エラー: {e}") | |
| async def slidedata_to_video(request: SlideDataToVideoRequest): | |
| """スライドデータ→音声付き動画変換APIエンドポイント""" | |
| # GEMINI_TOKEN取得 | |
| gemini_token = os.environ.get("GEMINI_TOKEN") | |
| if not gemini_token: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="GEMINI_TOKEN環境変数が設定されていません" | |
| ) | |
| try: | |
| logger.info(f"API リクエスト受信(V2.3 - Dify list型対応)") | |
| logger.info(f" slide_data type: {type(request.slide_data).__name__}") | |
| logger.info(f" history type: {type(request.history).__name__}") | |
| logger.info(f" pdf_url: {request.pdf_url[:50]}...") | |
| # 動画生成(V2.3完全版) | |
| video_url, page2_image_url, audio_info_list = create_video_with_audio_from_slides_v2( | |
| slide_data=request.slide_data, | |
| pdf_url=request.pdf_url, | |
| history=request.history, | |
| gemini_token=gemini_token | |
| ) | |
| # 総再生時間計算 | |
| total_duration = sum([info["duration"] + 0.6 for info in audio_info_list]) | |
| logger.info(f"処理完了: 動画={video_url}") | |
| return AudioVideoResponse( | |
| status="success", | |
| video_url=video_url, | |
| page2_image_url=page2_image_url, | |
| audio_urls=audio_info_list, | |
| message="音声付き動画の生成とアップロードに成功しました", | |
| total_slides=len(audio_info_list), | |
| video_duration=total_duration | |
| ) | |
| except Exception as e: | |
| logger.error(f"エラー発生: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"動画生成に失敗しました: {str(e)}" | |
| ) | |
| async def health_check(): | |
| """ヘルスチェックエンドポイント""" | |
| return { | |
| "status": "healthy", | |
| "service": "PDF to Video API", | |
| "hf_configured": video_uploader.api is not None | |
| } | |
| # ============================== | |
| # Gradio UI | |
| # ============================== | |
| def process_pdf_url(pdf_url, duration_per_page, dpi, progress=gr.Progress()): | |
| """Gradio UIからの処理関数""" | |
| try: | |
| if not pdf_url: | |
| return None, None, "❌ PDF URLを入力してください", None, None | |
| # URLをサニタイズ | |
| pdf_url = sanitize_url(pdf_url) | |
| progress(0, desc="PDFダウンロード中...") | |
| # PDFダウンロード | |
| pdf_path = download_pdf_from_url(pdf_url) | |
| progress(0.3, desc="PDF→画像変換中...") | |
| # PDF→画像変換 | |
| images = convert_pdf_to_images(pdf_path, dpi=dpi) | |
| total_pages = len(images) | |
| progress(0.6, desc=f"動画生成中({total_pages}ページ)...") | |
| # 動画生成 | |
| video_path = create_video_from_images( | |
| images, | |
| duration_per_page=duration_per_page | |
| ) | |
| progress(0.8, desc="Hugging Faceにアップロード中...") | |
| # 動画アップロード | |
| video_url = video_uploader.upload_video(video_path, prefix="pdf_video") | |
| # 2ページ目の画像を保存してアップロード | |
| page2_image_url = None | |
| page2_image_pil = None | |
| if total_pages >= 2: | |
| progress(0.9, desc="2ページ目の画像をアップロード中...") | |
| page2_image = images[1] | |
| # 一時ファイルに保存してアップロード | |
| with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img: | |
| page2_image_path = tmp_img.name | |
| page2_image.save(page2_image_path, format='JPEG', quality=90) | |
| page2_image_url = video_uploader.upload_image(page2_image_path, prefix="pdf_page2") | |
| # PIL.Imageオブジェクトを直接Gradioに渡す | |
| page2_image_pil = page2_image | |
| # 一時ファイルを即座に削除(PIL経由で表示するため不要) | |
| if os.path.exists(page2_image_path): | |
| os.remove(page2_image_path) | |
| # クリーンアップ | |
| if pdf_path and os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| video_duration = total_pages * duration_per_page | |
| progress(1.0, desc="完了!") | |
| message = f"✅ 成功: {total_pages}ページ、{video_duration}秒の動画を生成しました" | |
| if page2_image_url: | |
| message += "\n📸 2ページ目の画像も保存しました" | |
| return ( | |
| video_url, # ビデオURL | |
| page2_image_url, # 2ページ目の画像URL | |
| message, # ステータスメッセージ | |
| page2_image_pil # 画像プレビュー(PIL.Image) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Gradio処理エラー: {e}", exc_info=True) | |
| return None, None, f"❌ エラー: {str(e)}", None | |
| # Gradio UI定義 | |
| with gr.Blocks(title="PDF to Video Converter", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 📄 PDF → 🎬 動画変換") | |
| gr.Markdown("PDFのURLを指定すると、各ページをスライドショー動画に変換します。") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| pdf_url_input = gr.Textbox( | |
| label="PDF URL", | |
| placeholder="https://example.com/sample.pdf", | |
| info="変換したいPDFファイルのURLを入力してください" | |
| ) | |
| with gr.Row(): | |
| duration_slider = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| step=1, | |
| value=5, | |
| label="1ページあたりの表示秒数" | |
| ) | |
| dpi_slider = gr.Slider( | |
| minimum=72, | |
| maximum=300, | |
| step=1, | |
| value=150, | |
| label="画像解像度(DPI)", | |
| info="高いほど高画質ですが処理時間が増加します" | |
| ) | |
| convert_btn = gr.Button("🎬 動画生成", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| status_output = gr.Textbox( | |
| label="ステータス", | |
| interactive=False | |
| ) | |
| video_url_output = gr.Textbox( | |
| label="動画URL", | |
| interactive=False, | |
| info="生成された動画のURL" | |
| ) | |
| page2_image_url_output = gr.Textbox( | |
| label="2ページ目画像URL", | |
| interactive=False, | |
| info="2ページ目の画像URL" | |
| ) | |
| with gr.Row(): | |
| page2_image_preview = gr.Image( | |
| label="2ページ目プレビュー", | |
| interactive=False, | |
| type="pil" | |
| ) | |
| # イベント設定 | |
| convert_btn.click( | |
| fn=process_pdf_url, | |
| inputs=[pdf_url_input, duration_slider, dpi_slider], | |
| outputs=[video_url_output, page2_image_url_output, status_output, page2_image_preview] | |
| ) | |
| # 使用例とサンプルURL | |
| gr.Markdown(""" | |
| ## 📖 使用方法 | |
| 1. PDFのURLを入力 | |
| 2. 1ページあたりの表示秒数を調整(デフォルト5秒) | |
| 3. 解像度(DPI)を調整(デフォルト150) | |
| 4. 「動画生成」ボタンをクリック | |
| 5. 生成された動画URLをコピーして利用 | |
| ## 🔗 サンプルPDF URL(テスト用) | |
| - W3C サンプル: `https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf` | |
| ## ⚙️ 環境変数 | |
| - `HF_TOKEN`: Hugging Face認証トークン(必須) | |
| - `HF_REPO_ID`: データセットリポジトリID(デフォルト: tomo2chin2/video-storage) | |
| ## 🔗 APIエンドポイント | |
| - `POST /api/pdf-to-video`: PDF→動画変換API | |
| - `GET /health`: ヘルスチェック | |
| """) | |
| # 環境変数情報表示 | |
| hf_repo = os.environ.get("HF_REPO_ID", "tomo2chin2/video-storage") | |
| hf_configured = "✅ 設定済み" if video_uploader.api else "❌ 未設定" | |
| gr.Markdown(f""" | |
| ## 📊 現在の設定 | |
| - HuggingFace リポジトリ: `{hf_repo}` | |
| - HF_TOKEN: {hf_configured} | |
| """) | |
| # FastAPIにGradioをマウント | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| # ============================== | |
| # アプリケーション起動 | |
| # ============================== | |
| if __name__ == "__main__": | |
| import uvicorn | |
| logger.info("Starting PDF to Video Converter API...") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |