video_mcp / modal_whisper_app.py
jomasego's picture
Update backend and dependencies
fb2e8d8
raw
history blame
39.8 kB
import modal
from fastapi import FastAPI, UploadFile, File, Body, Query
import os
import tempfile
import io # Used by Whisper for BytesIO
import hashlib # For generating cache keys
import httpx # For downloading video from URL if needed by endpoint
import gradio as gr
import gradio.routes
from typing import Dict, List, Any, Optional # For type hinting results and Optional in Pydantic
from fastapi.responses import JSONResponse # For FastAPI endpoint
from fastapi import File, Body, UploadFile, Query # For FastAPI file uploads, request body parts, and query parameters
from pydantic import BaseModel # For FastAPI request body validation
import re # For parsing search results
import asyncio # For concurrent video processing
# --- Constants for Model Names ---
WHISPER_MODEL_NAME = "openai/whisper-large-v3"
CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT"
CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder
# CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor)
ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics"
ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME)
OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50"
OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50"
# --- Modal Image Definition ---
video_analysis_image = (
modal.Image.debian_slim(python_version="3.10")
.apt_install("ffmpeg")
.pip_install(
"gradio==3.50.2", # Pin Gradio version for stability
"transformers[torch]", # For all Hugging Face models and PyTorch
"soundfile", # For Whisper
"av", # For video frame extraction
"Pillow", # For image processing
"timm", # Often a dependency for vision models
"torchvision",
"torchaudio",
"fastapi[standard]", # For web endpoints
"pydantic", # For request body validation
"httpx" # For downloading video from URL
)
)
# --- Modal App Definition ---
app = modal.App(name="video-analysis-gradio-pipeline") # New app name, using App
fastapi_app = FastAPI() # Initialize FastAPI app
# --- Modal Distributed Dictionary for Caching ---
video_analysis_cache = modal.Dict.from_name("video_analysis_cache", create_if_missing=True)
# --- Hugging Face Token Secret ---
HF_TOKEN_SECRET = modal.Secret.from_name("my-huggingface-secret")
# --- Helper: Hugging Face Login ---
def _login_to_hf():
import os
from huggingface_hub import login
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
try:
login(token=hf_token)
print("Successfully logged into Hugging Face Hub.")
return True
except Exception as e:
print(f"Hugging Face Hub login failed: {e}")
return False
else:
print("HF_TOKEN secret not found. Some models might fail to load.")
return False
# === 1. Transcription with Whisper ===
@app.function(
image=video_analysis_image,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def transcribe_video_with_whisper(video_bytes: bytes) -> str:
_login_to_hf()
import torch
from transformers import pipeline
import soundfile as sf
import av # For robust audio extraction
import numpy as np
import io
print("[Whisper] Starting transcription.")
temp_audio_path = None
try:
# Robust audio extraction using PyAV
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
audio_stream = next((s for s in container.streams if s.type == 'audio'), None)
if audio_stream is None:
return "Whisper Error: No audio stream found in video."
# Decode and resample audio to 16kHz mono WAV
# Store resampled audio in a temporary WAV file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_file_for_sf:
temp_audio_path = tmp_audio_file_for_sf.name
output_container = av.open(temp_audio_path, mode='w')
output_stream = output_container.add_stream('pcm_s16le', rate=16000, layout='mono')
for frame in container.decode(audio_stream):
for packet in output_stream.encode(frame):
output_container.mux(packet)
# Flush stream
for packet in output_stream.encode():
output_container.mux(packet)
output_container.close()
container.close()
os.remove(video_path) # Clean up temp video file
pipe = pipeline(
"automatic-speech-recognition",
model=WHISPER_MODEL_NAME,
torch_dtype=torch.float16,
device="cuda:0" if torch.cuda.is_available() else "cpu",
)
print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...")
outputs = pipe(temp_audio_path, chunk_length_s=30, batch_size=8, return_timestamps=False)
transcription = outputs["text"]
print(f"[Whisper] Transcription successful: {transcription[:100]}...")
return transcription
except Exception as e:
print(f"[Whisper] Error: {e}")
import traceback
traceback.print_exc()
return f"Whisper Error: {str(e)}"
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
if 'video_path' in locals() and video_path and os.path.exists(video_path):
os.remove(video_path) # Ensure temp video is cleaned up if audio extraction failed early
# === 2. Captioning with SpaceTimeGPT ===
@app.function(
image=video_analysis_image,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str:
_login_to_hf()
import torch
from transformers import AutoProcessor, AutoModelForCausalLM
import av
import numpy as np
import tempfile
print("[SpaceTimeGPT] Starting captioning.")
video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
video_stream = next((s for s in container.streams if s.type == 'video'), None)
if video_stream is None:
return "SpaceTimeGPT Error: No video stream found."
num_frames_to_sample = 16
total_frames = video_stream.frames
if total_frames == 0: return "SpaceTimeGPT Error: Video has no frames."
indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
frames = []
for i in indices:
container.seek(i, stream=video_stream)
frame = next(container.decode(video_stream))
frames.append(frame.to_rgb().to_ndarray())
container.close()
video_frames_np = np.stack(frames)
processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(device)
if hasattr(processor, 'tokenizer'): # Check if tokenizer exists
processor.tokenizer.padding_side = "right"
print("[SpaceTimeGPT] Model and processor loaded. Generating captions...")
inputs = processor(text=None, videos=list(video_frames_np), return_tensors="pt", padding=True).to(device)
generated_ids = model.generate(**inputs, max_new_tokens=128)
captions = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
print(f"[SpaceTimeGPT] Captioning successful: {captions}")
return captions
except Exception as e:
print(f"[SpaceTimeGPT] Error: {e}")
import traceback
traceback.print_exc()
return f"SpaceTimeGPT Error: {str(e)}"
finally:
if video_path and os.path.exists(video_path):
os.remove(video_path)
# === 3. Action Recognition with VideoMAE ===
@app.function(
image=video_analysis_image,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]:
_login_to_hf()
import torch
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
import av
import numpy as np
import tempfile
print("[VideoMAE] Starting action recognition.")
video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
video_stream = next((s for s in container.streams if s.type == 'video'), None)
if video_stream is None:
return [{"error": "VideoMAE Error: No video stream found."}]
num_frames_to_sample = 16
total_frames = video_stream.frames
if total_frames == 0: return [{"error": "VideoMAE Error: Video has no frames."}]
indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
video_frames_list = []
for i in indices:
container.seek(i, stream=video_stream)
frame = next(container.decode(video_stream))
video_frames_list.append(frame.to_rgb().to_ndarray())
container.close()
processor = VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME)
model = VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(device)
print("[VideoMAE] Model and processor loaded. Classifying actions...")
inputs = processor(video_frames_list, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
top_k = 5
probabilities = torch.softmax(logits, dim=-1)
top_probs, top_indices = torch.topk(probabilities, top_k)
results = []
for i in range(top_k):
label = model.config.id2label[top_indices[0, i].item()]
score = top_probs[0, i].item()
results.append({"action": label, "confidence": round(score, 4)})
print(f"[VideoMAE] Action recognition successful: {results}")
return results
except Exception as e:
print(f"[VideoMAE] Error: {e}")
import traceback
traceback.print_exc()
return [{"error": f"VideoMAE Error: {str(e)}"}]
finally:
if video_path and os.path.exists(video_path):
os.remove(video_path)
# === 4. Object Detection with DETR ===
@app.function(
image=video_analysis_image,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]:
_login_to_hf()
import torch
from transformers import DetrImageProcessor, DetrForObjectDetection
from PIL import Image # Imported but not directly used, av.frame.to_image() is used
import av
import numpy as np
import tempfile
print("[DETR] Starting object detection.")
video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
video_stream = next((s for s in container.streams if s.type == 'video'), None)
if video_stream is None:
return [{"error": "DETR Error: No video stream found."}]
num_frames_to_extract = 3
total_frames = video_stream.frames
if total_frames == 0: return [{"error": "DETR Error: Video has no frames."}]
frame_indices = np.linspace(0, total_frames - 1, num_frames_to_extract, dtype=int)
processor = DetrImageProcessor.from_pretrained(OBJECT_DETECTION_PROCESSOR_NAME)
model = DetrForObjectDetection.from_pretrained(OBJECT_DETECTION_MODEL_NAME)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(device)
print("[DETR] Model and processor loaded.")
all_frame_detections = []
for frame_num, target_frame_index in enumerate(frame_indices):
container.seek(target_frame_index, stream=video_stream)
frame = next(container.decode(video_stream))
pil_image = frame.to_image()
print(f"[DETR] Processing frame {frame_num + 1}/{num_frames_to_extract} (original index {target_frame_index})...")
inputs = processor(images=pil_image, return_tensors="pt").to(device)
outputs = model(**inputs)
target_sizes = torch.tensor([pil_image.size[::-1]], device=device)
results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0]
frame_detections = []
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
frame_detections.append({
"label": model.config.id2label[label.item()],
"confidence": round(score.item(), 3),
"box": [round(coord) for coord in box.tolist()]
})
if frame_detections: # Only add if detections are present for this frame
all_frame_detections.append({
"frame_number": frame_num + 1,
"original_frame_index": int(target_frame_index),
"detections": frame_detections
})
container.close()
print(f"[DETR] Object detection successful: {all_frame_detections if all_frame_detections else 'No objects detected with threshold.'}")
return all_frame_detections if all_frame_detections else [{"info": "No objects detected with current threshold."}]
except Exception as e:
print(f"[DETR] Error: {e}")
import traceback
traceback.print_exc()
return [{"error": f"DETR Error: {str(e)}"}]
finally:
if video_path and os.path.exists(video_path):
os.remove(video_path)
# === 5. Comprehensive Video Analysis (Orchestrator) ===
@app.function(
image=video_analysis_image,
secrets=[HF_TOKEN_SECRET],
gpu="any", # Request GPU as some sub-tasks will need it
timeout=1800, # Generous timeout for all models
# allow_concurrent_inputs=10, # Optional: if you expect many parallel requests
# keep_warm=1 # Optional: to keep one instance warm for faster cold starts
)
async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]:
print("[Orchestrator] Starting comprehensive video analysis.")
cache_key = hashlib.sha256(video_bytes).hexdigest()
try:
cached_result = await video_analysis_cache.get(cache_key)
if cached_result:
print(f"[Orchestrator] Cache hit for key: {cache_key}")
return cached_result
except Exception as e:
# Log error but proceed with analysis if cache get fails
print(f"[Orchestrator] Cache GET error: {e}. Proceeding with fresh analysis.")
print(f"[Orchestrator] Cache miss for key: {cache_key}. Performing full analysis.")
results = {}
print("[Orchestrator] Calling transcription...")
try:
# .call() is synchronous in the context of the Modal function execution
results["transcription"] = transcribe_video_with_whisper.call(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in transcription: {e}")
results["transcription"] = f"Transcription Error: {str(e)}"
print("[Orchestrator] Calling captioning...")
try:
results["caption"] = generate_captions_with_spacetimegpt.call(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in captioning: {e}")
results["caption"] = f"Captioning Error: {str(e)}"
print("[Orchestrator] Calling action recognition...")
try:
results["actions"] = generate_action_labels.call(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in action recognition: {e}")
results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error
print("[Orchestrator] Calling object detection...")
try:
results["objects"] = generate_object_detection.call(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in object detection: {e}")
results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error
print("[Orchestrator] All analyses attempted. Storing results in cache.")
try:
await video_analysis_cache.put(cache_key, results)
print(f"[Orchestrator] Successfully cached results for key: {cache_key}")
except Exception as e:
print(f"[Orchestrator] Cache PUT error: {e}")
return results
# --- Pydantic model for FastAPI request ---
class VideoAnalysisRequestPayload(BaseModel):
video_url: Optional[str] = None
# === FastAPI Endpoint for Comprehensive Analysis ===
@fastapi_app.post("/analyze_video")
async def process_video_for_analysis(
payload: Optional[VideoAnalysisRequestPayload] = Body(None),
video_file: Optional[UploadFile] = File(None) # Use UploadFile for type hint and async read
):
print("[FastAPI Endpoint] Received request for comprehensive analysis.")
video_bytes_content: Optional[bytes] = None
video_source_description: str = "Unknown"
if video_file:
print(f"[FastAPI Endpoint] Processing uploaded video file: {video_file.filename}, size: {video_file.size} bytes.")
video_bytes_content = await video_file.read() # Use await for async read
video_source_description = f"direct file upload: {video_file.filename}"
elif payload and payload.video_url:
video_url = str(payload.video_url) # Ensure it's a string
print(f"[FastAPI Endpoint] Processing video_url: {video_url}")
video_source_description = f"URL: {video_url}"
try:
async with httpx.AsyncClient() as client:
response = await client.get(video_url, follow_redirects=True, timeout=60.0)
response.raise_for_status()
video_bytes_content = await response.aread()
if not video_bytes_content:
print(f"[FastAPI Endpoint] Download failed: content was empty for URL: {video_url}")
return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL: {video_url}. Content was empty."})
print(f"[FastAPI Endpoint] Successfully downloaded {len(video_bytes_content)} bytes from {video_url}")
except httpx.RequestError as e:
print(f"[FastAPI Endpoint] httpx.RequestError downloading video: {e}")
return JSONResponse(status_code=400, content={"error": f"Error downloading video from URL: {video_url}. Details: {str(e)}"})
except Exception as e:
print(f"[FastAPI Endpoint] Unexpected Exception downloading video: {e}")
return JSONResponse(status_code=500, content={"error": f"Unexpected error downloading video. Details: {str(e)}"})
else:
print("[FastAPI Endpoint] No video_url in payload and no video_file uploaded.")
return JSONResponse(status_code=400, content={"error": "Either 'video_url' in JSON payload or a 'video_file' in form-data must be provided."})
if not video_bytes_content:
print("[FastAPI Endpoint] Critical error: video_bytes_content is not populated after input processing.")
return JSONResponse(status_code=500, content={"error": "Internal server error: video data could not be obtained."})
print(f"[FastAPI Endpoint] Calling analyze_video_comprehensive for video from {video_source_description} ({len(video_bytes_content)} bytes).")
try:
# Since process_video_for_analysis is an @app.function, it can .call() another @app.function
analysis_results = await analyze_video_comprehensive.call(video_bytes_content)
print("[FastAPI Endpoint] Comprehensive analysis finished.")
return JSONResponse(status_code=200, content=analysis_results)
except modal.exception.ModalError as e:
print(f"[FastAPI Endpoint] ModalError during comprehensive analysis: {e}")
return JSONResponse(status_code=500, content={"error": f"Modal processing error: {str(e)}"})
except Exception as e:
print(f"[FastAPI Endpoint] Unexpected Exception during comprehensive analysis: {e}")
# import traceback # Uncomment for detailed server-side stack trace
# traceback.print_exc() # Uncomment for detailed server-side stack trace
return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"})
@fastapi_app.post("/analyze_topic")
async def analyze_topic_endpoint(topic: str = Query(..., min_length=3, description="The topic to search videos for."),
max_videos: Optional[int] = Query(3, ge=1, le=10, description="Maximum number of videos to find and analyze.")):
"""Endpoint to find videos for a topic, analyze them, and return aggregated results."""
print(f"[FastAPI /analyze_topic] Received request for topic: '{topic}', max_videos: {max_videos}")
# This endpoint is orchestrated by Cascade. Cascade will:
# 1. Call its `search_web` tool with the `topic`.
# 2. Call the local Python helper `extract_video_urls_from_search` with the search results.
# 3. Call the Modal function `analyze_videos_by_topic.remote()` with the extracted URLs and topic.
# The actual implementation of these steps happens in Cascade's execution flow, not directly in this FastAPI code.
# This FastAPI endpoint definition tells Modal to expect such a route and parameters.
# The body of this function in the Python file is a placeholder for Cascade's orchestration.
# Placeholder: The actual call to analyze_videos_by_topic.remote() will be made by Cascade
# after it performs the search and URL extraction.
# For standalone Modal testing, one might simulate this:
# if modal.is_local():
# # Simulate search and extraction
# simulated_search_results = [{"link": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}]
# video_urls = extract_video_urls_from_search(simulated_search_results, max_videos)
# if not video_urls:
# return JSONResponse(status_code=404, content={"error": "No relevant video URLs found for the topic after search."})
# try:
# result = await analyze_videos_by_topic.coro(video_urls=video_urls, topic=topic)
# return result
# except Exception as e:
# return JSONResponse(status_code=500, content={"error": f"Error during topic analysis: {str(e)}"})
# else:
# # In deployed Modal, Cascade handles the call chain.
# # This function body might not even be executed directly if Cascade calls .remote() on analyze_videos_by_topic directly.
# # However, having a defined endpoint is good practice for discoverability and potential direct calls.
pass
# The actual logic is: Cascade calls search_web -> Cascade calls extract_video_urls_from_search -> Cascade calls analyze_videos_by_topic.remote().
# This endpoint is the entry point for the USER's request to Cascade.
# Cascade will then perform the sequence of operations.
# So, this function body is more of a declaration for the endpoint.
# We expect Cascade to handle the full orchestration when this endpoint is invoked.
# For the purpose of defining the Modal app structure, this is sufficient.
# The `analyze_videos_by_topic` function is what Modal will ultimately run with the list of URLs provided by Cascade.
# Return a message indicating that the process is initiated by Cascade.
# This response won't typically be seen if Cascade directly calls the .remote() of the target function.
return JSONResponse(status_code=202, content={"message": "Topic analysis process initiated. Cascade will orchestrate the search and analysis."})
# === 6. Topic-Based Video Search ===
@app.function(
image=video_analysis_image,
secrets=[HF_TOKEN_SECRET],
timeout=300
)
def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]:
"""Finds video URLs (YouTube, direct links) for a given topic using web search."""
print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}")
# This import is inside because search_web is a tool available to Cascade, not directly to Modal runtime
# This function will be called via .remote() and its implementation will be provided by Cascade's tool execution
# For now, this is a placeholder for where the search_web tool would be invoked.
# In a real Modal execution, this function would need to use a library like 'requests' and 'beautifulsoup'
# or a dedicated search API (e.g., SerpApi, Google Search API) if called from within Modal directly.
# Since Cascade calls this, it will use its 'search_web' tool.
# Simulate search results for now, as direct tool call from Modal code isn't standard.
# When Cascade calls this, it should intercept and use its search_web tool.
# For local testing or direct Modal runs, this would need a real search implementation.
# Placeholder: In a real scenario, this function would use a search tool/API.
# For the purpose of this exercise, we'll assume Cascade's `search_web` tool will be used
# when this function is invoked through Cascade's orchestration.
# If running this Modal app standalone, this part needs a concrete implementation.
# Example of what the logic would look like if we had search results:
# query = f"{topic} video youtube OR .mp4 OR .mov"
# search_results = [] # This would be populated by a search_web call
# For demonstration, let's return some dummy URLs. Replace with actual search logic.
# print(f"[TopicSearch] This is a placeholder. Actual search via Cascade's 'search_web' tool is expected.")
# print(f"[TopicSearch] If running standalone, implement search logic here.")
# The actual implementation will be handled by Cascade's search_web tool call
# when this function is called via .remote() by another function that Cascade is orchestrating.
# This function definition serves as a Modal-compatible stub for Cascade's tool.
# This function is more of a declaration for Cascade to use its tool.
# The actual search logic will be implicitly handled by Cascade's tool call mechanism
# when `find_video_urls_for_topic.remote()` is used in a subsequent step orchestrated by Cascade.
# If this function were to be *truly* self-contained within Modal and callable independently
# *without* Cascade's direct tool invocation, it would need its own HTTP client and parsing logic here.
# However, given the context of Cascade's operation, this stub is appropriate for Cascade to inject its tool usage.
# The `search_web` tool will be called by Cascade when it orchestrates the call to this function.
# So, this Python function in `modal_whisper_app.py` mostly defines the signature and intent.
# We will rely on Cascade to make the actual search_web call and provide the results back to the orchestrator.
# This function, when called by Cascade, will trigger a `search_web` tool call.
# The tool call will be made by Cascade, not by the Modal runtime directly.
# For now, let's assume this function's body is a placeholder for that interaction.
# The key is that the *calling* function (e.g., analyze_videos_by_topic) will use .remote(),
# and Cascade will manage the search_web tool call.
# To make this runnable standalone (for testing Modal part without Cascade), one might add:
# if modal.is_local():
# # basic requests/bs4 search or return dummy data
# pass
# For the flow with Cascade, this function primarily serves as a named Modal function
# that Cascade understands it needs to provide search results for.
# The actual search logic is deferred to Cascade's tool execution.
# We will return an empty list here, expecting Cascade to populate it via its mechanisms when called.
print(f"[TopicSearch] Function '{find_video_urls_for_topic.__name__}' called. Expecting Cascade to perform web search.")
# This is a conceptual placeholder. The actual search will be done by Cascade's tool.
# When `analyze_videos_by_topic` calls `find_video_urls_for_topic.remote()`,
# Cascade will execute its `search_web` tool and the result will be used.
return [] # Placeholder: Cascade will provide actual URLs via its search_web tool.
# Helper function (not a Modal function) to extract video URLs from search results
def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]:
"""Extracts video URLs from a list of search result dictionaries."""
video_urls = []
seen_urls = set()
# Regex for YouTube, Vimeo, and common video file extensions
# Simplified YouTube regex to catch most common video and shorts links
youtube_regex = r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch\?v=|embed/|shorts/)|youtu\.be/)([a-zA-Z0-9_-]{11})"
vimeo_regex = r"(?:https?://)?(?:www\.)?vimeo\.com/(\d+)"
direct_video_regex = r"https?://[^\s]+\.(mp4|mov|avi|webm|mkv)(\?[^\s]*)?"
patterns = [
re.compile(youtube_regex),
re.compile(vimeo_regex),
re.compile(direct_video_regex)
]
for item in search_results:
url = item.get("link") or item.get("url") # Common keys for URL in search results
if not url:
continue
for pattern in patterns:
match = pattern.search(url)
if match:
# Reconstruct canonical YouTube URL if it's a short link or embed
if pattern.pattern == youtube_regex and match.group(1):
normalized_url = f"https://www.youtube.com/watch?v={match.group(1)}"
else:
normalized_url = url
if normalized_url not in seen_urls:
video_urls.append(normalized_url)
seen_urls.add(normalized_url)
if len(video_urls) >= max_urls:
break
if len(video_urls) >= max_urls:
break
print(f"[URL Extraction] Extracted {len(video_urls)} video URLs: {video_urls}")
return video_urls
# === 7. Topic-Based Video Analysis Orchestrator ===
@app.function(
image=video_analysis_image,
secrets=[HF_TOKEN_SECRET],
gpu="any", # Child functions use GPU
timeout=3600 # Allow up to 1 hour for multiple video analyses
)
async def _download_and_analyze_one_video(client: httpx.AsyncClient, video_url: str, topic: str) -> Dict[str, Any]:
"""Helper to download and analyze a single video. Returns result or error dict."""
print(f"[TopicAnalysisWorker] Processing video URL for topic '{topic}': {video_url}")
try:
# 1. Download video
print(f"[TopicAnalysisWorker] Downloading video from: {video_url}")
response = await client.get(video_url)
response.raise_for_status() # Raise HTTPError for bad responses (4XX or 5XX)
video_bytes = await response.aread()
print(f"[TopicAnalysisWorker] Downloaded {len(video_bytes)} bytes from {video_url}")
if not video_bytes:
raise ValueError("Downloaded video content is empty.")
# 2. Analyze video
analysis_result = await analyze_video_comprehensive.coro(video_bytes)
# Check if the analysis itself returned an error structure
if isinstance(analysis_result, dict) and any(key + "_error" in analysis_result for key in ["transcription", "caption", "actions", "objects"]):
print(f"[TopicAnalysisWorker] Comprehensive analysis for {video_url} reported errors: {analysis_result}")
return {"url": video_url, "error_type": "analysis_error", "error_details": analysis_result}
else:
return {"url": video_url, "analysis": analysis_result}
except httpx.HTTPStatusError as e:
print(f"[TopicAnalysisWorker] HTTP error downloading {video_url}: {e}")
return {"url": video_url, "error_type": "download_error", "error_details": f"HTTP {e.response.status_code}: {e.response.text[:200]}"}
except httpx.RequestError as e:
print(f"[TopicAnalysisWorker] Request error downloading {video_url}: {e}")
return {"url": video_url, "error_type": "download_error", "error_details": f"Failed to download: {str(e)}"}
except Exception as e:
print(f"[TopicAnalysisWorker] Error processing video {video_url}: {e}")
import traceback
# Consider logging traceback.format_exc() instead of printing if running in a less verbose environment
# traceback.print_exc() # This might be too verbose for regular Modal logs
return {"url": video_url, "error_type": "processing_error", "error_details": str(e), "traceback": traceback.format_exc()[:1000]}
async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]:
"""Analyzes a list of videos (by URL) concurrently and aggregates results for a topic."""
print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).")
results_aggregator = {
"topic": topic,
"analyzed_videos": [],
"errors": []
}
if not video_urls:
results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."})
return results_aggregator
async with httpx.AsyncClient(timeout=300.0) as client: # 5 min timeout for individual downloads
tasks = [_download_and_analyze_one_video(client, url, topic) for url in video_urls]
# return_exceptions=True allows us to get results for successful tasks even if others fail
individual_results = await asyncio.gather(*tasks, return_exceptions=True)
for res_or_exc in individual_results:
if isinstance(res_or_exc, Exception):
# This handles exceptions not caught within _download_and_analyze_one_video itself (should be rare)
# Or if return_exceptions=True was used and _download_and_analyze_one_video raised an unhandled one.
print(f"[TopicAnalysis] An unexpected exception occurred during asyncio.gather: {res_or_exc}")
results_aggregator["errors"].append({"url": "unknown_url_due_to_gather_exception", "processing_error": str(res_or_exc)})
elif isinstance(res_or_exc, dict):
if "error_type" in res_or_exc:
results_aggregator["errors"].append(res_or_exc) # Append the error dict directly
elif "analysis" in res_or_exc:
results_aggregator["analyzed_videos"].append(res_or_exc)
else:
print(f"[TopicAnalysis] Received an unexpected dictionary structure: {res_or_exc}")
results_aggregator["errors"].append({"url": res_or_exc.get("url", "unknown"), "processing_error": "Unknown result structure"})
else:
print(f"[TopicAnalysis] Received an unexpected result type from asyncio.gather: {type(res_or_exc)}")
results_aggregator["errors"].append({"url": "unknown", "processing_error": f"Unexpected result type: {type(res_or_exc)}"})
print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.")
return results_aggregator
# === Gradio Interface ===
def video_analyzer_gradio_ui():
print("[Gradio] UI function called to define interface.")
def analyze_video_all_models(video_filepath):
print(f"[Gradio] Received video filepath for analysis: {video_filepath}")
if not video_filepath or not os.path.exists(video_filepath):
return "Error: Video file path is invalid or does not exist.", "", "[]", "[]"
with open(video_filepath, "rb") as f:
video_bytes_content = f.read()
print(f"[Gradio] Read {len(video_bytes_content)} bytes from video path: {video_filepath}")
if not video_bytes_content:
return "Error: Could not read video bytes.", "", "[]", "[]"
print("[Gradio] Calling Whisper...")
transcription = transcribe_video_with_whisper.call(video_bytes_content)
print(f"[Gradio] Whisper result length: {len(transcription)}")
print("[Gradio] Calling SpaceTimeGPT...")
captions = generate_captions_with_spacetimegpt.call(video_bytes_content)
print(f"[Gradio] SpaceTimeGPT result: {captions}")
print("[Gradio] Calling VideoMAE...")
action_labels = generate_action_labels.call(video_bytes_content)
print(f"[Gradio] VideoMAE result: {action_labels}")
print("[Gradio] Calling DETR...")
object_detections = generate_object_detection.call(video_bytes_content)
print(f"[Gradio] DETR result: {object_detections}")
return transcription, captions, str(action_labels), str(object_detections)
with gr.Blocks(title="Comprehensive Video Analyzer", theme=gr.themes.Soft()) as demo:
gr.Markdown("# Comprehensive Video Analyzer")
gr.Markdown("Upload a video to get transcription, captions, action labels, and object detections.")
with gr.Row():
video_input = gr.Video(label="Upload Video", sources=["upload"], type="filepath")
submit_button = gr.Button("Analyze Video", variant="primary")
with gr.Tabs():
with gr.TabItem("Transcription (Whisper)"):
transcription_output = gr.Textbox(label="Transcription", lines=10, interactive=False)
with gr.TabItem("Dense Captions (SpaceTimeGPT)"):
caption_output = gr.Textbox(label="Captions", lines=10, interactive=False)
with gr.TabItem("Action Recognition (VideoMAE)"):
action_output = gr.Textbox(label="Predicted Actions (JSON format)", lines=10, interactive=False)
with gr.TabItem("Object Detection (DETR)"):
object_output = gr.Textbox(label="Detected Objects (JSON format)", lines=10, interactive=False)
submit_button.click(
fn=analyze_video_all_models,
inputs=[video_input],
outputs=[transcription_output, caption_output, action_output, object_output]
)
gr.Markdown("### Example Video")
gr.Markdown("You can test with a short video. Processing may take a few minutes depending on video length and model inference times.")
print("[Gradio] UI definition complete.")
return gr.routes.App.create_app(demo)
# === Main ASGI App (FastAPI + Gradio) ===
@modal.asgi_app()
def main_asgi():
# fastapi_app is defined globally
# video_analyzer_gradio_ui returns an ASGI-compatible Gradio app
gradio_asgi_app = video_analyzer_gradio_ui()
fastapi_app.mount("/gradio", gradio_asgi_app, name="gradio_ui")
print("FastAPI app with Gradio UI mounted on /gradio is ready.")
return fastapi_app