File size: 5,335 Bytes
5b29309 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import pandas as pd
import csv
logger = logging.getLogger("bot")
COLUMN_ORDER = [
"studio_name", "title", "upc", "price", "sku",
"release_date", "category", "product_url", "scraped_at", "error",
]
def _clean_title(value: Any, studio: str = "") -> str:
text = str(value or "").strip()
if not text:
return ""
text = re.sub(r"https?://\S+", "", text, flags=re.IGNORECASE)
text = re.sub(r"www\.[^\s,]+", "", text, flags=re.IGNORECASE)
# Remove "- DVD -" and variants (DVDs, DVD's) used as separators
text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*-\s*", " - ", text, flags=re.IGNORECASE)
text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*$", "", text, flags=re.IGNORECASE)
# Remove remaining standalone DVD tokens
text = re.sub(r"\bDVD(?:['']s|s)?\b", "", text, flags=re.IGNORECASE)
text = re.sub(r"\s{2,}", " ", text).strip(" -,")
# Strip everything after the studio name
s = studio.strip()
if s:
m = re.search(re.escape(s), text, flags=re.IGNORECASE)
if m:
text = text[:m.end()].strip(" -,")
return text
def _truncate_to_two_segments(title: str) -> str:
"""Keep only 'Movie Title - Studio', dropping everything after the second segment."""
parts = title.split(" - ")
return " - ".join(parts[:2]).strip(" -,") if len(parts) > 2 else title
# Reuse the more robust sanitizer from app/services/exporter.py
try:
from app.services.exporter import _clean_title as _sanitize_title
except Exception:
_sanitize_title = _clean_title
class ExportManager:
def __init__(self, output_format: str = "csv", output_file: Optional[str] = None):
self.output_format = output_format.lower()
self.output_file = output_file
def _output_path(self) -> Path:
if self.output_file:
return Path(self.output_file)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
ext = "xlsx" if self.output_format == "excel" else "csv"
return Path(f"output_{ts}.{ext}")
def save(self, records: List[Dict[str, Any]]) -> str:
if not records:
logger.warning("No records to save")
return ""
df = pd.DataFrame(records)
if "title" in df.columns:
df["title"] = df["title"].apply(_sanitize_title)
# Reorder columns
cols = [c for c in COLUMN_ORDER if c in df.columns]
extra = [c for c in df.columns if c not in COLUMN_ORDER]
df = df[cols + extra]
# Deduplicate by UPC (keep first, ignore blanks)
if "upc" in df.columns:
has_upc = df["upc"].str.strip().astype(bool)
dupes = df[has_upc].duplicated(subset=["upc"], keep="first")
removed = dupes.sum()
if removed:
# Keep all rows without UPC; drop duplicated UPC rows
df = pd.concat([
df[~has_upc],
df[has_upc][~df[has_upc].duplicated(subset=["upc"], keep="first")],
]).reset_index(drop=True)
logger.info(f"Removed {removed} duplicate UPC(s)")
out = self._output_path()
if self.output_format == "excel":
df.to_excel(str(out), index=False, engine="openpyxl")
else:
# For CSV exports, produce a sanitized two-column CSV (Title, UPC)
try:
rows = []
# Normalize titles and UPCs; pass studio so promo copy is stripped
if "title" in df.columns:
for _, r in df.iterrows():
studio = str(r.get("studio_name") or "").strip()
title = _sanitize_title(r.get("title") or r.get("Title") or "", studio)
title = _truncate_to_two_segments(title)
upc = str(r.get("upc") or r.get("UPC") or "").strip()
rows.append({"Title": title, "UPC": upc})
else:
# Fallback: try to use Title/UPC columns directly
for _, r in df.iterrows():
title = _truncate_to_two_segments(_sanitize_title(r.get("Title") or ""))
upc = str(r.get("UPC") or "").strip()
rows.append({"Title": title, "UPC": upc})
with open(str(out), "w", newline="", encoding="utf-8-sig") as fh:
writer = csv.DictWriter(fh, fieldnames=["Title", "UPC"]) # type: ignore
writer.writeheader()
writer.writerows(rows)
except Exception:
# Fallback to original behavior if something goes wrong
df.to_csv(str(out), index=False, encoding="utf-8-sig")
logger.info(f"Saved {len(df)} record(s) → {out}")
return str(out)
def save_both(self, records: List[Dict[str, Any]]) -> Dict[str, str]:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
paths: Dict[str, str] = {}
for fmt, ext in [("csv", "csv"), ("excel", "xlsx")]:
mgr = ExportManager(output_format=fmt, output_file=f"output_{ts}.{ext}")
path = mgr.save(records)
if path:
paths[fmt] = path
return paths
|