Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Hype Scorer Module | |
| Multi-modal hype scoring that combines: | |
| - Visual excitement scores | |
| - Audio energy scores | |
| - Motion intensity scores | |
| - Person visibility scores (optional) | |
| Supports both: | |
| 1. Trained MLP model (from Mr. HiSum dataset) | |
| 2. Heuristic weighted combination (fallback) | |
| Uses contrastive ranking: hype is relative to each video. | |
| """ | |
| from typing import List, Optional, Dict, Tuple | |
| from dataclasses import dataclass | |
| import numpy as np | |
| from utils.logger import get_logger, LogTimer | |
| from utils.helpers import normalize_scores, clamp | |
| from scoring.domain_presets import DomainPreset, get_domain_preset, Domain | |
| from config import get_config | |
| logger = get_logger("scoring.hype_scorer") | |
| # Try to import trained scorer (optional) | |
| try: | |
| from scoring.trained_scorer import get_trained_scorer, TrainedHypeScorer | |
| TRAINED_SCORER_AVAILABLE = True | |
| except ImportError: | |
| TRAINED_SCORER_AVAILABLE = False | |
| logger.debug("Trained scorer not available, using heuristic scoring") | |
| class SegmentScore: | |
| """Hype score for a video segment.""" | |
| start_time: float | |
| end_time: float | |
| # Individual scores (0-1 normalized) | |
| visual_score: float | |
| audio_score: float | |
| motion_score: float | |
| person_score: float | |
| # Combined score | |
| combined_score: float | |
| # Metadata | |
| rank: Optional[int] = None | |
| scene_id: Optional[int] = None | |
| def duration(self) -> float: | |
| return self.end_time - self.start_time | |
| def to_dict(self) -> Dict: | |
| return { | |
| "start_time": self.start_time, | |
| "end_time": self.end_time, | |
| "duration": self.duration, | |
| "visual_score": round(self.visual_score, 4), | |
| "audio_score": round(self.audio_score, 4), | |
| "motion_score": round(self.motion_score, 4), | |
| "person_score": round(self.person_score, 4), | |
| "combined_score": round(self.combined_score, 4), | |
| "rank": self.rank, | |
| } | |
| class HypeScorer: | |
| """ | |
| Multi-modal hype scorer using weighted combination. | |
| Implements contrastive scoring where segments are compared | |
| relative to each other within the same video. | |
| """ | |
| def __init__( | |
| self, | |
| preset: Optional[DomainPreset] = None, | |
| domain: str = "general", | |
| use_trained_model: bool = True, | |
| ): | |
| """ | |
| Initialize hype scorer. | |
| Args: | |
| preset: Domain preset (takes precedence if provided) | |
| domain: Domain name (used if preset not provided) | |
| use_trained_model: Whether to use trained MLP model if available | |
| """ | |
| if preset: | |
| self.preset = preset | |
| else: | |
| self.preset = get_domain_preset(domain) | |
| self.config = get_config().processing | |
| # Initialize trained model if available and requested | |
| self.trained_scorer = None | |
| if use_trained_model and TRAINED_SCORER_AVAILABLE: | |
| try: | |
| self.trained_scorer = get_trained_scorer() | |
| if self.trained_scorer.is_available: | |
| logger.info("Using trained MLP model for hype scoring") | |
| else: | |
| self.trained_scorer = None | |
| except Exception as e: | |
| logger.warning(f"Could not load trained scorer: {e}") | |
| logger.info( | |
| f"HypeScorer initialized with {self.preset.name} preset " | |
| f"(visual={self.preset.visual_weight:.2f}, " | |
| f"audio={self.preset.audio_weight:.2f}, " | |
| f"motion={self.preset.motion_weight:.2f})" | |
| f"{' + trained MLP' if self.trained_scorer else ''}" | |
| ) | |
| def score_segments( | |
| self, | |
| segments: List[Tuple[float, float]], # (start, end) pairs | |
| visual_scores: Optional[List[float]] = None, | |
| audio_scores: Optional[List[float]] = None, | |
| motion_scores: Optional[List[float]] = None, | |
| person_scores: Optional[List[float]] = None, | |
| ) -> List[SegmentScore]: | |
| """ | |
| Score a list of segments using available signals. | |
| Args: | |
| segments: List of (start_time, end_time) tuples | |
| visual_scores: Visual hype scores per segment | |
| audio_scores: Audio hype scores per segment | |
| motion_scores: Motion intensity scores per segment | |
| person_scores: Target person visibility per segment | |
| Returns: | |
| List of SegmentScore objects | |
| """ | |
| n = len(segments) | |
| if n == 0: | |
| return [] | |
| with LogTimer(logger, f"Scoring {n} segments"): | |
| # Initialize scores arrays | |
| visual = self._prepare_scores(visual_scores, n) | |
| audio = self._prepare_scores(audio_scores, n) | |
| motion = self._prepare_scores(motion_scores, n) | |
| person = self._prepare_scores(person_scores, n) | |
| # Normalize each signal independently | |
| visual_norm = normalize_scores(visual) if any(v > 0 for v in visual) else visual | |
| audio_norm = normalize_scores(audio) if any(a > 0 for a in audio) else audio | |
| motion_norm = normalize_scores(motion) if any(m > 0 for m in motion) else motion | |
| person_norm = person # Already 0-1 | |
| # Compute weighted combination | |
| combined = [] | |
| weights = self.preset.get_weights() | |
| for i in range(n): | |
| score = ( | |
| visual_norm[i] * weights["visual"] + | |
| audio_norm[i] * weights["audio"] + | |
| motion_norm[i] * weights["motion"] + | |
| person_norm[i] * weights["person"] | |
| ) | |
| combined.append(score) | |
| # Normalize combined scores | |
| combined_norm = normalize_scores(combined) | |
| # Create SegmentScore objects | |
| results = [] | |
| for i, (start, end) in enumerate(segments): | |
| results.append(SegmentScore( | |
| start_time=start, | |
| end_time=end, | |
| visual_score=visual_norm[i], | |
| audio_score=audio_norm[i], | |
| motion_score=motion_norm[i], | |
| person_score=person_norm[i], | |
| combined_score=combined_norm[i], | |
| )) | |
| # Rank by combined score | |
| results = self._rank_segments(results) | |
| logger.info(f"Scored {n} segments, top score: {results[0].combined_score:.3f}") | |
| return results | |
| def _prepare_scores( | |
| self, | |
| scores: Optional[List[float]], | |
| length: int, | |
| ) -> List[float]: | |
| """Prepare scores array with defaults if not provided.""" | |
| if scores is None: | |
| return [0.0] * length | |
| if len(scores) != length: | |
| logger.warning(f"Score length mismatch: {len(scores)} vs {length}") | |
| # Pad or truncate | |
| if len(scores) < length: | |
| return list(scores) + [0.0] * (length - len(scores)) | |
| return list(scores[:length]) | |
| return list(scores) | |
| def _rank_segments( | |
| self, | |
| segments: List[SegmentScore], | |
| ) -> List[SegmentScore]: | |
| """Rank segments by combined score.""" | |
| # Sort by score descending | |
| sorted_segments = sorted( | |
| segments, | |
| key=lambda s: s.combined_score, | |
| reverse=True, | |
| ) | |
| # Assign ranks | |
| for i, segment in enumerate(sorted_segments): | |
| segment.rank = i + 1 | |
| return sorted_segments | |
| def select_top_segments( | |
| self, | |
| segments: List[SegmentScore], | |
| num_clips: int, | |
| min_gap: Optional[float] = None, | |
| threshold: Optional[float] = None, | |
| ) -> List[SegmentScore]: | |
| """ | |
| Select top segments with diversity constraint. | |
| Args: | |
| segments: Ranked segments | |
| num_clips: Number of segments to select | |
| min_gap: Minimum gap between selected segments | |
| threshold: Minimum score threshold | |
| Returns: | |
| Selected top segments | |
| """ | |
| min_gap = min_gap or self.config.min_gap_between_clips | |
| threshold = threshold or self.preset.hype_threshold | |
| # Filter by threshold | |
| candidates = [s for s in segments if s.combined_score >= threshold] | |
| if not candidates: | |
| logger.warning(f"No segments above threshold {threshold}, using top {num_clips}") | |
| candidates = segments[:num_clips] | |
| # Select with diversity | |
| selected = [] | |
| for segment in candidates: | |
| if len(selected) >= num_clips: | |
| break | |
| # Check gap constraint | |
| is_valid = True | |
| for existing in selected: | |
| gap = abs(segment.start_time - existing.start_time) | |
| if gap < min_gap: | |
| is_valid = False | |
| break | |
| if is_valid: | |
| selected.append(segment) | |
| # If not enough, relax constraint | |
| if len(selected) < num_clips: | |
| for segment in candidates: | |
| if segment not in selected: | |
| selected.append(segment) | |
| if len(selected) >= num_clips: | |
| break | |
| # Re-rank selected | |
| for i, segment in enumerate(selected): | |
| segment.rank = i + 1 | |
| return selected | |
| def score_from_timeseries( | |
| self, | |
| timestamps: List[float], | |
| visual_series: Optional[List[float]] = None, | |
| audio_series: Optional[List[float]] = None, | |
| motion_series: Optional[List[float]] = None, | |
| person_series: Optional[List[float]] = None, | |
| segment_duration: float = 15.0, | |
| hop_duration: float = 5.0, | |
| ) -> List[SegmentScore]: | |
| """ | |
| Create segment scores from time-series data. | |
| Aggregates per-frame/per-second scores into segment-level scores. | |
| Args: | |
| timestamps: Timestamps for each data point | |
| visual_series: Visual scores at each timestamp | |
| audio_series: Audio scores at each timestamp | |
| motion_series: Motion scores at each timestamp | |
| person_series: Person visibility at each timestamp | |
| segment_duration: Duration of each segment | |
| hop_duration: Hop between segments | |
| Returns: | |
| List of SegmentScore objects | |
| """ | |
| if not timestamps: | |
| return [] | |
| max_time = max(timestamps) | |
| segments = [] | |
| current = 0.0 | |
| while current + segment_duration <= max_time: | |
| end = current + segment_duration | |
| segments.append((current, end)) | |
| current += hop_duration | |
| # Aggregate scores for each segment | |
| visual_agg = self._aggregate_series(timestamps, visual_series, segments) | |
| audio_agg = self._aggregate_series(timestamps, audio_series, segments) | |
| motion_agg = self._aggregate_series(timestamps, motion_series, segments) | |
| person_agg = self._aggregate_series(timestamps, person_series, segments) | |
| return self.score_segments( | |
| segments, | |
| visual_scores=visual_agg, | |
| audio_scores=audio_agg, | |
| motion_scores=motion_agg, | |
| person_scores=person_agg, | |
| ) | |
| def _aggregate_series( | |
| self, | |
| timestamps: List[float], | |
| series: Optional[List[float]], | |
| segments: List[Tuple[float, float]], | |
| ) -> List[float]: | |
| """Aggregate time-series data into segment-level scores.""" | |
| if series is None: | |
| return [0.0] * len(segments) | |
| ts = np.array(timestamps) | |
| values = np.array(series) | |
| aggregated = [] | |
| for start, end in segments: | |
| mask = (ts >= start) & (ts < end) | |
| if np.any(mask): | |
| # Use 90th percentile to capture peaks | |
| segment_values = values[mask] | |
| score = np.percentile(segment_values, 90) | |
| else: | |
| score = 0.0 | |
| aggregated.append(float(score)) | |
| return aggregated | |
| def apply_diversity_penalty( | |
| self, | |
| segments: List[SegmentScore], | |
| penalty_weight: float = 0.2, | |
| ) -> List[SegmentScore]: | |
| """ | |
| Apply temporal diversity penalty to discourage clustering. | |
| Reduces scores of segments that are close to higher-ranked ones. | |
| Args: | |
| segments: Segments sorted by score | |
| penalty_weight: Weight of diversity penalty | |
| Returns: | |
| Segments with adjusted scores | |
| """ | |
| if len(segments) <= 1: | |
| return segments | |
| # Work with a copy | |
| adjusted = list(segments) | |
| for i in range(1, len(adjusted)): | |
| current = adjusted[i] | |
| penalty = 0.0 | |
| # Check against all higher-ranked segments | |
| for j in range(i): | |
| higher = adjusted[j] | |
| distance = abs(current.start_time - higher.start_time) | |
| # Closer segments get higher penalty | |
| if distance < 30: | |
| proximity_penalty = (30 - distance) / 30 | |
| penalty = max(penalty, proximity_penalty) | |
| # Apply penalty | |
| if penalty > 0: | |
| adjusted[i] = SegmentScore( | |
| start_time=current.start_time, | |
| end_time=current.end_time, | |
| visual_score=current.visual_score, | |
| audio_score=current.audio_score, | |
| motion_score=current.motion_score, | |
| person_score=current.person_score, | |
| combined_score=current.combined_score * (1 - penalty * penalty_weight), | |
| rank=current.rank, | |
| ) | |
| # Re-rank after adjustment | |
| return self._rank_segments(adjusted) | |
| def detect_peaks( | |
| self, | |
| segments: List[SegmentScore], | |
| threshold: Optional[float] = None, | |
| ) -> List[SegmentScore]: | |
| """ | |
| Identify peak segments above threshold. | |
| Args: | |
| segments: List of scored segments | |
| threshold: Score threshold for peaks | |
| Returns: | |
| List of peak segments | |
| """ | |
| threshold = threshold or self.preset.peak_threshold | |
| peaks = [s for s in segments if s.combined_score >= threshold] | |
| logger.info(f"Found {len(peaks)} peak segments above {threshold}") | |
| return peaks | |
| def compute_statistics( | |
| self, | |
| segments: List[SegmentScore], | |
| ) -> Dict: | |
| """ | |
| Compute statistics about the segment scores. | |
| Args: | |
| segments: List of scored segments | |
| Returns: | |
| Dictionary of statistics | |
| """ | |
| if not segments: | |
| return {"count": 0} | |
| scores = [s.combined_score for s in segments] | |
| return { | |
| "count": len(segments), | |
| "mean": float(np.mean(scores)), | |
| "std": float(np.std(scores)), | |
| "min": float(np.min(scores)), | |
| "max": float(np.max(scores)), | |
| "median": float(np.median(scores)), | |
| "q75": float(np.percentile(scores, 75)), | |
| "q90": float(np.percentile(scores, 90)), | |
| } | |
| # Export public interface | |
| __all__ = ["HypeScorer", "SegmentScore"] | |