Spaces:
Sleeping
Sleeping
| """File processing service: handles CSV, JSON, Excel, ZIP with chunked uploads.""" | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import uuid | |
| import zipfile | |
| from pathlib import Path | |
| from typing import List | |
| import pandas as pd | |
| from app.core.config import settings | |
| from app.core.logging import get_logger | |
| from app.models.schemas import FeedbackEntry | |
| logger = get_logger(__name__) | |
| SUPPORTED_EXTENSIONS = {".csv", ".json", ".xlsx", ".xls", ".zip"} | |
| TEXT_COLUMN_CANDIDATES = [ | |
| "text", "content", "message", "body", "feedback", "review", | |
| "comment", "description", "note", "summary", "title", | |
| "Text", "Content", "Message", "Body", "Feedback", "Review", | |
| ] | |
| TIMESTAMP_COLUMN_CANDIDATES = [ | |
| "timestamp", "date", "created_at", "created", "time", "datetime", | |
| "Timestamp", "Date", "Created", "CreatedAt", | |
| ] | |
| SOURCE_COLUMN_CANDIDATES = [ | |
| "source", "channel", "platform", "origin", "category", "type", | |
| "Source", "Channel", "Platform", | |
| ] | |
| def _find_column(df: pd.DataFrame, candidates: list[str]) -> str | None: | |
| for col in candidates: | |
| if col in df.columns: | |
| return col | |
| for col in df.columns: | |
| for candidate in candidates: | |
| if candidate.lower() in col.lower(): | |
| return col | |
| return None | |
| def _df_to_entries(df: pd.DataFrame, source: str | None = None) -> list[FeedbackEntry]: | |
| text_col = _find_column(df, TEXT_COLUMN_CANDIDATES) | |
| if not text_col: | |
| if len(df.columns) == 1: | |
| text_col = df.columns[0] | |
| else: | |
| raise ValueError( | |
| f"No text column found. Expected one of: {TEXT_COLUMN_CANDIDATES}. " | |
| f"Found columns: {list(df.columns)}" | |
| ) | |
| ts_col = _find_column(df, TIMESTAMP_COLUMN_CANDIDATES) | |
| src_col = _find_column(df, SOURCE_COLUMN_CANDIDATES) | |
| entries = [] | |
| other_cols = [c for c in df.columns if c not in {text_col, ts_col, src_col}] | |
| for _, row in df.iterrows(): | |
| text = str(row[text_col]).strip() | |
| if not text or text == "nan": | |
| continue | |
| ts = None | |
| if ts_col and pd.notna(row.get(ts_col)): | |
| try: | |
| ts = pd.to_datetime(row[ts_col]) | |
| except Exception: | |
| pass | |
| src = source | |
| if src_col and pd.notna(row.get(src_col)): | |
| src = str(row[src_col]) | |
| metadata = {} | |
| for col in other_cols: | |
| val = row.get(col) | |
| if pd.notna(val): | |
| metadata[col] = str(val) if not isinstance(val, (int, float, bool)) else val | |
| entries.append( | |
| FeedbackEntry( | |
| id=uuid.uuid4().hex[:12], | |
| text=text, | |
| source=src, | |
| timestamp=ts, | |
| metadata=metadata if metadata else None, | |
| ) | |
| ) | |
| return entries | |
| def parse_csv(content: bytes, source: str | None = None) -> list[FeedbackEntry]: | |
| for encoding in ("utf-8", "latin-1", "cp1252"): | |
| try: | |
| df = pd.read_csv(io.BytesIO(content), encoding=encoding) | |
| return _df_to_entries(df, source) | |
| except UnicodeDecodeError: | |
| continue | |
| raise ValueError("Unable to decode CSV file with supported encodings") | |
| def parse_json(content: bytes, source: str | None = None) -> list[FeedbackEntry]: | |
| data = json.loads(content.decode("utf-8")) | |
| if isinstance(data, list): | |
| if all(isinstance(item, str) for item in data): | |
| return [ | |
| FeedbackEntry(id=uuid.uuid4().hex[:12], text=item, source=source) | |
| for item in data | |
| if item.strip() | |
| ] | |
| df = pd.DataFrame(data) | |
| return _df_to_entries(df, source) | |
| elif isinstance(data, dict): | |
| if "data" in data: | |
| df = pd.DataFrame(data["data"]) | |
| elif "entries" in data: | |
| df = pd.DataFrame(data["entries"]) | |
| elif "results" in data: | |
| df = pd.DataFrame(data["results"]) | |
| else: | |
| df = pd.DataFrame([data]) | |
| return _df_to_entries(df, source) | |
| raise ValueError("Unsupported JSON structure") | |
| def parse_excel(content: bytes, source: str | None = None) -> list[FeedbackEntry]: | |
| df = pd.read_excel(io.BytesIO(content), engine="openpyxl") | |
| return _df_to_entries(df, source) | |
| def parse_zip(content: bytes, source: str | None = None) -> list[FeedbackEntry]: | |
| all_entries = [] | |
| with zipfile.ZipFile(io.BytesIO(content)) as zf: | |
| for name in zf.namelist(): | |
| if name.startswith("__MACOSX") or name.startswith("."): | |
| continue | |
| ext = Path(name).suffix.lower() | |
| inner = zf.read(name) | |
| file_source = source or Path(name).stem | |
| try: | |
| if ext == ".csv": | |
| all_entries.extend(parse_csv(inner, file_source)) | |
| elif ext == ".json": | |
| all_entries.extend(parse_json(inner, file_source)) | |
| elif ext in (".xlsx", ".xls"): | |
| all_entries.extend(parse_excel(inner, file_source)) | |
| else: | |
| logger.warning("skipping_unsupported_file_in_zip", filename=name) | |
| except Exception as exc: | |
| logger.error("error_processing_zip_entry", filename=name, error=str(exc)) | |
| return all_entries | |
| def parse_file(content: bytes, filename: str, source: str | None = None) -> list[FeedbackEntry]: | |
| ext = Path(filename).suffix.lower() | |
| if ext not in SUPPORTED_EXTENSIONS: | |
| raise ValueError(f"Unsupported file format: {ext}. Supported: {SUPPORTED_EXTENSIONS}") | |
| parsers = { | |
| ".csv": parse_csv, | |
| ".json": parse_json, | |
| ".xlsx": parse_excel, | |
| ".xls": parse_excel, | |
| ".zip": parse_zip, | |
| } | |
| return parsers[ext](content, source) | |
| async def save_upload(content: bytes, filename: str) -> Path: | |
| upload_dir = settings.upload_path | |
| safe_name = f"{uuid.uuid4().hex[:8]}_{Path(filename).name}" | |
| file_path = upload_dir / safe_name | |
| file_path.write_bytes(content) | |
| return file_path | |