NovaAI / memory /local_memory.py
veteroner's picture
CRANE AI ZeroGPU Space yüklendi
58c4fec verified
"""
CRANE AI - Local Memory Management
"""
from typing import Dict, Any, List, Optional
import asyncio
import time
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
@dataclass
class MemorySlot:
"""Bellek slot'u veri yapısı"""
id: str
content: Any
timestamp: float
ttl: float # Time to live (seconds)
access_count: int = 0
priority: int = 1
tags: List[str] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.tags is None:
self.tags = []
if self.metadata is None:
self.metadata = {}
def is_expired(self) -> bool:
"""Slot'un süresinin dolup dolmadığını kontrol eder"""
return time.time() > (self.timestamp + self.ttl)
def update_access(self):
"""Erişim sayısını günceller"""
self.access_count += 1
self.timestamp = time.time()
def to_dict(self) -> Dict[str, Any]:
"""Dictionary'e çevirir"""
return asdict(self)
class LocalMemoryManager:
"""Yerel bellek yönetimi sistemi"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.max_slots = config.get("max_slots", 10)
self.default_ttl = config.get("default_ttl", 3600) # 1 saat
self.cleanup_interval = config.get("cleanup_interval", 300) # 5 dakika
# Bellek slot'ları
self.slots: Dict[str, MemorySlot] = {}
# İstatistikler
self.stats = {
"total_writes": 0,
"total_reads": 0,
"cache_hits": 0,
"cache_misses": 0,
"evictions": 0,
"cleanups": 0
}
# Background cleanup task
self.cleanup_task = None
self.is_running = False
async def start(self):
"""Bellek yöneticisini başlatır"""
if not self.is_running:
self.is_running = True
self.cleanup_task = asyncio.create_task(self._cleanup_worker())
logger.info("Memory manager started")
async def stop(self):
"""Bellek yöneticisini durdurur"""
self.is_running = False
if self.cleanup_task:
self.cleanup_task.cancel()
try:
await self.cleanup_task
except asyncio.CancelledError:
pass
logger.info("Memory manager stopped")
async def store(self, key: str, content: Any, ttl: Optional[float] = None,
priority: int = 1, tags: List[str] = None, metadata: Dict[str, Any] = None) -> bool:
"""Bellek slot'una veri yazar"""
try:
# TTL ayarı
if ttl is None:
ttl = self.default_ttl
# Slot oluştur
slot = MemorySlot(
id=key,
content=content,
timestamp=time.time(),
ttl=ttl,
priority=priority,
tags=tags or [],
metadata=metadata or {}
)
# Yer kontrolü
if len(self.slots) >= self.max_slots and key not in self.slots:
await self._evict_slot()
# Slot'u kaydet
self.slots[key] = slot
self.stats["total_writes"] += 1
logger.debug(f"Stored in memory slot: {key}")
return True
except Exception as e:
logger.error(f"Memory store error: {str(e)}")
return False
async def retrieve(self, key: str) -> Optional[Any]:
"""Bellek slot'undan veri okur"""
try:
if key not in self.slots:
self.stats["cache_misses"] += 1
return None
slot = self.slots[key]
# Expire kontrolü
if slot.is_expired():
await self.remove(key)
self.stats["cache_misses"] += 1
return None
# Erişim güncelle
slot.update_access()
self.stats["total_reads"] += 1
self.stats["cache_hits"] += 1
logger.debug(f"Retrieved from memory slot: {key}")
return slot.content
except Exception as e:
logger.error(f"Memory retrieve error: {str(e)}")
return None
async def remove(self, key: str) -> bool:
"""Bellek slot'unu siler"""
try:
if key in self.slots:
del self.slots[key]
logger.debug(f"Removed memory slot: {key}")
return True
return False
except Exception as e:
logger.error(f"Memory remove error: {str(e)}")
return False
async def search_by_tags(self, tags: List[str]) -> List[MemorySlot]:
"""Tag'lere göre slot'ları arar"""
matching_slots = []
for slot in self.slots.values():
if not slot.is_expired():
if any(tag in slot.tags for tag in tags):
matching_slots.append(slot)
return matching_slots
async def get_active_slots(self) -> List[MemorySlot]:
"""Aktif slot'ları döndürür"""
active_slots = []
for slot in self.slots.values():
if not slot.is_expired():
active_slots.append(slot)
return active_slots
async def update_slot(self, key: str, content: Any = None, ttl: Optional[float] = None,
priority: Optional[int] = None, tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool:
"""Slot'u günceller"""
try:
if key not in self.slots:
return False
slot = self.slots[key]
# Expire kontrolü
if slot.is_expired():
await self.remove(key)
return False
# Güncellemeleri uygula
if content is not None:
slot.content = content
if ttl is not None:
slot.ttl = ttl
if priority is not None:
slot.priority = priority
if tags is not None:
slot.tags = tags
if metadata is not None:
slot.metadata.update(metadata)
slot.timestamp = time.time()
logger.debug(f"Updated memory slot: {key}")
return True
except Exception as e:
logger.error(f"Memory update error: {str(e)}")
return False
async def _evict_slot(self):
"""En az öncelikli slot'u çıkarır"""
if not self.slots:
return
# En az kullanılan ve en düşük öncelikli slot'u bul
candidates = []
for slot in self.slots.values():
if not slot.is_expired():
candidates.append(slot)
if not candidates:
return
# Skorlama: düşük priority + düşük access_count + eski timestamp
def eviction_score(slot):
age = time.time() - slot.timestamp
return (1 / slot.priority) + (1 / max(slot.access_count, 1)) + (age / 3600)
victim = max(candidates, key=eviction_score)
await self.remove(victim.id)
self.stats["evictions"] += 1
logger.debug(f"Evicted memory slot: {victim.id}")
async def _cleanup_worker(self):
"""Periyodik temizlik işlevi"""
while self.is_running:
try:
await asyncio.sleep(self.cleanup_interval)
await self._cleanup_expired()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Cleanup worker error: {str(e)}")
async def _cleanup_expired(self):
"""Süresi dolmuş slot'ları temizler"""
expired_keys = []
for key, slot in self.slots.items():
if slot.is_expired():
expired_keys.append(key)
for key in expired_keys:
await self.remove(key)
if expired_keys:
self.stats["cleanups"] += 1
logger.debug(f"Cleaned up {len(expired_keys)} expired slots")
async def export_memory(self, include_expired: bool = False) -> Dict[str, Any]:
"""Belleği dışa aktarır"""
export_data = {
"timestamp": datetime.now().isoformat(),
"slots": [],
"stats": self.stats
}
for slot in self.slots.values():
if include_expired or not slot.is_expired():
export_data["slots"].append(slot.to_dict())
return export_data
async def import_memory(self, data: Dict[str, Any]) -> bool:
"""Belleği içe aktarır"""
try:
slots_data = data.get("slots", [])
for slot_data in slots_data:
slot = MemorySlot(**slot_data)
if not slot.is_expired():
self.slots[slot.id] = slot
logger.info(f"Imported {len(slots_data)} memory slots")
return True
except Exception as e:
logger.error(f"Memory import error: {str(e)}")
return False
def get_stats(self) -> Dict[str, Any]:
"""Bellek istatistikleri"""
total_requests = self.stats["total_reads"]
hit_rate = self.stats["cache_hits"] / max(total_requests, 1)
active_slots = sum(1 for slot in self.slots.values() if not slot.is_expired())
return {
"total_slots": len(self.slots),
"active_slots": active_slots,
"max_slots": self.max_slots,
"hit_rate": hit_rate,
"total_writes": self.stats["total_writes"],
"total_reads": self.stats["total_reads"],
"evictions": self.stats["evictions"],
"cleanups": self.stats["cleanups"]
}
async def clear_all(self):
"""Tüm belleği temizler"""
self.slots.clear()
logger.info("All memory slots cleared")