File size: 3,029 Bytes
73b0303 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | """Load and summarize IMDb reviews from local files."""
from __future__ import annotations
import random
from collections import Counter
from pathlib import Path
from statistics import mean, median
from typing import Literal
from feedback_intelligence.types import ReviewRecord
Split = Literal["train", "test"]
def load_local_imdb_reviews(
base_path: Path,
split: Split,
sample_size: int | None = None,
seed: int = 42,
) -> list[ReviewRecord]:
"""Load deterministic review samples from a local IMDb directory."""
categories = ("pos", "neg")
label_map = {"pos": "positive", "neg": "negative"}
records_by_label: dict[str, list[ReviewRecord]] = {}
rng = random.Random(seed)
for category in categories:
category_path = base_path / split / category
if not category_path.exists():
raise FileNotFoundError(f"IMDb directory not found: {category_path}")
files = sorted(category_path.glob("*.txt"))
if not files:
raise FileNotFoundError(f"No review files found in: {category_path}")
if sample_size is None:
selected = files
else:
per_label = sample_size // 2
if per_label <= 0:
raise ValueError("sample_size must be at least 2 for balanced sampling.")
if per_label > len(files):
raise ValueError(
f"Requested {per_label} files from {category_path}, "
f"but only {len(files)} are available."
)
selected = rng.sample(files, per_label)
records_by_label[category] = [
_record_from_path(path=file_path, split=split, label=label_map[category])
for file_path in sorted(selected)
]
records = records_by_label["pos"] + records_by_label["neg"]
rng.shuffle(records)
return records
def summarize_reviews(records: list[ReviewRecord]) -> dict[str, object]:
"""Compute a compact dataset summary for CLI inspection."""
if not records:
return {
"rows": 0,
"label_distribution": {},
"word_count": {"min": 0, "median": 0, "mean": 0, "max": 0},
}
word_counts = [record.word_count for record in records]
labels = Counter(record.label for record in records)
return {
"rows": len(records),
"label_distribution": dict(labels),
"word_count": {
"min": min(word_counts),
"median": int(median(word_counts)),
"mean": round(mean(word_counts), 2),
"max": max(word_counts),
},
}
def _record_from_path(path: Path, split: str, label: str) -> ReviewRecord:
stem_parts = path.stem.split("_")
review_id = stem_parts[0]
score = int(stem_parts[1]) if len(stem_parts) == 2 else None
return ReviewRecord(
review_id=review_id,
text=path.read_text(encoding="utf-8"),
label=label,
split=split,
source="local-imdb",
score=score,
)
|