"""Load and summarize IMDb reviews from local files.""" from __future__ import annotations import random from collections import Counter from pathlib import Path from statistics import mean, median from typing import Literal from feedback_intelligence.types import ReviewRecord Split = Literal["train", "test"] def load_local_imdb_reviews( base_path: Path, split: Split, sample_size: int | None = None, seed: int = 42, ) -> list[ReviewRecord]: """Load deterministic review samples from a local IMDb directory.""" categories = ("pos", "neg") label_map = {"pos": "positive", "neg": "negative"} records_by_label: dict[str, list[ReviewRecord]] = {} rng = random.Random(seed) for category in categories: category_path = base_path / split / category if not category_path.exists(): raise FileNotFoundError(f"IMDb directory not found: {category_path}") files = sorted(category_path.glob("*.txt")) if not files: raise FileNotFoundError(f"No review files found in: {category_path}") if sample_size is None: selected = files else: per_label = sample_size // 2 if per_label <= 0: raise ValueError("sample_size must be at least 2 for balanced sampling.") if per_label > len(files): raise ValueError( f"Requested {per_label} files from {category_path}, " f"but only {len(files)} are available." ) selected = rng.sample(files, per_label) records_by_label[category] = [ _record_from_path(path=file_path, split=split, label=label_map[category]) for file_path in sorted(selected) ] records = records_by_label["pos"] + records_by_label["neg"] rng.shuffle(records) return records def summarize_reviews(records: list[ReviewRecord]) -> dict[str, object]: """Compute a compact dataset summary for CLI inspection.""" if not records: return { "rows": 0, "label_distribution": {}, "word_count": {"min": 0, "median": 0, "mean": 0, "max": 0}, } word_counts = [record.word_count for record in records] labels = Counter(record.label for record in records) return { "rows": len(records), "label_distribution": dict(labels), "word_count": { "min": min(word_counts), "median": int(median(word_counts)), "mean": round(mean(word_counts), 2), "max": max(word_counts), }, } def _record_from_path(path: Path, split: str, label: str) -> ReviewRecord: stem_parts = path.stem.split("_") review_id = stem_parts[0] score = int(stem_parts[1]) if len(stem_parts) == 2 else None return ReviewRecord( review_id=review_id, text=path.read_text(encoding="utf-8"), label=label, split=split, source="local-imdb", score=score, )