File size: 5,335 Bytes
5b29309
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

import pandas as pd
import csv

logger = logging.getLogger("bot")

COLUMN_ORDER = [
    "studio_name", "title", "upc", "price", "sku",
    "release_date", "category", "product_url", "scraped_at", "error",
]


def _clean_title(value: Any, studio: str = "") -> str:
    text = str(value or "").strip()
    if not text:
        return ""
    text = re.sub(r"https?://\S+", "", text, flags=re.IGNORECASE)
    text = re.sub(r"www\.[^\s,]+", "", text, flags=re.IGNORECASE)
    # Remove "- DVD -" and variants (DVDs, DVD's) used as separators
    text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*-\s*", " - ", text, flags=re.IGNORECASE)
    text = re.sub(r"\s*-\s*DVD(?:s|'s)?\s*$", "", text, flags=re.IGNORECASE)
    # Remove remaining standalone DVD tokens
    text = re.sub(r"\bDVD(?:['']s|s)?\b", "", text, flags=re.IGNORECASE)
    text = re.sub(r"\s{2,}", " ", text).strip(" -,")
    # Strip everything after the studio name
    s = studio.strip()
    if s:
        m = re.search(re.escape(s), text, flags=re.IGNORECASE)
        if m:
            text = text[:m.end()].strip(" -,")
    return text

def _truncate_to_two_segments(title: str) -> str:
    """Keep only 'Movie Title - Studio', dropping everything after the second segment."""
    parts = title.split(" - ")
    return " - ".join(parts[:2]).strip(" -,") if len(parts) > 2 else title


# Reuse the more robust sanitizer from app/services/exporter.py
try:
    from app.services.exporter import _clean_title as _sanitize_title
except Exception:
    _sanitize_title = _clean_title


class ExportManager:
    def __init__(self, output_format: str = "csv", output_file: Optional[str] = None):
        self.output_format = output_format.lower()
        self.output_file = output_file

    def _output_path(self) -> Path:
        if self.output_file:
            return Path(self.output_file)
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        ext = "xlsx" if self.output_format == "excel" else "csv"
        return Path(f"output_{ts}.{ext}")

    def save(self, records: List[Dict[str, Any]]) -> str:
        if not records:
            logger.warning("No records to save")
            return ""

        df = pd.DataFrame(records)

        if "title" in df.columns:
            df["title"] = df["title"].apply(_sanitize_title)

        # Reorder columns
        cols = [c for c in COLUMN_ORDER if c in df.columns]
        extra = [c for c in df.columns if c not in COLUMN_ORDER]
        df = df[cols + extra]

        # Deduplicate by UPC (keep first, ignore blanks)
        if "upc" in df.columns:
            has_upc = df["upc"].str.strip().astype(bool)
            dupes = df[has_upc].duplicated(subset=["upc"], keep="first")
            removed = dupes.sum()
            if removed:
                # Keep all rows without UPC; drop duplicated UPC rows
                df = pd.concat([
                    df[~has_upc],
                    df[has_upc][~df[has_upc].duplicated(subset=["upc"], keep="first")],
                ]).reset_index(drop=True)
                logger.info(f"Removed {removed} duplicate UPC(s)")

        out = self._output_path()
        if self.output_format == "excel":
            df.to_excel(str(out), index=False, engine="openpyxl")
        else:
            # For CSV exports, produce a sanitized two-column CSV (Title, UPC)
            try:
                rows = []
                # Normalize titles and UPCs; pass studio so promo copy is stripped
                if "title" in df.columns:
                    for _, r in df.iterrows():
                        studio = str(r.get("studio_name") or "").strip()
                        title = _sanitize_title(r.get("title") or r.get("Title") or "", studio)
                        title = _truncate_to_two_segments(title)
                        upc = str(r.get("upc") or r.get("UPC") or "").strip()
                        rows.append({"Title": title, "UPC": upc})
                else:
                    # Fallback: try to use Title/UPC columns directly
                    for _, r in df.iterrows():
                        title = _truncate_to_two_segments(_sanitize_title(r.get("Title") or ""))
                        upc = str(r.get("UPC") or "").strip()
                        rows.append({"Title": title, "UPC": upc})

                with open(str(out), "w", newline="", encoding="utf-8-sig") as fh:
                    writer = csv.DictWriter(fh, fieldnames=["Title", "UPC"])  # type: ignore
                    writer.writeheader()
                    writer.writerows(rows)
            except Exception:
                # Fallback to original behavior if something goes wrong
                df.to_csv(str(out), index=False, encoding="utf-8-sig")

        logger.info(f"Saved {len(df)} record(s) → {out}")
        return str(out)

    def save_both(self, records: List[Dict[str, Any]]) -> Dict[str, str]:
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        paths: Dict[str, str] = {}
        for fmt, ext in [("csv", "csv"), ("excel", "xlsx")]:
            mgr = ExportManager(output_format=fmt, output_file=f"output_{ts}.{ext}")
            path = mgr.save(records)
            if path:
                paths[fmt] = path
        return paths