import logging import re from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import pandas as pd import csv logger = logging.getLogger("bot") COLUMN_ORDER = [ "studio_name", "title", "upc", "price", "sku", "release_date", "category", "product_url", "scraped_at", "error", ] def _clean_title(value: Any, studio: str = "") -> str: text = str(value or "").strip() if not text: return "" text = re.sub(r"https?://\S+", "", text, flags=re.IGNORECASE) text = re.sub(r"www\.[^\s,]+", "", text, flags=re.IGNORECASE) # Remove "- DVD -" and variants (DVDs, DVD's) used as separators text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*-\s*", " - ", text, flags=re.IGNORECASE) text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*$", "", text, flags=re.IGNORECASE) # Remove remaining standalone DVD tokens text = re.sub(r"\bDVD(?:['']s|s)?\b", "", text, flags=re.IGNORECASE) text = re.sub(r"\s{2,}", " ", text).strip(" -,") # Strip everything after the studio name s = studio.strip() if s: m = re.search(re.escape(s), text, flags=re.IGNORECASE) if m: text = text[:m.end()].strip(" -,") return text def _truncate_to_two_segments(title: str) -> str: """Keep only 'Movie Title - Studio', dropping everything after the second segment.""" parts = title.split(" - ") return " - ".join(parts[:2]).strip(" -,") if len(parts) > 2 else title # Reuse the more robust sanitizer from app/services/exporter.py try: from app.services.exporter import _clean_title as _sanitize_title except Exception: _sanitize_title = _clean_title class ExportManager: def __init__(self, output_format: str = "csv", output_file: Optional[str] = None): self.output_format = output_format.lower() self.output_file = output_file def _output_path(self) -> Path: if self.output_file: return Path(self.output_file) ts = datetime.now().strftime("%Y%m%d_%H%M%S") ext = "xlsx" if self.output_format == "excel" else "csv" return Path(f"output_{ts}.{ext}") def save(self, records: List[Dict[str, Any]]) -> str: if not records: logger.warning("No records to save") return "" df = pd.DataFrame(records) if "title" in df.columns: df["title"] = df["title"].apply(_sanitize_title) # Reorder columns cols = [c for c in COLUMN_ORDER if c in df.columns] extra = [c for c in df.columns if c not in COLUMN_ORDER] df = df[cols + extra] # Deduplicate by UPC (keep first, ignore blanks) if "upc" in df.columns: has_upc = df["upc"].str.strip().astype(bool) dupes = df[has_upc].duplicated(subset=["upc"], keep="first") removed = dupes.sum() if removed: # Keep all rows without UPC; drop duplicated UPC rows df = pd.concat([ df[~has_upc], df[has_upc][~df[has_upc].duplicated(subset=["upc"], keep="first")], ]).reset_index(drop=True) logger.info(f"Removed {removed} duplicate UPC(s)") out = self._output_path() if self.output_format == "excel": df.to_excel(str(out), index=False, engine="openpyxl") else: # For CSV exports, produce a sanitized two-column CSV (Title, UPC) try: rows = [] # Normalize titles and UPCs; pass studio so promo copy is stripped if "title" in df.columns: for _, r in df.iterrows(): studio = str(r.get("studio_name") or "").strip() title = _sanitize_title(r.get("title") or r.get("Title") or "", studio) title = _truncate_to_two_segments(title) upc = str(r.get("upc") or r.get("UPC") or "").strip() rows.append({"Title": title, "UPC": upc}) else: # Fallback: try to use Title/UPC columns directly for _, r in df.iterrows(): title = _truncate_to_two_segments(_sanitize_title(r.get("Title") or "")) upc = str(r.get("UPC") or "").strip() rows.append({"Title": title, "UPC": upc}) with open(str(out), "w", newline="", encoding="utf-8-sig") as fh: writer = csv.DictWriter(fh, fieldnames=["Title", "UPC"]) # type: ignore writer.writeheader() writer.writerows(rows) except Exception: # Fallback to original behavior if something goes wrong df.to_csv(str(out), index=False, encoding="utf-8-sig") logger.info(f"Saved {len(df)} record(s) → {out}") return str(out) def save_both(self, records: List[Dict[str, Any]]) -> Dict[str, str]: ts = datetime.now().strftime("%Y%m%d_%H%M%S") paths: Dict[str, str] = {} for fmt, ext in [("csv", "csv"), ("excel", "xlsx")]: mgr = ExportManager(output_format=fmt, output_file=f"output_{ts}.{ext}") path = mgr.save(records) if path: paths[fmt] = path return paths