Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Trained Hype Scorer | |
| Uses the MLP model trained on Mr. HiSum dataset to score segments. | |
| Falls back to heuristic scoring if weights not available. | |
| """ | |
| import os | |
| from pathlib import Path | |
| from typing import Optional, List, Tuple | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from utils.logger import get_logger | |
| logger = get_logger("scoring.trained_scorer") | |
| class HypeScorerMLP(nn.Module): | |
| """ | |
| 2-layer MLP for hype scoring. | |
| Must match the architecture from training notebook. | |
| """ | |
| def __init__( | |
| self, | |
| visual_dim: int = 512, | |
| audio_dim: int = 13, | |
| hidden_dim: int = 256, | |
| dropout: float = 0.3, | |
| ): | |
| super().__init__() | |
| self.visual_dim = visual_dim | |
| self.audio_dim = audio_dim | |
| input_dim = visual_dim + audio_dim | |
| self.network = nn.Sequential( | |
| # Layer 1 | |
| nn.Linear(input_dim, hidden_dim), | |
| nn.BatchNorm1d(hidden_dim), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| # Layer 2 | |
| nn.Linear(hidden_dim, hidden_dim // 2), | |
| nn.BatchNorm1d(hidden_dim // 2), | |
| nn.ReLU(), | |
| nn.Dropout(dropout), | |
| # Output layer | |
| nn.Linear(hidden_dim // 2, 1), | |
| ) | |
| def forward(self, features: torch.Tensor) -> torch.Tensor: | |
| """Forward pass with concatenated features.""" | |
| return self.network(features) | |
| class TrainedHypeScorer: | |
| """ | |
| Trained neural network hype scorer. | |
| Uses MLP trained on Mr. HiSum "Most Replayed" data. | |
| """ | |
| # Default weights path relative to project root | |
| DEFAULT_WEIGHTS_PATH = "weights/hype_scorer_weights.pt" | |
| def __init__( | |
| self, | |
| weights_path: Optional[str] = None, | |
| device: Optional[str] = None, | |
| visual_dim: int = 512, | |
| audio_dim: int = 13, | |
| ): | |
| """ | |
| Initialize trained scorer. | |
| Args: | |
| weights_path: Path to trained weights (.pt file) | |
| device: Device to run on (cuda/cpu/mps) | |
| visual_dim: Visual feature dimension | |
| audio_dim: Audio feature dimension | |
| """ | |
| self.visual_dim = visual_dim | |
| self.audio_dim = audio_dim | |
| self.model = None | |
| self.device = device or self._get_device() | |
| # Find weights file | |
| if weights_path is None: | |
| # Look in common locations | |
| candidates = [ | |
| self.DEFAULT_WEIGHTS_PATH, | |
| "hype_scorer_weights.pt", | |
| "weights/hype_scorer_weights.pt", | |
| os.path.join(os.path.dirname(__file__), "..", "weights", "hype_scorer_weights.pt"), | |
| ] | |
| for candidate in candidates: | |
| if os.path.exists(candidate): | |
| weights_path = candidate | |
| break | |
| if weights_path and os.path.exists(weights_path): | |
| self._load_model(weights_path) | |
| else: | |
| logger.warning( | |
| f"Trained weights not found. TrainedHypeScorer will use fallback scoring. " | |
| f"To use trained model, place weights at: {self.DEFAULT_WEIGHTS_PATH}" | |
| ) | |
| def _get_device(self) -> str: | |
| """Detect best available device.""" | |
| if torch.cuda.is_available(): | |
| return "cuda" | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| return "mps" | |
| return "cpu" | |
| def _load_model(self, weights_path: str) -> None: | |
| """Load trained model weights.""" | |
| try: | |
| logger.info(f"Loading trained hype scorer from {weights_path}") | |
| # Initialize model | |
| self.model = HypeScorerMLP( | |
| visual_dim=self.visual_dim, | |
| audio_dim=self.audio_dim, | |
| ) | |
| # Load weights | |
| state_dict = torch.load(weights_path, map_location=self.device) | |
| # Handle different save formats | |
| if isinstance(state_dict, dict) and "model_state_dict" in state_dict: | |
| state_dict = state_dict["model_state_dict"] | |
| self.model.load_state_dict(state_dict) | |
| self.model.to(self.device) | |
| self.model.eval() | |
| logger.info(f"✓ Trained hype scorer loaded successfully on {self.device}") | |
| except Exception as e: | |
| logger.error(f"Failed to load trained model: {e}") | |
| self.model = None | |
| def is_available(self) -> bool: | |
| """Check if trained model is loaded.""" | |
| return self.model is not None | |
| def score( | |
| self, | |
| visual_features: np.ndarray, | |
| audio_features: np.ndarray, | |
| ) -> float: | |
| """ | |
| Score a single segment. | |
| Args: | |
| visual_features: Visual feature vector (visual_dim,) | |
| audio_features: Audio feature vector (audio_dim,) | |
| Returns: | |
| Hype score (0-1) | |
| """ | |
| if not self.is_available: | |
| return self._fallback_score(visual_features, audio_features) | |
| # Prepare input | |
| features = np.concatenate([visual_features, audio_features]) | |
| tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(self.device) | |
| # Forward pass | |
| raw_score = self.model(tensor) | |
| # Normalize to 0-1 with sigmoid | |
| score = torch.sigmoid(raw_score).item() | |
| return score | |
| def score_batch( | |
| self, | |
| visual_features: np.ndarray, | |
| audio_features: np.ndarray, | |
| ) -> np.ndarray: | |
| """ | |
| Score multiple segments in batch. | |
| Args: | |
| visual_features: Visual features (N, visual_dim) | |
| audio_features: Audio features (N, audio_dim) | |
| Returns: | |
| Array of hype scores (N,) | |
| """ | |
| if not self.is_available: | |
| return np.array([ | |
| self._fallback_score(visual_features[i], audio_features[i]) | |
| for i in range(len(visual_features)) | |
| ]) | |
| # Prepare batch input | |
| features = np.concatenate([visual_features, audio_features], axis=1) | |
| tensor = torch.tensor(features, dtype=torch.float32).to(self.device) | |
| # Forward pass | |
| raw_scores = self.model(tensor) | |
| # Normalize to 0-1 | |
| scores = torch.sigmoid(raw_scores).squeeze().cpu().numpy() | |
| return scores | |
| def _fallback_score( | |
| self, | |
| visual_features: np.ndarray, | |
| audio_features: np.ndarray, | |
| ) -> float: | |
| """ | |
| Fallback heuristic scoring when model not available. | |
| Uses similar logic to training data generation. | |
| """ | |
| # Visual contribution (mean of first 50 dims if available) | |
| visual_len = min(50, len(visual_features)) | |
| visual_score = np.mean(visual_features[:visual_len]) * 0.5 + 0.5 | |
| visual_score = np.clip(visual_score, 0, 1) | |
| # Audio contribution | |
| if len(audio_features) >= 8: | |
| audio_score = ( | |
| audio_features[0] * 0.4 + # RMS energy | |
| audio_features[5] * 0.3 + # Spectral flux (if available) | |
| audio_features[7] * 0.3 # Onset strength (if available) | |
| ) * 0.5 + 0.5 | |
| else: | |
| audio_score = np.mean(audio_features) * 0.5 + 0.5 | |
| audio_score = np.clip(audio_score, 0, 1) | |
| # Combined | |
| return float(0.5 * visual_score + 0.5 * audio_score) | |
| def compare_segments( | |
| self, | |
| visual_a: np.ndarray, | |
| audio_a: np.ndarray, | |
| visual_b: np.ndarray, | |
| audio_b: np.ndarray, | |
| ) -> int: | |
| """ | |
| Compare two segments. | |
| Returns: | |
| 1 if A is more engaging, -1 if B is more engaging, 0 if equal | |
| """ | |
| score_a = self.score(visual_a, audio_a) | |
| score_b = self.score(visual_b, audio_b) | |
| if score_a > score_b + 0.05: | |
| return 1 | |
| elif score_b > score_a + 0.05: | |
| return -1 | |
| return 0 | |
| # Singleton instance for easy access | |
| _trained_scorer: Optional[TrainedHypeScorer] = None | |
| def get_trained_scorer( | |
| weights_path: Optional[str] = None, | |
| force_reload: bool = False, | |
| ) -> TrainedHypeScorer: | |
| """ | |
| Get singleton trained scorer instance. | |
| Args: | |
| weights_path: Optional path to weights file | |
| force_reload: Force reload even if already loaded | |
| Returns: | |
| TrainedHypeScorer instance | |
| """ | |
| global _trained_scorer | |
| if _trained_scorer is None or force_reload: | |
| _trained_scorer = TrainedHypeScorer(weights_path=weights_path) | |
| return _trained_scorer | |
| __all__ = ["TrainedHypeScorer", "HypeScorerMLP", "get_trained_scorer"] | |