import ast
import logging
import os
import sys
# Add the project root to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")))
import pandas as pd
from src.book_recommender.core.exceptions import DataNotFoundError, FileProcessingError
from src.book_recommender.utils import ensure_dir_exists
logger = logging.getLogger(__name__)
def process_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""
Cleans, deduplicates, and engineers features on a raw book DataFrame.
This function performs the following main operations:
1. Handles missing columns by initializing them as empty strings.
2. Fills any NaN values in text columns with empty strings.
3. Converts all text columns to string type for safety.
4. Creates lowercase versions of 'title' and 'authors' for matching,
while preserving the original casing for display.
5. Deduplicates the DataFrame based on the lowercased title.
6. Removes rows with empty titles.
7. Engineers the 'combined_text' feature for embeddings, applying a
weighting strategy to give more importance to the title and author.
Args:
df (pd.DataFrame): The raw book data, typically from a CSV.
Returns:
pd.DataFrame: The processed DataFrame ready for embedding.
Raises:
ValueError: If the DataFrame is empty after processing.
"""
logger.info("Starting data cleaning and preparation...")
logger.info(f"Found columns in CSV: {df.columns.tolist()}")
expected_cols = ["title", "authors", "genres", "description", "tags"]
for col in expected_cols:
if col not in df.columns:
df[col] = ""
logger.warning(f"Column '{col}' not found in CSV. Initializing as empty.")
df[expected_cols] = df[expected_cols].fillna("")
for col in expected_cols:
df[col] = df[col].astype(str)
if "book_id" in df.columns:
df["id"] = df["book_id"].astype(str)
logger.info("Using 'book_id' as the unique identifier.")
else:
df["id"] = df.index.astype(str)
logger.warning("No 'book_id' column found. Generated 'id' from DataFrame index.")
for col in ["genres", "tags"]:
if col in df.columns:
df[col] = df[col].apply(
lambda x: ", ".join(ast.literal_eval(x)) if (x.startswith("[") and x.endswith("]")) else x
)
df["title_lower"] = df["title"].str.strip().str.lower()
df["authors_lower"] = df["authors"].str.strip().str.lower()
# Clean description text (remove HTML tags and artifacts)
# This regex removes
, , tags and their closing counterparts
import re
def clean_html_text(text):
if not isinstance(text, str):
return text
text = re.sub(r'
', '\n', text, flags=re.IGNORECASE) # Replace
with newline
text = re.sub(r'<[^>]+>', '', text) # Remove other HTML tags
return text.strip()
df["description"] = df["description"].apply(clean_html_text)
# Lowercase genres/tags for normalization, but keep description/title case for display
for col in ["genres", "tags"]:
df[col] = df[col].str.strip().str.lower()
original_rows = len(df)
df.drop_duplicates(subset=["title_lower"], keep="first", inplace=True)
new_rows = len(df)
if new_rows < original_rows:
logger.info(f"Removed {original_rows - new_rows} duplicate books based on title.")
original_rows = len(df)
df.dropna(subset=["title_lower"], inplace=True)
df = df[df["title_lower"] != ""].copy()
if len(df) < original_rows:
logger.warning(f"Dropped {original_rows - len(df)} rows with missing titles.")
if df.empty:
logger.error("DataFrame is empty after cleaning. No valid book data to process.")
raise ValueError(
"No valid books found after processing. The dataset might be empty or contain only invalid entries."
)
logger.info("Creating 'combined_text' for embeddings with weighted fields...")
# Use lowercased description ONLY for embedding generation, not for storage
df["combined_text"] = (
(df["title_lower"] + " ") * 3
+ "by "
+ df["authors_lower"]
+ ". "
+ "genres: "
+ df["genres"]
+ ". "
+ "description: "
+ df["description"].str.lower()
+ ". "
+ "tags: "
+ df["tags"]
)
return df
def clean_and_prepare_data(raw_path: str, processed_path: str) -> pd.DataFrame:
"""
Orchestrator function that loads raw data, processes it, and saves the result.
This function chains the data processing steps:
1. Loads the raw CSV data from `raw_path`.
2. Calls `process_dataframe` to perform all cleaning and feature engineering.
3. Saves the cleaned DataFrame to a Parquet file at `processed_path`.
Args:
raw_path (str): The file path for the raw CSV data.
processed_path (str): The file path to save the processed Parquet file.
Returns:
pd.DataFrame: The fully processed DataFrame.
Raises:
DataNotFoundError: If the file at `raw_path` is not found.
FileProcessingError: If the CSV file cannot be parsed.
"""
if not os.path.exists(raw_path):
logger.error(f"Raw data file not found at: {raw_path}")
raise DataNotFoundError(f"Raw data file not found at: {raw_path}")
try:
logger.info(f"Loading raw data from {raw_path}...")
raw_df = pd.read_csv(raw_path)
logger.info(f"Loaded {len(raw_df)} rows.")
except (pd.errors.ParserError, UnicodeDecodeError) as e:
logger.error(f"Failed to parse CSV from {raw_path}: {e}")
raise FileProcessingError(f"Failed to parse CSV from {raw_path}: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred while loading CSV from {raw_path}: {e}")
raise
processed_df = process_dataframe(raw_df)
try:
ensure_dir_exists(processed_path)
logger.info(f"Saving processed data to {processed_path}...")
processed_df.to_parquet(processed_path, index=False)
logger.info(f"Successfully saved {len(processed_df)} processed rows.")
except Exception as e:
logger.error(f"Failed to save processed data to {processed_path}: {e}")
raise
return processed_df
if __name__ == "__main__":
import argparse
import book_recommender.core.config as config
if os.getenv("LOG_LEVEL"):
logging.basicConfig(level=os.getenv("LOG_LEVEL"))
else:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
parser = argparse.ArgumentParser(description="Clean and prepare book data.")
parser.add_argument("--raw-path", type=str, default=config.RAW_DATA_PATH, help="Path to the raw CSV data file.")
parser.add_argument(
"--processed-path",
type=str,
default=config.PROCESSED_DATA_PATH,
help="Path to save the processed Parquet file.",
)
args = parser.parse_args()
logger.info("--- Starting Data Processing Standalone Script ---")
clean_and_prepare_data(raw_path=args.raw_path, processed_path=args.processed_path)
logger.info("--- Data Processing Finished ---")