Spaces:
Build error
Build error
| import os | |
| import logging | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request | |
| from fastapi.responses import JSONResponse, HTMLResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.exceptions import RequestValidationError | |
| import openai | |
| from pydub import AudioSegment | |
| import tempfile | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("uvicorn.error") | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["https://studyscribe.framer.ai/"], # Replace "*" with your frontend URL in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| if not OPENAI_API_KEY: | |
| logger.error("OPENAI_API_KEY is not set.") | |
| raise Exception("OPENAI_API_KEY must be set as an environment variable.") | |
| openai.api_key = OPENAI_API_KEY | |
| async def validation_exception_handler(request, exc): | |
| logger.error(f"Validation error: {exc}") | |
| return JSONResponse( | |
| status_code=422, | |
| content={"detail": exc.errors(), "body": exc.body}, | |
| ) | |
| async def global_exception_handler(request, exc): | |
| logger.error(f"Unhandled exception: {exc}") | |
| return JSONResponse(status_code=500, content={"detail": "Internal Server Error"}) | |
| async def log_requests(request: Request, call_next): | |
| logger.info(f"Incoming request: {request.method} {request.url.path}") | |
| response = await call_next(request) | |
| return response | |
| def transcribe_audio(audio_file_path): | |
| try: | |
| with open(audio_file_path, "rb") as audio_file: | |
| transcript = openai.Audio.transcribe("whisper-1", audio_file, response_format="verbose_json") | |
| return transcript | |
| except Exception as e: | |
| logger.error(f"Error in transcribe_audio: {e}") | |
| raise HTTPException(status_code=500, detail="Error during audio transcription.") | |
| def split_audio_file(audio_file_path, max_chunk_size_mb=24): | |
| audio = AudioSegment.from_file(audio_file_path) | |
| duration_ms = len(audio) | |
| chunks = [] | |
| start_ms = 0 | |
| while start_ms < duration_ms: | |
| chunk_duration_ms = min(5 * 60 * 1000, duration_ms - start_ms) # Start with 5 minutes | |
| chunk = audio[start_ms:start_ms + chunk_duration_ms] | |
| while True: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_chunk_file: | |
| chunk.export(temp_chunk_file.name, format="wav") | |
| temp_chunk_file.flush() | |
| file_size_bytes = os.path.getsize(temp_chunk_file.name) | |
| file_size_mb = file_size_bytes / (1024 * 1024) | |
| temp_chunk_file.close() | |
| os.unlink(temp_chunk_file.name) | |
| if file_size_mb <= max_chunk_size_mb: | |
| # Chunk size is acceptable | |
| break | |
| else: | |
| # Reduce chunk duration | |
| if chunk_duration_ms <= 60 * 1000: | |
| # Minimum chunk duration reached (1 minute), cannot reduce further | |
| raise Exception("Cannot split audio into chunks small enough to meet the size limit.") | |
| chunk_duration_ms -= 60 * 1000 # Reduce by 1 minute | |
| chunk = audio[start_ms:start_ms + chunk_duration_ms] | |
| chunks.append(chunk) | |
| start_ms += chunk_duration_ms | |
| return chunks | |
| def summarize_text(text, lesson_plan): | |
| try: | |
| system_prompt = "You are an assistant that summarizes text based on a lesson plan." | |
| user_prompt = f""" | |
| Text to summarize: | |
| "{text}" | |
| Based on the lesson plan below, summarize the key points discussed: | |
| Lesson Plan: | |
| {lesson_plan} | |
| Provide a concise summary with key takeaways. | |
| """ | |
| response = openai.ChatCompletion.create( | |
| model='gpt-3.5-turbo', | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| max_tokens=300, | |
| temperature=0.5, | |
| ) | |
| return response['choices'][0]['message']['content'].strip() | |
| except Exception as e: | |
| logger.error(f"Error in summarize_text: {e}") | |
| raise HTTPException(status_code=500, detail="Error during summarization.") | |
| def generate_lecture_notes(summaries, lesson_plan): | |
| try: | |
| summaries_text = "\n".join([f"At {item['timestamp']}: {item['summary']}" for item in summaries]) | |
| system_prompt = "You are an assistant that generates detailed lecture notes based on summaries and a lesson plan." | |
| user_prompt = f""" | |
| Using the summarized text segments below and the lesson plan, create detailed lecture notes. | |
| Summarized Segments: | |
| {summaries_text} | |
| Lesson Plan: | |
| {lesson_plan} | |
| Provide comprehensive lecture notes in a structured format. | |
| """ | |
| response = openai.ChatCompletion.create( | |
| model='gpt-3.5-turbo', | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| max_tokens=2000, | |
| temperature=0.5, | |
| ) | |
| return response['choices'][0]['message']['content'].strip() | |
| except Exception as e: | |
| logger.error(f"Error in generate_lecture_notes: {e}") | |
| raise HTTPException(status_code=500, detail="Error during lecture notes generation.") | |
| def read_root(): | |
| html_content = """ | |
| <html> | |
| <head> | |
| <title>Lecture Notes Generator</title> | |
| </head> | |
| <body> | |
| <h1>Lecture Notes Generator</h1> | |
| <p>This is the backend API for the Lecture Notes Generator. Please use the /process endpoint to submit data.</p> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(content=html_content) | |
| async def process_files( | |
| audio_file: UploadFile = File(None), | |
| lecture_link: str = Form(None), | |
| lesson_plan: str = Form(...) | |
| ): | |
| try: | |
| if not audio_file and not lecture_link: | |
| raise HTTPException(status_code=400, detail="Please provide an audio file or a lecture link.") | |
| if not lesson_plan: | |
| raise HTTPException(status_code=400, detail="Lesson plan is required.") | |
| if audio_file: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| tmp_file.write(await audio_file.read()) | |
| tmp_file_path = tmp_file.name | |
| elif lecture_link: | |
| logger.error("Processing lecture links is not implemented yet.") | |
| raise HTTPException(status_code=501, detail="Processing lecture links is not implemented yet.") | |
| else: | |
| raise HTTPException(status_code=400, detail="No valid audio input provided.") | |
| # Use the updated split_audio_file function | |
| audio_chunks = split_audio_file(tmp_file_path, max_chunk_size_mb=24) | |
| summarized_texts = [] | |
| current_chunk_start_time = 0 | |
| for index, chunk in enumerate(audio_chunks): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as chunk_file: | |
| chunk.export(chunk_file.name, format="wav") | |
| chunk_file_path = chunk_file.name | |
| # Transcribe chunk | |
| transcript = transcribe_audio(chunk_file_path) | |
| segments = transcript.get('segments', []) | |
| for segment in segments: | |
| # Adjust the segment timestamps to account for the chunk's position in the full audio | |
| segment_start = segment['start'] + current_chunk_start_time | |
| segment_end = segment['end'] + current_chunk_start_time | |
| segment_text = segment['text'] | |
| # Summarize the segment | |
| summary = summarize_text(segment_text, lesson_plan) | |
| summarized_texts.append({ | |
| 'timestamp': f"{segment_start:.2f} - {segment_end:.2f}", | |
| 'summary': summary | |
| }) | |
| # Update the chunk start time | |
| chunk_duration = len(chunk) / 1000.0 # duration in seconds | |
| current_chunk_start_time += chunk_duration | |
| os.unlink(chunk_file_path) | |
| lecture_notes = generate_lecture_notes(summarized_texts, lesson_plan) | |
| os.unlink(tmp_file_path) | |
| return JSONResponse(content={ | |
| 'summarized_texts': summarized_texts, | |
| 'lecture_notes': lecture_notes | |
| }) | |
| except HTTPException as e: | |
| logger.error(f"HTTPException in /process endpoint: {e.detail}") | |
| raise e | |
| except Exception as e: | |
| logger.error(f"Unhandled exception in /process endpoint: {e}") | |
| raise HTTPException(status_code=500, detail="Internal Server Error") | |