Spaces:
Running
Running
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| import pytest | |
| pytestmark = pytest.mark.benchmark | |
| class TestDatabaseBenchmarks: | |
| def test_bulk_insert(self, db): | |
| from shopstack.schemas.models import InventoryLot | |
| n = 100 | |
| start = time.perf_counter() | |
| for i in range(n): | |
| db.add_inventory_lot(InventoryLot(canonical_name=f"item-{i}", display_name=f"Item {i}", quantity=1.0, unit="unit")) | |
| elapsed = time.perf_counter() - start | |
| assert elapsed < 2.0, f"Bulk insert too slow: {elapsed:.3f}s for {n} items" | |
| def test_bulk_query(self, db): | |
| n = db.conn.execute("SELECT COUNT(*) FROM inventory_lots").fetchone()[0] | |
| start = time.perf_counter() | |
| items = db.get_inventory() | |
| elapsed = time.perf_counter() - start | |
| assert elapsed < 0.5, f"Query too slow: {elapsed:.3f}s for {n} items" | |
| assert len(items) == n | |
| class TestToolBenchmarks: | |
| def test_add_item_throughput(self, tool_registry): | |
| n = 50 | |
| start = time.perf_counter() | |
| for i in range(n): | |
| tool_registry.execute("add_inventory_item", canonical_name=f"bench-item-{i}", quantity=1.0, unit="unit") | |
| elapsed = time.perf_counter() - start | |
| assert elapsed < 3.0, f"Tool throughput too slow: {elapsed:.3f}s for {n} items" | |
| def test_find_item_latency(self, tool_registry): | |
| start = time.perf_counter() | |
| tool_registry.execute("find_item", query="bench") | |
| elapsed = time.perf_counter() - start | |
| assert elapsed < 0.5, f"Search too slow: {elapsed:.3f}s" | |
| class TestAnnotateImageBenchmarks: | |
| """Performance regression benchmarks for annotate_image with many detections. | |
| Verifies that bbox normalization (format detection + coordinate conversion + | |
| Pillow rendering) scales linearly and does not bottleneck with 50+ detections. | |
| """ | |
| def test_annotate_50_detections_latency(self): | |
| """annotate_image with 50 mixed-format detections should complete in <2s.""" | |
| import time | |
| from pathlib import Path | |
| import tempfile | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| # Create a test image | |
| test_img = Path(tempfile.mkdtemp()) / "bench_annotate.png" | |
| Image.new("RGB", (400, 300), color="white").save(test_img) | |
| try: | |
| # Generate 50 detections with mixed bbox formats to stress all paths | |
| detections = self._generate_bench_detections(50) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(test_img), detections) | |
| elapsed = time.perf_counter() - start | |
| # Verify output | |
| result_path = Path(result) | |
| assert result_path.is_file(), "Annotated output should exist" | |
| assert result.endswith(".png"), "Should produce PNG with Pillow available" | |
| size_bytes = result_path.stat().st_size | |
| # Timing assertion — 2s budget for 50 detections | |
| assert elapsed < 2.0, ( | |
| f"annotate_image with 50 detections took {elapsed:.3f}s — " | |
| f"expected <2.0s (bottleneck in bbox normalization?)" | |
| ) | |
| # Log for trend tracking | |
| print(f"\n[ANNOTATE BENCH] 50 detections: {elapsed:.3f}s, output size: {size_bytes}b") | |
| finally: | |
| test_img.unlink(missing_ok=True) | |
| def test_annotate_100_detections_latency(self): | |
| """annotate_image with 100 mixed-format detections should complete in <4s.""" | |
| import time | |
| from pathlib import Path | |
| import tempfile | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| test_img = Path(tempfile.mkdtemp()) / "bench_annotate_100.png" | |
| Image.new("RGB", (400, 300), color="white").save(test_img) | |
| try: | |
| detections = self._generate_bench_detections(100) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(test_img), detections) | |
| elapsed = time.perf_counter() - start | |
| assert Path(result).is_file() | |
| # Budget scales roughly linearly — 4s for 100 detections | |
| assert elapsed < 4.0, ( | |
| f"annotate_image with 100 detections took {elapsed:.3f}s — " | |
| f"expected <4.0s" | |
| ) | |
| print(f"\n[ANNOTATE BENCH] 100 detections: {elapsed:.3f}s") | |
| finally: | |
| test_img.unlink(missing_ok=True) | |
| def test_annotate_200_detections_with_all_formats(self): | |
| """200 detections cycling through all 5 bbox formats — stress test.""" | |
| import time | |
| from pathlib import Path | |
| import tempfile | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| test_img = Path(tempfile.mkdtemp()) / "bench_annotate_200.png" | |
| Image.new("RGB", (400, 300), color="white").save(test_img) | |
| try: | |
| # 200 detections: 40 of each of the 5 formats | |
| detections = [] | |
| for _ in range(40): | |
| # normalized_xyxy (auto-detect, small values) | |
| detections.append({"bbox": [0.05, 0.05, 0.2, 0.15], "label": "obj_a", "score": 0.9}) | |
| # absolute_xyxy with explicit format | |
| detections.append({"bbox": [50, 30, 160, 130], "label": "obj_b", "score": 0.8, "bbox_format": "absolute_xyxy"}) | |
| # absolute_cxcywh with explicit format | |
| detections.append({"bbox": [200, 150, 60, 40], "label": "obj_c", "score": 0.7, "bbox_format": "absolute_cxcywh"}) | |
| # normalized_cxcywh with explicit format | |
| detections.append({"bbox": [0.5, 0.5, 0.3, 0.2], "label": "obj_d", "score": 0.6, "bbox_format": "normalized_cxcywh"}) | |
| # absolute_xywh with explicit format | |
| detections.append({"bbox": [250, 200, 80, 50], "label": "obj_e", "score": 0.5, "bbox_format": "absolute_xywh"}) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(test_img), detections) | |
| elapsed = time.perf_counter() - start | |
| assert Path(result).is_file() | |
| # 200 detections at ~0.02-0.04s each → ~4-8s | |
| assert elapsed < 8.0, ( | |
| f"annotate_image with 200 mixed-format detections took {elapsed:.3f}s — " | |
| f"expected <8.0s" | |
| ) | |
| print(f"\n[ANNOTATE BENCH] 200 detections (5 formats): {elapsed:.3f}s") | |
| finally: | |
| test_img.unlink(missing_ok=True) | |
| def _generate_bench_detections(count: int) -> list[dict]: | |
| """Generate ``count`` detections cycling through mixed bbox formats. | |
| Distributes detections across: | |
| - normalized_xyxy (auto-detect) | |
| - absolute_xyxy (explicit) | |
| - absolute_cxcywh (explicit) | |
| - normalized_cxcywh (explicit) | |
| - absolute_xywh (explicit) | |
| This stresses all format detection + normalization code paths. | |
| """ | |
| detections: list[dict] = [] | |
| for i in range(count): | |
| base = (i * 17) % 200 # spread out positions to avoid overlap | |
| fmt_idx = i % 5 | |
| if fmt_idx == 0: | |
| # normalized_xyxy — auto-detect via small values | |
| x1, y1 = (base % 80) / 100.0 + 0.02, ((base + 13) % 60) / 100.0 + 0.02 | |
| x2, y2 = x1 + 0.12, y1 + 0.08 | |
| detections.append({ | |
| "bbox": [x1, y1, x2, y2], | |
| "label": f"norm_{i}", | |
| "score": 0.85, | |
| }) | |
| elif fmt_idx == 1: | |
| # absolute_xyxy | |
| x1, y1 = base + 10, (base + 7) % 200 + 10 | |
| detections.append({ | |
| "bbox": [x1, y1, x1 + 50, y1 + 40], | |
| "label": f"abs_{i}", | |
| "score": 0.80, | |
| "bbox_format": "absolute_xyxy", | |
| }) | |
| elif fmt_idx == 2: | |
| # absolute_cxcywh | |
| cx, cy = base + 30, (base + 11) % 150 + 20 | |
| detections.append({ | |
| "bbox": [cx, cy, 40, 30], | |
| "label": f"cxcy_{i}", | |
| "score": 0.75, | |
| "bbox_format": "absolute_cxcywh", | |
| }) | |
| elif fmt_idx == 3: | |
| # normalized_cxcywh | |
| cx, cy = 0.3 + (base % 40) / 100.0, 0.3 + ((base + 5) % 30) / 100.0 | |
| detections.append({ | |
| "bbox": [cx, cy, 0.15, 0.10], | |
| "label": f"ncxcy_{i}", | |
| "score": 0.70, | |
| "bbox_format": "normalized_cxcywh", | |
| }) | |
| else: | |
| # absolute_xywh | |
| x, y = base + 20, (base + 3) % 150 + 10 | |
| detections.append({ | |
| "bbox": [x, y, 35, 25], | |
| "label": f"xywh_{i}", | |
| "score": 0.65, | |
| "bbox_format": "absolute_xywh", | |
| }) | |
| return detections | |
| class TestBboxFormatDetectionOverheadBenchmarks: | |
| """Compare annotate_image latency with auto-detected vs explicit bbox_format. | |
| ``resolve_detection_bbox()`` has two code paths: | |
| 1. **Auto-detect**: calls ``_detect_bbox_format()`` (heuristic checks for | |
| all 5 formats) then ``_format_to_normalized_xyxy()``. | |
| 2. **Explicit**: passes ``bbox_format`` directly to ``_format_to_normalized_xyxy()``, | |
| skipping ``_detect_bbox_format()`` entirely. | |
| These benchmarks amplify the difference by using 1000+ detections so the | |
| overhead of the heuristic (comparisons on each of 4 bbox values) is | |
| measurable. Each test verifies both paths produce identical coordinates. | |
| """ | |
| IMG_W, IMG_H = 400, 300 | |
| N = 1000 # enough to amplify sub-millisecond per-detection overhead | |
| def test_auto_vs_explicit_normalized_xyxy(self): | |
| """Normalized xyxy bboxes — auto-detect vs explicit. | |
| Auto-detect: values ≤ 1.5 → falls through to ``_detect_bbox_format`` | |
| which checks cx/cy near 0.5 first, then returns normalized_xyxy. | |
| This is the simplest heuristic path. | |
| """ | |
| import time | |
| from shopstack.providers.image_gen_provider import resolve_detection_bbox | |
| count = self.N | |
| bboxes = [[0.05, 0.05, 0.25, 0.20], [0.10, 0.08, 0.35, 0.28], | |
| [0.02, 0.12, 0.18, 0.30], [0.30, 0.05, 0.55, 0.22], | |
| [0.08, 0.20, 0.28, 0.40]] | |
| detections_no_fmt = [{"bbox": b, "label": f"obj_{i%5}", "score": 0.9} | |
| for i, b in enumerate(bboxes * (count // 5))] | |
| detections_fmt = [{"bbox": d["bbox"], "bbox_format": "normalized_xyxy", | |
| "label": d["label"], "score": d["score"]} | |
| for d in detections_no_fmt] | |
| # Auto-detect path | |
| start = time.perf_counter() | |
| auto_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in detections_no_fmt] | |
| auto_elapsed = time.perf_counter() - start | |
| # Explicit path | |
| start = time.perf_counter() | |
| explicit_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in detections_fmt] | |
| explicit_elapsed = time.perf_counter() - start | |
| # Coord correctness: both paths must produce identical results | |
| for a, e in zip(auto_results, explicit_results): | |
| assert a == pytest.approx(e, abs=1e-6), ( | |
| f"Auto vs explicit coord mismatch: {a} vs {e}" | |
| ) | |
| ratio = auto_elapsed / max(explicit_elapsed, 1e-9) | |
| overhead_us = (auto_elapsed - explicit_elapsed) / count * 1e6 | |
| print(f"\n[BBOX FMT OVERHEAD] normalized_xyxy ({count}x):") | |
| print(f" Auto-detect: {auto_elapsed:.4f}s") | |
| print(f" Explicit: {explicit_elapsed:.4f}s") | |
| print(f" Ratio: {ratio:.2f}x") | |
| print(f" Overhead/det: {overhead_us:.2f}us") | |
| # Auto-detect should be slower, but not dramatically so | |
| assert ratio < 5.0, ( | |
| f"Auto-detect is {ratio:.1f}x slower than explicit! " | |
| f"Auto={auto_elapsed:.4f}s, Explicit={explicit_elapsed:.4f}s" | |
| ) | |
| def test_auto_vs_explicit_absolute_xyxy(self): | |
| """Absolute xyxy bboxes — auto-detect must disambiguate via heuristic. | |
| Auto-detect path: values > 1.5 → absolute branch → width/height | |
| comparisons vs x/y to disambiguate xyxy vs xywh vs cxcywh. | |
| This is the most expensive heuristic path. | |
| """ | |
| import time | |
| from shopstack.providers.image_gen_provider import resolve_detection_bbox | |
| count = self.N | |
| bboxes = [[30, 20, 160, 130], [50, 40, 200, 170], | |
| [10, 60, 100, 200], [120, 30, 280, 150], | |
| [60, 90, 200, 250]] | |
| detections_no_fmt = [{"bbox": b, "label": f"obj_{i%5}", "score": 0.9} | |
| for i, b in enumerate(bboxes * (count // 5))] | |
| detections_fmt = [{"bbox": d["bbox"], "bbox_format": "absolute_xyxy", | |
| "label": d["label"], "score": d["score"]} | |
| for d in detections_no_fmt] | |
| start = time.perf_counter() | |
| auto_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in detections_no_fmt] | |
| auto_elapsed = time.perf_counter() - start | |
| start = time.perf_counter() | |
| explicit_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in detections_fmt] | |
| explicit_elapsed = time.perf_counter() - start | |
| for a, e in zip(auto_results, explicit_results): | |
| assert a == pytest.approx(e, abs=1e-6) | |
| ratio = auto_elapsed / max(explicit_elapsed, 1e-9) | |
| overhead_us = (auto_elapsed - explicit_elapsed) / count * 1e6 | |
| print(f"\n[BBOX FMT OVERHEAD] absolute_xyxy ({count}x):") | |
| print(f" Auto-detect: {auto_elapsed:.4f}s") | |
| print(f" Explicit: {explicit_elapsed:.4f}s") | |
| print(f" Ratio: {ratio:.2f}x") | |
| print(f" Overhead/det: {overhead_us:.2f}us") | |
| assert ratio < 5.0, ( | |
| f"Auto-detect is {ratio:.1f}x slower! " | |
| f"Auto={auto_elapsed:.4f}s, Explicit={explicit_elapsed:.4f}s" | |
| ) | |
| def test_auto_vs_explicit_absolute_cxcywh(self): | |
| """Absolute cxcywh bboxes — auto-detect must distinguish from xyxy. | |
| This is the trickiest auto-detect case: cxcywh has values like | |
| [150, 100, 60, 40] where width/height could look like x2/y2 vs | |
| w/h vs (x,y). The heuristic checks if w/h are comparable to | |
| x/y magnitudes. | |
| Note: bboxes are chosen so the heuristic correctly identifies them | |
| as cxcywh (width > x*0.5 to avoid confusion with xywh). | |
| """ | |
| import time | |
| from shopstack.providers.image_gen_provider import resolve_detection_bbox | |
| count = self.N | |
| # Each bbox: cx, cy, w, h where w > cx*0.5 so the heuristic | |
| # doesn't misclassify as xywh. E.g. [150, 80, 100, 60]: | |
| # w=100 > 150*0.5=75 → not xywh. h=60 < 80*1.5=120 → cxcywh. ✓ | |
| bboxes = [[150, 80, 100, 60], [200, 120, 140, 70], | |
| [100, 150, 90, 50], [250, 60, 160, 50], | |
| [180, 200, 120, 70]] | |
| detections_no_fmt = [{"bbox": b, "label": f"obj_{i%5}", "score": 0.9} | |
| for i, b in enumerate(bboxes * (count // 5))] | |
| detections_fmt = [{"bbox": d["bbox"], "bbox_format": "absolute_cxcywh", | |
| "label": d["label"], "score": d["score"]} | |
| for d in detections_no_fmt] | |
| start = time.perf_counter() | |
| auto_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in detections_no_fmt] | |
| auto_elapsed = time.perf_counter() - start | |
| start = time.perf_counter() | |
| explicit_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in detections_fmt] | |
| explicit_elapsed = time.perf_counter() - start | |
| for a, e in zip(auto_results, explicit_results): | |
| assert a == pytest.approx(e, abs=1e-6) | |
| ratio = auto_elapsed / max(explicit_elapsed, 1e-9) | |
| overhead_us = (auto_elapsed - explicit_elapsed) / count * 1e6 | |
| print(f"\n[BBOX FMT OVERHEAD] absolute_cxcywh ({count}x):") | |
| print(f" Auto-detect: {auto_elapsed:.4f}s") | |
| print(f" Explicit: {explicit_elapsed:.4f}s") | |
| print(f" Ratio: {ratio:.2f}x") | |
| print(f" Overhead/det: {overhead_us:.2f}us") | |
| assert ratio < 5.0 | |
| def test_auto_vs_explicit_mixed_formats(self): | |
| """All 5 formats mixed — most realistic scenario. | |
| Uses carefully chosen bboxes so the auto-detect heuristic correctly | |
| identifies each format. This avoids heuristic edge cases that are | |
| known limitations of format auto-detection. | |
| Covers format-to-format transitions within a single call — important | |
| because the heuristic's code path branches differently per detection. | |
| """ | |
| import time | |
| from shopstack.providers.image_gen_provider import resolve_detection_bbox | |
| count = 200 | |
| # Hand-picked bboxes for each format that the heuristic correctly detects. | |
| # For each format, 5 bboxes are defined and cycled. | |
| fmt_bboxes = { | |
| # normalized_xyxy: small values, not near center (avoids cxcywh heuristic) | |
| 0: [[0.05, 0.05, 0.20, 0.18], [0.30, 0.08, 0.55, 0.30], | |
| [0.02, 0.40, 0.15, 0.60], [0.60, 0.10, 0.85, 0.35], | |
| [0.10, 0.50, 0.30, 0.75]], | |
| # absolute_xyxy: values > 1.5, w/x and h/y both not <= 0.5 and not < 1.5 | |
| 1: [[50, 30, 200, 160], [120, 40, 300, 180], | |
| [30, 80, 130, 240], [160, 50, 350, 200], | |
| [80, 100, 220, 260]], | |
| # absolute_cxcywh: values > 1.5, w > x*0.5 (not xywh), w < x*1.5 and h < y*1.5 | |
| 2: [[150, 80, 100, 60], [200, 120, 140, 70], | |
| [100, 150, 90, 50], [250, 60, 160, 50], | |
| [180, 200, 120, 70]], | |
| # normalized_cxcywh: values near 0.5, small w/h | |
| 3: [[0.40, 0.40, 0.15, 0.10], [0.55, 0.45, 0.20, 0.12], | |
| [0.35, 0.60, 0.12, 0.08], [0.65, 0.40, 0.18, 0.14], | |
| [0.45, 0.55, 0.10, 0.12]], | |
| # absolute_xywh: values > 1.5, w <= x*0.5 AND h <= y*0.5 | |
| 4: [[180, 160, 40, 30], [240, 100, 60, 25], | |
| [150, 200, 30, 40], [300, 80, 50, 20], | |
| [200, 140, 45, 35]], | |
| } | |
| # Build auto-detect detections (no bbox_format) and explicit copies | |
| auto_dets = [] | |
| explicit_dets = [] | |
| fmt_labels = {0: "normalized_xyxy", 1: "absolute_xyxy", | |
| 2: "absolute_cxcywh", 3: "normalized_cxcywh", 4: "absolute_xywh"} | |
| for i in range(count): | |
| fmt_idx = i % 5 | |
| bbox = fmt_bboxes[fmt_idx][i % 5] | |
| label = f"obj_{i}" | |
| auto_dets.append({"bbox": list(bbox), "label": label, "score": 0.9}) | |
| ed = dict(auto_dets[-1]) | |
| ed["bbox_format"] = fmt_labels[fmt_idx] | |
| explicit_dets.append(ed) | |
| start = time.perf_counter() | |
| auto_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in auto_dets] | |
| auto_elapsed = time.perf_counter() - start | |
| start = time.perf_counter() | |
| explicit_results = [resolve_detection_bbox(d, self.IMG_W, self.IMG_H) | |
| for d in explicit_dets] | |
| explicit_elapsed = time.perf_counter() - start | |
| for idx, (a, e) in enumerate(zip(auto_results, explicit_results)): | |
| assert a == pytest.approx(e, abs=1e-6), ( | |
| f"Mismatch at idx {idx}: auto={a}, explicit={e}" | |
| ) | |
| ratio = auto_elapsed / max(explicit_elapsed, 1e-9) | |
| overhead_us = (auto_elapsed - explicit_elapsed) / count * 1e6 | |
| print(f"\n[BBOX FMT OVERHEAD] mixed 5 formats ({count}x):") | |
| print(f" Auto-detect: {auto_elapsed:.4f}s") | |
| print(f" Explicit: {explicit_elapsed:.4f}s") | |
| print(f" Ratio: {ratio:.2f}x") | |
| print(f" Overhead/det: {overhead_us:.2f}us") | |
| assert ratio < 5.0, ( | |
| f"Auto-detect is {ratio:.1f}x slower for mixed formats! " | |
| f"Auto={auto_elapsed:.4f}s, Explicit={explicit_elapsed:.4f}s" | |
| ) | |
| class TestAnnotateImageSizeScalingBenchmarks: | |
| """Benchmark annotate_image latency across different image sizes. | |
| Tests scaling behavior from thumbnail (100x100) through high-res | |
| (4000x3000) using the same set of 50 mixed-format detections. | |
| This isolates Pillow rendering scaling from bbox normalization cost. | |
| Expected scaling: | |
| - Bbox normalization: O(1) per detection, independent of image size | |
| - Pillow ImageDraw rectangle/text: primarily O(detections), small | |
| constant factor for larger images (wider pixel spans for outlines) | |
| - PNG compression: varies with image size | |
| """ | |
| # Image sizes to test: (name, w, h, max_seconds) | |
| SIZES = [ | |
| ("thumbnail", 100, 100, 2.0), | |
| ("standard", 400, 300, 2.0), | |
| ("high_res", 4000, 3000, 8.0), | |
| ] | |
| _DETECTIONS = 50 # same count for all sizes | |
| def test_annotate_thumbnail_image(self): | |
| """100x100 — thumbnail-size image with 50 detections. | |
| Bbox values must be small enough to fit in 100x100 pixels. | |
| Verifies output matches input dimensions. | |
| """ | |
| self._run_size_test("thumbnail", 100, 100, 2.0) | |
| def test_annotate_standard_image(self): | |
| """400x300 — typical receipt/market scan size with 50 detections. | |
| This is the standard image size used in existing benchmarks. | |
| Provides a baseline for scaling comparison. | |
| """ | |
| self._run_size_test("standard", 400, 300, 2.0) | |
| def test_annotate_high_res_image(self): | |
| """4000x3000 — high-resolution photo with 50 detections. | |
| ~40x more pixels than thumbnail, ~100x more than standard. | |
| Verifies that Pillow textbbox and rectangle drawing scale | |
| reasonably rather than exploding with image dimensions. | |
| """ | |
| self._run_size_test("high_res", 4000, 3000, 8.0) | |
| def test_annotate_scale_ratios(self): | |
| """Compare latencies across all sizes and compute scale factors.""" | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(self._DETECTIONS) | |
| results: list[dict] = [] | |
| tmpdirs: list[Path] = [] | |
| for name, w, h, _max_s in self.SIZES: | |
| tmp = Path(tempfile.mkdtemp()) | |
| tmpdirs.append(tmp) | |
| img_path = tmp / f"bench_{name}.png" | |
| Image.new("RGB", (w, h), color="white").save(img_path) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| result_path = Path(result) | |
| assert result_path.is_file(), f"Output missing for {name}" | |
| assert result.endswith(".png"), f"Should produce PNG for {name}" | |
| # Verify output dimensions match input | |
| with Image.open(result_path) as out_img: | |
| assert out_img.size == (w, h), ( | |
| f"Output dimensions {out_img.size} != input ({w}x{h}) for {name}" | |
| ) | |
| size_kb = result_path.stat().st_size / 1024 | |
| results.append({ | |
| "name": name, | |
| "w": w, | |
| "h": h, | |
| "megapixels": round(w * h / 1e6, 2), | |
| "elapsed_s": round(elapsed, 4), | |
| "size_kb": round(size_kb, 1), | |
| }) | |
| # Cleanup temp dirs | |
| for tmp in tmpdirs: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| # Print scaling table | |
| print(f"\n[IMAGE SIZE SCALING] {self._DETECTIONS} detections per size:") | |
| print(f" {'Name':<12} {'Dim':<12} {'MP':<8} {'Latency':<10} {'File':<10} {'Scale':<8}") | |
| print(f" {'-'*58}") | |
| baseline = results[0]["elapsed_s"] | |
| for r in results: | |
| scale = r["elapsed_s"] / max(baseline, 1e-9) | |
| print(f" {r['name']:<12} {r['w']}x{r['h']:<8} " | |
| f"{r['megapixels']:<8.2f} {r['elapsed_s']:<10.4f}s " | |
| f"{r['size_kb']:<10.1f}kb {scale:<8.2f}x") | |
| # ── Performance regression thresholds ──────────────────────── | |
| thumb_r = next(r for r in results if r["name"] == "thumbnail") | |
| std_r = next(r for r in results if r["name"] == "standard") | |
| hr_r = next(r for r in results if r["name"] == "high_res") | |
| # High-res (4000x3000, 12MP) should not be >20x slower than | |
| # thumbnail (100x100, 0.01MP). Pillow rendering scales primarily | |
| # with detection count, not image dimensions — so even at 1200x | |
| # more pixels, latency should stay within 20x. | |
| hr_vs_thumb = hr_r["elapsed_s"] / max(thumb_r["elapsed_s"], 1e-9) | |
| assert hr_vs_thumb < 20.0, ( | |
| f"High-res ({hr_r['megapixels']}MP, {hr_r['w']}x{hr_r['h']}) is " | |
| f"{hr_vs_thumb:.1f}x slower than thumbnail " | |
| f"({thumb_r['megapixels']}MP, {thumb_r['w']}x{thumb_r['h']}) — " | |
| f"expected <20x. " | |
| f"Thumbnail: {thumb_r['elapsed_s']:.4f}s, " | |
| f"High-res: {hr_r['elapsed_s']:.4f}s" | |
| ) | |
| # ~100x more pixels (standard → high-res) should not cause >10x | |
| # latency increase. This catches regressions in the rendering loop | |
| # (e.g., per-pixel operations accidentally introduced). | |
| pixel_ratio = (hr_r["megapixels"] / max(std_r["megapixels"], 1e-9)) | |
| latency_ratio = hr_r["elapsed_s"] / max(std_r["elapsed_s"], 1e-9) | |
| assert latency_ratio < pixel_ratio * 0.2 + 2.0, ( | |
| f"High-res scaling is super-linear: {pixel_ratio:.0f}x pixels " | |
| f"caused {latency_ratio:.1f}x latency increase. " | |
| f"Standard: {std_r['elapsed_s']:.4f}s, " | |
| f"High-res: {hr_r['elapsed_s']:.4f}s" | |
| ) | |
| def _run_size_test(self, name: str, w: int, h: int, max_seconds: float) -> None: | |
| """Run a single size benchmark with shared detections.""" | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(self._DETECTIONS) | |
| tmp = Path(tempfile.mkdtemp()) | |
| img_path = tmp / f"bench_{name}.png" | |
| try: | |
| Image.new("RGB", (w, h), color="white").save(img_path) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| result_path = Path(result) | |
| assert result_path.is_file(), f"Output missing for {name}" | |
| assert result.endswith(".png"), f"Should produce PNG for {name}" | |
| # Verify output dimensions match input | |
| with Image.open(result_path) as out_img: | |
| assert out_img.size == (w, h), ( | |
| f"Output dimensions {out_img.size} != input ({w}x{h})" | |
| ) | |
| assert elapsed < max_seconds, ( | |
| f"annotate_image on {name} ({w}x{h}) took {elapsed:.3f}s — " | |
| f"expected <{max_seconds}s ({self._DETECTIONS} detections)" | |
| ) | |
| size_kb = result_path.stat().st_size / 1024 | |
| print(f"\n[SIZE SCALE {name}] {w}x{h} ({w*h/1e6:.1f}MP): " | |
| f"{elapsed:.4f}s, {size_kb:.0f}kb output") | |
| finally: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| class TestAnnotateImageContentBenchmarks: | |
| """Benchmark annotate_image latency across different image content types. | |
| Tests whether pixel content (uniform white, solid color, gradient, random | |
| noise) affects rendering latency. Pillow's ``rectangle()`` and ``text()`` | |
| operations write pixels regardless of existing content, so rendering time | |
| should be independent of image content. However, PNG compression and | |
| file I/O may vary with pixel entropy. | |
| Image types tested: | |
| - **white**: Uniform RGB(255,255,255) — maximum PNG compression (baseline) | |
| - **solid_red**: Uniform RGB(200,40,40) — uniform but non-white | |
| - **gradient**: Horizontal color gradient — varied pixel values | |
| - **noise**: Random RGB noise — maximum entropy, minimal PNG compression | |
| Expected result: All content types should have nearly identical latency | |
| since Pillow operations are pixel-content-independent. | |
| """ | |
| SIZE = (400, 300) | |
| DETECTIONS = 50 | |
| def test_annotate_white_image(self): | |
| """Uniform white image — baseline for comparison.""" | |
| self._run_content_test("white", lambda img: None) | |
| def test_annotate_solid_red_image(self): | |
| """Solid red image — uniform but non-white content.""" | |
| self._run_content_test("solid_red", lambda img: img.paste((200, 40, 40), [0, 0, *self.SIZE])) | |
| def test_annotate_gradient_image(self): | |
| """Horizontal gradient — varied pixel values across width.""" | |
| def draw_gradient(img): | |
| from PIL import ImageDraw | |
| draw = ImageDraw.Draw(img) | |
| w, h = img.size | |
| for x in range(w): | |
| ratio = x / w | |
| color = int(255 * (1 - ratio)) | |
| draw.line([(x, 0), (x, h)], fill=(color, color, int(255 * ratio))) | |
| self._run_content_test("gradient", draw_gradient) | |
| def test_annotate_noise_image(self): | |
| """Random noise — maximum pixel entropy.""" | |
| def draw_noise(img): | |
| import random | |
| from PIL import ImageDraw | |
| draw = ImageDraw.Draw(img) | |
| w, h = img.size | |
| for y in range(0, h, 2): | |
| for x in range(0, w, 2): | |
| draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) | |
| self._run_content_test("noise", draw_noise) | |
| def test_annotate_content_comparison(self): | |
| """Run all content types and compare latency/ output size.""" | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| import random | |
| from PIL import Image, ImageDraw | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(self.DETECTIONS) | |
| w, h = self.SIZE | |
| content_generators = { | |
| "white": lambda img: None, | |
| "solid_red": lambda img: img.paste((200, 40, 40), [0, 0, w, h]), | |
| "gradient": lambda img: None, # handled below | |
| "noise": lambda img: None, # handled below | |
| } | |
| # Build gradient and noise manually | |
| gradient_img = Image.new("RGB", (w, h), color="white") | |
| g_draw = ImageDraw.Draw(gradient_img) | |
| for x in range(w): | |
| ratio = x / w | |
| g_draw.line([(x, 0), (x, h)], fill=(int(255 * (1 - ratio)), int(255 * (1 - ratio)), int(255 * ratio))) | |
| noise_img = Image.new("RGB", (w, h), color="white") | |
| n_draw = ImageDraw.Draw(noise_img) | |
| for y in range(0, h, 2): | |
| for x in range(0, w, 2): | |
| n_draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) | |
| prebuilt = { | |
| "gradient": gradient_img, | |
| "noise": noise_img, | |
| } | |
| results: list[dict] = [] | |
| names = ["white", "solid_red", "gradient", "noise"] | |
| tmpdirs: list[Path] = [] | |
| for name in names: | |
| tmp = Path(tempfile.mkdtemp()) | |
| tmpdirs.append(tmp) | |
| img_path = tmp / f"content_{name}.png" | |
| if name in prebuilt: | |
| prebuilt[name].save(img_path) | |
| else: | |
| img = Image.new("RGB", (w, h), color="white") | |
| content_generators[name](img) | |
| img.save(img_path) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| result_path = Path(result) | |
| assert result_path.is_file() | |
| assert result.endswith(".png") | |
| with Image.open(result_path) as out_img: | |
| assert out_img.size == (w, h) | |
| size_kb = result_path.stat().st_size / 1024 | |
| results.append({ | |
| "name": name, | |
| "elapsed_s": round(elapsed, 4), | |
| "size_kb": round(size_kb, 1), | |
| }) | |
| # Cleanup | |
| for tmp in tmpdirs: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| # Print comparison table | |
| print(f"\n[IMAGE CONTENT COMPARISON] {self.DETECTIONS} detections on {w}x{h}:") | |
| print(f" {'Content':<12} {'Latency':<10} {'Output':<10} {'Ratio':<8}") | |
| print(f" {'-'*38}") | |
| baseline = results[0]["elapsed_s"] | |
| for r in results: | |
| ratio = r["elapsed_s"] / max(baseline, 1e-9) | |
| print(f" {r['name']:<12} {r['elapsed_s']:<10.4f}s {r['size_kb']:<10.1f}kb {ratio:<8.2f}x") | |
| # Verify no content type causes >2x latency vs white | |
| for r in results: | |
| ratio = r["elapsed_s"] / max(baseline, 1e-9) | |
| assert ratio < 2.0, ( | |
| f"Content '{r['name']}' is {ratio:.2f}x slower than white " | |
| f"({r['elapsed_s']:.4f}s vs white {baseline:.4f}s)" | |
| ) | |
| def _run_content_test(self, name: str, draw_fn) -> None: | |
| """Run a single content-type benchmark.""" | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(self.DETECTIONS) | |
| w, h = self.SIZE | |
| tmp = Path(tempfile.mkdtemp()) | |
| img_path = tmp / f"content_{name}.png" | |
| try: | |
| img = Image.new("RGB", (w, h), color="white") | |
| draw_fn(img) | |
| img.save(img_path) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| result_path = Path(result) | |
| assert result_path.is_file(), f"Output missing for {name}" | |
| assert result.endswith(".png"), f"Should produce PNG for {name}" | |
| with Image.open(result_path) as out_img: | |
| assert out_img.size == (w, h) | |
| assert elapsed < 2.0, ( | |
| f"annotate_image on {name} image took {elapsed:.3f}s — " | |
| f"expected <2.0s" | |
| ) | |
| size_kb = result_path.stat().st_size / 1024 | |
| print(f"\n[CONTENT {name}] {w}x{h}: {elapsed:.4f}s, {size_kb:.0f}kb output") | |
| finally: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| # ── JSONL trend-tracking log for memory benchmarks ──────────────── | |
| _TREND_FILE = Path(__file__).parent / "trends" / "memory-trends.jsonl" | |
| def _append_memory_benchmark_trend( | |
| test_name: str, | |
| params: dict, | |
| results: dict, | |
| ) -> None: | |
| """Append a memory benchmark result to the JSONL trend-tracking file. | |
| Each line is a self-describing JSON object with timestamp, commit SHA, | |
| platform, test metadata, and measured results. The file is tracked in | |
| git so trends can be monitored across CI runs. | |
| Args: | |
| test_name: e.g. "test_annotate_memory_high_res_single" | |
| params: dict of input parameters (image size, detections, content type, etc.) | |
| results: dict of measured values (RSS deltas, latency, output size, etc.) | |
| """ | |
| import json | |
| import os as _os | |
| import subprocess | |
| import sys | |
| from datetime import datetime, timezone | |
| try: | |
| commit = subprocess.run( | |
| ["git", "rev-parse", "HEAD"], | |
| capture_output=True, | |
| text=True, | |
| timeout=5, | |
| cwd=Path(__file__).parent, | |
| ).stdout.strip() | |
| except Exception: | |
| commit = "unknown" | |
| record = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "commit": commit, | |
| "platform": sys.platform, | |
| "python_version": sys.version.split()[0], | |
| "test_name": test_name, | |
| "params": params, | |
| "results": results, | |
| } | |
| trends_dir = _TREND_FILE.parent | |
| trends_dir.mkdir(parents=True, exist_ok=True) | |
| with open(_TREND_FILE, "a") as f: | |
| f.write(json.dumps(record) + "\n") | |
| class TestAnnotateImageMemoryBenchmarks: | |
| """Memory-usage benchmarks for annotate_image on large images. | |
| Measures process RSS before/after annotation to detect memory | |
| regressions. Uses ``psutil`` (optional — test skips gracefully if | |
| unavailable). The FluxImageProvider itself consumes minimal memory | |
| (no neural model loaded), so these benchmarks primarily catch: | |
| - Memory from large Pillow images (especially 12MP high-res) | |
| - Leaked temporary files or accumulated detection state | |
| - Regressions from per-pixel operations that cache data | |
| """ | |
| # Uses psutil for RSS measurement (optional dependency) | |
| _SKIP_REASON = "psutil not installed — install with: pip install psutil" | |
| def test_annotate_memory_high_res_single(self): | |
| """Measure RSS increase for a single 12MP annotation with 50 detections. | |
| A single annotation should increase RSS by <200MB (the high-res PNG | |
| itself is ~12MP × 3 bytes ≈ 36MB uncompressed; the annotated output | |
| is another similar buffer). If this grows beyond 200MB, something | |
| is caching per-pixel data across calls. | |
| """ | |
| try: | |
| import psutil | |
| import os | |
| except ImportError: | |
| pytest.skip(self._SKIP_REASON) | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(50) | |
| w, h = 4000, 3000 | |
| tmp = Path(tempfile.mkdtemp()) | |
| img_path = tmp / "bench_mem_high_res.png" | |
| try: | |
| Image.new("RGB", (w, h), color="white").save(img_path) | |
| proc = psutil.Process(os.getpid()) | |
| import gc | |
| gc.collect() # clear deferred cleanup before baseline | |
| rss_before = proc.memory_info().rss / (1024 * 1024) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| gc.collect() # free annotation objects before after-measurement | |
| rss_after = proc.memory_info().rss / (1024 * 1024) | |
| delta = rss_after - rss_before | |
| result_path = Path(result) | |
| size_mb = result_path.stat().st_size / (1024 * 1024) | |
| print(f"\n[MEM HIGH-RES SINGLE] 4000x3000, 50 detections:") | |
| print(f" RSS before: {rss_before:.1f}MB") | |
| print(f" RSS after: {rss_after:.1f}MB") | |
| print(f" Delta: {delta:+.1f}MB") | |
| print(f" Output: {size_mb:.1f}MB PNG") | |
| print(f" Latency: {elapsed:.3f}s") | |
| assert elapsed < 8.0, f"High-res annotation too slow: {elapsed:.3f}s" | |
| assert delta < 200.0, ( | |
| f"Memory increase {delta:.1f}MB exceeds 200MB — " | |
| f"potential memory regression. " | |
| f"Before: {rss_before:.1f}MB, After: {rss_after:.1f}MB" | |
| ) | |
| _append_memory_benchmark_trend( | |
| test_name="test_annotate_memory_high_res_single", | |
| params={ | |
| "image_size": f"{w}x{h}", | |
| "megapixels": round(w * h / 1e6, 2), | |
| "detections": 50, | |
| "content_type": "white", | |
| }, | |
| results={ | |
| "rss_before_mb": round(rss_before, 1), | |
| "rss_after_mb": round(rss_after, 1), | |
| "delta_mb": round(delta, 1), | |
| "latency_s": round(elapsed, 4), | |
| "output_mb": round(size_mb, 2), | |
| }, | |
| ) | |
| finally: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| def test_annotate_memory_stress_detections(self): | |
| """Measure RSS increase for 200 detections on a standard image. | |
| Stress test with 4x the detection count. Memory should stay | |
| roughly constant since bboxes are processed one at a time | |
| (no batched allocation). Each rectangle/text operation allocates | |
| and frees within the same call. | |
| """ | |
| try: | |
| import psutil | |
| import os | |
| except ImportError: | |
| pytest.skip(self._SKIP_REASON) | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(200) | |
| w, h = 400, 300 | |
| tmp = Path(tempfile.mkdtemp()) | |
| img_path = tmp / "bench_mem_stress.png" | |
| try: | |
| Image.new("RGB", (w, h), color="white").save(img_path) | |
| proc = psutil.Process(os.getpid()) | |
| import gc | |
| gc.collect() # clear deferred cleanup before baseline | |
| rss_before = proc.memory_info().rss / (1024 * 1024) | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| gc.collect() # free annotation objects before after-measurement | |
| rss_after = proc.memory_info().rss / (1024 * 1024) | |
| delta = rss_after - rss_before | |
| result_path = Path(result) | |
| size_kb = result_path.stat().st_size / 1024 | |
| print(f"\n[MEM STRESS 200 DETS] 400x300, 200 detections:") | |
| print(f" RSS before: {rss_before:.1f}MB") | |
| print(f" RSS after: {rss_after:.1f}MB") | |
| print(f" Delta: {delta:+.1f}MB") | |
| print(f" Output: {size_kb:.0f}KB PNG") | |
| print(f" Latency: {elapsed:.3f}s") | |
| assert elapsed < 8.0, f"Stress annotation too slow: {elapsed:.3f}s" | |
| assert delta < 100.0, ( | |
| f"Memory increase {delta:.1f}MB for stress test exceeds 100MB — " | |
| f"potential memory regression from batching. " | |
| f"Before: {rss_before:.1f}MB, After: {rss_after:.1f}MB" | |
| ) | |
| _append_memory_benchmark_trend( | |
| test_name="test_annotate_memory_stress_detections", | |
| params={ | |
| "image_size": f"{w}x{h}", | |
| "detections": 200, | |
| "content_type": "white", | |
| }, | |
| results={ | |
| "rss_before_mb": round(rss_before, 1), | |
| "rss_after_mb": round(rss_after, 1), | |
| "delta_mb": round(delta, 1), | |
| "latency_s": round(elapsed, 4), | |
| "output_kb": round(size_kb, 1), | |
| }, | |
| ) | |
| finally: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| def test_annotate_memory_multiple_calls(self): | |
| """Measure RSS after 5 sequential annotations — leak detection. | |
| Each call to ``annotate_image`` creates a new Pillow Image, | |
| draws rectangles, and saves. If any per-call state leaks, | |
| RSS will grow with each iteration. This test runs 5 calls | |
| and measures cumulative increase. | |
| Uses a standard (400x300) image and 50 detections per call. | |
| """ | |
| try: | |
| import psutil | |
| import os | |
| except ImportError: | |
| pytest.skip(self._SKIP_REASON) | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(50) | |
| w, h = 400, 300 | |
| tmp = Path(tempfile.mkdtemp()) | |
| img_path = tmp / "bench_mem_multiple.png" | |
| try: | |
| Image.new("RGB", (w, h), color="white").save(img_path) | |
| proc = psutil.Process(os.getpid()) | |
| import gc | |
| gc.collect() # clear deferred cleanup before baseline | |
| rss_before = proc.memory_info().rss / (1024 * 1024) | |
| n = 5 | |
| start = time.perf_counter() | |
| for i in range(n): | |
| result = provider.annotate_image(str(img_path), detections) | |
| result_path = Path(result) | |
| assert result_path.is_file(), f"Output missing for call {i}" | |
| elapsed = time.perf_counter() - start | |
| gc.collect() # free annotation objects before after-measurement | |
| rss_after = proc.memory_info().rss / (1024 * 1024) | |
| delta = rss_after - rss_before | |
| avg_s = elapsed / n | |
| print(f"\n[MEM MULTIPLE CALLS] {n}x annotations (400x300, 50 detections):") | |
| print(f" RSS before: {rss_before:.1f}MB") | |
| print(f" RSS after: {rss_after:.1f}MB") | |
| print(f" Delta: {delta:+.1f}MB") | |
| print(f" Avg call: {avg_s:.3f}s") | |
| print(f" Total: {elapsed:.3f}s") | |
| assert elapsed < 10.0, ( | |
| f"{n} sequential annotations took {elapsed:.3f}s — " | |
| f"expected <10s total" | |
| ) | |
| # Cumulative increase across 5 calls should be <200MB. | |
| # If memory grows linearly per call, this catches leaks. | |
| assert delta < 200.0, ( | |
| f"Memory increase {delta:.1f}MB after {n} calls exceeds 200MB — " | |
| f"potential memory leak. " | |
| f"Before: {rss_before:.1f}MB, After: {rss_after:.1f}MB" | |
| ) | |
| _append_memory_benchmark_trend( | |
| test_name="test_annotate_memory_multiple_calls", | |
| params={ | |
| "image_size": f"{w}x{h}", | |
| "detections": 50, | |
| "num_calls": n, | |
| "content_type": "white", | |
| }, | |
| results={ | |
| "rss_before_mb": round(rss_before, 1), | |
| "rss_after_mb": round(rss_after, 1), | |
| "cumulative_delta_mb": round(delta, 1), | |
| "avg_latency_s": round(avg_s, 4), | |
| "total_latency_s": round(elapsed, 4), | |
| }, | |
| ) | |
| finally: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| def test_annotate_memory_vs_baseline(self): | |
| """Compare RSS with annotation vs loading the image alone. | |
| Isolates the annotation overhead (rectangle drawing, textbbox | |
| calculation, PNG save) from the image-in-memory cost by measuring | |
| RSS in three states: | |
| 1. **Baseline**: Baseline RSS (gc.collect() first) | |
| 2. **Image loaded**: After creating the Pillow Image in memory | |
| 3. **After annotation**: After ``annotate_image()`` completes | |
| The delta ``annotated - image_loaded`` is the pure annotation | |
| overhead — it excludes the cost of keeping the image in memory. | |
| Uses a 4000x3000 high-res image with 50 detections. | |
| """ | |
| try: | |
| import psutil | |
| import os | |
| import gc | |
| except ImportError: | |
| pytest.skip(self._SKIP_REASON) | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(50) | |
| w, h = 4000, 3000 | |
| tmp = Path(tempfile.mkdtemp()) | |
| img_path = tmp / "bench_mem_vs_baseline.png" | |
| img = None # ensure img is defined for cleanup | |
| try: | |
| Image.new("RGB", (w, h), color="white").save(img_path) | |
| proc = psutil.Process(os.getpid()) | |
| # ── Phase 1: Baseline (no image in memory) ──────────────── | |
| gc.collect() | |
| rss_baseline = proc.memory_info().rss / (1024 * 1024) | |
| # ── Phase 2: Load image into PIL (but don't annotate) ───── | |
| img = Image.open(img_path) | |
| img.load() # force pixel data into memory | |
| gc.collect() | |
| rss_with_image = proc.memory_info().rss / (1024 * 1024) | |
| image_cost = rss_with_image - rss_baseline | |
| # ── Phase 3: Run annotation ─────────────────────────────── | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| gc.collect() | |
| rss_annotated = proc.memory_info().rss / (1024 * 1024) | |
| annotation_overhead = rss_annotated - rss_with_image | |
| total_delta = rss_annotated - rss_baseline | |
| result_path = Path(result) | |
| output_mb = result_path.stat().st_size / (1024 * 1024) | |
| # Note: PIL's Image.open + load on a uniform white image may | |
| # not increase RSS measurably (PIL uses a shared/cached pixel | |
| # representation for uniform images). The annotation overhead | |
| # measurement is still valid — it measures the delta between | |
| # the image-in-memory state and the annotated state. | |
| # Log three-phase breakdown | |
| print(f"\n[MEM VS BASELINE] 4000x3000, 50 detections:") | |
| print(f" {'Phase':<20} {'RSS':<10} {'Delta':<10}") | |
| print(f" {'-'*40}") | |
| print(f" {'Baseline':<20} {rss_baseline:<10.1f}MB {'—':<10}") | |
| print(f" {'Image loaded':<20} {rss_with_image:<10.1f}MB {image_cost:+.1f}MB") | |
| print(f" {'After annotation':<20} {rss_annotated:<10.1f}MB {annotation_overhead:+.1f}MB") | |
| print(f" {'Total delta':<20} {'':<10} {total_delta:+.1f}MB") | |
| print(f" Output PNG: {output_mb:.2f}MB") | |
| print(f" Latency: {elapsed:.3f}s") | |
| # Assertions | |
| assert elapsed < 8.0, f"Annotation too slow: {elapsed:.3f}s" | |
| # Annotation overhead (Pillow drawing + text + PNG save). | |
| # For uniform images, image_cost can be ~0 (PIL optimization), | |
| # so use an absolute threshold: annotation overhead should be | |
| # <100MB (the annotated output PNG is reused from the original | |
| # image buffer — no full-image copy is made). | |
| # A typical run shows ~45MB delta (the PNG save buffer + temp | |
| # objects during annotation), well under 100MB. | |
| assert annotation_overhead < 100.0, ( | |
| f"Annotation overhead {annotation_overhead:.1f}MB exceeds " | |
| f"100MB — annotation should not duplicate the full image " | |
| f"buffer. Image loaded: {rss_with_image:.1f}MB, " | |
| f"Annotated: {rss_annotated:.1f}MB" | |
| ) | |
| # Sanity: total delta should be bounded | |
| assert total_delta < 300.0, ( | |
| f"Total RSS increase {total_delta:.1f}MB exceeds 300MB — " | |
| f"Baseline: {rss_baseline:.1f}MB, " | |
| f"Image: {rss_with_image:.1f}MB, " | |
| f"Annotated: {rss_annotated:.1f}MB" | |
| ) | |
| _append_memory_benchmark_trend( | |
| test_name="test_annotate_memory_vs_baseline", | |
| params={ | |
| "image_size": f"{w}x{h}", | |
| "megapixels": round(w * h / 1e6, 2), | |
| "detections": 50, | |
| "content_type": "white", | |
| }, | |
| results={ | |
| "rss_baseline_mb": round(rss_baseline, 1), | |
| "rss_with_image_mb": round(rss_with_image, 1), | |
| "image_cost_mb": round(image_cost, 1), | |
| "rss_annotated_mb": round(rss_annotated, 1), | |
| "annotation_overhead_mb": round(annotation_overhead, 1), | |
| "total_delta_mb": round(total_delta, 1), | |
| "output_mb": round(output_mb, 2), | |
| "latency_s": round(elapsed, 4), | |
| }, | |
| ) | |
| finally: | |
| if img is not None: | |
| img.close() | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| def test_annotate_memory_content_comparison(self): | |
| """Compare annotation overhead (RSS delta) across image content types. | |
| Measures RSS in three phases for each content type: | |
| 1. Baseline (no image) | |
| 2. Image loaded (PIL has pixel data in memory) | |
| 3. After annotation (annotate_image complete) | |
| The annotation overhead ``rss_annotated - rss_with_image`` isolates | |
| Pillow drawing + text + PNG save cost from the image-in-memory cost. | |
| Compares white (baseline) vs gradient vs noise to determine whether | |
| non-uniform pixel content affects the annotation's memory footprint. | |
| Expected: annotation overhead is content-independent — Pillow draws on | |
| the existing image buffer rather than creating a new one, so memory | |
| should be the same regardless of pixel content. | |
| """ | |
| try: | |
| import psutil | |
| import os | |
| import gc | |
| except ImportError: | |
| pytest.skip(self._SKIP_REASON) | |
| import time | |
| import random | |
| import tempfile | |
| from pathlib import Path | |
| from PIL import Image, ImageDraw | |
| from shopstack.providers.image_gen_provider import FluxImageProvider | |
| provider = FluxImageProvider() | |
| detections = TestAnnotateImageBenchmarks._generate_bench_detections(50) | |
| w, h = 400, 300 | |
| # ── Build content images ───────────────────────────────────── | |
| def _make_gradient() -> Image.Image: | |
| img = Image.new("RGB", (w, h), color="white") | |
| draw = ImageDraw.Draw(img) | |
| for x in range(w): | |
| ratio = x / w | |
| draw.line([(x, 0), (x, h)], fill=( | |
| int(255 * (1 - ratio)), | |
| int(255 * (1 - ratio)), | |
| int(255 * ratio), | |
| )) | |
| return img | |
| def _make_noise() -> Image.Image: | |
| img = Image.new("RGB", (w, h), color="white") | |
| draw = ImageDraw.Draw(img) | |
| for y in range(0, h, 2): | |
| for x in range(0, w, 2): | |
| draw.point((x, y), fill=( | |
| random.randint(0, 255), | |
| random.randint(0, 255), | |
| random.randint(0, 255), | |
| )) | |
| return img | |
| content_names = ["white", "gradient", "noise"] | |
| content_images = { | |
| "white": Image.new("RGB", (w, h), color="white"), | |
| "gradient": _make_gradient(), | |
| "noise": _make_noise(), | |
| } | |
| tmp = Path(tempfile.mkdtemp()) | |
| img_paths: dict[str, Path] = {} | |
| try: | |
| for name in content_names: | |
| path = tmp / f"content_mem_{name}.png" | |
| content_images[name].save(path) | |
| img_paths[name] = path | |
| proc = psutil.Process(os.getpid()) | |
| results: list[dict] = [] | |
| for name in content_names: | |
| img_path = img_paths[name] | |
| # Phase 1: Baseline | |
| gc.collect() | |
| rss_baseline = proc.memory_info().rss / (1024 * 1024) | |
| # Phase 2: Load image | |
| img = Image.open(img_path) | |
| img.load() | |
| gc.collect() | |
| rss_with_image = proc.memory_info().rss / (1024 * 1024) | |
| image_cost = rss_with_image - rss_baseline | |
| # Phase 3: Annotate | |
| start = time.perf_counter() | |
| result = provider.annotate_image(str(img_path), detections) | |
| elapsed = time.perf_counter() - start | |
| gc.collect() | |
| rss_annotated = proc.memory_info().rss / (1024 * 1024) | |
| annotation_overhead = rss_annotated - rss_with_image | |
| total_delta = rss_annotated - rss_baseline | |
| result_path = Path(result) | |
| output_kb = result_path.stat().st_size / 1024 | |
| results.append({ | |
| "name": name, | |
| "rss_baseline": rss_baseline, | |
| "image_cost": image_cost, | |
| "annotation_overhead": annotation_overhead, | |
| "total_delta": total_delta, | |
| "elapsed_s": round(elapsed, 4), | |
| "output_kb": round(output_kb, 1), | |
| }) | |
| img.close() | |
| # ── Print comparison table ─────────────────────────────── | |
| print(f"\n[MEM CONTENT COMPARISON] {w}x{h}, 50 detections:") | |
| header = " " + "".join(f"{c:<20}" for c in ["Content", "ImageCost", "AnnotOverhead", "TotalDelta", "Latency"]) | |
| print(header) | |
| print(f" {'-'*100}") | |
| baseline_overhead = results[0]["annotation_overhead"] | |
| for r in results: | |
| ratio = r["annotation_overhead"] / max(baseline_overhead, 1e-9) | |
| print(f" {r['name']:<20} " | |
| f"{r['image_cost']:+.1f}MB{'':<16} " | |
| f"{r['annotation_overhead']:+.1f}MB ({ratio:.2f}x){'':<6} " | |
| f"{r['total_delta']:+.1f}MB{'':<10} " | |
| f"{r['elapsed_s']:.4f}s") | |
| # ── Assertions ─────────────────────────────────────────── | |
| white_overhead = results[0]["annotation_overhead"] | |
| for r in results: | |
| # Absolute bound: annotation overhead for all content types | |
| # should be <100MB (standard 400x300 image with 50 detections). | |
| assert r["annotation_overhead"] < 100.0, ( | |
| f"Content '{r['name']}' annotation overhead " | |
| f"{r['annotation_overhead']:+.1f}MB exceeds 100MB" | |
| ) | |
| # Latency bound | |
| assert r["elapsed_s"] < 2.0, ( | |
| f"Content '{r['name']}' too slow: {r['elapsed_s']:.3f}s" | |
| ) | |
| # Compare non-white content to white baseline using absolute | |
| # difference (not ratio — PIL may show white_overhead ~0MB for | |
| # uniform images, making ratio comparisons unstable). | |
| # No content type should have >50MB more overhead than white. | |
| for r in results[1:]: | |
| extra = r["annotation_overhead"] - white_overhead | |
| assert extra < 50.0, ( | |
| f"Content '{r['name']}' annotation overhead " | |
| f"({r['annotation_overhead']:+.1f}MB) is " | |
| f"{extra:+.1f}MB above white's ({white_overhead:+.1f}MB) — " | |
| f"expected <50MB difference." | |
| ) | |
| # Log trend — record all 3 content types in one line | |
| _append_memory_benchmark_trend( | |
| test_name="test_annotate_memory_content_comparison", | |
| params={ | |
| "image_size": f"{w}x{h}", | |
| "detections": 50, | |
| "content_types": content_names, | |
| }, | |
| results={ | |
| "per_content": [ | |
| { | |
| "name": r["name"], | |
| "image_cost_mb": round(r["image_cost"], 1), | |
| "annotation_overhead_mb": round(r["annotation_overhead"], 1), | |
| "total_delta_mb": round(r["total_delta"], 1), | |
| "latency_s": round(r["elapsed_s"], 4), | |
| "output_kb": r["output_kb"], | |
| } | |
| for r in results | |
| ], | |
| }, | |
| ) | |
| finally: | |
| for f in tmp.iterdir(): | |
| f.unlink(missing_ok=True) | |
| tmp.rmdir() | |
| # ============================================================ | |
| # Tesseract real-model benchmarks (always available if CLI | |
| # is installed — Tesseract is the default OCR backend) | |
| # ============================================================ | |
| class TestTesseractBenchmarks: | |
| """Latency/throughput/quality benchmarks for Tesseract OCR. | |
| Tesseract is a local CLI tool (not a neural model) that runs on CPU | |
| with no GPU requirement. It is the default OCR backend in ShopStack | |
| because GLM-OCR fails on real-world receipt photos. | |
| These benchmarks use a generated thermal-printer receipt image | |
| (same fixture as GLM-OCR benchmarks) and extract text via pytesseract. | |
| Expected performance: | |
| - Extraction latency: ~0.1-0.5s per image (CPU) | |
| - Extraction quality: readable, key items/found, spacing noise common | |
| """ | |
| _KEY_ITEMS = ["ONION", "TOMATO", "POTATO", "MILK", "BREAD", "EGG", "SURF", "837"] | |
| _KEY_STORE = "SHARMA" | |
| def test_tesseract_available(self, tesseract_model): | |
| """Sanity check: TesseractOCRProvider reports available and version.""" | |
| provider, _image_path = tesseract_model | |
| assert provider.available, "TesseractOCRProvider should report available" | |
| assert provider.name == "tesseract" | |
| assert provider.last_latency_ms is None, "No extraction calls made yet" | |
| def test_tesseract_extraction_latency(self, tesseract_model): | |
| """Measure single receipt extraction latency. | |
| Tesseract typically completes in <0.5s on Apple Silicon. | |
| """ | |
| import time | |
| provider, image_path = tesseract_model | |
| start = time.perf_counter() | |
| result = provider.extract(image_path) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| text = result.get("text", "") | |
| assert elapsed < 2.0, f"Tesseract too slow: {elapsed:.3f}s" | |
| assert len(text) > 50, f"Extracted text too short: {len(text)} chars" | |
| assert provider.last_latency_ms is not None, "Latency should be recorded" | |
| assert provider.last_latency_ms < 2000, f"Latency {provider.last_latency_ms}ms exceeds 2s" | |
| def test_tesseract_extraction_quality(self, tesseract_model): | |
| """Verify extracted text contains expected receipt content. | |
| Tesseract preserves receipt structure well but may add spacing | |
| noise (extra dots, line-break artifacts). Key items, store name, | |
| and totals should still be identifiable. | |
| """ | |
| provider, image_path = tesseract_model | |
| result = provider.extract(image_path) | |
| text = result.get("text", "").upper() | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| # Check key items are present in extracted text | |
| found_items = [item for item in self._KEY_ITEMS if item in text] | |
| assert len(found_items) >= 4, ( | |
| f"Only {len(found_items)}/{len(self._KEY_ITEMS)} key items found. " | |
| f"Found: {found_items}. Text preview: {text[:300]}" | |
| ) | |
| # Check store name appears (Tesseract may split it across lines) | |
| assert self._KEY_STORE in text, ( | |
| f"Store name '{self._KEY_STORE}' not found in extracted text" | |
| ) | |
| # Tesseract should extract at least some numeric values | |
| import re | |
| numbers = re.findall(r"\d+\.?\d*", text) | |
| assert len(numbers) >= 5, ( | |
| f"Only {len(numbers)} numbers found in extracted text — " | |
| f"expected at least 5 (prices, quantities, total)" | |
| ) | |
| def test_tesseract_extraction_throughput(self, tesseract_model): | |
| """Measure sequential extraction throughput. | |
| Since Tesseract has no model loading overhead, it should | |
| handle sequential extractions very quickly. | |
| """ | |
| import time | |
| provider, image_path = tesseract_model | |
| n = 5 | |
| start = time.perf_counter() | |
| for _ in range(n): | |
| result = provider.extract(image_path) | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| elapsed = time.perf_counter() - start | |
| avg_s = elapsed / n | |
| images_per_min = 60.0 / avg_s if avg_s > 0 else 0 | |
| # Tesseract should handle 5 extractions in under 3s | |
| assert elapsed < 3.0, ( | |
| f"{n} extractions took {elapsed:.2f}s (avg {avg_s:.3f}s) — " | |
| f"too slow for sequential throughput" | |
| ) | |
| assert images_per_min > 60.0, ( | |
| f"Throughput {images_per_min:.0f} images/min too low " | |
| f"(expected >60 for Tesseract on CPU)" | |
| ) | |
| def test_tesseract_hindi_devanagari_receipt(self): | |
| """Benchmark Tesseract on a Devanagari-font bilingual Hindi receipt. | |
| Uses the ``_create_hindi_receipt_image()`` helper (Devanagari MT font, | |
| Hinglish-transliterated terms like PYAAZ, TAMATAR, DOODH) and Tesseract | |
| with ``lang='eng+hin'`` to test actual Devanagari script support. | |
| **Current status — NOT VERIFIED, PENDING.** | |
| Tesseract requires the ``tesseract-lang`` package (``brew install | |
| tesseract-lang``) to access the ``hin`` language data. On macOS without | |
| this package, the test skips gracefully with a clear message. | |
| Once ``hin`` is available, this test will measure: | |
| - Extraction latency with bilingual lang pack | |
| - Accuracy on Latin-script terms rendered in Devanagari MT font | |
| - Accuracy on actual Devanagari text (if present) | |
| See also: | |
| - ``Docs/models/tesseract/claims.yaml`` claim | |
| ``tesseract_hindi_devanagari_support`` (status: pending) | |
| - ``Docs/exploration/MODEL_EXPLORATION_2026.md`` section | |
| "Multilingual OCR Research — Hindi/Devanagari Support" for | |
| the full exploration map of Devanagari OCR candidates | |
| """ | |
| import importlib | |
| if importlib.util.find_spec("pytesseract") is None: | |
| pytest.skip("pytesseract not installed") | |
| if importlib.util.find_spec("PIL") is None: | |
| pytest.skip("Pillow not installed") | |
| # Check if 'hin' language data is available | |
| try: | |
| import pytesseract | |
| langs = pytesseract.get_languages() | |
| if "hin" not in langs: | |
| pytest.skip( | |
| "Tesseract Hindi Devanagari benchmark requires 'hin' language pack. " | |
| "Install with: brew install tesseract-lang. " | |
| "See Docs/exploration/MODEL_EXPLORATION_2026.md " | |
| "section 'Multilingual OCR Research — Hindi/Devanagari Support' " | |
| "for how to enable and the full research context." | |
| ) | |
| except Exception as e: | |
| pytest.skip(f"Could not check Tesseract languages: {e}") | |
| from shopstack.providers.tesseract_provider import TesseractOCRProvider | |
| from benchmarks.conftest import _create_hindi_receipt_image | |
| import time | |
| import os | |
| provider = TesseractOCRProvider(lang="eng+hin", psm=6) | |
| assert provider.available, "TesseractOCRProvider should be available" | |
| # Use the same Devanagari MT font receipt as GLM-OCR's Hindi test | |
| devanagari_path, gt_path = _create_hindi_receipt_image() | |
| try: | |
| with open(gt_path, encoding="utf-8") as f: | |
| ground_truth = f.read() | |
| start = time.perf_counter() | |
| result = provider.extract(devanagari_path) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| ext = result.get("text", "") | |
| # Ground truth terms (Hindi-transliterated Latin script) | |
| hindi_terms = ["pyaaz", "tamatar", "aaloo", "doodh", "anday", | |
| "makkhan", "cheeni", "sarson", "aata", "chawal", | |
| "dhanyavaad", "kuul", "aadhaa", "rupiyah", "vatra"] | |
| found = [t for t in hindi_terms if t in ext.lower()] | |
| # Devanagari MT font renders Latin characters differently than | |
| # standard fonts — Tesseract may struggle with character shapes. | |
| # Log the results even if accuracy is low. | |
| ext_lower = ext.lower() | |
| # Simple word-level overlap | |
| gt_words = set(ground_truth.lower().split()) | |
| ext_words = set(ext_lower.split()) | |
| overlap = len(gt_words & ext_words) | |
| accuracy = overlap / len(gt_words) if gt_words else 0.0 | |
| # Log for tracking — not a hard pass/fail since this is | |
| # an exploratory benchmark for a pending claim | |
| print( | |
| f"\n[DENAVAGARI BENCHMARK] Tesseract lang='eng+hin': " | |
| f"{elapsed:.2f}s, " | |
| f"{len(found)}/{len(hindi_terms)} Hindi terms found, " | |
| f"Word overlap: {accuracy:.1%} ({overlap}/{len(gt_words)}). " | |
| f"Found: {found}" | |
| ) | |
| # Expect at least some output (the test should not crash) | |
| assert elapsed < 10.0, f"Extraction too slow: {elapsed:.1f}s" | |
| assert len(ext) > 20, f"Extracted text too short: {len(ext)} chars" | |
| finally: | |
| try: | |
| os.unlink(devanagari_path) | |
| except Exception: | |
| pass | |
| try: | |
| os.unlink(gt_path) | |
| except Exception: | |
| pass | |
| def test_tesseract_no_model_load(self, tesseract_model): | |
| """Tesseract should have zero load time — it's a CLI tool. | |
| Unlike neural OCR models, Tesseract requires no weight loading | |
| or GPU initialization. This test verifies the load() method | |
| is a no-op and the provider is immediately available. | |
| """ | |
| provider, _image_path = tesseract_model | |
| import time | |
| start = time.perf_counter() | |
| provider.load() | |
| elapsed = time.perf_counter() - start | |
| assert elapsed < 0.1, f"Tesseract load() should be instant, took {elapsed:.4f}s" | |
| assert provider.available, "Tesseract should be available without loading" | |
| def test_tesseract_hindi_receipt(self): | |
| """Measure Tesseract accuracy on a receipt with Indian grocery terms. | |
| Uses a standard monospace font (Menlo) to render Hindi-transliterated | |
| item names (PYAAZ, TAMATAR, AALOO, etc.) — this tests Tesseract's | |
| ability to correctly read Indian grocery content, not its ability | |
| to handle Devanagari font rendering (which is a separate concern). | |
| Unlike GLM-OCR (which hallucinates on any Hindi-style content), | |
| Tesseract should extract most of the Latin-script transliterated | |
| terms accurately. | |
| """ | |
| import os as _os | |
| import time | |
| import tempfile | |
| from PIL import Image, ImageDraw, ImageFont | |
| from shopstack.providers.tesseract_provider import TesseractOCRProvider | |
| provider = TesseractOCRProvider(lang="eng", psm=6) | |
| if not provider.available: | |
| pytest.skip("Tesseract not available") | |
| # Generate a receipt image with Indian grocery terms using a | |
| # standard monospace font that Tesseract can read reliably | |
| lines = [ | |
| " SHARMA KIRANA STORE ", | |
| " 12th Main, Koramangala", | |
| " Date: 15/06/2026", | |
| "========================================", | |
| " ITEM QTY AMOUNT", | |
| "----------------------------------------", | |
| "1. PYAAZ (Onion) 2 KG 40", | |
| "2. TAMATAR (Tomato) 1 KG 35", | |
| "3. AALOO (Potato) 2 KG 50", | |
| "4. DOODH (Milk) 1 L 64", | |
| "5. ANDAY (Eggs) 12 PC 85", | |
| "6. MAKKHAN (Butter) 500 G 60", | |
| "7. CHEENI (Sugar) 1 KG 45", | |
| "8. SARSON KA TEL 1 L 185", | |
| "9. AATA (Wheat Flour) 1 KG 42", | |
| "10. CHAWAL (Rice) 1 KG 75", | |
| "----------------------------------------", | |
| " TOTAL 681", | |
| " GST 0", | |
| "========================================", | |
| " DHANYAVAAD! THANK YOU!", | |
| ] | |
| padding = 16 | |
| font_size = 15 | |
| line_height = font_size + 7 | |
| width = 440 | |
| height = len(lines) * line_height + padding * 2 | |
| img = Image.new("RGB", (width, height), (248, 244, 240)) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype("/System/Library/Fonts/Menlo.ttc", font_size) | |
| except Exception: | |
| font = ImageFont.load_default() | |
| right_align_keys = {"total", "gst"} | |
| for i, line in enumerate(lines): | |
| y = padding + i * line_height | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| lower = stripped.lower() | |
| if any(lower.startswith(k) for k in right_align_keys): | |
| tw = draw.textlength(stripped, font=font) | |
| draw.text((width - padding - tw, y), stripped, fill="black", font=font) | |
| else: | |
| draw.text((padding, y), stripped, fill="black", font=font) | |
| fd, path = tempfile.mkstemp(suffix=".png", prefix="tesseract_hindi_") | |
| _os.close(fd) | |
| img.save(path) | |
| try: | |
| start = time.perf_counter() | |
| result = provider.extract(path) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| ext = result.get("text", "") | |
| # Check for Hindi-transliterated terms in the extracted text. | |
| # All terms are Latin script (PYAAZ, TAMATAR, etc.) in a | |
| # standard monospace font, so Tesseract should extract them. | |
| hindi_terms = ["pyaaz", "tamatar", "aaloo", "doodh", "anday", | |
| "makkhan", "cheeni", "sarson", "aata", "chawal", | |
| "dhanyavaad"] | |
| found = [t for t in hindi_terms if t in ext.lower()] | |
| # Tesseract should extract most terms. Threshold at 8 | |
| # to allow for spacing noise (e.g., "MAKKHAN" → "MAK KHAN"). | |
| assert len(found) >= 8, ( | |
| f"Only {len(found)}/{len(hindi_terms)} Indian terms found. " | |
| f"Expected at least 8. Found: {found}. " | |
| f"Extracted text preview: {ext[:400]}" | |
| ) | |
| # Also verify key structural fields appear | |
| ext_upper = ext.upper() | |
| assert "SHARMA" in ext_upper, "Store name not found" | |
| assert "TOTAL" in ext_upper, "Total not found" | |
| # Extraction should be fast for a small image | |
| assert elapsed < 3.0, f"Extraction too slow: {elapsed:.1f}s" | |
| finally: | |
| try: | |
| _os.unlink(path) | |
| except Exception: | |
| pass | |
| # ============================================================ | |
| # GLM-OCR real-model benchmarks (requires cached weights) | |
| # ============================================================ | |
| class TestGlmOCRRealModelBenchmarks: | |
| """Real-model latency/throughput/accuracy benchmarks for GLM-OCR. | |
| These benchmarks load the actual GLM-OCR model via ``GlmOCRProvider`` | |
| and exercise the full ``extract()`` pipeline on generated receipt images. | |
| They are skipped in CI or when the model is not cached locally. | |
| Measured values are validated against ``claims.yaml`` targets: | |
| - Load time: ~2.6s (warm, after cache) | |
| - Extraction latency: ~5-10s per receipt | |
| - Extraction quality: text should contain key items from the receipt | |
| """ | |
| _KEY_ITEMS = ["ONION", "TOMATO", "POTATO", "MILK", "BREAD", "EGG", "SURF", "Total"] | |
| _KEY_STORE = "SHARMA" | |
| _KEY_DATE = "08/06/2026" | |
| def test_glm_ocr_model_available(self, glm_ocr_model): | |
| """Sanity check: GlmOCRProvider detects and can access the GLM-OCR model.""" | |
| provider, _image_path, _warm = glm_ocr_model | |
| assert provider.available, "GlmOCRProvider should report available" | |
| assert provider._model is not None, "Model should be loaded" | |
| assert provider._processor is not None, "Processor should be loaded" | |
| assert provider.last_latency_ms is None, "No extraction calls made yet" | |
| def test_glm_ocr_warmup_time(self, glm_ocr_model): | |
| """Measure the time to load the model into memory (cold start). | |
| This includes transformers weight loading and processor init. | |
| Expected: <15s on Apple Silicon with cached weights. | |
| """ | |
| _provider, _image_path, warm_elapsed = glm_ocr_model | |
| assert warm_elapsed < 15.0, ( | |
| f"Model load took {warm_elapsed:.2f}s — expected <15s " | |
| "with cached weights on Apple Silicon" | |
| ) | |
| def test_glm_ocr_extraction_latency(self, glm_ocr_model): | |
| """Measure single receipt extraction latency. | |
| Targets (from claims.yaml): ~5-8s warm inference. | |
| """ | |
| import time | |
| provider, image_path, _warm = glm_ocr_model | |
| start = time.perf_counter() | |
| result = provider.extract(image_path) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| text = result.get("text", "") | |
| latency_ms = result.get("latency_ms", elapsed * 1000) | |
| assert elapsed < 20.0, f"Extraction too slow: {elapsed:.3f}s" | |
| assert len(text) > 50, f"Extracted text too short: {len(text)} chars" | |
| assert provider.last_latency_ms is not None, "Latency should be recorded" | |
| def test_glm_ocr_extraction_quality(self, glm_ocr_model): | |
| """Verify extracted text contains expected receipt content. | |
| The generated receipt has specific items, store name, date, and total. | |
| This test checks that the OCR output preserves the key fields. | |
| """ | |
| provider, image_path, _warm = glm_ocr_model | |
| result = provider.extract(image_path) | |
| text = result.get("text", "").upper() | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| # Check key items are present in extracted text | |
| found_items = [item for item in self._KEY_ITEMS if item in text] | |
| assert len(found_items) >= 5, ( | |
| f"Only {len(found_items)}/{len(self._KEY_ITEMS)} key items found in extracted text. " | |
| f"Found: {found_items}. Text preview: {text[:300]}" | |
| ) | |
| # Check store name appears | |
| assert self._KEY_STORE in text, ( | |
| f"Store name '{self._KEY_STORE}' not found in extracted text" | |
| ) | |
| # Check date appears (at least the date pattern) | |
| import re | |
| assert re.search(r"08\s*[-/]\s*06\s*[-/]\s*2026", text), ( | |
| f"Date '08/06/2026' not found in extracted text" | |
| ) | |
| # Check total appears | |
| assert "837" in text, ( | |
| f"Total '837.00' not found in extracted text" | |
| ) | |
| def test_glm_ocr_extraction_throughput(self, glm_ocr_model): | |
| """Measure sequential extraction throughput. | |
| Run 3 extractions on the same receipt to measure | |
| average throughput (images per minute). | |
| """ | |
| import time | |
| provider, image_path, _warm = glm_ocr_model | |
| n = 3 | |
| start = time.perf_counter() | |
| for _ in range(n): | |
| result = provider.extract(image_path) | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| elapsed = time.perf_counter() - start | |
| avg_s = elapsed / n | |
| images_per_min = 60.0 / avg_s if avg_s > 0 else 0 | |
| # Should handle at least 3 sequential extractions in under 45s | |
| assert elapsed < 150.0, ( | |
| f"{n} extractions took {elapsed:.1f}s (avg {avg_s:.1f}s) — " | |
| f"too slow for sequential throughput" | |
| ) | |
| assert images_per_min > 1.5, ( | |
| f"Throughput {images_per_min:.1f} images/min too low " | |
| f"(avg {avg_s:.1f}s per extraction)" | |
| ) | |
| def test_glm_ocr_claims_validation(self, glm_ocr_model): | |
| """Validate measured latency against claims.yaml targets. | |
| Claims targets (from Docs/models/glm-ocr/claims.yaml): | |
| - 'glm_ocr_receipt_extraction': verified with manual benchmark | |
| - 'glm_ocr_measured_latency': ~5.3s warm inference | |
| """ | |
| import time | |
| provider, image_path, _warm = glm_ocr_model | |
| # Run extraction and measure | |
| start = time.perf_counter() | |
| result = provider.extract(image_path) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| text = result.get("text", "") | |
| latency_ms = round(elapsed * 1000, 1) | |
| token_estimate = max(1, len(text.split())) | |
| # Validate against claims targets | |
| # claims.yaml reports 5.3s warm inference — allow 3x margin | |
| assert latency_ms < 60000.0, ( | |
| f"Latency {latency_ms}ms exceeds 60s threshold " | |
| f"(claims: ~5300ms for warm inference)" | |
| ) | |
| # Extraction should return reasonable amount of text | |
| # Generated receipt has ~200 words | |
| assert token_estimate > 50, ( | |
| f"Only ~{token_estimate} tokens extracted — " | |
| f"expected >50 for a 13-item receipt" | |
| ) | |
| assert token_estimate < 1000, ( | |
| f"~{token_estimate} tokens seems too many for a receipt" | |
| ) | |
| def test_glm_ocr_model_parameter_count(self, glm_ocr_model): | |
| """Verify model metadata matches expected parameter count.""" | |
| provider, _image_path, _warm = glm_ocr_model | |
| assert provider.parameter_count == 0.9, ( | |
| f"Expected 0.9B params, got {provider.parameter_count}B" | |
| ) | |
| assert provider.name == "glm_ocr" | |
| assert provider.runtime_type == "transformers" | |
| assert provider.supports_off_grid is True | |
| def test_glm_ocr_hindi_receipt(self, glm_ocr_model): | |
| """Measure GLM-OCR accuracy on a bilingual Hindi-English receipt. | |
| This test documents the current limitation: GLM-OCR does not support | |
| Devanagari/Hindi text. The model hallucinates repetitive patterns | |
| (e.g. 'prabhaav') instead of extracting the actual Hindi-transliterated | |
| item names. This test verifies the model runs without crashing and | |
| records metrics for tracking. If a future model version improves | |
| Hindi support, this test will flag the change. | |
| Expected: poor accuracy (Word WER > 50%, 0/15 Hindi terms found) | |
| """ | |
| import time | |
| provider, _image_path, _warm = glm_ocr_model | |
| # Create Hindi receipt image | |
| from benchmarks.conftest import _create_hindi_receipt_image | |
| hindi_path, gt_path = _create_hindi_receipt_image() | |
| try: | |
| with open(gt_path, encoding="utf-8") as f: | |
| ground_truth = f.read() | |
| start = time.perf_counter() | |
| result = provider.extract(hindi_path) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Extraction failed: {result.get('error')}" | |
| ext = result.get("text", "") | |
| # Simple word-level WER | |
| gt_words = set(ground_truth.lower().split()) | |
| ext_words = set(ext.lower().split()) | |
| if gt_words: | |
| overlap = len(gt_words & ext_words) | |
| accuracy = overlap / len(gt_words) | |
| else: | |
| accuracy = 0.0 | |
| # Check for Hindi-transliterated terms | |
| hindi_terms = ["pyaaz", "tamatar", "aaloo", "doodh", "anday", | |
| "makkhan", "cheeni", "sarson", "aata", "chawal", | |
| "dhanyavaad", "kuul", "aadhaa", "rupiyah", "vatra"] | |
| found = [t for t in hindi_terms if t in ext.lower()] | |
| # Log metrics to stdout for trend tracking | |
| print( | |
| f"\n[GLM-OCR HINDI] {elapsed:.2f}s, " | |
| f"{len(found)}/15 Hindi terms, " | |
| f"Word overlap: {accuracy:.1%} " | |
| f"({len(gt_words & ext_words)}/{len(gt_words)}). " | |
| f"Found: {found}" | |
| ) | |
| # Current model fails on Hindi — document the limitation | |
| # If a future version improves, this assertion will flag it | |
| assert accuracy < 0.5, ( | |
| f"Hindi accuracy improved! Word overlap accuracy {accuracy:.1%} " | |
| f"({len(gt_words & ext_words)}/{len(gt_words)}). " | |
| f"Expected <50% based on pre-benchmark testing. " | |
| f"Found {len(found)}/15 Hindi terms. " | |
| f"If this is a real improvement, update claims.yaml " | |
| f"and lower the threshold. Extracted: {ext[:200]}" | |
| ) | |
| # Log metrics for tracking | |
| assert elapsed < 90.0, f"Extraction too slow: {elapsed:.1f}s" | |
| finally: | |
| import os | |
| try: | |
| os.unlink(hindi_path) | |
| os.unlink(gt_path) | |
| except Exception: | |
| pass | |
| def test_glm_ocr_thermal_throttling_profile(self, glm_ocr_model): | |
| """Detect thermal throttling by measuring latency trend across 3 consecutive Hindi extractions. | |
| Runs 3 Hindi receipt extractions back-to-back (same image) to measure | |
| progressive slowdown. On a cool system, latencies should be relatively | |
| stable. On a thermally-constrained system, each call gets slower as | |
| the CPU/GPU heats up and firmware-level frequency scaling kicks in. | |
| Metric: ``slowing_factor = latency_of_extraction_3 / latency_of_extraction_1``. | |
| A factor > 2.5 suggests significant thermal throttling. | |
| Logs full breakdown to stdout for trend tracking. This is a profiling | |
| benchmark — the "failure" is informative, not blocking, since thermal | |
| characteristics vary by machine. The threshold catches severe regressions | |
| (e.g. 4x+ slowdown from a model implementation change). | |
| """ | |
| import time | |
| provider, _image_path, _warm = glm_ocr_model | |
| from benchmarks.conftest import _create_hindi_receipt_image | |
| hindi_path, gt_path = _create_hindi_receipt_image() | |
| try: | |
| latencies: list[float] = [] | |
| for i in range(3): | |
| start = time.perf_counter() | |
| result = provider.extract(hindi_path) | |
| elapsed = time.perf_counter() - start | |
| latencies.append(elapsed) | |
| assert "error" not in result, f"Extraction {i+1} failed: {result.get('error')}" | |
| s1, s2, s3 = latencies | |
| ratio_2_to_1 = s2 / max(s1, 1e-9) | |
| ratio_3_to_1 = s3 / max(s1, 1e-9) | |
| peak_slowdown = max(ratio_2_to_1, ratio_3_to_1) | |
| monotonic_increase = s1 < s2 < s3 | |
| print( | |
| f"\n[GLM-OCR THERMAL PROFILE] 3 consecutive Hindi extractions:\n" | |
| f" Extraction 1 (cold): {s1:.1f}s\n" | |
| f" Extraction 2: {s2:.1f}s ({ratio_2_to_1:.2f}x vs #1)\n" | |
| f" Extraction 3: {s3:.1f}s ({ratio_3_to_1:.2f}x vs #1)\n" | |
| f" Peak slowdown: {peak_slowdown:.2f}x\n" | |
| f" Monotonic increase: {monotonic_increase}\n" | |
| f" Thermal score: {self._thermal_score(s1, s2, s3)}" | |
| ) | |
| # Flag severe throttling: >2.5x slowdown from first to worst extraction. | |
| # This threshold is generous enough to pass on a warm system (observed | |
| # range: 1.0x-1.5x on steady state) but catches pathological cases | |
| # where a model change dramatically increases sustained power draw. | |
| assert peak_slowdown < 2.5, ( | |
| f"Thermal throttling detected: extraction latency grew {peak_slowdown:.2f}x " | |
| f"from call 1 ({s1:.1f}s) to worst call ({max(s1, s2, s3):.1f}s). " | |
| f"Expected <2.5x for 3 consecutive Hindi extractions." | |
| ) | |
| finally: | |
| import os | |
| try: | |
| os.unlink(hindi_path) | |
| os.unlink(gt_path) | |
| except Exception: | |
| pass | |
| def _thermal_score(s1: float, s2: float, s3: float) -> str: | |
| """Classify thermal state based on latency progression.""" | |
| import statistics | |
| cv = statistics.stdev([s1, s2, s3]) / max(statistics.mean([s1, s2, s3]), 1e-9) | |
| increase = (s3 - s1) / max(s1, 1e-9) | |
| if cv < 0.15 and increase < 0.1: | |
| return "COOL — stable latencies, no throttling" | |
| elif cv < 0.25 and increase < 0.2: | |
| return "WARM — mild variance, possible light throttling" | |
| elif cv < 0.40 and increase < 0.5: | |
| return "HOT — significant variance, throttling likely" | |
| else: | |
| return "THROTTLED — severe performance degradation" | |
| def test_glm_ocr_thermal_inflection_point(self, glm_ocr_model): | |
| """Run 5 Hindi extractions to detect the thermal throttling inflection point. | |
| Unlike the 3-extraction profile (which detects throttling severity), this | |
| test pinpoints *when* throttling begins by running 5 sequential extractions | |
| and identifying the first call where latency deviates significantly from | |
| the initial baseline. | |
| Metrics: | |
| - Per-call latency with rolling 2-extraction average to smooth noise | |
| - Inflection point: the extraction index where a call is >1.5x slower | |
| than the minimum observed latency | |
| - Plateau latency: average of the last 2 extractions (the "settled" state) | |
| """ | |
| import time | |
| provider, _image_path, _warm = glm_ocr_model | |
| from benchmarks.conftest import _create_hindi_receipt_image | |
| hindi_path, gt_path = _create_hindi_receipt_image() | |
| try: | |
| n = 5 | |
| latencies: list[float] = [] | |
| for i in range(n): | |
| start = time.perf_counter() | |
| result = provider.extract(hindi_path) | |
| elapsed = time.perf_counter() - start | |
| latencies.append(elapsed) | |
| assert "error" not in result, f"Extraction {i+1} failed: {result.get('error')}" | |
| # Compute rolling 2-extraction average | |
| rolling_avg: list[float] = [] | |
| for i in range(n): | |
| window = latencies[max(0, i - 1):i + 1] | |
| rolling_avg.append(sum(window) / len(window)) | |
| min_latency = min(latencies) | |
| min_idx = latencies.index(min_latency) | |
| baseline = latencies[0] | |
| # Find inflection point: first extraction >1.5x the minimum | |
| inflection_idx: int | None = None | |
| for i in range(1, n): | |
| if latencies[i] > min_latency * 1.5: | |
| inflection_idx = i | |
| break | |
| plateau_latency = sum(latencies[-2:]) / 2.0 | |
| peak_vs_baseline = max(latencies) / max(baseline, 1e-9) | |
| peak_vs_min = max(latencies) / max(min_latency, 1e-9) | |
| # Print detailed table | |
| header = ( | |
| f"\n[GLM-OCR THERMAL INFLECTION] 5 Hindi extractions:\n" | |
| f" {'#':<3} {'Latency':>9} {'Ratio_v1':>9} {'Rolling':>9} {'Delta':>9}\n" | |
| f" {'---':<3} {'--------':>9} {'--------':>9} {'--------':>9} {'--------':>9}" | |
| ) | |
| print(header) | |
| for i in range(n): | |
| ratio = latencies[i] / max(baseline, 1e-9) | |
| delta_prev = ( | |
| latencies[i] - latencies[i - 1] | |
| if i > 0 else 0.0 | |
| ) | |
| marker = " <-- INFLECTION" if inflection_idx is not None and i == inflection_idx else "" | |
| print( | |
| f" {i + 1:<3} {latencies[i]:>8.1f}s {ratio:>8.2f}x " | |
| f"{rolling_avg[i]:>8.1f}s {delta_prev:>+8.1f}s{marker}" | |
| ) | |
| print( | |
| f"\n Minimum latency: {min_latency:.1f}s (extraction {min_idx + 1})\n" | |
| f" Baseline (call 1): {baseline:.1f}s\n" | |
| f" Plateau (avg last 2): {plateau_latency:.1f}s\n" | |
| f" Peak vs baseline: {peak_vs_baseline:.2f}x\n" | |
| f" Peak vs minimum: {peak_vs_min:.2f}x\n" | |
| f" Inflection at: " | |
| f"{'extraction ' + str(inflection_idx + 1) if inflection_idx is not None else 'none (stable)'}\n" | |
| f" Thermal score: {self._thermal_score(latencies[0], latencies[1], latencies[-1])}" | |
| ) | |
| # Assert: peak slowdown from baseline should be <3.5x for 5 calls | |
| # (more generous than 3-call 2.5x because 5 calls accumulate more heat) | |
| assert peak_vs_baseline < 3.5, ( | |
| f"Peak slowdown {peak_vs_baseline:.2f}x exceeds 3.5x threshold. " | |
| f"Baseline: {baseline:.1f}s, " | |
| f"Peak: {max(latencies):.1f}s, " | |
| f"Inflection at extraction {inflection_idx + 1 if inflection_idx is not None else 'N/A'}." | |
| ) | |
| finally: | |
| import os | |
| try: | |
| os.unlink(hindi_path) | |
| os.unlink(gt_path) | |
| except Exception: | |
| pass | |
| # ============================================================ | |
| # llama-3.2-3b real-model benchmarks (Apple Silicon only) | |
| # ============================================================ | |
| class TestLlama3BRealModelBenchmarks: | |
| """Real-model latency/throughput/memory benchmarks for Llama-3.2-3B. | |
| These benchmarks load the actual MLX-cached GGUF variant via | |
| ``LocalProvider`` and exercise the full ``complete()`` pipeline. | |
| They are skipped in CI or when the model is not cached locally. | |
| Measured values are validated against ``claims.yaml`` targets: | |
| - Latency: ~493ms for 49 tokens (10.06 tok/s) | |
| - Memory: <2GB RAM with Q4_K_M quantization | |
| """ | |
| _SAMPLE_PROMPTS = [ | |
| ( | |
| "What should I cook for dinner tonight with rice, tomatoes, and onions?", | |
| 32, | |
| ), | |
| ( | |
| "List 5 essential items I need to buy for a week of Indian cooking. " | |
| "Consider that I already have rice, dal, and spices at home.", | |
| 64, | |
| ), | |
| ( | |
| "How long does chopped coriander last in the fridge, and how can I " | |
| "tell if it's gone bad? Give me storage tips too.", | |
| 48, | |
| ), | |
| ] | |
| def test_llama3b_model_available(self, llama3b_model): | |
| """Sanity check: LocalProvider detects and can access the MLX model.""" | |
| provider, _warm = llama3b_model | |
| assert provider.available, "LocalProvider should report available" | |
| assert provider.backend == "mlx", f"Expected MLX backend, got {provider.backend}" | |
| assert provider.last_latency_ms is None, "No calls made yet" | |
| def test_llama3b_warmup_time(self, llama3b_model): | |
| """Measure the time to load the model into memory (cold start). | |
| This includes MLX weight loading and graph compilation. | |
| Expected: <10s on Apple Silicon with cached weights. | |
| """ | |
| _provider, warm_elapsed = llama3b_model | |
| assert warm_elapsed < 10.0, ( | |
| f"Model load took {warm_elapsed:.2f}s — expected <10s " | |
| "with cached weights on Apple Silicon" | |
| ) | |
| def test_llama3b_latency(self, llama3b_model): | |
| """Measure single-completion latency. | |
| Targets (from claims.yaml): <500ms for ~32 tokens. | |
| """ | |
| provider, _warm = llama3b_model | |
| prompt, _ = self._SAMPLE_PROMPTS[0] | |
| import time | |
| start = time.perf_counter() | |
| result = provider.complete(prompt, max_tokens=32, temperature=0.0) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Completion failed: {result.get('error')}" | |
| text = result.get("text", "") | |
| token_count = result.get("usage", {}).get("total_tokens", 0) | |
| latency_ms = result.get("cost", {}).get("latency_ms", elapsed * 1000) | |
| # Allow ~3x margin for first call after warm (graph compilation) | |
| assert elapsed < 1.5, f"Latency too high: {elapsed:.3f}s" | |
| assert len(text) > 0, "Empty response" | |
| def test_llama3b_throughput(self, llama3b_model): | |
| """Measure tokens-per-second throughput. | |
| Targets (from claims.yaml): ~10.06 tok/s for short prompts. | |
| Real throughput is measured as ``output_tokens / elapsed_seconds`` | |
| over several prompt lengths to capture scaling behavior. | |
| """ | |
| import time | |
| provider, _warm = llama3b_model | |
| results: list[dict[str, Any]] = [] | |
| for prompt, expected_tokens in self._SAMPLE_PROMPTS: | |
| start = time.perf_counter() | |
| result = provider.complete(prompt, max_tokens=expected_tokens, temperature=0.0) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Completion failed: {result.get('error')}" | |
| text = result.get("text", "") | |
| token_count = result.get("usage", {}).get("total_tokens", 0) | |
| # Estimate tokens from output text if usage not populated | |
| if token_count == 0: | |
| token_count = max(1, len(text.split())) | |
| tok_s = token_count / elapsed if elapsed > 0 else 0.0 | |
| results.append({ | |
| "prompt_len": len(prompt), | |
| "elapsed_s": round(elapsed, 4), | |
| "tokens": token_count, | |
| "tok_s": round(tok_s, 2), | |
| }) | |
| # Average throughput across all prompts | |
| avg_tok_s = sum(r["tok_s"] for r in results) / len(results) | |
| min_tok_s = min(r["tok_s"] for r in results) | |
| # claims.yaml target: 10.06 tok/s — allow 5x margin for int4 | |
| assert avg_tok_s > 2.0, ( | |
| f"Throughput too low: avg {avg_tok_s:.2f} tok/s " | |
| f"(min {min_tok_s:.2f})" | |
| ) | |
| def test_llama3b_claims_validation(self, llama3b_model): | |
| """Validate measured latency/throughput against claims.yaml targets. | |
| Claims targets (from Docs/models/llama-3.2-3b-gguf/claims.yaml): | |
| - 'llama_gguf_measured_latency': 493ms for 49 tokens | |
| - 'llama_gguf_memory_budget': <2GB RAM (pending verification) | |
| """ | |
| import time | |
| provider, _warm = llama3b_model | |
| # Run a benchmark call that mimics the original measurement | |
| # (short prompt, ~49 expected output tokens) | |
| prompt = ( | |
| "List the ingredients I need to restock this week " | |
| "based on having: rice, dal, spices, onions, tomatoes. " | |
| "Suggest 5-7 items with brief reasons." | |
| ) | |
| max_tokens = 64 | |
| # Warm-up iteration (ensures consistent timing) | |
| provider.complete("Say hello briefly.", max_tokens=8, temperature=0.0) | |
| start = time.perf_counter() | |
| result = provider.complete(prompt, max_tokens=max_tokens, temperature=0.0) | |
| elapsed = time.perf_counter() - start | |
| assert "error" not in result, f"Completion failed: {result.get('error')}" | |
| text = result.get("text", "") | |
| token_count = result.get("usage", {}).get("total_tokens", 0) | |
| latency_ms = round(elapsed * 1000, 1) | |
| # Estimate tokens if usage not populated | |
| if token_count == 0: | |
| token_count = max(1, len(text.split())) | |
| tok_s = round(token_count / elapsed, 2) if elapsed > 0 else 0.0 | |
| # Validate against claims (allow margin for MLX int4 vs GGUF Q4_K_M) | |
| assert latency_ms < 5000.0, ( | |
| f"Latency {latency_ms}ms exceeds 5s threshold " | |
| f"(claims: 493ms for 49 tokens)" | |
| ) | |
| assert tok_s > 2.0, ( | |
| f"Throughput {tok_s} tok/s too low " | |
| f"(claims: 10.06 tok/s)" | |
| ) | |
| # Memory: estimate from model metadata (3B params × ~0.5 bytes/param for int4) | |
| estimated_mb = 3.0 * 0.5 * 1024 # ~1.5GB for model weights | |
| assert estimated_mb < 3000, f"Memory estimate {estimated_mb}MB exceeds 3GB" | |
| def test_llama3b_memory_estimate(self, llama3b_model): | |
| """Approximate memory usage based on model metadata. | |
| claims.yaml target: <2GB RAM with Q4_K_M quantization. | |
| This test validates a model-level estimate rather than measuring | |
| actual RSS, since process-level RSS tracking requires psutil. | |
| """ | |
| provider, _warm = llama3b_model | |
| # 3B params × 4.5 bits/param for Q4_K_M ≈ 1.7GB | |
| # Plus ~200MB for KV cache at 2048 context | |
| bits_per_param = 4.5 | |
| model_weight_mb = 3.0 * bits_per_param / 8 * 1024 # MB | |
| kv_cache_mb = 200 | |
| estimated_mb = model_weight_mb + kv_cache_mb | |
| # Track from latency tracking if available | |
| token_count = provider.last_token_count | |
| latency_ms = provider.last_latency_ms | |
| assert estimated_mb < 3000, ( | |
| f"Estimated memory {estimated_mb:.0f}MB exceeds 3GB" | |
| ) | |
| assert model_weight_mb < 2000, ( | |
| f"Model weight estimate {model_weight_mb:.0f}MB exceeds 2GB" | |
| ) | |
| # Quick RSS check if psutil is available | |
| try: | |
| import psutil | |
| import os | |
| rss_mb = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024) | |
| assert rss_mb < 4000, f"Process RSS {rss_mb:.0f}MB exceeds 4GB" | |
| except ImportError: | |
| pass # psutil is optional | |
| def test_llama3b_thermal_throttling_profile(self, llama3b_model): | |
| """Detect thermal throttling via 3 consecutive completions. | |
| Runs 3 sequential completions with the same prompt to measure | |
| progressive slowdown from SoC heating. A peak slowdown > 2.5x | |
| between the first and worst completion suggests thermal throttling. | |
| """ | |
| import time | |
| provider, _warm = llama3b_model | |
| prompt = ( | |
| "What should I cook for dinner with rice, tomatoes, and onions? " | |
| "Say one dish only." | |
| ) | |
| latencies: list[float] = [] | |
| for i in range(3): | |
| start = time.perf_counter() | |
| result = provider.complete(prompt, max_tokens=32, temperature=0.0) | |
| elapsed = time.perf_counter() - start | |
| latencies.append(elapsed) | |
| assert "error" not in result, f"Completion {i+1} failed: {result.get('error')}" | |
| s1, s2, s3 = latencies | |
| peak_slowdown = max(s2 / max(s1, 1e-9), s3 / max(s1, 1e-9)) | |
| print( | |
| f"\n[LLAMA3B THERMAL PROFILE] 3 consecutive completions:\n" | |
| f" Completion 1: {s1:.3f}s\n" | |
| f" Completion 2: {s2:.3f}s ({s2 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Completion 3: {s3:.3f}s ({s3 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Peak slowdown: {peak_slowdown:.2f}x" | |
| ) | |
| assert peak_slowdown < 2.5, ( | |
| f"Thermal throttling detected: completion latency grew {peak_slowdown:.2f}x " | |
| f"from call 1 ({s1:.3f}s) to worst ({max(latencies):.3f}s). " | |
| f"Expected <2.5x for 3 consecutive completions." | |
| ) | |
| # ============================================================ | |
| # Real-model benchmarks for STT, TTS, Vision, Planner | |
| # (each skips gracefully if the model isn't cached) | |
| # ============================================================ | |
| class TestRealSTTBenchmarks: | |
| """Latency benchmarks for real STT providers (LocalWhisper / SenseVoice). | |
| These benchmarks load the actual STT model and transcribe a generated | |
| 1-second sine-tone WAV file. They are skipped in CI or when the model | |
| is not cached locally. | |
| Expected latency: | |
| - LocalWhisper (mlx-whisper): <5s for 1s audio | |
| - SenseVoice: <3s for 1s audio | |
| """ | |
| def test_real_stt_available(self, real_stt_model): | |
| """Sanity check: real STT provider initializes and reports available.""" | |
| provider, _audio_path = real_stt_model | |
| assert getattr(provider, "available", True), "Provider should report available" | |
| assert hasattr(provider, "transcribe"), "Provider must have transcribe method" | |
| def test_real_stt_transcription_latency(self, real_stt_model): | |
| """Measure single transcription latency on a 1s sine-tone WAV.""" | |
| import time | |
| provider, audio_path = real_stt_model | |
| start = time.perf_counter() | |
| result = provider.transcribe(audio_path) | |
| elapsed = time.perf_counter() - start | |
| assert isinstance(result, (dict, str)), ( | |
| f"Expected dict or str, got {type(result).__name__}" | |
| ) | |
| # Allow generous 15s for first-call model loading | |
| assert elapsed < 15.0, f"STT too slow: {elapsed:.3f}s" | |
| print(f"\n[REAL STT] Transcription: {elapsed:.3f}s, result: {str(result)[:100]}") | |
| def test_real_stt_throughput(self, real_stt_model): | |
| """Measure sequential transcription throughput (3 calls).""" | |
| import time | |
| provider, audio_path = real_stt_model | |
| n = 3 | |
| start = time.perf_counter() | |
| for _ in range(n): | |
| result = provider.transcribe(audio_path) | |
| assert isinstance(result, (dict, str)) | |
| elapsed = time.perf_counter() - start | |
| avg_s = elapsed / n | |
| print(f"\n[REAL STT] {n}x transcriptions: total {elapsed:.2f}s, avg {avg_s:.3f}s") | |
| # Allow generous total time (model may get faster after first call) | |
| assert elapsed < 45.0, ( | |
| f"{n} STT transcriptions took {elapsed:.1f}s (avg {avg_s:.2f}s)" | |
| ) | |
| def test_real_stt_thermal_throttling_profile(self, real_stt_model): | |
| """Detect thermal throttling via 3 consecutive transcriptions.""" | |
| import time | |
| provider, audio_path = real_stt_model | |
| latencies: list[float] = [] | |
| for i in range(3): | |
| start = time.perf_counter() | |
| result = provider.transcribe(audio_path) | |
| elapsed = time.perf_counter() - start | |
| latencies.append(elapsed) | |
| assert isinstance(result, (dict, str)), f"Transcription {i+1} failed" | |
| s1, s2, s3 = latencies | |
| peak_slowdown = max(s2 / max(s1, 1e-9), s3 / max(s1, 1e-9)) | |
| print( | |
| f"\n[STT THERMAL PROFILE] 3 consecutive transcriptions:\n" | |
| f" Transcription 1: {s1:.3f}s\n" | |
| f" Transcription 2: {s2:.3f}s ({s2 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Transcription 3: {s3:.3f}s ({s3 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Peak slowdown: {peak_slowdown:.2f}x" | |
| ) | |
| assert peak_slowdown < 2.5, ( | |
| f"Thermal throttling detected: transcription latency grew {peak_slowdown:.2f}x " | |
| f"from call 1 ({s1:.3f}s) to worst ({max(latencies):.3f}s)" | |
| ) | |
| class TestRealTTSBenchmarks: | |
| """Latency benchmarks for real TTS providers (Kokoro / gTTS). | |
| These benchmarks synthesize a short Hindi-English phrase and measure | |
| latency. They are skipped when no TTS backend is available. | |
| Expected latency: | |
| - Kokoro: <3s for short phrase | |
| - gTTS: <5s (network request to Google's API) | |
| """ | |
| _TEST_TEXT = "Namaste! Aaj hum kya pakayenge? Chicken curry aur rice." | |
| def test_real_tts_available(self, real_tts_model): | |
| """Sanity check: real TTS provider initializes and reports available.""" | |
| provider = real_tts_model | |
| assert getattr(provider, "available", True), "Provider should report available" | |
| assert hasattr(provider, "synthesize") or hasattr(provider, "speak"), ( | |
| "Provider must have synthesize or speak method" | |
| ) | |
| def test_real_tts_synthesis_latency(self, real_tts_model): | |
| """Measure single synthesis latency for a short phrase.""" | |
| import time | |
| provider = real_tts_model | |
| synth = getattr(provider, "synthesize", None) or getattr(provider, "speak", None) | |
| assert synth is not None, "No synthesis method found" | |
| start = time.perf_counter() | |
| result = synth(self._TEST_TEXT) | |
| elapsed = time.perf_counter() - start | |
| assert result is not None, "Synthesis returned None" | |
| assert elapsed < 10.0, f"TTS too slow: {elapsed:.3f}s" | |
| print(f"\n[REAL TTS] Synthesis: {elapsed:.3f}s, result type: {type(result).__name__}") | |
| def test_real_tts_throughput(self, real_tts_model): | |
| """Measure sequential synthesis throughput (3 calls).""" | |
| import time | |
| provider = real_tts_model | |
| synth = getattr(provider, "synthesize", None) or getattr(provider, "speak", None) | |
| n = 3 | |
| start = time.perf_counter() | |
| for _ in range(n): | |
| result = synth(self._TEST_TEXT) | |
| assert result is not None | |
| elapsed = time.perf_counter() - start | |
| avg_s = elapsed / n | |
| print(f"\n[REAL TTS] {n}x syntheses: total {elapsed:.2f}s, avg {avg_s:.3f}s") | |
| assert elapsed < 30.0, ( | |
| f"{n} TTS syntheses took {elapsed:.1f}s (avg {avg_s:.2f}s)" | |
| ) | |
| def test_real_tts_thermal_throttling_profile(self, real_tts_model): | |
| """Detect thermal throttling via 3 consecutive syntheses.""" | |
| import time | |
| provider = real_tts_model | |
| synth = getattr(provider, "synthesize", None) or getattr(provider, "speak", None) | |
| text = "The quick brown fox jumps over the lazy dog." | |
| latencies: list[float] = [] | |
| for i in range(3): | |
| start = time.perf_counter() | |
| result = synth(text) | |
| elapsed = time.perf_counter() - start | |
| latencies.append(elapsed) | |
| assert result is not None, f"Synthesis {i+1} returned None" | |
| s1, s2, s3 = latencies | |
| peak_slowdown = max(s2 / max(s1, 1e-9), s3 / max(s1, 1e-9)) | |
| print( | |
| f"\n[TTS THERMAL PROFILE] 3 consecutive syntheses:\n" | |
| f" Synthesis 1: {s1:.3f}s\n" | |
| f" Synthesis 2: {s2:.3f}s ({s2 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Synthesis 3: {s3:.3f}s ({s3 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Peak slowdown: {peak_slowdown:.2f}x" | |
| ) | |
| assert peak_slowdown < 2.5, ( | |
| f"Thermal throttling detected: synthesis latency grew {peak_slowdown:.2f}x " | |
| f"from call 1 ({s1:.3f}s) to worst ({max(latencies):.3f}s)" | |
| ) | |
| class TestRealVisionBenchmarks: | |
| """Latency benchmarks for real Vision providers (MiniCPM-V). | |
| These benchmarks load the actual vision model and analyze a generated | |
| 400x300 test image. They are skipped in CI or when the model is not | |
| cached locally. | |
| Expected latency: | |
| - MiniCPM-V (transformers): <15s for first call (model init) | |
| """ | |
| def test_real_vision_available(self, real_vision_model): | |
| """Sanity check: real Vision provider initializes and reports available.""" | |
| provider, _img_path, _tmp = real_vision_model | |
| assert getattr(provider, "available", True), "Provider should report available" | |
| assert hasattr(provider, "understand") or hasattr(provider, "describe"), ( | |
| "Provider must have understand or describe method" | |
| ) | |
| def test_real_vision_analysis_latency(self, real_vision_model): | |
| """Measure single image analysis latency.""" | |
| import time | |
| provider, img_path, _tmp = real_vision_model | |
| understand = getattr(provider, "understand", None) or getattr(provider, "describe", None) | |
| assert understand is not None, "No understanding method found" | |
| start = time.perf_counter() | |
| result = understand(img_path, "What is in this image? Describe briefly.") | |
| elapsed = time.perf_counter() - start | |
| assert result is not None, "Vision analysis returned None" | |
| # Allow 30s for first-call model loading on Apple Silicon | |
| assert elapsed < 30.0, f"Vision too slow: {elapsed:.3f}s" | |
| print(f"\n[REAL VISION] Analysis: {elapsed:.3f}s, result: {str(result)[:100]}") | |
| def test_real_vision_simple_object_detection(self, real_vision_model): | |
| """Verify the vision provider can detect objects (or reports gracefully). | |
| Uses a white image — the model should describe it as empty/blank or | |
| similar. This primarily tests that the provider runs without error. | |
| """ | |
| import time | |
| provider, img_path, _tmp = real_vision_model | |
| understand = getattr(provider, "understand", None) or getattr(provider, "describe", None) | |
| if understand is None: | |
| pytest.skip("No understanding method") | |
| start = time.perf_counter() | |
| result = understand(img_path, "What objects do you see?") | |
| elapsed = time.perf_counter() - start | |
| assert result is not None, "Vision analysis returned None" | |
| text = str(result).lower() | |
| # The white image should produce some description | |
| assert len(text) > 5, f"Response too short: {text}" | |
| print(f"\n[REAL VISION OBJ] {elapsed:.3f}s, desc: {text[:120]}") | |
| def test_real_vision_thermal_throttling_profile(self, real_vision_model): | |
| """Detect thermal throttling via 3 consecutive image analyses.""" | |
| import time | |
| provider, image_path, _tmpdir = real_vision_model | |
| understand_fn = ( | |
| getattr(provider, "understand", None) | |
| or getattr(provider, "describe", None) | |
| or getattr(provider, "analyze", None) | |
| ) | |
| latencies: list[float] = [] | |
| for i in range(3): | |
| start = time.perf_counter() | |
| result = understand_fn(image_path) | |
| elapsed = time.perf_counter() - start | |
| latencies.append(elapsed) | |
| assert result is not None, f"Analysis {i+1} returned None" | |
| s1, s2, s3 = latencies | |
| peak_slowdown = max(s2 / max(s1, 1e-9), s3 / max(s1, 1e-9)) | |
| print( | |
| f"\n[VISION THERMAL PROFILE] 3 consecutive image analyses:\n" | |
| f" Analysis 1: {s1:.3f}s\n" | |
| f" Analysis 2: {s2:.3f}s ({s2 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Analysis 3: {s3:.3f}s ({s3 / max(s1, 1e-9):.2f}x vs #1)\n" | |
| f" Peak slowdown: {peak_slowdown:.2f}x" | |
| ) | |
| assert peak_slowdown < 2.5, ( | |
| f"Thermal throttling detected: vision analysis latency grew {peak_slowdown:.2f}x " | |
| f"from call 1 ({s1:.3f}s) to worst ({max(latencies):.3f}s)" | |
| ) | |
| class TestRealPlannerBenchmarks: | |
| """Latency/throughput benchmarks for real Planner providers (LocalProvider via MLX). | |
| These benchmarks load the actual MLX model and run planning queries. | |
| They are skipped in CI or when the model is not cached locally. | |
| Expected latency: | |
| - Llama-3.2-3B (MLX, 4bit): <1.5s for short prompts | |
| """ | |
| _TEST_PROMPTS = [ | |
| "What should I cook for dinner with rice, tomatoes, and onions?", | |
| "List 5 essential items for Indian cooking this week.", | |
| "How long does coriander last in the fridge?", | |
| ] | |
| def test_real_planner_available(self, real_planner_model): | |
| """Sanity check: real Planner provider initializes and reports available.""" | |
| provider, _warm = real_planner_model | |
| assert provider.available, "Provider should report available" | |
| assert hasattr(provider, "complete") or hasattr(provider, "plan"), ( | |
| "Provider must have complete or plan method" | |
| ) | |
| def test_real_planner_completion_latency(self, real_planner_model): | |
| """Measure single completion latency for a short prompt.""" | |
| import time | |
| provider, _warm = real_planner_model | |
| complete = getattr(provider, "complete", None) or getattr(provider, "plan", None) | |
| assert complete is not None, "No completion method found" | |
| start = time.perf_counter() | |
| result = complete(self._TEST_PROMPTS[0], max_tokens=32, temperature=0.0) | |
| elapsed = time.perf_counter() - start | |
| assert result is not None, "Completion returned None" | |
| assert elapsed < 5.0, f"Planner too slow: {elapsed:.3f}s" | |
| print(f"\n[REAL PLANNER] Completion: {elapsed:.3f}s, result: {str(result)[:100]}") | |
| def test_real_planner_throughput(self, real_planner_model): | |
| """Measure sequential completion throughput across different prompts.""" | |
| import time | |
| provider, _warm = real_planner_model | |
| complete = getattr(provider, "complete", None) or getattr(provider, "plan", None) | |
| results: list[dict] = [] | |
| for prompt in self._TEST_PROMPTS: | |
| start = time.perf_counter() | |
| result = complete(prompt, max_tokens=48, temperature=0.0) | |
| elapsed = time.perf_counter() - start | |
| results.append({ | |
| "prompt_len": len(prompt), | |
| "elapsed_s": round(elapsed, 4), | |
| }) | |
| total_s = sum(r["elapsed_s"] for r in results) | |
| avg_s = total_s / len(results) | |
| print(f"\n[REAL PLANNER] {len(results)} completions: total {total_s:.2f}s, avg {avg_s:.3f}s") | |
| assert total_s < 15.0, ( | |
| f"3 planner completions took {total_s:.1f}s (avg {avg_s:.2f}s)" | |
| ) | |
| def test_real_planner_temperature_zero_determinism(self, real_planner_model): | |
| """Verify the planner produces similar output with temperature=0.0.""" | |
| provider, _warm = real_planner_model | |
| complete = getattr(provider, "complete", None) or getattr(provider, "plan", None) | |
| prompt = "Say 'Hello World' and nothing else." | |
| results_set = set() | |
| for _ in range(3): | |
| result = complete(prompt, max_tokens=16, temperature=0.0) | |
| text = str(result)[:50] | |
| results_set.add(text) | |
| # With temperature=0.0, all responses should be identical or very similar | |
| # Allow some variation due to floating point / batching differences | |
| assert len(results_set) <= 2, ( | |
| f"temperature=0.0 produced {len(results_set)} different outputs: {results_set}" | |
| ) | |
| print(f"\n[REAL PLANNER] Determinism: {len(results_set)} unique outputs from 3 runs") | |
| def test_real_planner_short_vs_long_prompt(self, real_planner_model): | |
| """Compare latency for short vs long prompts. | |
| Short prompt (<50 chars) should complete faster than long | |
| prompt (>500 chars). Ratio should be less than 3x. | |
| """ | |
| import time | |
| import gc | |
| provider, _warm = real_planner_model | |
| complete = getattr(provider, "complete", None) or getattr(provider, "plan", None) | |
| short_prompt = "Say hello." | |
| long_prompt = ( | |
| "I have the following ingredients in my kitchen: rice, wheat flour, toor dal, " | |
| "moong dal, chana dal, mustard oil, sunflower oil, salt, turmeric powder, red " | |
| "chilli powder, cumin seeds, coriander powder, garam masala, milk, curd, paneer, " | |
| "butter, onions, tomatoes, potatoes, green chillies, ginger, garlic, capsicum, " | |
| "coriander leaves, spinach, bananas, apples, lemons, sugar, tea, coffee, " | |
| "biscuits, bread, eggs, chicken, frozen parathas, frozen peas, honey, soy sauce, " | |
| "vinegar, baking soda, cornflour, and various spices.\n\n" | |
| "What can I cook for a week of healthy Indian meals? Please suggest 7 dinner " | |
| "ideas, one for each day, with brief notes on which ingredients to use. " | |
| "Consider that I want to use up perishable items first before they spoil." | |
| ) | |
| # Short prompt | |
| gc.collect() | |
| start = time.perf_counter() | |
| complete(short_prompt, max_tokens=16, temperature=0.0) | |
| short_elapsed = time.perf_counter() - start | |
| # Long prompt | |
| gc.collect() | |
| start = time.perf_counter() | |
| complete(long_prompt, max_tokens=64, temperature=0.0) | |
| long_elapsed = time.perf_counter() - start | |
| ratio = long_elapsed / max(short_elapsed, 1e-9) | |
| print(f"REAL PLANNER SHORT VS LONG: Short: {short_elapsed:.3f}s, " | |
| f"Long: {long_elapsed:.3f}s, Ratio: {ratio:.2f}x") | |
| assert ratio < 4.0, ( | |
| f"Long prompt took {ratio:.1f}x longer than short prompt! " | |
| f"Short: {short_elapsed:.3f}s, Long: {long_elapsed:.3f}s" | |
| ) | |
| assert long_elapsed < 8.0, f"Long prompt too slow: {long_elapsed:.3f}s" | |