import csv import io from datetime import date import openpyxl from fastapi import HTTPException from fastapi.responses import StreamingResponse def resolve_period( period: str, date_from: date | None, date_to: date | None, ) -> tuple[date, date]: """Convert a named period to a (start, end) date tuple. Raises HTTP 422 for ``custom`` when either date is missing. """ today = date.today() if period == "today": return today, today elif period == "last_7_days": from datetime import timedelta return today - timedelta(days=7), today elif period == "mtd": return today.replace(day=1), today elif period == "ytd": return today.replace(month=1, day=1), today elif period == "custom": if date_from is None or date_to is None: raise HTTPException( status_code=422, detail="date_from and date_to are required when period is 'custom'.", ) return date_from, date_to else: raise HTTPException( status_code=422, detail=f"Invalid period '{period}'.", ) def apply_projection( rows: list[dict], projection_list: list[str] | None, ) -> list[dict]: """Filter dict keys to those in *projection_list*. ``_id`` is always excluded. Returns *rows* unchanged when *projection_list* is ``None`` or empty (except ``_id`` is still stripped). """ if not projection_list: # Still strip _id even when no explicit projection is requested return [{k: v for k, v in row.items() if k != "_id"} for row in rows] allowed = set(projection_list) - {"_id"} return [{k: v for k, v in row.items() if k in allowed} for row in rows] async def stream_export( rows: list[dict], slug: str, export_format: str, ) -> StreamingResponse: """Build a CSV or XLSX file from *rows* and return a :class:`StreamingResponse`. Raises HTTP 400 when ``len(rows) > 100_000``. """ if len(rows) > 100_000: raise HTTPException( status_code=400, detail="Export limit exceeded. Apply filters to reduce the result set.", ) today_str = date.today().strftime("%Y%m%d") if export_format == "csv": buffer = io.StringIO() fieldnames = list(rows[0].keys()) if rows else [] writer = csv.DictWriter(buffer, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) content = buffer.getvalue().encode("utf-8") filename = f"{slug}_{today_str}.csv" return StreamingResponse( iter([content]), media_type="text/csv; charset=utf-8", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) elif export_format == "xlsx": wb = openpyxl.Workbook() ws = wb.active ws.title = slug fieldnames = list(rows[0].keys()) if rows else [] ws.append(fieldnames) for row in rows: ws.append([row.get(f) for f in fieldnames]) buffer = io.BytesIO() wb.save(buffer) buffer.seek(0) content = buffer.read() filename = f"{slug}_{today_str}.xlsx" return StreamingResponse( iter([content]), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) else: raise HTTPException( status_code=400, detail="Invalid export format. Must be csv or xlsx.", )