""" CRANE AI - Local Memory Management """ from typing import Dict, Any, List, Optional import asyncio import time import json from dataclasses import dataclass, asdict from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) @dataclass class MemorySlot: """Bellek slot'u veri yapısı""" id: str content: Any timestamp: float ttl: float # Time to live (seconds) access_count: int = 0 priority: int = 1 tags: List[str] = None metadata: Dict[str, Any] = None def __post_init__(self): if self.tags is None: self.tags = [] if self.metadata is None: self.metadata = {} def is_expired(self) -> bool: """Slot'un süresinin dolup dolmadığını kontrol eder""" return time.time() > (self.timestamp + self.ttl) def update_access(self): """Erişim sayısını günceller""" self.access_count += 1 self.timestamp = time.time() def to_dict(self) -> Dict[str, Any]: """Dictionary'e çevirir""" return asdict(self) class LocalMemoryManager: """Yerel bellek yönetimi sistemi""" def __init__(self, config: Dict[str, Any]): self.config = config self.max_slots = config.get("max_slots", 10) self.default_ttl = config.get("default_ttl", 3600) # 1 saat self.cleanup_interval = config.get("cleanup_interval", 300) # 5 dakika # Bellek slot'ları self.slots: Dict[str, MemorySlot] = {} # İstatistikler self.stats = { "total_writes": 0, "total_reads": 0, "cache_hits": 0, "cache_misses": 0, "evictions": 0, "cleanups": 0 } # Background cleanup task self.cleanup_task = None self.is_running = False async def start(self): """Bellek yöneticisini başlatır""" if not self.is_running: self.is_running = True self.cleanup_task = asyncio.create_task(self._cleanup_worker()) logger.info("Memory manager started") async def stop(self): """Bellek yöneticisini durdurur""" self.is_running = False if self.cleanup_task: self.cleanup_task.cancel() try: await self.cleanup_task except asyncio.CancelledError: pass logger.info("Memory manager stopped") async def store(self, key: str, content: Any, ttl: Optional[float] = None, priority: int = 1, tags: List[str] = None, metadata: Dict[str, Any] = None) -> bool: """Bellek slot'una veri yazar""" try: # TTL ayarı if ttl is None: ttl = self.default_ttl # Slot oluştur slot = MemorySlot( id=key, content=content, timestamp=time.time(), ttl=ttl, priority=priority, tags=tags or [], metadata=metadata or {} ) # Yer kontrolü if len(self.slots) >= self.max_slots and key not in self.slots: await self._evict_slot() # Slot'u kaydet self.slots[key] = slot self.stats["total_writes"] += 1 logger.debug(f"Stored in memory slot: {key}") return True except Exception as e: logger.error(f"Memory store error: {str(e)}") return False async def retrieve(self, key: str) -> Optional[Any]: """Bellek slot'undan veri okur""" try: if key not in self.slots: self.stats["cache_misses"] += 1 return None slot = self.slots[key] # Expire kontrolü if slot.is_expired(): await self.remove(key) self.stats["cache_misses"] += 1 return None # Erişim güncelle slot.update_access() self.stats["total_reads"] += 1 self.stats["cache_hits"] += 1 logger.debug(f"Retrieved from memory slot: {key}") return slot.content except Exception as e: logger.error(f"Memory retrieve error: {str(e)}") return None async def remove(self, key: str) -> bool: """Bellek slot'unu siler""" try: if key in self.slots: del self.slots[key] logger.debug(f"Removed memory slot: {key}") return True return False except Exception as e: logger.error(f"Memory remove error: {str(e)}") return False async def search_by_tags(self, tags: List[str]) -> List[MemorySlot]: """Tag'lere göre slot'ları arar""" matching_slots = [] for slot in self.slots.values(): if not slot.is_expired(): if any(tag in slot.tags for tag in tags): matching_slots.append(slot) return matching_slots async def get_active_slots(self) -> List[MemorySlot]: """Aktif slot'ları döndürür""" active_slots = [] for slot in self.slots.values(): if not slot.is_expired(): active_slots.append(slot) return active_slots async def update_slot(self, key: str, content: Any = None, ttl: Optional[float] = None, priority: Optional[int] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None) -> bool: """Slot'u günceller""" try: if key not in self.slots: return False slot = self.slots[key] # Expire kontrolü if slot.is_expired(): await self.remove(key) return False # Güncellemeleri uygula if content is not None: slot.content = content if ttl is not None: slot.ttl = ttl if priority is not None: slot.priority = priority if tags is not None: slot.tags = tags if metadata is not None: slot.metadata.update(metadata) slot.timestamp = time.time() logger.debug(f"Updated memory slot: {key}") return True except Exception as e: logger.error(f"Memory update error: {str(e)}") return False async def _evict_slot(self): """En az öncelikli slot'u çıkarır""" if not self.slots: return # En az kullanılan ve en düşük öncelikli slot'u bul candidates = [] for slot in self.slots.values(): if not slot.is_expired(): candidates.append(slot) if not candidates: return # Skorlama: düşük priority + düşük access_count + eski timestamp def eviction_score(slot): age = time.time() - slot.timestamp return (1 / slot.priority) + (1 / max(slot.access_count, 1)) + (age / 3600) victim = max(candidates, key=eviction_score) await self.remove(victim.id) self.stats["evictions"] += 1 logger.debug(f"Evicted memory slot: {victim.id}") async def _cleanup_worker(self): """Periyodik temizlik işlevi""" while self.is_running: try: await asyncio.sleep(self.cleanup_interval) await self._cleanup_expired() except asyncio.CancelledError: break except Exception as e: logger.error(f"Cleanup worker error: {str(e)}") async def _cleanup_expired(self): """Süresi dolmuş slot'ları temizler""" expired_keys = [] for key, slot in self.slots.items(): if slot.is_expired(): expired_keys.append(key) for key in expired_keys: await self.remove(key) if expired_keys: self.stats["cleanups"] += 1 logger.debug(f"Cleaned up {len(expired_keys)} expired slots") async def export_memory(self, include_expired: bool = False) -> Dict[str, Any]: """Belleği dışa aktarır""" export_data = { "timestamp": datetime.now().isoformat(), "slots": [], "stats": self.stats } for slot in self.slots.values(): if include_expired or not slot.is_expired(): export_data["slots"].append(slot.to_dict()) return export_data async def import_memory(self, data: Dict[str, Any]) -> bool: """Belleği içe aktarır""" try: slots_data = data.get("slots", []) for slot_data in slots_data: slot = MemorySlot(**slot_data) if not slot.is_expired(): self.slots[slot.id] = slot logger.info(f"Imported {len(slots_data)} memory slots") return True except Exception as e: logger.error(f"Memory import error: {str(e)}") return False def get_stats(self) -> Dict[str, Any]: """Bellek istatistikleri""" total_requests = self.stats["total_reads"] hit_rate = self.stats["cache_hits"] / max(total_requests, 1) active_slots = sum(1 for slot in self.slots.values() if not slot.is_expired()) return { "total_slots": len(self.slots), "active_slots": active_slots, "max_slots": self.max_slots, "hit_rate": hit_rate, "total_writes": self.stats["total_writes"], "total_reads": self.stats["total_reads"], "evictions": self.stats["evictions"], "cleanups": self.stats["cleanups"] } async def clear_all(self): """Tüm belleği temizler""" self.slots.clear() logger.info("All memory slots cleared")