Spaces:
Configuration error
Configuration error
| """ | |
| Property-based tests for Analytics Reports base engine. | |
| Uses Hypothesis to validate invariants across arbitrary inputs. | |
| Properties tested: | |
| 2. Projection Correctness | |
| 3. Pagination Invariant | |
| 4. Period Resolution Correctness | |
| 5. Aging Bucket Invariant | |
| 6. Export Row Count Consistency | |
| 7. Export Format Correctness (CSV and XLSX) | |
| """ | |
| from hypothesis import given, settings, HealthCheck | |
| from hypothesis import strategies as st | |
| from app.reports.base.engine import apply_projection, resolve_period, stream_export | |
| settings.register_profile("ci", max_examples=100, suppress_health_check=[HealthCheck.too_slow]) | |
| settings.load_profile("ci") | |
| # --------------------------------------------------------------------------- | |
| # Property 2: Projection Correctness | |
| # --------------------------------------------------------------------------- | |
| # Feature: analytics-reports, Property 2: Projection Correctness | |
| def test_projection_correctness(rows, projection): | |
| result = apply_projection(rows, projection) | |
| for row in result: | |
| assert "_id" not in row | |
| for key in row: | |
| assert key in projection | |
| # Feature: analytics-reports, Property 2: Projection Correctness (None case) | |
| def test_projection_absent_returns_all(rows): | |
| # When no projection, all keys returned (minus _id) | |
| result = apply_projection(rows, None) | |
| for orig, proj in zip(rows, result): | |
| for k, v in orig.items(): | |
| if k != "_id": | |
| assert k in proj | |
| assert proj[k] == v | |
| # --------------------------------------------------------------------------- | |
| # Property 3: Pagination Invariant | |
| # --------------------------------------------------------------------------- | |
| # Feature: analytics-reports, Property 3: Pagination Invariant | |
| def test_pagination_invariant(limit, rows): | |
| skip = 0 | |
| paginated = rows[skip:skip + limit] | |
| assert len(paginated) <= limit | |
| envelope = {"success": True, "total": len(rows), "skip": skip, "limit": limit, "data": paginated} | |
| for key in ("success", "total", "skip", "limit", "data"): | |
| assert key in envelope | |
| assert envelope["total"] >= 0 | |
| assert envelope["skip"] == skip | |
| # --------------------------------------------------------------------------- | |
| # Property 4: Period Resolution Correctness | |
| # --------------------------------------------------------------------------- | |
| # Feature: analytics-reports, Property 4: Period Resolution Correctness | |
| def test_period_resolution_correctness(period): | |
| from datetime import date, timedelta | |
| start, end = resolve_period(period, None, None) | |
| today = date.today() | |
| assert start <= end | |
| assert end <= today | |
| if period == "today": | |
| assert start == today and end == today | |
| elif period == "last_7_days": | |
| assert start == today - timedelta(days=7) | |
| elif period == "mtd": | |
| assert start == today.replace(day=1) | |
| elif period == "ytd": | |
| assert start == today.replace(month=1, day=1) | |
| # --------------------------------------------------------------------------- | |
| # Property 5: Aging Bucket Invariant | |
| # --------------------------------------------------------------------------- | |
| # Feature: analytics-reports, Property 5: Aging Bucket Invariant | |
| def test_aging_bucket_invariant(b0, b1, b2, b3): | |
| total = b0 + b1 + b2 + b3 | |
| assert abs((b0 + b1 + b2 + b3) - total) < 1e-9 | |
| # --------------------------------------------------------------------------- | |
| # Async helper — consume a StreamingResponse body synchronously | |
| # --------------------------------------------------------------------------- | |
| def _collect_response_body(response) -> bytes: | |
| """Drain a StreamingResponse body_iterator into bytes, handling both | |
| sync iterables (``iter([...])``) and async generators.""" | |
| import asyncio, inspect | |
| iterator = response.body_iterator | |
| if inspect.isasyncgen(iterator) or hasattr(iterator, "__aiter__"): | |
| async def _drain(): | |
| chunks = [] | |
| async for chunk in iterator: | |
| chunks.append(chunk if isinstance(chunk, bytes) else chunk.encode()) | |
| return b"".join(chunks) | |
| return asyncio.run(_drain()) | |
| # sync iterable | |
| chunks = [] | |
| for chunk in iterator: | |
| chunks.append(chunk if isinstance(chunk, bytes) else chunk.encode()) | |
| return b"".join(chunks) | |
| # --------------------------------------------------------------------------- | |
| # Property 6: Export Row Count Consistency | |
| # --------------------------------------------------------------------------- | |
| # Feature: analytics-reports, Property 6: Export Row Count Consistency | |
| def test_export_row_count_consistency(rows): | |
| import asyncio, csv, io | |
| response = asyncio.run(stream_export(rows, "test-slug", "csv")) | |
| body = _collect_response_body(response) | |
| reader = list(csv.DictReader(io.StringIO(body.decode("utf-8")))) | |
| assert len(reader) == len(rows) | |
| # --------------------------------------------------------------------------- | |
| # Property 7: Export Format Correctness (CSV) | |
| # --------------------------------------------------------------------------- | |
| # Feature: analytics-reports, Property 7: Export Format Correctness (CSV) | |
| def test_export_csv_has_header(rows): | |
| import asyncio, io | |
| response = asyncio.run(stream_export(rows, "test-slug", "csv")) | |
| body = _collect_response_body(response) | |
| lines = body.decode("utf-8").splitlines() | |
| header = lines[0].split(",") | |
| assert "col_a" in header | |
| assert "col_b" in header | |
| # --------------------------------------------------------------------------- | |
| # Property 7: Export Format Correctness (XLSX) | |
| # --------------------------------------------------------------------------- | |
| # Feature: analytics-reports, Property 7: Export Format Correctness (XLSX) | |
| def test_export_xlsx_valid_workbook(rows): | |
| import asyncio, io, openpyxl | |
| response = asyncio.run(stream_export(rows, "my-report", "xlsx")) | |
| body = _collect_response_body(response) | |
| wb = openpyxl.load_workbook(io.BytesIO(body)) | |
| assert "my-report" in wb.sheetnames | |