Rakshitjan's picture
Create main.py
f6a1998 verified
raw
history blame
11.7 kB
# main.py
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import google.generativeai as genai
import pdfplumber
import json
import re
import os
import io
from gtts import gTTS
from pydub import AudioSegment
import uuid
import asyncio
from pydantic import BaseModel
from typing import Dict, List, Optional
import shutil
import tempfile
app = FastAPI(title="PDF to Audio Converter")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Specify your frontend domains in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global storage for tracking job status
job_status = {}
class JobStatus(BaseModel):
job_id: str
status: str
progress: int
message: Optional[str] = None
result_url: Optional[str] = None
@app.on_event("startup")
async def startup_event():
# Create temp directory for storing files
os.makedirs("temp", exist_ok=True)
# Configure Gemini API
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
print("Warning: GOOGLE_API_KEY not found. API functionality will be limited.")
else:
genai.configure(api_key=api_key)
def extract_text_from_pdf(file_path):
"""Extract text from PDF using pdfplumber"""
text = ""
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text
async def generate_conversation(pdf_text):
"""Generate conversation from PDF text using Gemini"""
try:
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable not set")
model = genai.GenerativeModel('gemini-2.5-pro-exp-03-25')
output_format = """
[
{"Emily": "..."},
{"Bob": "..."},
{"Emily": "..."},
{"Bob": "..."}
]
"""
query = f"""
You are the expert conversation generator for the JEE student based on provided inputs. Your task is to
generate the incentive conversation between Emily and her friend Bob explaining ALL the concepts to each others in *DETAILS*.
The content to use to generate the conversations:
{pdf_text}
-----------------------------------------------------------------------
**NOTE**:
- Do not include ```json anywhere.
- All points in the given content should be explained with details in output conversation.
- **Some dialog should contain filler words only**. Do not limit the conversation.
- The conversation should include filler words such as umm, yahh, etc. at proper places specially for Emily.
- The conversation will be read by tts so make it very easy and accurate to read.
- The formulas should be accurately read by tts.
- It should include pauses, emphasizes, and similar emotions.
- All the topics in the given content should be covered with better and detailed explanations in the output discussion.
- Make conversation with significant length so that all the concepts should be covered without fail.
- The listener should understand the concepts in the given content easily by listening to the conversation between Bob and Emily.
- The conversation should be filled with pleasure, emotions, and all.
- All contents given to you should be completely explained to listener by hearing the conversations.
The output format should strictly follow this output format:
{output_format}
Strictly follow the provided output format and do *not* include extra intro or '''dot heading.
Output Format Rules:
Rules:
1. **Ensure the JSON is syntactically correct** before responding.
2. Do not include markdown (```json).
3. Verify there are no extra commas, missing brackets, or incorrect types.
4. Respond **only with the JSON** (no explanations)
"""
response = model.generate_content(query)
text_content = response.text
# Clean up the response
cleaned_text = text_content.strip("```").strip()
cleaned_text = re.sub(r"^json", "", cleaned_text, flags=re.IGNORECASE).strip()
# Fix common JSON issues
cleaned_text = re.sub(r",\s*([\]}])", r"\1", cleaned_text)
try:
parsed_json = json.loads(cleaned_text)
return parsed_json
except json.JSONDecodeError as e:
print(f"JSON Parse Error: {e}")
print(f"Problem text: {cleaned_text}")
raise ValueError(f"Failed to parse generated conversation: {str(e)}")
except Exception as e:
print(f"Error generating conversation: {str(e)}")
raise
def generate_female_voice(text, filename):
"""Generate female voice using gTTS"""
tts = gTTS(text=text, lang='en')
tts.save(filename)
return AudioSegment.from_file(filename)
def generate_male_voice(text, filename):
"""Generate male voice by lowering pitch"""
temp_file = f"{filename}_temp.mp3"
tts = gTTS(text=text, lang='en')
tts.save(temp_file)
sound = AudioSegment.from_file(temp_file)
lower_pitch = sound._spawn(sound.raw_data, overrides={
"frame_rate": int(sound.frame_rate * 0.85)
}).set_frame_rate(sound.frame_rate)
lower_pitch.export(filename, format="mp3")
os.remove(temp_file)
return lower_pitch
async def process_pdf_to_audio(job_id: str, file_path: str):
"""Process PDF to Audio with status updates"""
try:
# Extract text from PDF
job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=10,
message="Extracting text from PDF...")
pdf_text = extract_text_from_pdf(file_path)
if not pdf_text.strip():
job_status[job_id] = JobStatus(job_id=job_id, status="error", progress=0,
message="No text extracted from PDF")
return
# Generate conversation
job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=30,
message="Generating conversation...")
conversation = await generate_conversation(pdf_text)
# Create temp directory for audio files
output_dir = f"temp/{job_id}"
os.makedirs(output_dir, exist_ok=True)
# Generate audio for each line
job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=50,
message="Generating voices...")
speaker_voice_map = {
"Emily": "female",
"Bob": "male"
}
final_podcast = AudioSegment.silent(duration=1000) # 1 sec silence at start
total_lines = len(conversation)
for i, line_dict in enumerate(conversation):
for speaker, line in line_dict.items():
voice_type = speaker_voice_map.get(speaker, "female")
filename = f"{output_dir}/{i}_{speaker}.mp3"
if voice_type == "female":
voice = generate_female_voice(line, filename)
else:
voice = generate_male_voice(line, filename)
final_podcast += voice + AudioSegment.silent(duration=500)
# Update progress (50% to 90%)
progress = 50 + int(40 * (i+1) / total_lines)
job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=progress,
message=f"Processing dialogue {i+1}/{total_lines}")
# Export final audio
output_filename = f"temp/{job_id}/final_podcast.mp3"
job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=95,
message="Exporting final audio...")
final_podcast.export(output_filename, format="mp3")
# Complete job
job_status[job_id] = JobStatus(
job_id=job_id,
status="complete",
progress=100,
message="Processing complete",
result_url=f"/download/{job_id}"
)
except Exception as e:
print(f"Error processing job {job_id}: {str(e)}")
job_status[job_id] = JobStatus(job_id=job_id, status="error", progress=0,
message=f"Error: {str(e)}")
@app.post("/upload/")
async def upload_file(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
"""Upload and process a PDF file"""
try:
# Validate file is a PDF
if not file.filename.endswith('.pdf'):
raise HTTPException(status_code=400, detail="File must be a PDF")
# Generate a job ID
job_id = str(uuid.uuid4())
# Save uploaded file
temp_file_path = f"temp/{job_id}_upload.pdf"
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Initialize job status
job_status[job_id] = JobStatus(job_id=job_id, status="uploaded", progress=5,
message="File uploaded, starting processing")
# Process in background
background_tasks.add_task(process_pdf_to_audio, job_id, temp_file_path)
return {"job_id": job_id, "message": "File uploaded successfully. Processing started."}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status/{job_id}")
async def get_job_status(job_id: str):
"""Get status of a processing job"""
if job_id not in job_status:
raise HTTPException(status_code=404, detail="Job not found")
return job_status[job_id]
@app.get("/download/{job_id}")
async def download_audio(job_id: str):
"""Download the processed audio file"""
if job_id not in job_status or job_status[job_id].status != "complete":
raise HTTPException(status_code=404, detail="Audio not ready or job not found")
file_path = f"temp/{job_id}/final_podcast.mp3"
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
def iterfile():
with open(file_path, mode="rb") as file_like:
yield from file_like
return StreamingResponse(
iterfile(),
media_type="audio/mpeg",
headers={"Content-Disposition": f"attachment; filename=podcast_{job_id}.mp3"}
)
@app.delete("/job/{job_id}")
async def delete_job(job_id: str):
"""Delete a job and its files"""
if job_id not in job_status:
raise HTTPException(status_code=404, detail="Job not found")
# Remove job files
job_dir = f"temp/{job_id}"
upload_file = f"temp/{job_id}_upload.pdf"
if os.path.exists(job_dir):
shutil.rmtree(job_dir)
if os.path.exists(upload_file):
os.remove(upload_file)
# Remove from status tracking
del job_status[job_id]
return {"message": "Job deleted successfully"}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)