dev_caio / models /visual_analyzer.py
Chaitanya-aitf's picture
Update models/visual_analyzer.py
c9d5801 verified
"""
ShortSmith v2 - Visual Analyzer Module
Visual analysis using Qwen2-VL-2B for:
- Scene understanding and description
- Action/event detection
- Emotion recognition
- Visual hype scoring
Uses quantization (INT4/INT8) for efficient inference on consumer GPUs.
"""
from pathlib import Path
from typing import List, Optional, Dict, Any, Union
from dataclasses import dataclass
import numpy as np
from utils.logger import get_logger, LogTimer
from utils.helpers import ModelLoadError, InferenceError, batch_list
from config import get_config, ModelConfig
logger = get_logger("models.visual_analyzer")
@dataclass
class VisualFeatures:
"""Visual features extracted from a frame or video segment."""
timestamp: float # Timestamp in seconds
description: str # Natural language description
hype_score: float # Visual excitement score (0-1)
action_detected: str # Detected action/event
emotion: str # Detected emotion/mood
scene_type: str # Scene classification
confidence: float # Model confidence (0-1)
# Raw embedding if available
embedding: Optional[np.ndarray] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"timestamp": self.timestamp,
"description": self.description,
"hype_score": self.hype_score,
"action": self.action_detected,
"emotion": self.emotion,
"scene_type": self.scene_type,
"confidence": self.confidence,
}
class VisualAnalyzer:
"""
Visual analysis using Qwen2-VL-2B model.
Supports:
- Single frame analysis
- Batch processing
- Video segment understanding
- Custom prompt-based analysis
"""
# Prompts for different analysis tasks
HYPE_PROMPT = """Analyze this image and rate its excitement/hype level from 0 to 10.
Consider: action intensity, crowd energy, dramatic moments, emotional peaks.
Respond with just a number from 0-10."""
DESCRIPTION_PROMPT = """Briefly describe what's happening in this image in one sentence.
Focus on the main action, people, and mood."""
ACTION_PROMPT = """What action or event is happening in this image?
Choose from: celebration, performance, speech, reaction, action, calm, transition, other.
Respond with just the action type."""
EMOTION_PROMPT = """What is the dominant emotion or mood in this image?
Choose from: excitement, joy, tension, surprise, calm, sadness, anger, neutral.
Respond with just the emotion."""
def __init__(
self,
config: Optional[ModelConfig] = None,
load_model: bool = True,
):
"""
Initialize visual analyzer.
Args:
config: Model configuration (uses default if None)
load_model: Whether to load model immediately
Raises:
ModelLoadError: If model loading fails
"""
self.config = config or get_config().model
self.model = None
self.processor = None
self._device = None
if load_model:
self._load_model()
logger.info(f"VisualAnalyzer initialized (model={self.config.visual_model_id})")
def _load_model(self) -> None:
"""Load the Qwen2-VL model with quantization."""
with LogTimer(logger, "Loading Qwen2-VL model"):
try:
import os
import torch
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
# Get HuggingFace token from environment (optional - model is open access)
hf_token = os.environ.get("HF_TOKEN")
# Determine device
if self.config.device == "cuda" and torch.cuda.is_available():
self._device = "cuda"
else:
self._device = "cpu"
logger.info(f"Loading model on {self._device}")
# Load processor
self.processor = AutoProcessor.from_pretrained(
self.config.visual_model_id,
trust_remote_code=True,
token=hf_token,
)
# Load model with quantization
model_kwargs = {
"trust_remote_code": True,
"device_map": "auto" if self._device == "cuda" else None,
}
# Apply quantization if requested
if self.config.visual_model_quantization == "int4":
try:
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
model_kwargs["quantization_config"] = quantization_config
logger.info("Using INT4 quantization")
except ImportError:
logger.warning("bitsandbytes not available, loading without quantization")
elif self.config.visual_model_quantization == "int8":
try:
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
)
model_kwargs["quantization_config"] = quantization_config
logger.info("Using INT8 quantization")
except ImportError:
logger.warning("bitsandbytes not available, loading without quantization")
self.model = Qwen2VLForConditionalGeneration.from_pretrained(
self.config.visual_model_id,
token=hf_token,
**model_kwargs,
)
if self._device == "cpu":
self.model = self.model.to(self._device)
self.model.eval()
logger.info("Qwen2-VL model loaded successfully")
except Exception as e:
logger.error(f"Failed to load Qwen2-VL model: {e}")
raise ModelLoadError(f"Could not load visual model: {e}") from e
def analyze_frame(
self,
image: Union[str, Path, np.ndarray, "PIL.Image.Image"],
prompt: Optional[str] = None,
timestamp: float = 0.0,
) -> VisualFeatures:
"""
Analyze a single frame/image.
Args:
image: Image path, numpy array, or PIL Image
prompt: Custom prompt (uses default if None)
timestamp: Timestamp for this frame
Returns:
VisualFeatures with analysis results
Raises:
InferenceError: If analysis fails
"""
if self.model is None:
raise ModelLoadError("Model not loaded. Call _load_model() first.")
try:
from PIL import Image as PILImage
# Load image if path
if isinstance(image, (str, Path)):
pil_image = PILImage.open(image).convert("RGB")
elif isinstance(image, np.ndarray):
pil_image = PILImage.fromarray(image).convert("RGB")
else:
pil_image = image
# Get various analyses
hype_score = self._get_hype_score(pil_image)
description = self._get_description(pil_image)
action = self._get_action(pil_image)
emotion = self._get_emotion(pil_image)
return VisualFeatures(
timestamp=timestamp,
description=description,
hype_score=hype_score,
action_detected=action,
emotion=emotion,
scene_type=self._classify_scene(action, emotion),
confidence=0.8, # Default confidence
)
except Exception as e:
logger.error(f"Frame analysis failed: {e}")
raise InferenceError(f"Visual analysis failed: {e}") from e
def _query_model(
self,
image: "PIL.Image.Image",
prompt: str,
max_tokens: int = 50,
) -> str:
"""Send a query to the model and get response."""
import torch
try:
# Prepare messages in Qwen2-VL format
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": prompt},
],
}
]
# Process inputs
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = self.processor(
text=[text],
images=[image],
padding=True,
return_tensors="pt",
)
if self._device == "cuda":
inputs = {k: v.cuda() if hasattr(v, 'cuda') else v for k, v in inputs.items()}
# Generate
with torch.no_grad():
output_ids = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
do_sample=False,
)
# Decode response
response = self.processor.batch_decode(
output_ids[:, inputs['input_ids'].shape[1]:],
skip_special_tokens=True,
)[0]
return response.strip()
except Exception as e:
logger.warning(f"Model query failed: {e}")
return ""
def _get_hype_score(self, image: "PIL.Image.Image") -> float:
"""Get hype score from model."""
response = self._query_model(image, self.HYPE_PROMPT, max_tokens=10)
try:
# Extract number from response
import re
numbers = re.findall(r'\d+(?:\.\d+)?', response)
if numbers:
score = float(numbers[0])
return min(1.0, score / 10.0) # Normalize to 0-1
except (ValueError, IndexError):
pass
return 0.5 # Default middle score
def _get_description(self, image: "PIL.Image.Image") -> str:
"""Get description from model."""
response = self._query_model(image, self.DESCRIPTION_PROMPT, max_tokens=100)
return response if response else "Unable to describe"
def _get_action(self, image: "PIL.Image.Image") -> str:
"""Get action type from model."""
response = self._query_model(image, self.ACTION_PROMPT, max_tokens=20)
actions = ["celebration", "performance", "speech", "reaction", "action", "calm", "transition", "other"]
response_lower = response.lower()
for action in actions:
if action in response_lower:
return action
return "other"
def _get_emotion(self, image: "PIL.Image.Image") -> str:
"""Get emotion from model."""
response = self._query_model(image, self.EMOTION_PROMPT, max_tokens=20)
emotions = ["excitement", "joy", "tension", "surprise", "calm", "sadness", "anger", "neutral"]
response_lower = response.lower()
for emotion in emotions:
if emotion in response_lower:
return emotion
return "neutral"
def _classify_scene(self, action: str, emotion: str) -> str:
"""Classify scene type based on action and emotion."""
high_energy = {"celebration", "performance", "action"}
high_emotion = {"excitement", "joy", "surprise", "tension"}
if action in high_energy and emotion in high_emotion:
return "highlight"
elif action in high_energy:
return "active"
elif emotion in high_emotion:
return "emotional"
else:
return "neutral"
def analyze_frames_batch(
self,
images: List[Union[str, Path, np.ndarray]],
timestamps: Optional[List[float]] = None,
batch_size: int = 4,
) -> List[VisualFeatures]:
"""
Analyze multiple frames in batches.
Args:
images: List of images (paths or arrays)
timestamps: Timestamps for each image
batch_size: Number of images per batch
Returns:
List of VisualFeatures for each image
"""
if timestamps is None:
timestamps = [i * 1.0 for i in range(len(images))]
results = []
with LogTimer(logger, f"Analyzing {len(images)} frames"):
for i, (image, ts) in enumerate(zip(images, timestamps)):
try:
features = self.analyze_frame(image, timestamp=ts)
results.append(features)
if (i + 1) % 10 == 0:
logger.debug(f"Processed {i + 1}/{len(images)} frames")
except Exception as e:
logger.warning(f"Failed to analyze frame {i}: {e}")
# Add placeholder
results.append(VisualFeatures(
timestamp=ts,
description="Analysis failed",
hype_score=0.5,
action_detected="unknown",
emotion="neutral",
scene_type="neutral",
confidence=0.0,
))
return results
def analyze_with_custom_prompt(
self,
image: Union[str, Path, np.ndarray, "PIL.Image.Image"],
prompt: str,
timestamp: float = 0.0,
) -> Dict[str, Any]:
"""
Analyze image with a custom prompt.
Args:
image: Image to analyze
prompt: Custom analysis prompt
timestamp: Timestamp for this frame
Returns:
Dictionary with prompt, response, and timestamp
"""
from PIL import Image as PILImage
# Load image if needed
if isinstance(image, (str, Path)):
pil_image = PILImage.open(image).convert("RGB")
elif isinstance(image, np.ndarray):
pil_image = PILImage.fromarray(image).convert("RGB")
else:
pil_image = image
response = self._query_model(pil_image, prompt, max_tokens=200)
return {
"timestamp": timestamp,
"prompt": prompt,
"response": response,
}
def get_frame_embedding(
self,
image: Union[str, Path, np.ndarray, "PIL.Image.Image"],
) -> Optional[np.ndarray]:
"""
Get visual embedding for a frame.
Args:
image: Image to embed
Returns:
Embedding array or None if failed
"""
# Note: Qwen2-VL doesn't directly expose embeddings
# This would need a different approach or model
logger.warning("Frame embedding not directly supported by Qwen2-VL")
return None
def unload_model(self) -> None:
"""Unload model to free GPU memory."""
if self.model is not None:
del self.model
self.model = None
if self.processor is not None:
del self.processor
self.processor = None
# Clear CUDA cache
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except ImportError:
pass
logger.info("Visual model unloaded")
# Export public interface
__all__ = ["VisualAnalyzer", "VisualFeatures"]