| import re |
| import os |
| import json |
| import time |
| import struct |
| import asyncio |
| import logging |
| import mimetypes |
| from datetime import datetime |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from typing import List, Tuple, Optional, Dict |
|
|
| import openai |
| from google import genai |
| from google.genai import types |
| from pydantic import BaseModel |
| from pydub import AudioSegment |
|
|
| from core.config import settings |
| from core.prompts import SYSTEM_PROMPT, ANALYSIS_PROMPT |
| from services.s3_service import s3_service |
| from core import constants |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class AnalysisOutput(BaseModel): |
| program_structure: str |
| script: str |
|
|
| class MultiProposalOutput(BaseModel): |
| proposals: List[AnalysisOutput] |
|
|
| |
| VOICE_CHOICES = {v["value"]: v["value"] for v in constants.PODCAST_VOICES} |
|
|
| BGM_CHOICES = { |
| "No BGM": None, |
| "BGM 1": "assets/bgm/BGM_1.mp3", |
| "BGM 2": "assets/bgm/BGM_2.mp3", |
| "BGM 3": "assets/bgm/BGM_3.mp3" |
| } |
|
|
| class PodcastService: |
| def __init__(self): |
| self.openai_client = openai.OpenAI(api_key=settings.OPENAI_API_KEY) |
| self.genai_client = genai.Client(api_key=settings.GEMINI_API_KEY) |
|
|
| def compute_script_targets(self, duration_minutes: int) -> int: |
| if duration_minutes <= 5: return 2000 |
| elif duration_minutes <= 10: return 3000 |
| elif duration_minutes <= 15: return 4000 |
| else: return 5000 |
|
|
| async def analyze_pdf(self, file_key: str, duration_minutes: int, model: str = "gpt-4o", progress_callback=None): |
| |
| |
| temp_path = f"temp_{int(time.time())}.pdf" |
| try: |
| import boto3 |
| |
| |
| def download_from_s3(): |
| s3 = boto3.client('s3', |
| aws_access_key_id=settings.AWS_ACCESS_KEY_ID, |
| aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, |
| region_name=settings.AWS_REGION) |
| s3.download_file(settings.AWS_S3_BUCKET, file_key, temp_path) |
| |
| await asyncio.to_thread(download_from_s3) |
|
|
| |
| def upload_to_openai(): |
| with open(temp_path, "rb") as f: |
| return self.openai_client.files.create(file=f, purpose="assistants") |
| |
| file_response = await asyncio.to_thread(upload_to_openai) |
| |
| |
| formatted_prompt = ANALYSIS_PROMPT.format(duration_minutes=duration_minutes) |
| |
| response = await asyncio.to_thread( |
| self.openai_client.chat.completions.parse, |
| model=model, |
| messages=[ |
| {"role": "system", "content": formatted_prompt}, |
| {"role": "user", "content": [{"type": "file", "file": {"file_id": file_response.id}}]} |
| ], |
| temperature=1.0, |
| response_format=MultiProposalOutput |
| ) |
| return response.choices[0].message.content |
| finally: |
| if os.path.exists(temp_path): |
| await asyncio.to_thread(os.remove, temp_path) |
|
|
|
|
| async def generate_script(self, user_prompt: str, model: str, duration_minutes: int, |
| podcast_format: str, pdf_suggestions: str, file_key: Optional[str] = None, progress_callback=None): |
| target_words = self.compute_script_targets(duration_minutes) |
| formatted_system = SYSTEM_PROMPT.format( |
| target_words=target_words, |
| podcast_format=podcast_format, |
| pdf_suggestions=pdf_suggestions |
| ) |
|
|
| messages = [{"role": "system", "content": formatted_system}] |
| |
| temp_path = None |
| if file_key: |
| temp_path = f"temp_gen_{int(time.time())}.pdf" |
| |
| |
| def download_from_s3(): |
| import boto3 |
| s3 = boto3.client('s3', |
| aws_access_key_id=settings.AWS_ACCESS_KEY_ID, |
| aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, |
| region_name=settings.AWS_REGION) |
| s3.download_file(settings.AWS_S3_BUCKET, file_key, temp_path) |
| |
| await asyncio.to_thread(download_from_s3) |
| |
| |
| def upload_to_openai(): |
| with open(temp_path, "rb") as f: |
| return self.openai_client.files.create(file=f, purpose="assistants") |
| |
| file_response = await asyncio.to_thread(upload_to_openai) |
| |
| messages.append({ |
| "role": "user", |
| "content": [ |
| {"type": "file", "file": {"file_id": file_response.id}}, |
| {"type": "text", "text": user_prompt} |
| ] |
| }) |
| else: |
| messages.append({"role": "user", "content": user_prompt}) |
|
|
| try: |
| |
| response = await asyncio.to_thread( |
| self.openai_client.chat.completions.create, |
| model=model, |
| messages=messages, |
| temperature=1.0, |
| max_completion_tokens=100000 |
| ) |
| return response.choices[0].message.content |
| finally: |
| if temp_path and os.path.exists(temp_path): |
| await asyncio.to_thread(os.remove, temp_path) |
|
|
|
|
| def parse_script(self, script: str) -> List[Tuple[str, str]]: |
| dialogs = [] |
| |
| |
| pattern_en = re.compile(r"^\*?\*?(Speaker [12])\*?\*?[::]\s*(.*)$", re.MULTILINE) |
| matches = list(pattern_en.finditer(script)) |
| |
| if matches: |
| print(f"[DEBUG] Found {len(matches)} English patterns") |
| for match in matches: |
| speaker, text = match.groups() |
| dialogs.append((speaker, text)) |
| else: |
| |
| pattern_jp = re.compile(r"^\*?\*?(スピーカー[12])\*?\*?[::]\s*(.*)$", re.MULTILINE) |
| matches = list(pattern_jp.finditer(script)) |
| |
| if matches: |
| print(f"[DEBUG] Found {len(matches)} Japanese patterns") |
| for match in matches: |
| speaker_jp, text = match.groups() |
| speaker_num = "1" if "1" in speaker_jp else "2" |
| speaker = f"Speaker {speaker_num}" |
| dialogs.append((speaker, text)) |
| else: |
| print(f"[ERROR] No patterns found!") |
| print(f"[DEBUG] Preview: {script[:300]}") |
| |
| return dialogs |
|
|
| def split_script(self, dialogs: List[Tuple[str, str]], chunk_size=20) -> List[str]: |
| chunks = [] |
| for i in range(0, len(dialogs), chunk_size): |
| chunk = dialogs[i:i + chunk_size] |
| chunks.append("\n".join([f"{s}: {t}" for s, t in chunk])) |
| return chunks |
|
|
| async def generate_audio_chunk(self, chunk_script: str, tts_model: str, spk1_voice: str, |
| spk2_voice: str, temperature: float, index: int) -> Optional[str]: |
| try: |
| print(f"[DEBUG] Chunk {index}: Starting generation") |
| print(f"[DEBUG] Chunk {index}: Script length: {len(chunk_script)} chars") |
| |
| contents = [types.Content(role="user", parts=[types.Part.from_text(text=chunk_script)])] |
| config = types.GenerateContentConfig( |
| temperature=temperature, |
| response_modalities=["audio"], |
| speech_config=types.SpeechConfig( |
| multi_speaker_voice_config=types.MultiSpeakerVoiceConfig( |
| speaker_voice_configs=[ |
| types.SpeakerVoiceConfig(speaker="Speaker 1", voice_config=types.VoiceConfig( |
| prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=spk1_voice))), |
| types.SpeakerVoiceConfig(speaker="Speaker 2", voice_config=types.VoiceConfig( |
| prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=spk2_voice))) |
| ] |
| ) |
| ) |
| ) |
| |
| print(f"[DEBUG] Chunk {index}: Calling Gemini API (Async)...") |
| audio_data = None |
| mime_type = "audio/wav" |
| |
| |
| async for chunk in await self.genai_client.aio.models.generate_content_stream(model=tts_model, contents=contents, config=config): |
| if chunk.candidates and chunk.candidates[0].content.parts: |
| part = chunk.candidates[0].content.parts[0] |
| if part.inline_data: |
| audio_data = part.inline_data.data |
| mime_type = part.inline_data.mime_type |
| print(f"[DEBUG] Chunk {index}: Received audio data, mime: {mime_type}") |
| break |
| |
| if audio_data: |
| |
| if "wav" not in mime_type.lower(): |
| print(f"[DEBUG] Chunk {index}: Converting to WAV") |
|
|
| |
| audio_data = self._convert_to_wav(audio_data, mime_type) |
| |
| path = f"chunk_{index}_{int(time.time())}.wav" |
| with open(path, "wb") as f: |
| f.write(audio_data) |
| print(f"[DEBUG] Chunk {index}: Saved to {path}") |
| return path |
| else: |
| print(f"[ERROR] Chunk {index}: No audio data received from Gemini") |
| except Exception as e: |
| print(f"[ERROR] Chunk {index}: Exception: {e}") |
| logger.error(f"Error generating chunk {index}: {e}") |
| return None |
|
|
| def _convert_to_wav(self, audio_data: bytes, mime_type: str) -> bytes: |
| |
| rate = 24000 |
| if "rate=" in mime_type: |
| try: rate = int(mime_type.split("rate=")[1].split(";")[0]) |
| except: pass |
| |
| bits = 16 |
| num_channels = 1 |
| data_size = len(audio_data) |
| header = struct.pack("<4sI4s4sIHHIIHH4sI", b"RIFF", 36 + data_size, b"WAVE", b"fmt ", 16, 1, num_channels, rate, rate * num_channels * (bits // 8), num_channels * (bits // 8), bits, b"data", data_size) |
| return header + audio_data |
|
|
| async def generate_full_audio(self, script: str, tts_model: str, spk1_voice: str, |
| spk2_voice: str, temperature: float, bgm_choice: str, progress_callback=None): |
| print(f"[DEBUG] Starting generate_full_audio") |
| |
| dialogs = self.parse_script(script) |
| print(f"[DEBUG] Parsed {len(dialogs)} dialogs") |
| |
| chunks = self.split_script(dialogs) |
| print(f"[DEBUG] Split into {len(chunks)} chunks") |
| |
| |
| print(f"[DEBUG] Starting parallel chunk generation...") |
| tasks = [] |
| for i, chunk_script in enumerate(chunks): |
| |
| tasks.append(self.generate_audio_chunk( |
| chunk_script, tts_model, spk1_voice, spk2_voice, temperature, i |
| )) |
| |
| chunk_paths = await asyncio.gather(*tasks) |
|
|
| |
| valid_paths = [p for p in chunk_paths if p] |
| print(f"[DEBUG] Valid chunks: {len(valid_paths)} out of {len(chunk_paths)}") |
| |
| if not valid_paths: |
| print(f"[ERROR] No valid audio chunks generated!") |
| return None |
| |
| |
| def combine_audio(): |
| print(f"[DEBUG] Starting audio combination in thread") |
| combined = AudioSegment.empty() |
| for i, p in enumerate(valid_paths): |
| combined += AudioSegment.from_file(p) |
| combined += AudioSegment.silent(duration=500) |
| try: os.remove(p) |
| except: pass |
| |
| final_path = f"final_podcast_{int(time.time())}.wav" |
| |
| |
| bgm_path = BGM_CHOICES.get(bgm_choice) |
| if bgm_path and os.path.exists(bgm_path): |
| print(f"[DEBUG] Adding BGM: {bgm_choice}") |
| bgm = AudioSegment.from_file(bgm_path) |
| if len(bgm) < len(combined) + 10000: |
| bgm = bgm * ( (len(combined) + 10000) // len(bgm) + 1 ) |
| |
| bgm = bgm[:len(combined) + 10000] |
| bgm_main = bgm[5000:5000+len(combined)] - 16 |
| bgm_intro = bgm[:5000] |
| bgm_outro = bgm[5000+len(combined):].fade_out(5000) - 16 |
| |
| bgm_processed = bgm_intro + bgm_main + bgm_outro |
| combined_with_intro = AudioSegment.silent(duration=5000) + combined + AudioSegment.silent(duration=5000) |
| final_audio = combined_with_intro.overlay(bgm_processed) |
| final_audio.export(final_path, format="wav") |
| else: |
| combined.export(final_path, format="wav") |
| |
| return final_path |
|
|
| final_path = await asyncio.to_thread(combine_audio) |
| print(f"[DEBUG] Audio generation complete: {final_path}") |
| return final_path |
|
|
|
|
|
|
| podcast_service = PodcastService() |