File size: 4,216 Bytes
019d08d
2c7b3a2
019d08d
 
2c7b3a2
019d08d
 
 
 
 
2c7b3a2
019d08d
d939d66
9058528
4332540
03da54f
 
d939d66
03da54f
9058528
978e57b
 
 
 
 
 
 
 
 
9058528
03da54f
2c7b3a2
 
 
019d08d
03da54f
2c7b3a2
 
 
 
 
03da54f
2c7b3a2
 
 
978e57b
 
2c7b3a2
 
03da54f
2c7b3a2
 
 
978e57b
 
2c7b3a2
 
03da54f
2c7b3a2
 
 
978e57b
 
03da54f
 
 
2c7b3a2
 
 
 
978e57b
 
03da54f
 
 
2c7b3a2
 
 
 
 
 
978e57b
 
 
 
2c7b3a2
 
03da54f
2c7b3a2
 
 
 
978e57b
 
 
 
 
 
 
 
 
 
 
2c7b3a2
 
03da54f
2c7b3a2
 
 
978e57b
 
 
2c7b3a2
978e57b
 
 
2c7b3a2
03da54f
4332540
 
03da54f
4332540
 
 
 
 
 
9058528
03da54f
019d08d
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import logging
import pandas as pd
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="E-Commerce Product Intelligence Platform")

LOCAL_CSV_PATH = Path("data/ecommerce_products.csv")


def load_data():
    """Load CSV từ local."""
    if not LOCAL_CSV_PATH.exists():
        raise FileNotFoundError(f"CSV not found: {LOCAL_CSV_PATH}")

    file_size = LOCAL_CSV_PATH.stat().st_size
    logger.info(f"Loading CSV from: {LOCAL_CSV_PATH} (size: {file_size} bytes)")

    if file_size == 0:
        raise ValueError(f"CSV file is empty: {LOCAL_CSV_PATH}")

    df = pd.read_csv(LOCAL_CSV_PATH)
    logger.info(f"Loaded {len(df)} rows, columns: {list(df.columns)}")
    return df


@app.get("/")
def root():
    return {"status": "E-Commerce Product Intelligence API is running"}


@app.get("/data")
def get_data():
    df = load_data()
    return df.head(200).to_dict("records")


@app.get("/stats/categories")
def stats_categories():
    df = load_data()
    if "category" not in df.columns:
        raise ValueError("Missing 'category' column")
    return df["category"].value_counts().head(10).to_dict()


@app.get("/stats/brands")
def stats_brands():
    df = load_data()
    if "brand" not in df.columns:
        raise ValueError("Missing 'brand' column")
    return df["brand"].value_counts().head(10).to_dict()


@app.get("/stats/price")
def stats_price():
    df = load_data()
    if "category" not in df.columns or "price" not in df.columns:
        raise ValueError("Missing 'category' or 'price' column")
    return df.groupby("category")["price"].agg(["mean", "median", "min", "max", "count"]).reset_index().to_dict(
        "records")


@app.get("/stats/rating")
def stats_rating():
    df = load_data()
    if "category" not in df.columns or "rating" not in df.columns:
        raise ValueError("Missing 'category' or 'rating' column")
    return df.groupby("category")["rating"].agg(["mean", "median", "min", "max", "count"]).reset_index().to_dict(
        "records")


@app.get("/insights")
def insights():
    df = load_data()
    return JSONResponse(content={
        "total_products": len(df),
        "categories": df["category"].nunique() if "category" in df.columns else 0,
        "brands": df["brand"].nunique() if "brand" in df.columns else 0,
        "avg_price": df["price"].mean() if "price" in df.columns else 0,
        "avg_rating": df["rating"].mean() if "rating" in df.columns else 0,
    })


@app.get("/search")
def search(query: str):
    df = load_data()
    q = query.lower()

    # Find text columns
    text_cols = df.select_dtypes(include=["object"]).columns.tolist()

    mask = pd.Series([False] * len(df), index=df.index)
    for col in text_cols[:5]:  # Check first 5 text columns
        try:
            mask |= df[col].str.contains(q, case=False, na=False)
        except:
            pass

    return df[mask].head(100).to_dict("records")


@app.get("/recommend")
def recommend(category: str):
    df = load_data()
    if "category" not in df.columns:
        raise ValueError("Missing 'category' column")

    subset = df[df["category"] == category]
    if "rating" in df.columns:
        return subset.sort_values("rating", ascending=False).head(10).to_dict("records")
    return subset.head(10).to_dict("records")


@app.post("/run-scraper")
def trigger_scraper():
    """Trigger download Kaggle → save CSV."""
    import subprocess
    result = subprocess.run(["python", "backend/scraper.py"], capture_output=True, text=True)
    if result.returncode == 0:
        return {"status": "Scraper completed successfully", "output": result.stdout}
    else:
        return {"status": "Scraper failed", "error": result.stderr}


frontend_dir = Path("frontend")
if frontend_dir.exists():
    app.mount("/", StaticFiles(directory=str(frontend), html=True), name="frontend")
else:
    @app.get("/")
    def frontend_placeholder():
        return HTMLResponse(
            content="<h1>E-Commerce Product Intelligence Dashboard</h1><p>Frontend placeholder.</p>"
        )