File size: 6,066 Bytes
6242ddb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""File processing service: handles CSV, JSON, Excel, ZIP with chunked uploads."""

from __future__ import annotations

import io
import json
import uuid
import zipfile
from pathlib import Path
from typing import List

import pandas as pd

from app.core.config import settings
from app.core.logging import get_logger
from app.models.schemas import FeedbackEntry

logger = get_logger(__name__)

SUPPORTED_EXTENSIONS = {".csv", ".json", ".xlsx", ".xls", ".zip"}
TEXT_COLUMN_CANDIDATES = [
    "text", "content", "message", "body", "feedback", "review",
    "comment", "description", "note", "summary", "title",
    "Text", "Content", "Message", "Body", "Feedback", "Review",
]
TIMESTAMP_COLUMN_CANDIDATES = [
    "timestamp", "date", "created_at", "created", "time", "datetime",
    "Timestamp", "Date", "Created", "CreatedAt",
]
SOURCE_COLUMN_CANDIDATES = [
    "source", "channel", "platform", "origin", "category", "type",
    "Source", "Channel", "Platform",
]


def _find_column(df: pd.DataFrame, candidates: list[str]) -> str | None:
    for col in candidates:
        if col in df.columns:
            return col
    for col in df.columns:
        for candidate in candidates:
            if candidate.lower() in col.lower():
                return col
    return None


def _df_to_entries(df: pd.DataFrame, source: str | None = None) -> list[FeedbackEntry]:
    text_col = _find_column(df, TEXT_COLUMN_CANDIDATES)
    if not text_col:
        if len(df.columns) == 1:
            text_col = df.columns[0]
        else:
            raise ValueError(
                f"No text column found. Expected one of: {TEXT_COLUMN_CANDIDATES}. "
                f"Found columns: {list(df.columns)}"
            )

    ts_col = _find_column(df, TIMESTAMP_COLUMN_CANDIDATES)
    src_col = _find_column(df, SOURCE_COLUMN_CANDIDATES)

    entries = []
    other_cols = [c for c in df.columns if c not in {text_col, ts_col, src_col}]

    for _, row in df.iterrows():
        text = str(row[text_col]).strip()
        if not text or text == "nan":
            continue

        ts = None
        if ts_col and pd.notna(row.get(ts_col)):
            try:
                ts = pd.to_datetime(row[ts_col])
            except Exception:
                pass

        src = source
        if src_col and pd.notna(row.get(src_col)):
            src = str(row[src_col])

        metadata = {}
        for col in other_cols:
            val = row.get(col)
            if pd.notna(val):
                metadata[col] = str(val) if not isinstance(val, (int, float, bool)) else val

        entries.append(
            FeedbackEntry(
                id=uuid.uuid4().hex[:12],
                text=text,
                source=src,
                timestamp=ts,
                metadata=metadata if metadata else None,
            )
        )

    return entries


def parse_csv(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
    for encoding in ("utf-8", "latin-1", "cp1252"):
        try:
            df = pd.read_csv(io.BytesIO(content), encoding=encoding)
            return _df_to_entries(df, source)
        except UnicodeDecodeError:
            continue
    raise ValueError("Unable to decode CSV file with supported encodings")


def parse_json(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
    data = json.loads(content.decode("utf-8"))

    if isinstance(data, list):
        if all(isinstance(item, str) for item in data):
            return [
                FeedbackEntry(id=uuid.uuid4().hex[:12], text=item, source=source)
                for item in data
                if item.strip()
            ]
        df = pd.DataFrame(data)
        return _df_to_entries(df, source)
    elif isinstance(data, dict):
        if "data" in data:
            df = pd.DataFrame(data["data"])
        elif "entries" in data:
            df = pd.DataFrame(data["entries"])
        elif "results" in data:
            df = pd.DataFrame(data["results"])
        else:
            df = pd.DataFrame([data])
        return _df_to_entries(df, source)

    raise ValueError("Unsupported JSON structure")


def parse_excel(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
    df = pd.read_excel(io.BytesIO(content), engine="openpyxl")
    return _df_to_entries(df, source)


def parse_zip(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
    all_entries = []
    with zipfile.ZipFile(io.BytesIO(content)) as zf:
        for name in zf.namelist():
            if name.startswith("__MACOSX") or name.startswith("."):
                continue
            ext = Path(name).suffix.lower()
            inner = zf.read(name)
            file_source = source or Path(name).stem
            try:
                if ext == ".csv":
                    all_entries.extend(parse_csv(inner, file_source))
                elif ext == ".json":
                    all_entries.extend(parse_json(inner, file_source))
                elif ext in (".xlsx", ".xls"):
                    all_entries.extend(parse_excel(inner, file_source))
                else:
                    logger.warning("skipping_unsupported_file_in_zip", filename=name)
            except Exception as exc:
                logger.error("error_processing_zip_entry", filename=name, error=str(exc))
    return all_entries


def parse_file(content: bytes, filename: str, source: str | None = None) -> list[FeedbackEntry]:
    ext = Path(filename).suffix.lower()
    if ext not in SUPPORTED_EXTENSIONS:
        raise ValueError(f"Unsupported file format: {ext}. Supported: {SUPPORTED_EXTENSIONS}")

    parsers = {
        ".csv": parse_csv,
        ".json": parse_json,
        ".xlsx": parse_excel,
        ".xls": parse_excel,
        ".zip": parse_zip,
    }

    return parsers[ext](content, source)


async def save_upload(content: bytes, filename: str) -> Path:
    upload_dir = settings.upload_path
    safe_name = f"{uuid.uuid4().hex[:8]}_{Path(filename).name}"
    file_path = upload_dir / safe_name
    file_path.write_bytes(content)
    return file_path