Spaces:
Sleeping
Sleeping
File size: 4,527 Bytes
a2f0c28 afab305 a2f0c28 afab305 5e71f1f a2f0c28 5e71f1f afab305 5e71f1f a0e2c3a 5e71f1f afab305 a0e2c3a ee1d274 a2f0c28 5e71f1f afab305 a0e2c3a 5e71f1f a0e2c3a 5e71f1f afab305 5e71f1f a0e2c3a 5e71f1f a0e2c3a afab305 5e71f1f a0e2c3a 5e71f1f a0e2c3a 5e71f1f a2f0c28 5e71f1f a2f0c28 5e71f1f afab305 5e71f1f afab305 5e71f1f a2f0c28 5e71f1f afab305 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
from moviepy.editor import VideoFileClip
import whisper
from transformers import pipeline
import os
import tempfile
import numpy as np # Explicitly import numpy
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure ALL cache directories
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache"
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["WHISPER_CACHE_DIR"] = "/tmp/whisper_cache"
# Create cache directories
os.makedirs("/tmp/transformers_cache", exist_ok=True)
os.makedirs("/tmp/huggingface", exist_ok=True)
os.makedirs("/tmp/whisper_cache", exist_ok=True)
app = FastAPI()
# Verify numpy is working
try:
np.array([1, 2, 3]) # Simple numpy operation test
logger.info("NumPy is working correctly")
except Exception as e:
logger.error(f"NumPy test failed: {str(e)}")
# Load models at startup
@app.on_event("startup")
async def load_models():
try:
logger.info("Loading Whisper model...")
app.state.whisper_model = whisper.load_model(
"base",
download_root="/tmp/whisper_cache",
device="cpu" # Force CPU usage
)
logger.info("Whisper model loaded successfully")
logger.info("Loading summarization model...")
app.state.summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=-1 # Use CPU
)
logger.info("Summarization model loaded successfully")
except Exception as e:
logger.error(f"Error loading models: {str(e)}")
raise
def extract_audio(video_path: str) -> str:
"""Extract audio from video file"""
try:
with VideoFileClip(video_path) as video:
audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav")
video.audio.write_audiofile(audio_path)
return audio_path
except Exception as e:
logger.error(f"Error extracting audio: {str(e)}")
raise
def transcribe_audio(audio_path: str) -> str:
"""Transcribe audio using Whisper"""
try:
result = app.state.whisper_model.transcribe(audio_path)
return result["text"]
except Exception as e:
logger.error(f"Error transcribing audio: {str(e)}")
raise
def chunk_text(text: str, max_words: int = 400) -> list:
"""Split text into chunks"""
words = text.split()
return [" ".join(words[i:i + max_words]) for i in range(0, len(words), max_words)]
def summarize_text(text: str) -> str:
"""Summarize text using BART"""
try:
chunks = chunk_text(text)
summaries = []
for chunk in chunks:
summary = app.state.summarizer(
chunk,
max_length=150,
min_length=50,
do_sample=False
)
summaries.append(summary[0]['summary_text'])
return " ".join(summaries)
except Exception as e:
logger.error(f"Error summarizing text: {str(e)}")
return text[:500] + "..." # Return partial text if summarization fails
@app.post("/process/")
async def process_video(file: UploadFile = File(...)):
"""Process video and return transcription and summary"""
try:
# Save uploaded file
temp_video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video:
temp_video.write(await file.read())
temp_video_path = temp_video.name
# Process video
audio_path = extract_audio(temp_video_path)
transcription = transcribe_audio(audio_path)
summary = summarize_text(transcription)
return {
"transcription": transcription,
"summary": summary
}
finally:
# Cleanup files
if temp_video_path and os.path.exists(temp_video_path):
os.unlink(temp_video_path)
audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav")
if os.path.exists(audio_path):
os.unlink(audio_path)
except Exception as e:
logger.error(f"Processing error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Processing error: {str(e)}"}
) |