File size: 8,699 Bytes
c4ee290
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""
ShortSmith v2 - Trained Hype Scorer

Uses the MLP model trained on Mr. HiSum dataset to score segments.
Falls back to heuristic scoring if weights not available.
"""

import os
from pathlib import Path
from typing import Optional, List, Tuple
import numpy as np

import torch
import torch.nn as nn

from utils.logger import get_logger

logger = get_logger("scoring.trained_scorer")


class HypeScorerMLP(nn.Module):
    """
    2-layer MLP for hype scoring.
    Must match the architecture from training notebook.
    """

    def __init__(
        self,
        visual_dim: int = 512,
        audio_dim: int = 13,
        hidden_dim: int = 256,
        dropout: float = 0.3,
    ):
        super().__init__()

        self.visual_dim = visual_dim
        self.audio_dim = audio_dim
        input_dim = visual_dim + audio_dim

        self.network = nn.Sequential(
            # Layer 1
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),

            # Layer 2
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),

            # Output layer
            nn.Linear(hidden_dim // 2, 1),
        )

    def forward(self, features: torch.Tensor) -> torch.Tensor:
        """Forward pass with concatenated features."""
        return self.network(features)


class TrainedHypeScorer:
    """
    Trained neural network hype scorer.

    Uses MLP trained on Mr. HiSum "Most Replayed" data.
    """

    # Default weights path relative to project root
    DEFAULT_WEIGHTS_PATH = "weights/hype_scorer_weights.pt"

    def __init__(
        self,
        weights_path: Optional[str] = None,
        device: Optional[str] = None,
        visual_dim: int = 512,
        audio_dim: int = 13,
    ):
        """
        Initialize trained scorer.

        Args:
            weights_path: Path to trained weights (.pt file)
            device: Device to run on (cuda/cpu/mps)
            visual_dim: Visual feature dimension
            audio_dim: Audio feature dimension
        """
        self.visual_dim = visual_dim
        self.audio_dim = audio_dim
        self.model = None
        self.device = device or self._get_device()

        # Find weights file
        if weights_path is None:
            # Look in common locations
            candidates = [
                self.DEFAULT_WEIGHTS_PATH,
                "hype_scorer_weights.pt",
                "weights/hype_scorer_weights.pt",
                os.path.join(os.path.dirname(__file__), "..", "weights", "hype_scorer_weights.pt"),
            ]
            for candidate in candidates:
                if os.path.exists(candidate):
                    weights_path = candidate
                    break

        if weights_path and os.path.exists(weights_path):
            self._load_model(weights_path)
        else:
            logger.warning(
                f"Trained weights not found. TrainedHypeScorer will use fallback scoring. "
                f"To use trained model, place weights at: {self.DEFAULT_WEIGHTS_PATH}"
            )

    def _get_device(self) -> str:
        """Detect best available device."""
        if torch.cuda.is_available():
            return "cuda"
        elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
            return "mps"
        return "cpu"

    def _load_model(self, weights_path: str) -> None:
        """Load trained model weights."""
        try:
            logger.info(f"Loading trained hype scorer from {weights_path}")

            # Initialize model
            self.model = HypeScorerMLP(
                visual_dim=self.visual_dim,
                audio_dim=self.audio_dim,
            )

            # Load weights
            state_dict = torch.load(weights_path, map_location=self.device)

            # Handle different save formats
            if isinstance(state_dict, dict) and "model_state_dict" in state_dict:
                state_dict = state_dict["model_state_dict"]

            self.model.load_state_dict(state_dict)
            self.model.to(self.device)
            self.model.eval()

            logger.info(f"✓ Trained hype scorer loaded successfully on {self.device}")

        except Exception as e:
            logger.error(f"Failed to load trained model: {e}")
            self.model = None

    @property
    def is_available(self) -> bool:
        """Check if trained model is loaded."""
        return self.model is not None

    @torch.no_grad()
    def score(
        self,
        visual_features: np.ndarray,
        audio_features: np.ndarray,
    ) -> float:
        """
        Score a single segment.

        Args:
            visual_features: Visual feature vector (visual_dim,)
            audio_features: Audio feature vector (audio_dim,)

        Returns:
            Hype score (0-1)
        """
        if not self.is_available:
            return self._fallback_score(visual_features, audio_features)

        # Prepare input
        features = np.concatenate([visual_features, audio_features])
        tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(self.device)

        # Forward pass
        raw_score = self.model(tensor)

        # Normalize to 0-1 with sigmoid
        score = torch.sigmoid(raw_score).item()

        return score

    @torch.no_grad()
    def score_batch(
        self,
        visual_features: np.ndarray,
        audio_features: np.ndarray,
    ) -> np.ndarray:
        """
        Score multiple segments in batch.

        Args:
            visual_features: Visual features (N, visual_dim)
            audio_features: Audio features (N, audio_dim)

        Returns:
            Array of hype scores (N,)
        """
        if not self.is_available:
            return np.array([
                self._fallback_score(visual_features[i], audio_features[i])
                for i in range(len(visual_features))
            ])

        # Prepare batch input
        features = np.concatenate([visual_features, audio_features], axis=1)
        tensor = torch.tensor(features, dtype=torch.float32).to(self.device)

        # Forward pass
        raw_scores = self.model(tensor)

        # Normalize to 0-1
        scores = torch.sigmoid(raw_scores).squeeze().cpu().numpy()

        return scores

    def _fallback_score(
        self,
        visual_features: np.ndarray,
        audio_features: np.ndarray,
    ) -> float:
        """
        Fallback heuristic scoring when model not available.

        Uses similar logic to training data generation.
        """
        # Visual contribution (mean of first 50 dims if available)
        visual_len = min(50, len(visual_features))
        visual_score = np.mean(visual_features[:visual_len]) * 0.5 + 0.5
        visual_score = np.clip(visual_score, 0, 1)

        # Audio contribution
        if len(audio_features) >= 8:
            audio_score = (
                audio_features[0] * 0.4 +  # RMS energy
                audio_features[5] * 0.3 +  # Spectral flux (if available)
                audio_features[7] * 0.3    # Onset strength (if available)
            ) * 0.5 + 0.5
        else:
            audio_score = np.mean(audio_features) * 0.5 + 0.5
        audio_score = np.clip(audio_score, 0, 1)

        # Combined
        return float(0.5 * visual_score + 0.5 * audio_score)

    def compare_segments(
        self,
        visual_a: np.ndarray,
        audio_a: np.ndarray,
        visual_b: np.ndarray,
        audio_b: np.ndarray,
    ) -> int:
        """
        Compare two segments.

        Returns:
            1 if A is more engaging, -1 if B is more engaging, 0 if equal
        """
        score_a = self.score(visual_a, audio_a)
        score_b = self.score(visual_b, audio_b)

        if score_a > score_b + 0.05:
            return 1
        elif score_b > score_a + 0.05:
            return -1
        return 0


# Singleton instance for easy access
_trained_scorer: Optional[TrainedHypeScorer] = None


def get_trained_scorer(
    weights_path: Optional[str] = None,
    force_reload: bool = False,
) -> TrainedHypeScorer:
    """
    Get singleton trained scorer instance.

    Args:
        weights_path: Optional path to weights file
        force_reload: Force reload even if already loaded

    Returns:
        TrainedHypeScorer instance
    """
    global _trained_scorer

    if _trained_scorer is None or force_reload:
        _trained_scorer = TrainedHypeScorer(weights_path=weights_path)

    return _trained_scorer


__all__ = ["TrainedHypeScorer", "HypeScorerMLP", "get_trained_scorer"]