Spaces:
Paused
Paused
| import io | |
| import re | |
| from typing import Any, Dict, List | |
| import pandas as pd | |
| from fastapi import UploadFile | |
| from sqlalchemy import inspect, text | |
| from app.db.database import engine | |
| SUPPORTED_ENCODINGS = ("utf-8", "latin1", "cp1252") | |
| SAFE_TABLE_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") | |
| def _validate_table_name(name: str) -> str: | |
| if not SAFE_TABLE_PATTERN.match(name): | |
| raise ValueError("Table name must start with a letter/underscore and contain only alphanumerics or underscores.") | |
| return name | |
| def ingest_csv_dataset( | |
| upload_file: UploadFile, | |
| table_name: str = "sales", | |
| ) -> Dict[str, Any]: | |
| """Load an uploaded CSV file into the configured database table.""" | |
| raw_bytes = upload_file.file.read() | |
| upload_file.file.seek(0) | |
| dataframe = None | |
| chosen_encoding = None | |
| errors = [] | |
| for encoding in SUPPORTED_ENCODINGS: | |
| buffer = io.BytesIO(raw_bytes) | |
| try: | |
| dataframe = pd.read_csv(buffer, encoding=encoding) | |
| chosen_encoding = encoding | |
| break | |
| except UnicodeDecodeError as exc: | |
| errors.append(f"{encoding}: {exc}") | |
| continue | |
| except Exception as exc: | |
| raise ValueError(f"Unable to parse CSV: {exc}") | |
| if dataframe is None: | |
| raise ValueError( | |
| "Unable to decode CSV. Please upload UTF-8 or Latin-1 encoded files." + | |
| (f" Details: {'; '.join(errors)}" if errors else "") | |
| ) | |
| if dataframe.empty: | |
| raise ValueError("Uploaded CSV is empty.") | |
| safe_table_name = _validate_table_name(table_name.strip() or "sales") | |
| dataframe.to_sql(safe_table_name, engine, if_exists="replace", index=False) | |
| # Convert first 5 rows to dict for preview | |
| preview_rows = dataframe.head(5).to_dict(orient="records") | |
| return { | |
| "table": safe_table_name, | |
| "rows": len(dataframe), | |
| "columns": list(dataframe.columns), | |
| "encoding": chosen_encoding, | |
| "preview": preview_rows | |
| } | |
| def list_dataset_tables(limit: int = 15) -> List[Dict[str, Any]]: | |
| inspector = inspect(engine) | |
| tables = [t for t in inspector.get_table_names() if not t.startswith("sqlite_")][:limit] | |
| catalog: List[Dict[str, Any]] = [] | |
| with engine.connect() as conn: | |
| for table_name in tables: | |
| column_info = inspector.get_columns(table_name) | |
| try: | |
| result = conn.execute(text(f"SELECT COUNT(*) as cnt FROM {table_name}")) | |
| row_count = result.scalar() or 0 | |
| except Exception: | |
| row_count = None | |
| catalog.append( | |
| { | |
| "table": table_name, | |
| "rows": int(row_count) if row_count is not None else None, | |
| "columns": [col.get("name") for col in column_info], | |
| } | |
| ) | |
| return catalog | |