Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.responses import JSONResponse | |
| from moviepy.editor import VideoFileClip | |
| import whisper | |
| from transformers import pipeline | |
| import os | |
| import tempfile | |
| import numpy as np # Explicitly import numpy | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configure ALL cache directories | |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache" | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| os.environ["WHISPER_CACHE_DIR"] = "/tmp/whisper_cache" | |
| # Create cache directories | |
| os.makedirs("/tmp/transformers_cache", exist_ok=True) | |
| os.makedirs("/tmp/huggingface", exist_ok=True) | |
| os.makedirs("/tmp/whisper_cache", exist_ok=True) | |
| app = FastAPI() | |
| # Verify numpy is working | |
| try: | |
| np.array([1, 2, 3]) # Simple numpy operation test | |
| logger.info("NumPy is working correctly") | |
| except Exception as e: | |
| logger.error(f"NumPy test failed: {str(e)}") | |
| # Load models at startup | |
| async def load_models(): | |
| try: | |
| logger.info("Loading Whisper model...") | |
| app.state.whisper_model = whisper.load_model( | |
| "base", | |
| download_root="/tmp/whisper_cache", | |
| device="cpu" # Force CPU usage | |
| ) | |
| logger.info("Whisper model loaded successfully") | |
| logger.info("Loading summarization model...") | |
| app.state.summarizer = pipeline( | |
| "summarization", | |
| model="facebook/bart-large-cnn", | |
| device=-1 # Use CPU | |
| ) | |
| logger.info("Summarization model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading models: {str(e)}") | |
| raise | |
| def extract_audio(video_path: str) -> str: | |
| """Extract audio from video file""" | |
| try: | |
| with VideoFileClip(video_path) as video: | |
| audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav") | |
| video.audio.write_audiofile(audio_path) | |
| return audio_path | |
| except Exception as e: | |
| logger.error(f"Error extracting audio: {str(e)}") | |
| raise | |
| def transcribe_audio(audio_path: str) -> str: | |
| """Transcribe audio using Whisper""" | |
| try: | |
| result = app.state.whisper_model.transcribe(audio_path) | |
| return result["text"] | |
| except Exception as e: | |
| logger.error(f"Error transcribing audio: {str(e)}") | |
| raise | |
| def chunk_text(text: str, max_words: int = 400) -> list: | |
| """Split text into chunks""" | |
| words = text.split() | |
| return [" ".join(words[i:i + max_words]) for i in range(0, len(words), max_words)] | |
| def summarize_text(text: str) -> str: | |
| """Summarize text using BART""" | |
| try: | |
| chunks = chunk_text(text) | |
| summaries = [] | |
| for chunk in chunks: | |
| summary = app.state.summarizer( | |
| chunk, | |
| max_length=150, | |
| min_length=50, | |
| do_sample=False | |
| ) | |
| summaries.append(summary[0]['summary_text']) | |
| return " ".join(summaries) | |
| except Exception as e: | |
| logger.error(f"Error summarizing text: {str(e)}") | |
| return text[:500] + "..." # Return partial text if summarization fails | |
| async def process_video(file: UploadFile = File(...)): | |
| """Process video and return transcription and summary""" | |
| try: | |
| # Save uploaded file | |
| temp_video_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video: | |
| temp_video.write(await file.read()) | |
| temp_video_path = temp_video.name | |
| # Process video | |
| audio_path = extract_audio(temp_video_path) | |
| transcription = transcribe_audio(audio_path) | |
| summary = summarize_text(transcription) | |
| return { | |
| "transcription": transcription, | |
| "summary": summary | |
| } | |
| finally: | |
| # Cleanup files | |
| if temp_video_path and os.path.exists(temp_video_path): | |
| os.unlink(temp_video_path) | |
| audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav") | |
| if os.path.exists(audio_path): | |
| os.unlink(audio_path) | |
| except Exception as e: | |
| logger.error(f"Processing error: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": f"Processing error: {str(e)}"} | |
| ) |