Spaces:
Configuration error
Configuration error
| import csv | |
| import io | |
| from datetime import date | |
| import openpyxl | |
| from fastapi import HTTPException | |
| from fastapi.responses import StreamingResponse | |
| def resolve_period( | |
| period: str, | |
| date_from: date | None, | |
| date_to: date | None, | |
| ) -> tuple[date, date]: | |
| """Convert a named period to a (start, end) date tuple. | |
| Raises HTTP 422 for ``custom`` when either date is missing. | |
| """ | |
| today = date.today() | |
| if period == "today": | |
| return today, today | |
| elif period == "last_7_days": | |
| from datetime import timedelta | |
| return today - timedelta(days=7), today | |
| elif period == "mtd": | |
| return today.replace(day=1), today | |
| elif period == "ytd": | |
| return today.replace(month=1, day=1), today | |
| elif period == "custom": | |
| if date_from is None or date_to is None: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="date_from and date_to are required when period is 'custom'.", | |
| ) | |
| return date_from, date_to | |
| else: | |
| raise HTTPException( | |
| status_code=422, | |
| detail=f"Invalid period '{period}'.", | |
| ) | |
| def apply_projection( | |
| rows: list[dict], | |
| projection_list: list[str] | None, | |
| ) -> list[dict]: | |
| """Filter dict keys to those in *projection_list*. | |
| ``_id`` is always excluded. Returns *rows* unchanged when | |
| *projection_list* is ``None`` or empty (except ``_id`` is still stripped). | |
| """ | |
| if not projection_list: | |
| # Still strip _id even when no explicit projection is requested | |
| return [{k: v for k, v in row.items() if k != "_id"} for row in rows] | |
| allowed = set(projection_list) - {"_id"} | |
| return [{k: v for k, v in row.items() if k in allowed} for row in rows] | |
| async def stream_export( | |
| rows: list[dict], | |
| slug: str, | |
| export_format: str, | |
| ) -> StreamingResponse: | |
| """Build a CSV or XLSX file from *rows* and return a :class:`StreamingResponse`. | |
| Raises HTTP 400 when ``len(rows) > 100_000``. | |
| """ | |
| if len(rows) > 100_000: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Export limit exceeded. Apply filters to reduce the result set.", | |
| ) | |
| today_str = date.today().strftime("%Y%m%d") | |
| if export_format == "csv": | |
| buffer = io.StringIO() | |
| fieldnames = list(rows[0].keys()) if rows else [] | |
| writer = csv.DictWriter(buffer, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| content = buffer.getvalue().encode("utf-8") | |
| filename = f"{slug}_{today_str}.csv" | |
| return StreamingResponse( | |
| iter([content]), | |
| media_type="text/csv; charset=utf-8", | |
| headers={"Content-Disposition": f'attachment; filename="{filename}"'}, | |
| ) | |
| elif export_format == "xlsx": | |
| wb = openpyxl.Workbook() | |
| ws = wb.active | |
| ws.title = slug | |
| fieldnames = list(rows[0].keys()) if rows else [] | |
| ws.append(fieldnames) | |
| for row in rows: | |
| ws.append([row.get(f) for f in fieldnames]) | |
| buffer = io.BytesIO() | |
| wb.save(buffer) | |
| buffer.seek(0) | |
| content = buffer.read() | |
| filename = f"{slug}_{today_str}.xlsx" | |
| return StreamingResponse( | |
| iter([content]), | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| headers={"Content-Disposition": f'attachment; filename="{filename}"'}, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid export format. Must be csv or xlsx.", | |
| ) | |