| import logging |
| import re |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| import pandas as pd |
| import csv |
|
|
| logger = logging.getLogger("bot") |
|
|
| COLUMN_ORDER = [ |
| "studio_name", "title", "upc", "price", "sku", |
| "release_date", "category", "product_url", "scraped_at", "error", |
| ] |
|
|
|
|
| def _clean_title(value: Any, studio: str = "") -> str: |
| text = str(value or "").strip() |
| if not text: |
| return "" |
| text = re.sub(r"https?://\S+", "", text, flags=re.IGNORECASE) |
| text = re.sub(r"www\.[^\s,]+", "", text, flags=re.IGNORECASE) |
| |
| text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*-\s*", " - ", text, flags=re.IGNORECASE) |
| text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*$", "", text, flags=re.IGNORECASE) |
| |
| text = re.sub(r"\bDVD(?:['']s|s)?\b", "", text, flags=re.IGNORECASE) |
| text = re.sub(r"\s{2,}", " ", text).strip(" -,") |
| |
| s = studio.strip() |
| if s: |
| m = re.search(re.escape(s), text, flags=re.IGNORECASE) |
| if m: |
| text = text[:m.end()].strip(" -,") |
| return text |
|
|
| def _truncate_to_two_segments(title: str) -> str: |
| """Keep only 'Movie Title - Studio', dropping everything after the second segment.""" |
| parts = title.split(" - ") |
| return " - ".join(parts[:2]).strip(" -,") if len(parts) > 2 else title |
|
|
|
|
| |
| try: |
| from app.services.exporter import _clean_title as _sanitize_title |
| except Exception: |
| _sanitize_title = _clean_title |
|
|
|
|
| class ExportManager: |
| def __init__(self, output_format: str = "csv", output_file: Optional[str] = None): |
| self.output_format = output_format.lower() |
| self.output_file = output_file |
|
|
| def _output_path(self) -> Path: |
| if self.output_file: |
| return Path(self.output_file) |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| ext = "xlsx" if self.output_format == "excel" else "csv" |
| return Path(f"output_{ts}.{ext}") |
|
|
| def save(self, records: List[Dict[str, Any]]) -> str: |
| if not records: |
| logger.warning("No records to save") |
| return "" |
|
|
| df = pd.DataFrame(records) |
|
|
| if "title" in df.columns: |
| df["title"] = df["title"].apply(_sanitize_title) |
|
|
| |
| cols = [c for c in COLUMN_ORDER if c in df.columns] |
| extra = [c for c in df.columns if c not in COLUMN_ORDER] |
| df = df[cols + extra] |
|
|
| |
| if "upc" in df.columns: |
| has_upc = df["upc"].str.strip().astype(bool) |
| dupes = df[has_upc].duplicated(subset=["upc"], keep="first") |
| removed = dupes.sum() |
| if removed: |
| |
| df = pd.concat([ |
| df[~has_upc], |
| df[has_upc][~df[has_upc].duplicated(subset=["upc"], keep="first")], |
| ]).reset_index(drop=True) |
| logger.info(f"Removed {removed} duplicate UPC(s)") |
|
|
| out = self._output_path() |
| if self.output_format == "excel": |
| df.to_excel(str(out), index=False, engine="openpyxl") |
| else: |
| |
| try: |
| rows = [] |
| |
| if "title" in df.columns: |
| for _, r in df.iterrows(): |
| studio = str(r.get("studio_name") or "").strip() |
| title = _sanitize_title(r.get("title") or r.get("Title") or "", studio) |
| title = _truncate_to_two_segments(title) |
| upc = str(r.get("upc") or r.get("UPC") or "").strip() |
| rows.append({"Title": title, "UPC": upc}) |
| else: |
| |
| for _, r in df.iterrows(): |
| title = _truncate_to_two_segments(_sanitize_title(r.get("Title") or "")) |
| upc = str(r.get("UPC") or "").strip() |
| rows.append({"Title": title, "UPC": upc}) |
|
|
| with open(str(out), "w", newline="", encoding="utf-8-sig") as fh: |
| writer = csv.DictWriter(fh, fieldnames=["Title", "UPC"]) |
| writer.writeheader() |
| writer.writerows(rows) |
| except Exception: |
| |
| df.to_csv(str(out), index=False, encoding="utf-8-sig") |
|
|
| logger.info(f"Saved {len(df)} record(s) → {out}") |
| return str(out) |
|
|
| def save_both(self, records: List[Dict[str, Any]]) -> Dict[str, str]: |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
| paths: Dict[str, str] = {} |
| for fmt, ext in [("csv", "csv"), ("excel", "xlsx")]: |
| mgr = ExportManager(output_format=fmt, output_file=f"output_{ts}.{ext}") |
| path = mgr.save(records) |
| if path: |
| paths[fmt] = path |
| return paths |
|
|