Spaces:
Sleeping
Sleeping
| import logging | |
| import re | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import pandas as pd | |
| import csv | |
| logger = logging.getLogger("bot") | |
| COLUMN_ORDER = [ | |
| "studio_name", "title", "upc", "price", "sku", | |
| "release_date", "category", "product_url", "scraped_at", "error", | |
| ] | |
| def _clean_title(value: Any, studio: str = "") -> str: | |
| text = str(value or "").strip() | |
| if not text: | |
| return "" | |
| text = re.sub(r"https?://\S+", "", text, flags=re.IGNORECASE) | |
| text = re.sub(r"www\.[^\s,]+", "", text, flags=re.IGNORECASE) | |
| # Remove "- DVD -" and variants (DVDs, DVD's) used as separators | |
| text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*-\s*", " - ", text, flags=re.IGNORECASE) | |
| text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*$", "", text, flags=re.IGNORECASE) | |
| # Remove remaining standalone DVD tokens | |
| text = re.sub(r"\bDVD(?:['']s|s)?\b", "", text, flags=re.IGNORECASE) | |
| text = re.sub(r"\s{2,}", " ", text).strip(" -,") | |
| # Strip everything after the studio name | |
| s = studio.strip() | |
| if s: | |
| m = re.search(re.escape(s), text, flags=re.IGNORECASE) | |
| if m: | |
| text = text[:m.end()].strip(" -,") | |
| return text | |
| def _truncate_to_two_segments(title: str) -> str: | |
| """Keep only 'Movie Title - Studio', dropping everything after the second segment.""" | |
| parts = title.split(" - ") | |
| return " - ".join(parts[:2]).strip(" -,") if len(parts) > 2 else title | |
| # Reuse the more robust sanitizer from app/services/exporter.py | |
| try: | |
| from app.services.exporter import _clean_title as _sanitize_title | |
| except Exception: | |
| _sanitize_title = _clean_title | |
| class ExportManager: | |
| def __init__(self, output_format: str = "csv", output_file: Optional[str] = None): | |
| self.output_format = output_format.lower() | |
| self.output_file = output_file | |
| def _output_path(self) -> Path: | |
| if self.output_file: | |
| return Path(self.output_file) | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| ext = "xlsx" if self.output_format == "excel" else "csv" | |
| return Path(f"output_{ts}.{ext}") | |
| def save(self, records: List[Dict[str, Any]]) -> str: | |
| if not records: | |
| logger.warning("No records to save") | |
| return "" | |
| df = pd.DataFrame(records) | |
| if "title" in df.columns: | |
| df["title"] = df["title"].apply(_sanitize_title) | |
| # Reorder columns | |
| cols = [c for c in COLUMN_ORDER if c in df.columns] | |
| extra = [c for c in df.columns if c not in COLUMN_ORDER] | |
| df = df[cols + extra] | |
| # Deduplicate by UPC (keep first, ignore blanks) | |
| if "upc" in df.columns: | |
| has_upc = df["upc"].str.strip().astype(bool) | |
| dupes = df[has_upc].duplicated(subset=["upc"], keep="first") | |
| removed = dupes.sum() | |
| if removed: | |
| # Keep all rows without UPC; drop duplicated UPC rows | |
| df = pd.concat([ | |
| df[~has_upc], | |
| df[has_upc][~df[has_upc].duplicated(subset=["upc"], keep="first")], | |
| ]).reset_index(drop=True) | |
| logger.info(f"Removed {removed} duplicate UPC(s)") | |
| out = self._output_path() | |
| if self.output_format == "excel": | |
| df.to_excel(str(out), index=False, engine="openpyxl") | |
| else: | |
| # For CSV exports, produce a sanitized two-column CSV (Title, UPC) | |
| try: | |
| rows = [] | |
| # Normalize titles and UPCs; pass studio so promo copy is stripped | |
| if "title" in df.columns: | |
| for _, r in df.iterrows(): | |
| studio = str(r.get("studio_name") or "").strip() | |
| title = _sanitize_title(r.get("title") or r.get("Title") or "", studio) | |
| title = _truncate_to_two_segments(title) | |
| upc = str(r.get("upc") or r.get("UPC") or "").strip() | |
| rows.append({"Title": title, "UPC": upc}) | |
| else: | |
| # Fallback: try to use Title/UPC columns directly | |
| for _, r in df.iterrows(): | |
| title = _truncate_to_two_segments(_sanitize_title(r.get("Title") or "")) | |
| upc = str(r.get("UPC") or "").strip() | |
| rows.append({"Title": title, "UPC": upc}) | |
| with open(str(out), "w", newline="", encoding="utf-8-sig") as fh: | |
| writer = csv.DictWriter(fh, fieldnames=["Title", "UPC"]) # type: ignore | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| except Exception: | |
| # Fallback to original behavior if something goes wrong | |
| df.to_csv(str(out), index=False, encoding="utf-8-sig") | |
| logger.info(f"Saved {len(df)} record(s) → {out}") | |
| return str(out) | |
| def save_both(self, records: List[Dict[str, Any]]) -> Dict[str, str]: | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| paths: Dict[str, str] = {} | |
| for fmt, ext in [("csv", "csv"), ("excel", "xlsx")]: | |
| mgr = ExportManager(output_format=fmt, output_file=f"output_{ts}.{ext}") | |
| path = mgr.save(records) | |
| if path: | |
| paths[fmt] = path | |
| return paths | |