import random import uuid import json from locust import HttpUser, task, between, events class AwnAIServiceUser(HttpUser): wait_time = between(0.1, 1.0) # Intense traffic simulation def on_start(self): # Generate a unique session ID for this user session self.session_id = str(uuid.uuid4()) # Use a fixed IP proxy simulation if we were doing IP spoofing, # but here we'll just hit the server directly. @task(3) def chat_request(self): """Simulates a standard synchronous chat request.""" payload = { "message": random.choice([ "How are you?", "Tell me about Aoun project.", "What is the capital of Saudi Arabia?", "كيف حالك اليوم؟", "ما هو مشروع عون؟" ]), "session_id": self.session_id, "mode": "agent" } with self.client.post("/api/chat/", json=payload, catch_response=True) as response: if response.status_code == 200: response.success() elif response.status_code == 429: # Rate limit hit (this is expected under load) response.failure(f"Rate limited: {response.text}") elif response.status_code == 503: # Service under heavy load (Global rate limiter hit) response.failure(f"Global load protection triggered: {response.text}") else: response.failure(f"Unexpected error {response.status_code}: {response.text}") @task(1) def chat_stream(self): """Simulates a streaming chat request.""" payload = { "message": "Write a long story about a robot named Awn.", "session_id": self.session_id, "stream": True } # Note: Locust doesn't natively handle SSE perfectly without extra code, # so we'll just check if the initial connection is successful. with self.client.post("/api/chat/stream", json=payload, stream=True, catch_response=True) as response: if response.status_code == 200: response.success() else: response.failure(f"Stream failure {response.status_code}") @task(1) def metrics_check(self): """Checks the observability endpoint.""" self.client.get("/api/metrics") @events.init_command_line_parser.connect def _(parser): parser.add_argument("--test-ip-limits", action="store_true", help="Rotate IPs if proxy available")