from fastapi import FastAPI, UploadFile, File from fastapi.responses import JSONResponse from moviepy.editor import VideoFileClip import whisper from transformers import pipeline import os import tempfile import numpy as np # Explicitly import numpy import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configure ALL cache directories os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache" os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["WHISPER_CACHE_DIR"] = "/tmp/whisper_cache" # Create cache directories os.makedirs("/tmp/transformers_cache", exist_ok=True) os.makedirs("/tmp/huggingface", exist_ok=True) os.makedirs("/tmp/whisper_cache", exist_ok=True) app = FastAPI() # Verify numpy is working try: np.array([1, 2, 3]) # Simple numpy operation test logger.info("NumPy is working correctly") except Exception as e: logger.error(f"NumPy test failed: {str(e)}") # Load models at startup @app.on_event("startup") async def load_models(): try: logger.info("Loading Whisper model...") app.state.whisper_model = whisper.load_model( "base", download_root="/tmp/whisper_cache", device="cpu" # Force CPU usage ) logger.info("Whisper model loaded successfully") logger.info("Loading summarization model...") app.state.summarizer = pipeline( "summarization", model="facebook/bart-large-cnn", device=-1 # Use CPU ) logger.info("Summarization model loaded successfully") except Exception as e: logger.error(f"Error loading models: {str(e)}") raise def extract_audio(video_path: str) -> str: """Extract audio from video file""" try: with VideoFileClip(video_path) as video: audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav") video.audio.write_audiofile(audio_path) return audio_path except Exception as e: logger.error(f"Error extracting audio: {str(e)}") raise def transcribe_audio(audio_path: str) -> str: """Transcribe audio using Whisper""" try: result = app.state.whisper_model.transcribe(audio_path) return result["text"] except Exception as e: logger.error(f"Error transcribing audio: {str(e)}") raise def chunk_text(text: str, max_words: int = 400) -> list: """Split text into chunks""" words = text.split() return [" ".join(words[i:i + max_words]) for i in range(0, len(words), max_words)] def summarize_text(text: str) -> str: """Summarize text using BART""" try: chunks = chunk_text(text) summaries = [] for chunk in chunks: summary = app.state.summarizer( chunk, max_length=150, min_length=50, do_sample=False ) summaries.append(summary[0]['summary_text']) return " ".join(summaries) except Exception as e: logger.error(f"Error summarizing text: {str(e)}") return text[:500] + "..." # Return partial text if summarization fails @app.post("/process/") async def process_video(file: UploadFile = File(...)): """Process video and return transcription and summary""" try: # Save uploaded file temp_video_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video: temp_video.write(await file.read()) temp_video_path = temp_video.name # Process video audio_path = extract_audio(temp_video_path) transcription = transcribe_audio(audio_path) summary = summarize_text(transcription) return { "transcription": transcription, "summary": summary } finally: # Cleanup files if temp_video_path and os.path.exists(temp_video_path): os.unlink(temp_video_path) audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav") if os.path.exists(audio_path): os.unlink(audio_path) except Exception as e: logger.error(f"Processing error: {str(e)}") return JSONResponse( status_code=500, content={"error": f"Processing error: {str(e)}"} )