Aoun-Ai / scripts /locustfile.py
MuhammadMahmoud's picture
feat: Hardened routing layer with Redis-backed rate limiting, distributed circuit breakers, bulkheads, and full OTel/Prom observability with Grafana alerts
78e0e85
import random
import uuid
import json
from locust import HttpUser, task, between, events
class AwnAIServiceUser(HttpUser):
wait_time = between(0.1, 1.0) # Intense traffic simulation
def on_start(self):
# Generate a unique session ID for this user session
self.session_id = str(uuid.uuid4())
# Use a fixed IP proxy simulation if we were doing IP spoofing,
# but here we'll just hit the server directly.
@task(3)
def chat_request(self):
"""Simulates a standard synchronous chat request."""
payload = {
"message": random.choice([
"How are you?",
"Tell me about Aoun project.",
"What is the capital of Saudi Arabia?",
"ูƒูŠู ุญุงู„ูƒ ุงู„ูŠูˆู…ุŸ",
"ู…ุง ู‡ูˆ ู…ุดุฑูˆุน ุนูˆู†ุŸ"
]),
"session_id": self.session_id,
"mode": "agent"
}
with self.client.post("/api/chat/", json=payload, catch_response=True) as response:
if response.status_code == 200:
response.success()
elif response.status_code == 429:
# Rate limit hit (this is expected under load)
response.failure(f"Rate limited: {response.text}")
elif response.status_code == 503:
# Service under heavy load (Global rate limiter hit)
response.failure(f"Global load protection triggered: {response.text}")
else:
response.failure(f"Unexpected error {response.status_code}: {response.text}")
@task(1)
def chat_stream(self):
"""Simulates a streaming chat request."""
payload = {
"message": "Write a long story about a robot named Awn.",
"session_id": self.session_id,
"stream": True
}
# Note: Locust doesn't natively handle SSE perfectly without extra code,
# so we'll just check if the initial connection is successful.
with self.client.post("/api/chat/stream", json=payload, stream=True, catch_response=True) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Stream failure {response.status_code}")
@task(1)
def metrics_check(self):
"""Checks the observability endpoint."""
self.client.get("/api/metrics")
@events.init_command_line_parser.connect
def _(parser):
parser.add_argument("--test-ip-limits", action="store_true", help="Rotate IPs if proxy available")