Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| import sys | |
| import httpx | |
| from httpx import ASGITransport | |
| BACKEND_DIR = Path(__file__).resolve().parents[1] | |
| if str(BACKEND_DIR) not in sys.path: | |
| sys.path.insert(0, str(BACKEND_DIR)) | |
| from app import app | |
| def parse_sse(text: str) -> list[tuple[str, dict]]: | |
| events: list[tuple[str, dict]] = [] | |
| for block in text.strip().split("\n\n"): | |
| if not block.strip(): | |
| continue | |
| event_name = "message" | |
| payload: dict = {} | |
| for line in block.splitlines(): | |
| if line.startswith("event:"): | |
| event_name = line.split(":", 1)[1].strip() | |
| if line.startswith("data:"): | |
| payload = json.loads(line.split(":", 1)[1].strip()) | |
| events.append((event_name, payload)) | |
| return events | |
| async def main() -> None: | |
| report: dict[str, object] = {"checks": []} | |
| transport = ASGITransport(app=app) | |
| async with httpx.AsyncClient(transport=transport, base_url="http://testserver", follow_redirects=True) as client: | |
| live = await client.get("/health/live") | |
| ready = await client.get("/health/ready") | |
| report["checks"].append({"name": "health_live", "status_code": live.status_code, "ok": live.status_code == 200}) | |
| report["checks"].append({"name": "health_ready", "status_code": ready.status_code, "ok": ready.status_code == 200}) | |
| informational = await client.post( | |
| "/api/chat/stream", | |
| json={"message": "According to Blink's blog, what is changing in SEO and AI-driven search?", "history": []}, | |
| ) | |
| informational_events = parse_sse(informational.text) | |
| informational_done = next(payload for event, payload in informational_events if event == "done") | |
| informational_tokens = "".join(payload["delta"] for event, payload in informational_events if event == "token").strip() | |
| report["checks"].append( | |
| { | |
| "name": "chat_stream_grounded", | |
| "status_code": informational.status_code, | |
| "ok": informational.status_code == 200 | |
| and informational_done["content"] | |
| and informational_done["language"] == "en" | |
| and len(informational_done["citations"]) >= 1 | |
| and "<doc-ref" not in informational_done["content"] | |
| and informational_done["ttft_ms"] >= 1, | |
| "answer_preview": informational_tokens[:280], | |
| "citations": informational_done["citations"], | |
| } | |
| ) | |
| execution = await client.post( | |
| "/api/chat/stream", | |
| json={"message": "Can Blink help us create a brandbook and what would be the next step?", "history": []}, | |
| ) | |
| execution_events = parse_sse(execution.text) | |
| execution_done = next(payload for event, payload in execution_events if event == "done") | |
| email_events = [payload for event, payload in execution_events if event == "ceo_email_sent"] | |
| report["checks"].append( | |
| { | |
| "name": "chat_stream_execution_handoff", | |
| "status_code": execution.status_code, | |
| "ok": execution.status_code == 200 and bool(email_events) and execution_done["handoff"] is True, | |
| "contact": execution_done["contact"], | |
| "email_notification": execution_done["email_notification"], | |
| } | |
| ) | |
| report["summary"] = { | |
| "passed_checks": sum(1 for item in report["checks"] if item["ok"]), | |
| "total_checks": len(report["checks"]), | |
| } | |
| report_path = Path(tempfile.gettempdir()) / "brand_chatbot_smoke_report.json" | |
| report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") | |
| print(report_path) | |
| print(json.dumps(report, ensure_ascii=False, indent=2)) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |