IntimateUser6969
Deploy CineMatch backend: Two-Tower + DeepFM + MMR + Upstash Redis
1359487
Raw
History Blame Contribute Delete
4.42 kB
"""
Data loading utilities for MovieLens 20M dataset.
Handles chunked loading for the large ratings file and provides a unified interface.
"""
import logging
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
# All 20 MovieLens genres
ALL_GENRES = [
"Action", "Adventure", "Animation", "Children", "Comedy",
"Crime", "Documentary", "Drama", "Fantasy", "Film-Noir",
"Horror", "IMAX", "Musical", "Mystery", "Romance",
"Sci-Fi", "Thriller", "War", "Western", "(no genres listed)",
]
def load_ratings(
path: Path,
sample_frac: Optional[float] = None,
chunksize: int = 500_000,
) -> pd.DataFrame:
"""
Load ratings.csv with optional sampling for development.
Uses chunked reading to handle the 20M row file efficiently.
"""
path = Path(path)
logger.info(f"Loading ratings from {path} …")
if sample_frac and sample_frac < 1.0:
# Fast path: estimate rows, then sample
chunks = []
for chunk in pd.read_csv(path, chunksize=chunksize):
chunks.append(chunk.sample(frac=sample_frac, random_state=42))
df = pd.concat(chunks, ignore_index=True)
logger.info(f"Sampled {len(df):,} ratings (frac={sample_frac})")
else:
df = pd.read_csv(path)
logger.info(f"Loaded {len(df):,} ratings")
# Normalise column types
df["userId"] = df["userId"].astype(np.int32)
df["movieId"] = df["movieId"].astype(np.int32)
df["rating"] = df["rating"].astype(np.float32)
# Parse timestamp — MovieLens 20M already has readable timestamps
if df["timestamp"].dtype == object:
df["timestamp"] = pd.to_datetime(df["timestamp"])
else:
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
return df.sort_values("timestamp").reset_index(drop=True)
def load_movies(path: Path) -> pd.DataFrame:
"""
Load movies.csv and expand genres into a list column.
Extracts release year from the title string.
"""
path = Path(path)
df = pd.read_csv(path)
df["movieId"] = df["movieId"].astype(np.int32)
# Extract year from title "(YYYY)"
df["year"] = (
df["title"]
.str.extract(r"\((\d{4})\)\s*$", expand=False)
.fillna("0")
.astype(np.int16)
)
# Split pipe-delimited genres into list
df["genre_list"] = df["genres"].apply(
lambda g: [] if g == "(no genres listed)" else g.split("|")
)
# Multi-hot genre encoding (one column per genre)
for genre in ALL_GENRES:
safe = genre.replace("-", "_").replace("(", "").replace(")", "").replace(" ", "_")
df[f"g_{safe}"] = df["genre_list"].apply(lambda lst: int(genre in lst)).astype(np.uint8)
logger.info(f"Loaded {len(df):,} movies")
return df
def load_genome(scores_path: Path, tags_path: Path) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Load genome tag scores and tag names.
Returns (scores_df, tags_df).
scores_df has columns: movieId, tagId, relevance
"""
tags = pd.read_csv(Path(tags_path))
scores = pd.read_csv(Path(scores_path))
scores["movieId"] = scores["movieId"].astype(np.int32)
scores["tagId"] = scores["tagId"].astype(np.int32)
scores["relevance"] = scores["relevance"].astype(np.float32)
logger.info(
f"Loaded genome: {len(tags):,} tags, {len(scores):,} tag-movie scores"
)
return scores, tags
def load_links(path: Path) -> pd.DataFrame:
"""Load link.csv mapping movieId → imdbId / tmdbId."""
df = pd.read_csv(Path(path))
df["movieId"] = df["movieId"].astype(np.int32)
return df
def load_all(
data_dir: str | Path,
sample_frac: Optional[float] = None,
) -> dict:
"""
Load the full MovieLens 20M dataset from data_dir.
Returns a dict with keys: ratings, movies, genome_scores, genome_tags, links.
"""
data_dir = Path(data_dir)
return {
"ratings": load_ratings(data_dir / "rating.csv", sample_frac=sample_frac),
"movies": load_movies(data_dir / "movie.csv"),
"genome_scores": load_genome(
data_dir / "genome_scores.csv",
data_dir / "genome_tags.csv",
)[0],
"genome_tags": load_genome(
data_dir / "genome_scores.csv",
data_dir / "genome_tags.csv",
)[1],
"links": load_links(data_dir / "link.csv"),
}