File size: 3,029 Bytes
73b0303
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""Load and summarize IMDb reviews from local files."""

from __future__ import annotations

import random
from collections import Counter
from pathlib import Path
from statistics import mean, median
from typing import Literal

from feedback_intelligence.types import ReviewRecord

Split = Literal["train", "test"]


def load_local_imdb_reviews(
    base_path: Path,
    split: Split,
    sample_size: int | None = None,
    seed: int = 42,
) -> list[ReviewRecord]:
    """Load deterministic review samples from a local IMDb directory."""
    categories = ("pos", "neg")
    label_map = {"pos": "positive", "neg": "negative"}

    records_by_label: dict[str, list[ReviewRecord]] = {}
    rng = random.Random(seed)

    for category in categories:
        category_path = base_path / split / category
        if not category_path.exists():
            raise FileNotFoundError(f"IMDb directory not found: {category_path}")

        files = sorted(category_path.glob("*.txt"))
        if not files:
            raise FileNotFoundError(f"No review files found in: {category_path}")

        if sample_size is None:
            selected = files
        else:
            per_label = sample_size // 2
            if per_label <= 0:
                raise ValueError("sample_size must be at least 2 for balanced sampling.")
            if per_label > len(files):
                raise ValueError(
                    f"Requested {per_label} files from {category_path}, "
                    f"but only {len(files)} are available."
                )
            selected = rng.sample(files, per_label)

        records_by_label[category] = [
            _record_from_path(path=file_path, split=split, label=label_map[category])
            for file_path in sorted(selected)
        ]

    records = records_by_label["pos"] + records_by_label["neg"]
    rng.shuffle(records)
    return records


def summarize_reviews(records: list[ReviewRecord]) -> dict[str, object]:
    """Compute a compact dataset summary for CLI inspection."""
    if not records:
        return {
            "rows": 0,
            "label_distribution": {},
            "word_count": {"min": 0, "median": 0, "mean": 0, "max": 0},
        }

    word_counts = [record.word_count for record in records]
    labels = Counter(record.label for record in records)
    return {
        "rows": len(records),
        "label_distribution": dict(labels),
        "word_count": {
            "min": min(word_counts),
            "median": int(median(word_counts)),
            "mean": round(mean(word_counts), 2),
            "max": max(word_counts),
        },
    }


def _record_from_path(path: Path, split: str, label: str) -> ReviewRecord:
    stem_parts = path.stem.split("_")
    review_id = stem_parts[0]
    score = int(stem_parts[1]) if len(stem_parts) == 2 else None
    return ReviewRecord(
        review_id=review_id,
        text=path.read_text(encoding="utf-8"),
        label=label,
        split=split,
        source="local-imdb",
        score=score,
    )