dev_caio / scoring /hype_scorer.py
Chaitanya-aitf's picture
Upload 30 files
c4ee290 verified
"""
ShortSmith v2 - Hype Scorer Module
Multi-modal hype scoring that combines:
- Visual excitement scores
- Audio energy scores
- Motion intensity scores
- Person visibility scores (optional)
Supports both:
1. Trained MLP model (from Mr. HiSum dataset)
2. Heuristic weighted combination (fallback)
Uses contrastive ranking: hype is relative to each video.
"""
from typing import List, Optional, Dict, Tuple
from dataclasses import dataclass
import numpy as np
from utils.logger import get_logger, LogTimer
from utils.helpers import normalize_scores, clamp
from scoring.domain_presets import DomainPreset, get_domain_preset, Domain
from config import get_config
logger = get_logger("scoring.hype_scorer")
# Try to import trained scorer (optional)
try:
from scoring.trained_scorer import get_trained_scorer, TrainedHypeScorer
TRAINED_SCORER_AVAILABLE = True
except ImportError:
TRAINED_SCORER_AVAILABLE = False
logger.debug("Trained scorer not available, using heuristic scoring")
@dataclass
class SegmentScore:
"""Hype score for a video segment."""
start_time: float
end_time: float
# Individual scores (0-1 normalized)
visual_score: float
audio_score: float
motion_score: float
person_score: float
# Combined score
combined_score: float
# Metadata
rank: Optional[int] = None
scene_id: Optional[int] = None
@property
def duration(self) -> float:
return self.end_time - self.start_time
def to_dict(self) -> Dict:
return {
"start_time": self.start_time,
"end_time": self.end_time,
"duration": self.duration,
"visual_score": round(self.visual_score, 4),
"audio_score": round(self.audio_score, 4),
"motion_score": round(self.motion_score, 4),
"person_score": round(self.person_score, 4),
"combined_score": round(self.combined_score, 4),
"rank": self.rank,
}
class HypeScorer:
"""
Multi-modal hype scorer using weighted combination.
Implements contrastive scoring where segments are compared
relative to each other within the same video.
"""
def __init__(
self,
preset: Optional[DomainPreset] = None,
domain: str = "general",
use_trained_model: bool = True,
):
"""
Initialize hype scorer.
Args:
preset: Domain preset (takes precedence if provided)
domain: Domain name (used if preset not provided)
use_trained_model: Whether to use trained MLP model if available
"""
if preset:
self.preset = preset
else:
self.preset = get_domain_preset(domain)
self.config = get_config().processing
# Initialize trained model if available and requested
self.trained_scorer = None
if use_trained_model and TRAINED_SCORER_AVAILABLE:
try:
self.trained_scorer = get_trained_scorer()
if self.trained_scorer.is_available:
logger.info("Using trained MLP model for hype scoring")
else:
self.trained_scorer = None
except Exception as e:
logger.warning(f"Could not load trained scorer: {e}")
logger.info(
f"HypeScorer initialized with {self.preset.name} preset "
f"(visual={self.preset.visual_weight:.2f}, "
f"audio={self.preset.audio_weight:.2f}, "
f"motion={self.preset.motion_weight:.2f})"
f"{' + trained MLP' if self.trained_scorer else ''}"
)
def score_segments(
self,
segments: List[Tuple[float, float]], # (start, end) pairs
visual_scores: Optional[List[float]] = None,
audio_scores: Optional[List[float]] = None,
motion_scores: Optional[List[float]] = None,
person_scores: Optional[List[float]] = None,
) -> List[SegmentScore]:
"""
Score a list of segments using available signals.
Args:
segments: List of (start_time, end_time) tuples
visual_scores: Visual hype scores per segment
audio_scores: Audio hype scores per segment
motion_scores: Motion intensity scores per segment
person_scores: Target person visibility per segment
Returns:
List of SegmentScore objects
"""
n = len(segments)
if n == 0:
return []
with LogTimer(logger, f"Scoring {n} segments"):
# Initialize scores arrays
visual = self._prepare_scores(visual_scores, n)
audio = self._prepare_scores(audio_scores, n)
motion = self._prepare_scores(motion_scores, n)
person = self._prepare_scores(person_scores, n)
# Normalize each signal independently
visual_norm = normalize_scores(visual) if any(v > 0 for v in visual) else visual
audio_norm = normalize_scores(audio) if any(a > 0 for a in audio) else audio
motion_norm = normalize_scores(motion) if any(m > 0 for m in motion) else motion
person_norm = person # Already 0-1
# Compute weighted combination
combined = []
weights = self.preset.get_weights()
for i in range(n):
score = (
visual_norm[i] * weights["visual"] +
audio_norm[i] * weights["audio"] +
motion_norm[i] * weights["motion"] +
person_norm[i] * weights["person"]
)
combined.append(score)
# Normalize combined scores
combined_norm = normalize_scores(combined)
# Create SegmentScore objects
results = []
for i, (start, end) in enumerate(segments):
results.append(SegmentScore(
start_time=start,
end_time=end,
visual_score=visual_norm[i],
audio_score=audio_norm[i],
motion_score=motion_norm[i],
person_score=person_norm[i],
combined_score=combined_norm[i],
))
# Rank by combined score
results = self._rank_segments(results)
logger.info(f"Scored {n} segments, top score: {results[0].combined_score:.3f}")
return results
def _prepare_scores(
self,
scores: Optional[List[float]],
length: int,
) -> List[float]:
"""Prepare scores array with defaults if not provided."""
if scores is None:
return [0.0] * length
if len(scores) != length:
logger.warning(f"Score length mismatch: {len(scores)} vs {length}")
# Pad or truncate
if len(scores) < length:
return list(scores) + [0.0] * (length - len(scores))
return list(scores[:length])
return list(scores)
def _rank_segments(
self,
segments: List[SegmentScore],
) -> List[SegmentScore]:
"""Rank segments by combined score."""
# Sort by score descending
sorted_segments = sorted(
segments,
key=lambda s: s.combined_score,
reverse=True,
)
# Assign ranks
for i, segment in enumerate(sorted_segments):
segment.rank = i + 1
return sorted_segments
def select_top_segments(
self,
segments: List[SegmentScore],
num_clips: int,
min_gap: Optional[float] = None,
threshold: Optional[float] = None,
) -> List[SegmentScore]:
"""
Select top segments with diversity constraint.
Args:
segments: Ranked segments
num_clips: Number of segments to select
min_gap: Minimum gap between selected segments
threshold: Minimum score threshold
Returns:
Selected top segments
"""
min_gap = min_gap or self.config.min_gap_between_clips
threshold = threshold or self.preset.hype_threshold
# Filter by threshold
candidates = [s for s in segments if s.combined_score >= threshold]
if not candidates:
logger.warning(f"No segments above threshold {threshold}, using top {num_clips}")
candidates = segments[:num_clips]
# Select with diversity
selected = []
for segment in candidates:
if len(selected) >= num_clips:
break
# Check gap constraint
is_valid = True
for existing in selected:
gap = abs(segment.start_time - existing.start_time)
if gap < min_gap:
is_valid = False
break
if is_valid:
selected.append(segment)
# If not enough, relax constraint
if len(selected) < num_clips:
for segment in candidates:
if segment not in selected:
selected.append(segment)
if len(selected) >= num_clips:
break
# Re-rank selected
for i, segment in enumerate(selected):
segment.rank = i + 1
return selected
def score_from_timeseries(
self,
timestamps: List[float],
visual_series: Optional[List[float]] = None,
audio_series: Optional[List[float]] = None,
motion_series: Optional[List[float]] = None,
person_series: Optional[List[float]] = None,
segment_duration: float = 15.0,
hop_duration: float = 5.0,
) -> List[SegmentScore]:
"""
Create segment scores from time-series data.
Aggregates per-frame/per-second scores into segment-level scores.
Args:
timestamps: Timestamps for each data point
visual_series: Visual scores at each timestamp
audio_series: Audio scores at each timestamp
motion_series: Motion scores at each timestamp
person_series: Person visibility at each timestamp
segment_duration: Duration of each segment
hop_duration: Hop between segments
Returns:
List of SegmentScore objects
"""
if not timestamps:
return []
max_time = max(timestamps)
segments = []
current = 0.0
while current + segment_duration <= max_time:
end = current + segment_duration
segments.append((current, end))
current += hop_duration
# Aggregate scores for each segment
visual_agg = self._aggregate_series(timestamps, visual_series, segments)
audio_agg = self._aggregate_series(timestamps, audio_series, segments)
motion_agg = self._aggregate_series(timestamps, motion_series, segments)
person_agg = self._aggregate_series(timestamps, person_series, segments)
return self.score_segments(
segments,
visual_scores=visual_agg,
audio_scores=audio_agg,
motion_scores=motion_agg,
person_scores=person_agg,
)
def _aggregate_series(
self,
timestamps: List[float],
series: Optional[List[float]],
segments: List[Tuple[float, float]],
) -> List[float]:
"""Aggregate time-series data into segment-level scores."""
if series is None:
return [0.0] * len(segments)
ts = np.array(timestamps)
values = np.array(series)
aggregated = []
for start, end in segments:
mask = (ts >= start) & (ts < end)
if np.any(mask):
# Use 90th percentile to capture peaks
segment_values = values[mask]
score = np.percentile(segment_values, 90)
else:
score = 0.0
aggregated.append(float(score))
return aggregated
def apply_diversity_penalty(
self,
segments: List[SegmentScore],
penalty_weight: float = 0.2,
) -> List[SegmentScore]:
"""
Apply temporal diversity penalty to discourage clustering.
Reduces scores of segments that are close to higher-ranked ones.
Args:
segments: Segments sorted by score
penalty_weight: Weight of diversity penalty
Returns:
Segments with adjusted scores
"""
if len(segments) <= 1:
return segments
# Work with a copy
adjusted = list(segments)
for i in range(1, len(adjusted)):
current = adjusted[i]
penalty = 0.0
# Check against all higher-ranked segments
for j in range(i):
higher = adjusted[j]
distance = abs(current.start_time - higher.start_time)
# Closer segments get higher penalty
if distance < 30:
proximity_penalty = (30 - distance) / 30
penalty = max(penalty, proximity_penalty)
# Apply penalty
if penalty > 0:
adjusted[i] = SegmentScore(
start_time=current.start_time,
end_time=current.end_time,
visual_score=current.visual_score,
audio_score=current.audio_score,
motion_score=current.motion_score,
person_score=current.person_score,
combined_score=current.combined_score * (1 - penalty * penalty_weight),
rank=current.rank,
)
# Re-rank after adjustment
return self._rank_segments(adjusted)
def detect_peaks(
self,
segments: List[SegmentScore],
threshold: Optional[float] = None,
) -> List[SegmentScore]:
"""
Identify peak segments above threshold.
Args:
segments: List of scored segments
threshold: Score threshold for peaks
Returns:
List of peak segments
"""
threshold = threshold or self.preset.peak_threshold
peaks = [s for s in segments if s.combined_score >= threshold]
logger.info(f"Found {len(peaks)} peak segments above {threshold}")
return peaks
def compute_statistics(
self,
segments: List[SegmentScore],
) -> Dict:
"""
Compute statistics about the segment scores.
Args:
segments: List of scored segments
Returns:
Dictionary of statistics
"""
if not segments:
return {"count": 0}
scores = [s.combined_score for s in segments]
return {
"count": len(segments),
"mean": float(np.mean(scores)),
"std": float(np.std(scores)),
"min": float(np.min(scores)),
"max": float(np.max(scores)),
"median": float(np.median(scores)),
"q75": float(np.percentile(scores, 75)),
"q90": float(np.percentile(scores, 90)),
}
# Export public interface
__all__ = ["HypeScorer", "SegmentScore"]