videototextup / app.py
ayloll's picture
Update app.py
5e71f1f verified
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
from moviepy.editor import VideoFileClip
import whisper
from transformers import pipeline
import os
import tempfile
import numpy as np # Explicitly import numpy
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure ALL cache directories
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers_cache"
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["WHISPER_CACHE_DIR"] = "/tmp/whisper_cache"
# Create cache directories
os.makedirs("/tmp/transformers_cache", exist_ok=True)
os.makedirs("/tmp/huggingface", exist_ok=True)
os.makedirs("/tmp/whisper_cache", exist_ok=True)
app = FastAPI()
# Verify numpy is working
try:
np.array([1, 2, 3]) # Simple numpy operation test
logger.info("NumPy is working correctly")
except Exception as e:
logger.error(f"NumPy test failed: {str(e)}")
# Load models at startup
@app.on_event("startup")
async def load_models():
try:
logger.info("Loading Whisper model...")
app.state.whisper_model = whisper.load_model(
"base",
download_root="/tmp/whisper_cache",
device="cpu" # Force CPU usage
)
logger.info("Whisper model loaded successfully")
logger.info("Loading summarization model...")
app.state.summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=-1 # Use CPU
)
logger.info("Summarization model loaded successfully")
except Exception as e:
logger.error(f"Error loading models: {str(e)}")
raise
def extract_audio(video_path: str) -> str:
"""Extract audio from video file"""
try:
with VideoFileClip(video_path) as video:
audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav")
video.audio.write_audiofile(audio_path)
return audio_path
except Exception as e:
logger.error(f"Error extracting audio: {str(e)}")
raise
def transcribe_audio(audio_path: str) -> str:
"""Transcribe audio using Whisper"""
try:
result = app.state.whisper_model.transcribe(audio_path)
return result["text"]
except Exception as e:
logger.error(f"Error transcribing audio: {str(e)}")
raise
def chunk_text(text: str, max_words: int = 400) -> list:
"""Split text into chunks"""
words = text.split()
return [" ".join(words[i:i + max_words]) for i in range(0, len(words), max_words)]
def summarize_text(text: str) -> str:
"""Summarize text using BART"""
try:
chunks = chunk_text(text)
summaries = []
for chunk in chunks:
summary = app.state.summarizer(
chunk,
max_length=150,
min_length=50,
do_sample=False
)
summaries.append(summary[0]['summary_text'])
return " ".join(summaries)
except Exception as e:
logger.error(f"Error summarizing text: {str(e)}")
return text[:500] + "..." # Return partial text if summarization fails
@app.post("/process/")
async def process_video(file: UploadFile = File(...)):
"""Process video and return transcription and summary"""
try:
# Save uploaded file
temp_video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video:
temp_video.write(await file.read())
temp_video_path = temp_video.name
# Process video
audio_path = extract_audio(temp_video_path)
transcription = transcribe_audio(audio_path)
summary = summarize_text(transcription)
return {
"transcription": transcription,
"summary": summary
}
finally:
# Cleanup files
if temp_video_path and os.path.exists(temp_video_path):
os.unlink(temp_video_path)
audio_path = os.path.join(tempfile.gettempdir(), "extracted_audio.wav")
if os.path.exists(audio_path):
os.unlink(audio_path)
except Exception as e:
logger.error(f"Processing error: {str(e)}")
return JSONResponse(
status_code=500,
content={"error": f"Processing error: {str(e)}"}
)