File size: 3,032 Bytes
dbb04e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | """
Locust load testing file for MnemoCore API.
Usage:
locust -f tests/load/locustfile.py --host http://localhost:8100
Then open http://localhost:8089 to configure and run the load test.
"""
import random
import string
from locust import HttpUser, task, between
def random_content(length: int = 100) -> str:
"""Generate random content string."""
return ''.join(random.choices(string.ascii_letters + string.digits + ' ', k=length))
class StoreMemoryUser(HttpUser):
"""User that stores memories to MnemoCore."""
wait_time = between(0.1, 0.5)
def on_start(self):
"""Initialize user with API key."""
self.api_key = "test-api-key"
self.headers = {"X-API-Key": self.api_key}
@task(10)
def store_memory(self):
"""Store a random memory."""
payload = {
"content": random_content(200),
"metadata": {
"source": "load_test",
"timestamp": random.randint(1000000000, 2000000000)
}
}
self.client.post(
"/memories",
json=payload,
headers=self.headers,
name="/memories [STORE]"
)
class QueryMemoryUser(HttpUser):
"""User that queries memories from MnemoCore."""
wait_time = between(0.05, 0.2)
def on_start(self):
"""Initialize user with API key and some queries."""
self.api_key = "test-api-key"
self.headers = {"X-API-Key": self.api_key}
self.queries = [
"test query",
"memory search",
"find similar",
"cognitive recall",
"semantic search"
]
@task(20)
def query_memory(self):
"""Query for similar memories."""
query = random.choice(self.queries)
self.client.get(
f"/query?q={query}&limit=10",
headers=self.headers,
name="/query [SEARCH]"
)
class MixedUser(HttpUser):
"""User that performs both store and query operations."""
wait_time = between(0.1, 0.3)
def on_start(self):
"""Initialize user."""
self.api_key = "test-api-key"
self.headers = {"X-API-Key": self.api_key}
@task(3)
def store_memory(self):
"""Store a random memory."""
payload = {
"content": random_content(150),
"metadata": {"source": "mixed_user"}
}
self.client.post(
"/memories",
json=payload,
headers=self.headers,
name="/memories [STORE]"
)
@task(7)
def query_memory(self):
"""Query for memories."""
self.client.get(
"/query?q=test&limit=5",
headers=self.headers,
name="/query [SEARCH]"
)
@task(1)
def health_check(self):
"""Check API health."""
self.client.get("/health", name="/health [CHECK]")
|