Spaces:
Sleeping
Sleeping
File size: 6,066 Bytes
6242ddb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | """File processing service: handles CSV, JSON, Excel, ZIP with chunked uploads."""
from __future__ import annotations
import io
import json
import uuid
import zipfile
from pathlib import Path
from typing import List
import pandas as pd
from app.core.config import settings
from app.core.logging import get_logger
from app.models.schemas import FeedbackEntry
logger = get_logger(__name__)
SUPPORTED_EXTENSIONS = {".csv", ".json", ".xlsx", ".xls", ".zip"}
TEXT_COLUMN_CANDIDATES = [
"text", "content", "message", "body", "feedback", "review",
"comment", "description", "note", "summary", "title",
"Text", "Content", "Message", "Body", "Feedback", "Review",
]
TIMESTAMP_COLUMN_CANDIDATES = [
"timestamp", "date", "created_at", "created", "time", "datetime",
"Timestamp", "Date", "Created", "CreatedAt",
]
SOURCE_COLUMN_CANDIDATES = [
"source", "channel", "platform", "origin", "category", "type",
"Source", "Channel", "Platform",
]
def _find_column(df: pd.DataFrame, candidates: list[str]) -> str | None:
for col in candidates:
if col in df.columns:
return col
for col in df.columns:
for candidate in candidates:
if candidate.lower() in col.lower():
return col
return None
def _df_to_entries(df: pd.DataFrame, source: str | None = None) -> list[FeedbackEntry]:
text_col = _find_column(df, TEXT_COLUMN_CANDIDATES)
if not text_col:
if len(df.columns) == 1:
text_col = df.columns[0]
else:
raise ValueError(
f"No text column found. Expected one of: {TEXT_COLUMN_CANDIDATES}. "
f"Found columns: {list(df.columns)}"
)
ts_col = _find_column(df, TIMESTAMP_COLUMN_CANDIDATES)
src_col = _find_column(df, SOURCE_COLUMN_CANDIDATES)
entries = []
other_cols = [c for c in df.columns if c not in {text_col, ts_col, src_col}]
for _, row in df.iterrows():
text = str(row[text_col]).strip()
if not text or text == "nan":
continue
ts = None
if ts_col and pd.notna(row.get(ts_col)):
try:
ts = pd.to_datetime(row[ts_col])
except Exception:
pass
src = source
if src_col and pd.notna(row.get(src_col)):
src = str(row[src_col])
metadata = {}
for col in other_cols:
val = row.get(col)
if pd.notna(val):
metadata[col] = str(val) if not isinstance(val, (int, float, bool)) else val
entries.append(
FeedbackEntry(
id=uuid.uuid4().hex[:12],
text=text,
source=src,
timestamp=ts,
metadata=metadata if metadata else None,
)
)
return entries
def parse_csv(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
for encoding in ("utf-8", "latin-1", "cp1252"):
try:
df = pd.read_csv(io.BytesIO(content), encoding=encoding)
return _df_to_entries(df, source)
except UnicodeDecodeError:
continue
raise ValueError("Unable to decode CSV file with supported encodings")
def parse_json(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
data = json.loads(content.decode("utf-8"))
if isinstance(data, list):
if all(isinstance(item, str) for item in data):
return [
FeedbackEntry(id=uuid.uuid4().hex[:12], text=item, source=source)
for item in data
if item.strip()
]
df = pd.DataFrame(data)
return _df_to_entries(df, source)
elif isinstance(data, dict):
if "data" in data:
df = pd.DataFrame(data["data"])
elif "entries" in data:
df = pd.DataFrame(data["entries"])
elif "results" in data:
df = pd.DataFrame(data["results"])
else:
df = pd.DataFrame([data])
return _df_to_entries(df, source)
raise ValueError("Unsupported JSON structure")
def parse_excel(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
df = pd.read_excel(io.BytesIO(content), engine="openpyxl")
return _df_to_entries(df, source)
def parse_zip(content: bytes, source: str | None = None) -> list[FeedbackEntry]:
all_entries = []
with zipfile.ZipFile(io.BytesIO(content)) as zf:
for name in zf.namelist():
if name.startswith("__MACOSX") or name.startswith("."):
continue
ext = Path(name).suffix.lower()
inner = zf.read(name)
file_source = source or Path(name).stem
try:
if ext == ".csv":
all_entries.extend(parse_csv(inner, file_source))
elif ext == ".json":
all_entries.extend(parse_json(inner, file_source))
elif ext in (".xlsx", ".xls"):
all_entries.extend(parse_excel(inner, file_source))
else:
logger.warning("skipping_unsupported_file_in_zip", filename=name)
except Exception as exc:
logger.error("error_processing_zip_entry", filename=name, error=str(exc))
return all_entries
def parse_file(content: bytes, filename: str, source: str | None = None) -> list[FeedbackEntry]:
ext = Path(filename).suffix.lower()
if ext not in SUPPORTED_EXTENSIONS:
raise ValueError(f"Unsupported file format: {ext}. Supported: {SUPPORTED_EXTENSIONS}")
parsers = {
".csv": parse_csv,
".json": parse_json,
".xlsx": parse_excel,
".xls": parse_excel,
".zip": parse_zip,
}
return parsers[ext](content, source)
async def save_upload(content: bytes, filename: str) -> Path:
upload_dir = settings.upload_path
safe_name = f"{uuid.uuid4().hex[:8]}_{Path(filename).name}"
file_path = upload_dir / safe_name
file_path.write_bytes(content)
return file_path
|