|
|
import asyncio |
|
|
import hashlib |
|
|
import time |
|
|
import psutil |
|
|
import gc |
|
|
from typing import Dict, List, Optional, Any |
|
|
from collections import OrderedDict |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime, timedelta |
|
|
import redis |
|
|
import json |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CacheEntry: |
|
|
response: str |
|
|
timestamp: datetime |
|
|
hit_count: int = 0 |
|
|
|
|
|
|
|
|
class PerformanceOptimizer: |
|
|
def __init__(self, |
|
|
cache_size: int = 100, |
|
|
cache_ttl_hours: int = 24, |
|
|
use_redis: bool = False): |
|
|
"""Initialize performance optimizer with caching""" |
|
|
|
|
|
self.cache_size = cache_size |
|
|
self.cache_ttl = timedelta(hours=cache_ttl_hours) |
|
|
|
|
|
|
|
|
self.use_redis = use_redis |
|
|
if use_redis: |
|
|
try: |
|
|
self.redis_client = redis.Redis( |
|
|
host='localhost', |
|
|
port=6379, |
|
|
decode_responses=True |
|
|
) |
|
|
self.redis_client.ping() |
|
|
except: |
|
|
print("Redis not available, using in-memory cache") |
|
|
self.use_redis = False |
|
|
self.cache = OrderedDict() |
|
|
else: |
|
|
self.cache = OrderedDict() |
|
|
|
|
|
|
|
|
self.metrics = { |
|
|
"cache_hits": 0, |
|
|
"cache_misses": 0, |
|
|
"total_requests": 0, |
|
|
"average_response_time": 0, |
|
|
"memory_usage_mb": 0 |
|
|
} |
|
|
|
|
|
def _hash_prompt(self, prompt: str) -> str: |
|
|
"""Create hash for caching""" |
|
|
normalized = prompt.lower().strip() |
|
|
return hashlib.md5(normalized.encode()).hexdigest() |
|
|
|
|
|
def get_cached_response(self, prompt: str) -> Optional[str]: |
|
|
"""Get response from cache if available""" |
|
|
|
|
|
self.metrics["total_requests"] += 1 |
|
|
prompt_hash = self._hash_prompt(prompt) |
|
|
|
|
|
if self.use_redis: |
|
|
cached = self.redis_client.get(f"chat:{prompt_hash}") |
|
|
if cached: |
|
|
self.metrics["cache_hits"] += 1 |
|
|
|
|
|
self.redis_client.hincrby(f"chat:stats:{prompt_hash}", "hits", 1) |
|
|
return json.loads(cached)["response"] |
|
|
else: |
|
|
if prompt_hash in self.cache: |
|
|
entry = self.cache[prompt_hash] |
|
|
|
|
|
if datetime.now() - entry.timestamp < self.cache_ttl: |
|
|
self.metrics["cache_hits"] += 1 |
|
|
entry.hit_count += 1 |
|
|
|
|
|
self.cache.move_to_end(prompt_hash) |
|
|
return entry.response |
|
|
else: |
|
|
|
|
|
del self.cache[prompt_hash] |
|
|
|
|
|
self.metrics["cache_misses"] += 1 |
|
|
return None |
|
|
|
|
|
def cache_response(self, prompt: str, response: str): |
|
|
"""Cache a response""" |
|
|
|
|
|
prompt_hash = self._hash_prompt(prompt) |
|
|
|
|
|
if self.use_redis: |
|
|
cache_data = { |
|
|
"response": response, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
self.redis_client.setex( |
|
|
f"chat:{prompt_hash}", |
|
|
int(self.cache_ttl.total_seconds()), |
|
|
json.dumps(cache_data) |
|
|
) |
|
|
self.redis_client.hset( |
|
|
f"chat:stats:{prompt_hash}", |
|
|
mapping={"hits": 0, "created": datetime.now().isoformat()} |
|
|
) |
|
|
else: |
|
|
|
|
|
if len(self.cache) >= self.cache_size: |
|
|
|
|
|
self.cache.popitem(last=False) |
|
|
|
|
|
self.cache[prompt_hash] = CacheEntry( |
|
|
response=response, |
|
|
timestamp=datetime.now() |
|
|
) |
|
|
|
|
|
def get_metrics(self) -> Dict[str, Any]: |
|
|
"""Get performance metrics""" |
|
|
|
|
|
|
|
|
process = psutil.Process() |
|
|
self.metrics["memory_usage_mb"] = process.memory_info().rss / 1024 / 1024 |
|
|
|
|
|
|
|
|
if self.metrics["total_requests"] > 0: |
|
|
self.metrics["cache_hit_rate"] = ( |
|
|
self.metrics["cache_hits"] / self.metrics["total_requests"] |
|
|
) |
|
|
|
|
|
return self.metrics |
|
|
|
|
|
def clear_cache(self): |
|
|
"""Clear all cached responses""" |
|
|
|
|
|
if self.use_redis: |
|
|
for key in self.redis_client.scan_iter("chat:*"): |
|
|
self.redis_client.delete(key) |
|
|
else: |
|
|
self.cache.clear() |
|
|
|
|
|
gc.collect() |
|
|
|
|
|
|
|
|
class MemoryManager: |
|
|
def __init__(self, max_memory_mb: int = 8192): |
|
|
"""Initialize memory manager""" |
|
|
|
|
|
self.max_memory_mb = max_memory_mb |
|
|
self.warning_threshold = 0.8 |
|
|
self.critical_threshold = 0.9 |
|
|
|
|
|
def check_memory(self) -> Dict[str, Any]: |
|
|
"""Check current memory usage""" |
|
|
|
|
|
process = psutil.Process() |
|
|
memory_info = process.memory_info() |
|
|
|
|
|
current_mb = memory_info.rss / 1024 / 1024 |
|
|
percentage = current_mb / self.max_memory_mb |
|
|
|
|
|
status = "normal" |
|
|
if percentage > self.critical_threshold: |
|
|
status = "critical" |
|
|
elif percentage > self.warning_threshold: |
|
|
status = "warning" |
|
|
|
|
|
return { |
|
|
"current_mb": round(current_mb, 2), |
|
|
"max_mb": self.max_memory_mb, |
|
|
"percentage": round(percentage * 100, 2), |
|
|
"status": status, |
|
|
"available_mb": round(self.max_memory_mb - current_mb, 2) |
|
|
} |
|
|
|
|
|
def optimize_if_needed(self) -> bool: |
|
|
"""Run optimization if memory usage is high""" |
|
|
|
|
|
memory_status = self.check_memory() |
|
|
|
|
|
if memory_status["status"] in ["warning", "critical"]: |
|
|
|
|
|
gc.collect() |
|
|
|
|
|
|
|
|
if memory_status["status"] == "critical": |
|
|
|
|
|
gc.collect(2) |
|
|
|
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
class RequestBatcher: |
|
|
def __init__(self, batch_size: int = 5, timeout_ms: int = 100): |
|
|
"""Initialize request batcher for efficiency""" |
|
|
|
|
|
self.batch_size = batch_size |
|
|
self.timeout_ms = timeout_ms |
|
|
self.pending_requests = [] |
|
|
self.results = {} |
|
|
|
|
|
async def add_request(self, request_id: str, prompt: str) -> str: |
|
|
"""Add request to batch""" |
|
|
|
|
|
self.pending_requests.append({ |
|
|
"id": request_id, |
|
|
"prompt": prompt, |
|
|
"timestamp": time.time() |
|
|
}) |
|
|
|
|
|
|
|
|
if len(self.pending_requests) >= self.batch_size: |
|
|
await self._process_batch() |
|
|
else: |
|
|
|
|
|
await asyncio.sleep(self.timeout_ms / 1000) |
|
|
if request_id not in self.results: |
|
|
await self._process_batch() |
|
|
|
|
|
return self.results.get(request_id, "Error processing request") |
|
|
|
|
|
async def _process_batch(self): |
|
|
"""Process pending requests as batch""" |
|
|
|
|
|
if not self.pending_requests: |
|
|
return |
|
|
|
|
|
batch = self.pending_requests[:self.batch_size] |
|
|
self.pending_requests = self.pending_requests[self.batch_size:] |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for request in batch: |
|
|
|
|
|
tasks.append(self._process_single(request)) |
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
for request, result in zip(batch, results): |
|
|
self.results[request["id"]] = result |
|
|
|
|
|
async def _process_single(self, request: Dict[str, Any]) -> str: |
|
|
"""Process single request (placeholder)""" |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.1) |
|
|
return f"Response to: {request['prompt']}" |
|
|
|