File size: 4,418 Bytes
1359487
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Data loading utilities for MovieLens 20M dataset.
Handles chunked loading for the large ratings file and provides a unified interface.
"""

import logging
from pathlib import Path
from typing import Optional

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)

# All 20 MovieLens genres
ALL_GENRES = [
    "Action", "Adventure", "Animation", "Children", "Comedy",
    "Crime", "Documentary", "Drama", "Fantasy", "Film-Noir",
    "Horror", "IMAX", "Musical", "Mystery", "Romance",
    "Sci-Fi", "Thriller", "War", "Western", "(no genres listed)",
]


def load_ratings(
    path: Path,
    sample_frac: Optional[float] = None,
    chunksize: int = 500_000,
) -> pd.DataFrame:
    """
    Load ratings.csv with optional sampling for development.
    Uses chunked reading to handle the 20M row file efficiently.
    """
    path = Path(path)
    logger.info(f"Loading ratings from {path} …")

    if sample_frac and sample_frac < 1.0:
        # Fast path: estimate rows, then sample
        chunks = []
        for chunk in pd.read_csv(path, chunksize=chunksize):
            chunks.append(chunk.sample(frac=sample_frac, random_state=42))
        df = pd.concat(chunks, ignore_index=True)
        logger.info(f"Sampled {len(df):,} ratings (frac={sample_frac})")
    else:
        df = pd.read_csv(path)
        logger.info(f"Loaded {len(df):,} ratings")

    # Normalise column types
    df["userId"] = df["userId"].astype(np.int32)
    df["movieId"] = df["movieId"].astype(np.int32)
    df["rating"] = df["rating"].astype(np.float32)

    # Parse timestamp — MovieLens 20M already has readable timestamps
    if df["timestamp"].dtype == object:
        df["timestamp"] = pd.to_datetime(df["timestamp"])
    else:
        df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")

    return df.sort_values("timestamp").reset_index(drop=True)


def load_movies(path: Path) -> pd.DataFrame:
    """
    Load movies.csv and expand genres into a list column.
    Extracts release year from the title string.
    """
    path = Path(path)
    df = pd.read_csv(path)
    df["movieId"] = df["movieId"].astype(np.int32)

    # Extract year from title "(YYYY)"
    df["year"] = (
        df["title"]
        .str.extract(r"\((\d{4})\)\s*$", expand=False)
        .fillna("0")
        .astype(np.int16)
    )

    # Split pipe-delimited genres into list
    df["genre_list"] = df["genres"].apply(
        lambda g: [] if g == "(no genres listed)" else g.split("|")
    )

    # Multi-hot genre encoding (one column per genre)
    for genre in ALL_GENRES:
        safe = genre.replace("-", "_").replace("(", "").replace(")", "").replace(" ", "_")
        df[f"g_{safe}"] = df["genre_list"].apply(lambda lst: int(genre in lst)).astype(np.uint8)

    logger.info(f"Loaded {len(df):,} movies")
    return df


def load_genome(scores_path: Path, tags_path: Path) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Load genome tag scores and tag names.
    Returns (scores_df, tags_df).
    scores_df has columns: movieId, tagId, relevance
    """
    tags = pd.read_csv(Path(tags_path))
    scores = pd.read_csv(Path(scores_path))
    scores["movieId"] = scores["movieId"].astype(np.int32)
    scores["tagId"] = scores["tagId"].astype(np.int32)
    scores["relevance"] = scores["relevance"].astype(np.float32)
    logger.info(
        f"Loaded genome: {len(tags):,} tags, {len(scores):,} tag-movie scores"
    )
    return scores, tags


def load_links(path: Path) -> pd.DataFrame:
    """Load link.csv mapping movieId → imdbId / tmdbId."""
    df = pd.read_csv(Path(path))
    df["movieId"] = df["movieId"].astype(np.int32)
    return df


def load_all(
    data_dir: str | Path,
    sample_frac: Optional[float] = None,
) -> dict:
    """
    Load the full MovieLens 20M dataset from data_dir.
    Returns a dict with keys: ratings, movies, genome_scores, genome_tags, links.
    """
    data_dir = Path(data_dir)
    return {
        "ratings": load_ratings(data_dir / "rating.csv", sample_frac=sample_frac),
        "movies": load_movies(data_dir / "movie.csv"),
        "genome_scores": load_genome(
            data_dir / "genome_scores.csv",
            data_dir / "genome_tags.csv",
        )[0],
        "genome_tags": load_genome(
            data_dir / "genome_scores.csv",
            data_dir / "genome_tags.csv",
        )[1],
        "links": load_links(data_dir / "link.csv"),
    }