Spaces:
Build error
Build error
| # main.py | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import google.generativeai as genai | |
| import pdfplumber | |
| import json | |
| import re | |
| import os | |
| import io | |
| from gtts import gTTS | |
| from pydub import AudioSegment | |
| import uuid | |
| import asyncio | |
| from pydantic import BaseModel | |
| from typing import Dict, List, Optional | |
| import shutil | |
| import tempfile | |
| app = FastAPI(title="PDF to Audio Converter") | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Specify your frontend domains in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Global storage for tracking job status | |
| job_status = {} | |
| class JobStatus(BaseModel): | |
| job_id: str | |
| status: str | |
| progress: int | |
| message: Optional[str] = None | |
| result_url: Optional[str] = None | |
| async def startup_event(): | |
| # Create temp directory for storing files | |
| os.makedirs("temp", exist_ok=True) | |
| # Configure Gemini API | |
| api_key = os.environ.get("GOOGLE_API_KEY") | |
| if not api_key: | |
| print("Warning: GOOGLE_API_KEY not found. API functionality will be limited.") | |
| else: | |
| genai.configure(api_key=api_key) | |
| def extract_text_from_pdf(file_path): | |
| """Extract text from PDF using pdfplumber""" | |
| text = "" | |
| with pdfplumber.open(file_path) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| return text | |
| async def generate_conversation(pdf_text): | |
| """Generate conversation from PDF text using Gemini""" | |
| try: | |
| api_key = os.environ.get("GOOGLE_API_KEY") | |
| if not api_key: | |
| raise ValueError("GOOGLE_API_KEY environment variable not set") | |
| model = genai.GenerativeModel('gemini-2.5-pro-exp-03-25') | |
| output_format = """ | |
| [ | |
| {"Emily": "..."}, | |
| {"Bob": "..."}, | |
| {"Emily": "..."}, | |
| {"Bob": "..."} | |
| ] | |
| """ | |
| query = f""" | |
| You are the expert conversation generator for the JEE student based on provided inputs. Your task is to | |
| generate the incentive conversation between Emily and her friend Bob explaining ALL the concepts to each others in *DETAILS*. | |
| The content to use to generate the conversations: | |
| {pdf_text} | |
| ----------------------------------------------------------------------- | |
| **NOTE**: | |
| - Do not include ```json anywhere. | |
| - All points in the given content should be explained with details in output conversation. | |
| - **Some dialog should contain filler words only**. Do not limit the conversation. | |
| - The conversation should include filler words such as umm, yahh, etc. at proper places specially for Emily. | |
| - The conversation will be read by tts so make it very easy and accurate to read. | |
| - The formulas should be accurately read by tts. | |
| - It should include pauses, emphasizes, and similar emotions. | |
| - All the topics in the given content should be covered with better and detailed explanations in the output discussion. | |
| - Make conversation with significant length so that all the concepts should be covered without fail. | |
| - The listener should understand the concepts in the given content easily by listening to the conversation between Bob and Emily. | |
| - The conversation should be filled with pleasure, emotions, and all. | |
| - All contents given to you should be completely explained to listener by hearing the conversations. | |
| The output format should strictly follow this output format: | |
| {output_format} | |
| Strictly follow the provided output format and do *not* include extra intro or '''dot heading. | |
| Output Format Rules: | |
| Rules: | |
| 1. **Ensure the JSON is syntactically correct** before responding. | |
| 2. Do not include markdown (```json). | |
| 3. Verify there are no extra commas, missing brackets, or incorrect types. | |
| 4. Respond **only with the JSON** (no explanations) | |
| """ | |
| response = model.generate_content(query) | |
| text_content = response.text | |
| # Clean up the response | |
| cleaned_text = text_content.strip("```").strip() | |
| cleaned_text = re.sub(r"^json", "", cleaned_text, flags=re.IGNORECASE).strip() | |
| # Fix common JSON issues | |
| cleaned_text = re.sub(r",\s*([\]}])", r"\1", cleaned_text) | |
| try: | |
| parsed_json = json.loads(cleaned_text) | |
| return parsed_json | |
| except json.JSONDecodeError as e: | |
| print(f"JSON Parse Error: {e}") | |
| print(f"Problem text: {cleaned_text}") | |
| raise ValueError(f"Failed to parse generated conversation: {str(e)}") | |
| except Exception as e: | |
| print(f"Error generating conversation: {str(e)}") | |
| raise | |
| def generate_female_voice(text, filename): | |
| """Generate female voice using gTTS""" | |
| tts = gTTS(text=text, lang='en') | |
| tts.save(filename) | |
| return AudioSegment.from_file(filename) | |
| def generate_male_voice(text, filename): | |
| """Generate male voice by lowering pitch""" | |
| temp_file = f"{filename}_temp.mp3" | |
| tts = gTTS(text=text, lang='en') | |
| tts.save(temp_file) | |
| sound = AudioSegment.from_file(temp_file) | |
| lower_pitch = sound._spawn(sound.raw_data, overrides={ | |
| "frame_rate": int(sound.frame_rate * 0.85) | |
| }).set_frame_rate(sound.frame_rate) | |
| lower_pitch.export(filename, format="mp3") | |
| os.remove(temp_file) | |
| return lower_pitch | |
| async def process_pdf_to_audio(job_id: str, file_path: str): | |
| """Process PDF to Audio with status updates""" | |
| try: | |
| # Extract text from PDF | |
| job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=10, | |
| message="Extracting text from PDF...") | |
| pdf_text = extract_text_from_pdf(file_path) | |
| if not pdf_text.strip(): | |
| job_status[job_id] = JobStatus(job_id=job_id, status="error", progress=0, | |
| message="No text extracted from PDF") | |
| return | |
| # Generate conversation | |
| job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=30, | |
| message="Generating conversation...") | |
| conversation = await generate_conversation(pdf_text) | |
| # Create temp directory for audio files | |
| output_dir = f"temp/{job_id}" | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Generate audio for each line | |
| job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=50, | |
| message="Generating voices...") | |
| speaker_voice_map = { | |
| "Emily": "female", | |
| "Bob": "male" | |
| } | |
| final_podcast = AudioSegment.silent(duration=1000) # 1 sec silence at start | |
| total_lines = len(conversation) | |
| for i, line_dict in enumerate(conversation): | |
| for speaker, line in line_dict.items(): | |
| voice_type = speaker_voice_map.get(speaker, "female") | |
| filename = f"{output_dir}/{i}_{speaker}.mp3" | |
| if voice_type == "female": | |
| voice = generate_female_voice(line, filename) | |
| else: | |
| voice = generate_male_voice(line, filename) | |
| final_podcast += voice + AudioSegment.silent(duration=500) | |
| # Update progress (50% to 90%) | |
| progress = 50 + int(40 * (i+1) / total_lines) | |
| job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=progress, | |
| message=f"Processing dialogue {i+1}/{total_lines}") | |
| # Export final audio | |
| output_filename = f"temp/{job_id}/final_podcast.mp3" | |
| job_status[job_id] = JobStatus(job_id=job_id, status="processing", progress=95, | |
| message="Exporting final audio...") | |
| final_podcast.export(output_filename, format="mp3") | |
| # Complete job | |
| job_status[job_id] = JobStatus( | |
| job_id=job_id, | |
| status="complete", | |
| progress=100, | |
| message="Processing complete", | |
| result_url=f"/download/{job_id}" | |
| ) | |
| except Exception as e: | |
| print(f"Error processing job {job_id}: {str(e)}") | |
| job_status[job_id] = JobStatus(job_id=job_id, status="error", progress=0, | |
| message=f"Error: {str(e)}") | |
| async def upload_file(background_tasks: BackgroundTasks, file: UploadFile = File(...)): | |
| """Upload and process a PDF file""" | |
| try: | |
| # Validate file is a PDF | |
| if not file.filename.endswith('.pdf'): | |
| raise HTTPException(status_code=400, detail="File must be a PDF") | |
| # Generate a job ID | |
| job_id = str(uuid.uuid4()) | |
| # Save uploaded file | |
| temp_file_path = f"temp/{job_id}_upload.pdf" | |
| with open(temp_file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Initialize job status | |
| job_status[job_id] = JobStatus(job_id=job_id, status="uploaded", progress=5, | |
| message="File uploaded, starting processing") | |
| # Process in background | |
| background_tasks.add_task(process_pdf_to_audio, job_id, temp_file_path) | |
| return {"job_id": job_id, "message": "File uploaded successfully. Processing started."} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_job_status(job_id: str): | |
| """Get status of a processing job""" | |
| if job_id not in job_status: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return job_status[job_id] | |
| async def download_audio(job_id: str): | |
| """Download the processed audio file""" | |
| if job_id not in job_status or job_status[job_id].status != "complete": | |
| raise HTTPException(status_code=404, detail="Audio not ready or job not found") | |
| file_path = f"temp/{job_id}/final_podcast.mp3" | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| def iterfile(): | |
| with open(file_path, mode="rb") as file_like: | |
| yield from file_like | |
| return StreamingResponse( | |
| iterfile(), | |
| media_type="audio/mpeg", | |
| headers={"Content-Disposition": f"attachment; filename=podcast_{job_id}.mp3"} | |
| ) | |
| async def delete_job(job_id: str): | |
| """Delete a job and its files""" | |
| if job_id not in job_status: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| # Remove job files | |
| job_dir = f"temp/{job_id}" | |
| upload_file = f"temp/{job_id}_upload.pdf" | |
| if os.path.exists(job_dir): | |
| shutil.rmtree(job_dir) | |
| if os.path.exists(upload_file): | |
| os.remove(upload_file) | |
| # Remove from status tracking | |
| del job_status[job_id] | |
| return {"message": "Job deleted successfully"} | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |