matsuap's picture
Upload folder using huggingface_hub
951d5c6 verified
import re
import os
import json
import time
import struct
import asyncio
import logging
import mimetypes
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple, Optional, Dict
import openai
from google import genai
from google.genai import types
from pydantic import BaseModel
from pydub import AudioSegment
from core.config import settings
from core.prompts import SYSTEM_PROMPT, ANALYSIS_PROMPT
from services.s3_service import s3_service
from core import constants
logger = logging.getLogger(__name__)
class AnalysisOutput(BaseModel):
program_structure: str
script: str
class MultiProposalOutput(BaseModel):
proposals: List[AnalysisOutput]
# Automatically generate voice choices from constants
VOICE_CHOICES = {v["value"]: v["value"] for v in constants.PODCAST_VOICES}
BGM_CHOICES = {
"No BGM": None,
"BGM 1": "assets/bgm/BGM_1.mp3",
"BGM 2": "assets/bgm/BGM_2.mp3",
"BGM 3": "assets/bgm/BGM_3.mp3"
}
class PodcastService:
def __init__(self):
self.openai_client = openai.OpenAI(api_key=settings.OPENAI_API_KEY)
self.genai_client = genai.Client(api_key=settings.GEMINI_API_KEY)
def compute_script_targets(self, duration_minutes: int) -> int:
if duration_minutes <= 5: return 2000
elif duration_minutes <= 10: return 3000
elif duration_minutes <= 15: return 4000
else: return 5000
async def analyze_pdf(self, file_key: str, duration_minutes: int, model: str = "gpt-4o", progress_callback=None):
# 1. Get file from S3
# Since openai files.create needs a file, we download it temporarily
temp_path = f"temp_{int(time.time())}.pdf"
try:
import boto3
# Create S3 client and download (non-blocking)
def download_from_s3():
s3 = boto3.client('s3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_REGION)
s3.download_file(settings.AWS_S3_BUCKET, file_key, temp_path)
await asyncio.to_thread(download_from_s3)
# 2. Upload to OpenAI (non-blocking)
def upload_to_openai():
with open(temp_path, "rb") as f:
return self.openai_client.files.create(file=f, purpose="assistants")
file_response = await asyncio.to_thread(upload_to_openai)
# 3. Analyze (non-blocking)
formatted_prompt = ANALYSIS_PROMPT.format(duration_minutes=duration_minutes)
response = await asyncio.to_thread(
self.openai_client.chat.completions.parse,
model=model,
messages=[
{"role": "system", "content": formatted_prompt},
{"role": "user", "content": [{"type": "file", "file": {"file_id": file_response.id}}]}
],
temperature=1.0,
response_format=MultiProposalOutput
)
return response.choices[0].message.content
finally:
if os.path.exists(temp_path):
await asyncio.to_thread(os.remove, temp_path)
async def generate_script(self, user_prompt: str, model: str, duration_minutes: int,
podcast_format: str, pdf_suggestions: str, file_key: Optional[str] = None, progress_callback=None):
target_words = self.compute_script_targets(duration_minutes)
formatted_system = SYSTEM_PROMPT.format(
target_words=target_words,
podcast_format=podcast_format,
pdf_suggestions=pdf_suggestions
)
messages = [{"role": "system", "content": formatted_system}]
temp_path = None
if file_key:
temp_path = f"temp_gen_{int(time.time())}.pdf"
# Download from S3 (non-blocking)
def download_from_s3():
import boto3
s3 = boto3.client('s3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_REGION)
s3.download_file(settings.AWS_S3_BUCKET, file_key, temp_path)
await asyncio.to_thread(download_from_s3)
# Upload to OpenAI (non-blocking)
def upload_to_openai():
with open(temp_path, "rb") as f:
return self.openai_client.files.create(file=f, purpose="assistants")
file_response = await asyncio.to_thread(upload_to_openai)
messages.append({
"role": "user",
"content": [
{"type": "file", "file": {"file_id": file_response.id}},
{"type": "text", "text": user_prompt}
]
})
else:
messages.append({"role": "user", "content": user_prompt})
try:
# Call OpenAI API (non-blocking)
response = await asyncio.to_thread(
self.openai_client.chat.completions.create,
model=model,
messages=messages,
temperature=1.0,
max_completion_tokens=100000
)
return response.choices[0].message.content
finally:
if temp_path and os.path.exists(temp_path):
await asyncio.to_thread(os.remove, temp_path)
def parse_script(self, script: str) -> List[Tuple[str, str]]:
dialogs = []
# Try English format: "Speaker 1:" or "**Speaker 1**:"
pattern_en = re.compile(r"^\*?\*?(Speaker [12])\*?\*?[::]\s*(.*)$", re.MULTILINE)
matches = list(pattern_en.finditer(script))
if matches:
print(f"[DEBUG] Found {len(matches)} English patterns")
for match in matches:
speaker, text = match.groups()
dialogs.append((speaker, text))
else:
# Try Japanese format: "スピーカー1:"
pattern_jp = re.compile(r"^\*?\*?(スピーカー[12])\*?\*?[::]\s*(.*)$", re.MULTILINE)
matches = list(pattern_jp.finditer(script))
if matches:
print(f"[DEBUG] Found {len(matches)} Japanese patterns")
for match in matches:
speaker_jp, text = match.groups()
speaker_num = "1" if "1" in speaker_jp else "2"
speaker = f"Speaker {speaker_num}"
dialogs.append((speaker, text))
else:
print(f"[ERROR] No patterns found!")
print(f"[DEBUG] Preview: {script[:300]}")
return dialogs
def split_script(self, dialogs: List[Tuple[str, str]], chunk_size=20) -> List[str]:
chunks = []
for i in range(0, len(dialogs), chunk_size):
chunk = dialogs[i:i + chunk_size]
chunks.append("\n".join([f"{s}: {t}" for s, t in chunk]))
return chunks
async def generate_audio_chunk(self, chunk_script: str, tts_model: str, spk1_voice: str,
spk2_voice: str, temperature: float, index: int) -> Optional[str]:
try:
print(f"[DEBUG] Chunk {index}: Starting generation")
print(f"[DEBUG] Chunk {index}: Script length: {len(chunk_script)} chars")
contents = [types.Content(role="user", parts=[types.Part.from_text(text=chunk_script)])]
config = types.GenerateContentConfig(
temperature=temperature,
response_modalities=["audio"],
speech_config=types.SpeechConfig(
multi_speaker_voice_config=types.MultiSpeakerVoiceConfig(
speaker_voice_configs=[
types.SpeakerVoiceConfig(speaker="Speaker 1", voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=spk1_voice))),
types.SpeakerVoiceConfig(speaker="Speaker 2", voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=spk2_voice)))
]
)
)
)
print(f"[DEBUG] Chunk {index}: Calling Gemini API (Async)...")
audio_data = None
mime_type = "audio/wav"
# Use client.aio for non-blocking network I/O
async for chunk in await self.genai_client.aio.models.generate_content_stream(model=tts_model, contents=contents, config=config):
if chunk.candidates and chunk.candidates[0].content.parts:
part = chunk.candidates[0].content.parts[0]
if part.inline_data:
audio_data = part.inline_data.data
mime_type = part.inline_data.mime_type
print(f"[DEBUG] Chunk {index}: Received audio data, mime: {mime_type}")
break
if audio_data:
# Basic WAV conversion if needed (simplified from original)
if "wav" not in mime_type.lower():
print(f"[DEBUG] Chunk {index}: Converting to WAV")
# We usually get raw PCM or similar, need header
audio_data = self._convert_to_wav(audio_data, mime_type)
path = f"chunk_{index}_{int(time.time())}.wav"
with open(path, "wb") as f:
f.write(audio_data)
print(f"[DEBUG] Chunk {index}: Saved to {path}")
return path
else:
print(f"[ERROR] Chunk {index}: No audio data received from Gemini")
except Exception as e:
print(f"[ERROR] Chunk {index}: Exception: {e}")
logger.error(f"Error generating chunk {index}: {e}")
return None
def _convert_to_wav(self, audio_data: bytes, mime_type: str) -> bytes:
# Simplified conversion
rate = 24000
if "rate=" in mime_type:
try: rate = int(mime_type.split("rate=")[1].split(";")[0])
except: pass
bits = 16
num_channels = 1
data_size = len(audio_data)
header = struct.pack("<4sI4s4sIHHIIHH4sI", b"RIFF", 36 + data_size, b"WAVE", b"fmt ", 16, 1, num_channels, rate, rate * num_channels * (bits // 8), num_channels * (bits // 8), bits, b"data", data_size)
return header + audio_data
async def generate_full_audio(self, script: str, tts_model: str, spk1_voice: str,
spk2_voice: str, temperature: float, bgm_choice: str, progress_callback=None):
print(f"[DEBUG] Starting generate_full_audio")
dialogs = self.parse_script(script)
print(f"[DEBUG] Parsed {len(dialogs)} dialogs")
chunks = self.split_script(dialogs)
print(f"[DEBUG] Split into {len(chunks)} chunks")
# Run chunks in parallel using asyncio.gather
print(f"[DEBUG] Starting parallel chunk generation...")
tasks = []
for i, chunk_script in enumerate(chunks):
# Now calling the async method directly
tasks.append(self.generate_audio_chunk(
chunk_script, tts_model, spk1_voice, spk2_voice, temperature, i
))
chunk_paths = await asyncio.gather(*tasks)
valid_paths = [p for p in chunk_paths if p]
print(f"[DEBUG] Valid chunks: {len(valid_paths)} out of {len(chunk_paths)}")
if not valid_paths:
print(f"[ERROR] No valid audio chunks generated!")
return None
# Combine - This is heavy processing, run in thread
def combine_audio():
print(f"[DEBUG] Starting audio combination in thread")
combined = AudioSegment.empty()
for i, p in enumerate(valid_paths):
combined += AudioSegment.from_file(p)
combined += AudioSegment.silent(duration=500)
try: os.remove(p)
except: pass
final_path = f"final_podcast_{int(time.time())}.wav"
# Mix BGM
bgm_path = BGM_CHOICES.get(bgm_choice)
if bgm_path and os.path.exists(bgm_path):
print(f"[DEBUG] Adding BGM: {bgm_choice}")
bgm = AudioSegment.from_file(bgm_path)
if len(bgm) < len(combined) + 10000:
bgm = bgm * ( (len(combined) + 10000) // len(bgm) + 1 )
bgm = bgm[:len(combined) + 10000]
bgm_main = bgm[5000:5000+len(combined)] - 16
bgm_intro = bgm[:5000]
bgm_outro = bgm[5000+len(combined):].fade_out(5000) - 16
bgm_processed = bgm_intro + bgm_main + bgm_outro
combined_with_intro = AudioSegment.silent(duration=5000) + combined + AudioSegment.silent(duration=5000)
final_audio = combined_with_intro.overlay(bgm_processed)
final_audio.export(final_path, format="wav")
else:
combined.export(final_path, format="wav")
return final_path
final_path = await asyncio.to_thread(combine_audio)
print(f"[DEBUG] Audio generation complete: {final_path}")
return final_path
podcast_service = PodcastService()