Spaces:
Running
Running
feat: Hardened routing layer with Redis-backed rate limiting, distributed circuit breakers, bulkheads, and full OTel/Prom observability with Grafana alerts
78e0e85 | import random | |
| import uuid | |
| import json | |
| from locust import HttpUser, task, between, events | |
| class AwnAIServiceUser(HttpUser): | |
| wait_time = between(0.1, 1.0) # Intense traffic simulation | |
| def on_start(self): | |
| # Generate a unique session ID for this user session | |
| self.session_id = str(uuid.uuid4()) | |
| # Use a fixed IP proxy simulation if we were doing IP spoofing, | |
| # but here we'll just hit the server directly. | |
| def chat_request(self): | |
| """Simulates a standard synchronous chat request.""" | |
| payload = { | |
| "message": random.choice([ | |
| "How are you?", | |
| "Tell me about Aoun project.", | |
| "What is the capital of Saudi Arabia?", | |
| "ููู ุญุงูู ุงูููู ุ", | |
| "ู ุง ูู ู ุดุฑูุน ุนููุ" | |
| ]), | |
| "session_id": self.session_id, | |
| "mode": "agent" | |
| } | |
| with self.client.post("/api/chat/", json=payload, catch_response=True) as response: | |
| if response.status_code == 200: | |
| response.success() | |
| elif response.status_code == 429: | |
| # Rate limit hit (this is expected under load) | |
| response.failure(f"Rate limited: {response.text}") | |
| elif response.status_code == 503: | |
| # Service under heavy load (Global rate limiter hit) | |
| response.failure(f"Global load protection triggered: {response.text}") | |
| else: | |
| response.failure(f"Unexpected error {response.status_code}: {response.text}") | |
| def chat_stream(self): | |
| """Simulates a streaming chat request.""" | |
| payload = { | |
| "message": "Write a long story about a robot named Awn.", | |
| "session_id": self.session_id, | |
| "stream": True | |
| } | |
| # Note: Locust doesn't natively handle SSE perfectly without extra code, | |
| # so we'll just check if the initial connection is successful. | |
| with self.client.post("/api/chat/stream", json=payload, stream=True, catch_response=True) as response: | |
| if response.status_code == 200: | |
| response.success() | |
| else: | |
| response.failure(f"Stream failure {response.status_code}") | |
| def metrics_check(self): | |
| """Checks the observability endpoint.""" | |
| self.client.get("/api/metrics") | |
| def _(parser): | |
| parser.add_argument("--test-ip-limits", action="store_true", help="Rotate IPs if proxy available") | |