Emma / utils /memory_utils.py
Keyvan1986's picture
Update utils/memory_utils.py
0c253b4 verified
Raw
History Blame Contribute Delete
11.3 kB
import os
import sys
import json
import time
import shutil
import gradio as gr
from huggingface_hub import HfApi, snapshot_download
# LlamaIndex Imports
try:
from llama_index.core import StorageContext, load_index_from_storage
except ImportError:
pass
# Local Imports setup
bank_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../memory_bank')
sys.path.append(bank_path)
try:
from build_memory_index import build_memory_index
from summarize_memory import summarize_memory
except ImportError:
def build_memory_index(*args, **kwargs): pass
def summarize_memory(*args, **kwargs): return {}
# ==========================================
# 1. تنظیمات مسیر و هاب (Cloud Config)
# ==========================================
REPO_ID = "Keyvan1986/Emma-memory-storage"
REPO_TYPE = "dataset"
HF_TOKEN = os.environ.get("HF_TOKEN")
# مسیرهای اصلی (Absolute Paths)
BASE_DIR = os.path.abspath(os.getcwd()) # معمولاً /app
MEMORIES_DIR = os.path.join(BASE_DIR, "memories")
MEMORY_INDEX_DIR_NAME = "memory_index"
MEMORY_INDEX_PATH = os.path.join(MEMORIES_DIR, MEMORY_INDEX_DIR_NAME)
MEMORY_FILE_NAME = "update_memory_0512_eng.json"
MEMORY_FILE_PATH = os.path.join(MEMORIES_DIR, MEMORY_FILE_NAME)
# --- مسیر "اشتباه" قدیمی که فایل‌ها آنجا ساخته می‌شوند ---
# لاگ شما نشان داد فایل‌ها اینجا می‌روند: ../memories
LEGACY_OUTSIDE_DIR = os.path.abspath(os.path.join(BASE_DIR, "..", "memories"))
LEGACY_INDEX_PATH = os.path.join(LEGACY_OUTSIDE_DIR, MEMORY_INDEX_DIR_NAME)
os.makedirs(MEMORIES_DIR, exist_ok=True)
os.makedirs(MEMORY_INDEX_PATH, exist_ok=True)
print(f"📂 Path Configuration:")
print(f" - App Dir (Target): {MEMORY_INDEX_PATH}")
print(f" - Builder Dir (Source): {LEGACY_INDEX_PATH}")
# ==========================================
# 2. تابع انتقال فایل (حیاتی برای حل مشکل شما)
# ==========================================
def sync_legacy_files_to_app():
"""
این تابع بررسی می‌کند که آیا ایندکس‌ها در مسیر ../memories ساخته شده‌اند یا نه.
اگر آنجا باشند، آن‌ها را به داخل /app/memories کپی می‌کند تا آپلود شوند.
"""
if os.path.exists(LEGACY_INDEX_PATH):
print(f"🕵️ Detected files in outside path: {LEGACY_INDEX_PATH}")
try:
# کپی کردن تمام محتویات از بیرون به داخل پوشه قابل آپلود
shutil.copytree(LEGACY_INDEX_PATH, MEMORY_INDEX_PATH, dirs_exist_ok=True)
print(f"✅ MOVED files from '{LEGACY_INDEX_PATH}' to '{MEMORY_INDEX_PATH}' for upload.")
except Exception as e:
print(f"⚠️ Error moving legacy files: {e}")
else:
# شاید فایل‌ها همین الان در جای درست باشند
pass
# ==========================================
# 3. توابع همگام‌سازی ابری
# ==========================================
def push_to_hub():
if not HF_TOKEN:
return
try:
# قبل از آپلود، مطمئن می‌شویم فایل‌ها در جای درست هستند
sync_legacy_files_to_app()
api = HfApi(token=HF_TOKEN)
# 1. آپلود JSON
if os.path.exists(MEMORY_FILE_PATH):
api.upload_file(
path_or_fileobj=MEMORY_FILE_PATH,
path_in_repo=MEMORY_FILE_NAME,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
commit_message=f"Auto-save JSON: {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
# 2. آپلود پوشه ایندکس
if os.path.exists(MEMORY_INDEX_PATH):
# بررسی تعداد فایل‌های واقعی
files_count = sum([len(files) for r, d, files in os.walk(MEMORY_INDEX_PATH)])
print(f"📂 Preparing upload. Found {files_count} files in {MEMORY_INDEX_PATH}")
# ترفند Force Update
with open(os.path.join(MEMORY_INDEX_PATH, "last_sync_log.txt"), "w") as f:
f.write(f"Sync triggered at: {time.time()}")
print("☁️ Uploading Index folder...")
api.upload_folder(
folder_path=MEMORY_INDEX_PATH,
path_in_repo=MEMORY_INDEX_DIR_NAME,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
commit_message=f"Auto-save Indices: {time.strftime('%Y-%m-%d %H:%M:%S')}",
ignore_patterns=[".DS_Store", "*.git*"]
)
print(f"✅ Sync Complete.")
except Exception as e:
print(f"❌ Cloud Sync Error: {e}")
def pull_from_hub():
if not HF_TOKEN: return
try:
print(f"📥 Pulling data from Hub...")
# دانلود JSON
try:
snapshot_download(
repo_id=REPO_ID,
repo_type=REPO_TYPE,
local_dir=MEMORIES_DIR,
allow_patterns=[MEMORY_FILE_NAME],
force_download=True
)
except: pass
# دانلود پوشه ایندکس
snapshot_download(
repo_id=REPO_ID,
repo_type=REPO_TYPE,
local_dir=MEMORIES_DIR,
allow_patterns=f"{MEMORY_INDEX_DIR_NAME}/**",
force_download=True
)
print("✅ Data Downloaded.")
# برعکس: اگر برنامه در ../memories دنبال فایل می‌گردد، باید فایل‌های دانلود شده را آنجا هم کپی کنیم
# تا LlamaIndex بتواند آن‌ها را بخواند (اگر مسیرش نسبی است)
if os.path.exists(MEMORY_INDEX_PATH):
try:
os.makedirs(LEGACY_INDEX_PATH, exist_ok=True)
shutil.copytree(MEMORY_INDEX_PATH, LEGACY_INDEX_PATH, dirs_exist_ok=True)
print(f"✅ Mirroring downloaded files to legacy path: {LEGACY_INDEX_PATH}")
except Exception as e:
print(f"⚠️ Mirroring error: {e}")
except Exception as e:
print(f"⚠️ Cloud Download Error: {e}")
# ==========================================
# 4. توابع اصلی I/O
# ==========================================
def save_memory(data):
try:
os.makedirs(MEMORIES_DIR, exist_ok=True)
temp_file = MEMORY_FILE_PATH + ".tmp"
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
os.replace(temp_file, MEMORY_FILE_PATH)
push_to_hub()
except Exception as e:
print(f"❌ Save Error: {e}")
def load_memory():
pull_from_hub()
if not os.path.exists(MEMORY_FILE_PATH):
return {}
try:
with open(MEMORY_FILE_PATH, 'r', encoding='utf-8') as f:
return json.loads(f.read())
except:
return {}
# ==========================================
# 5. منطق حافظه (Memory Logic)
# ==========================================
def enter_name_llamaindex(name, memory_data, data_args, update_memory_index=True):
is_new_user = False
if name not in memory_data:
memory_data[name] = {"name": name, "sessions": [], "episodic_memory": [], "semantic_memory": {}, "profile": {}}
is_new_user = True
save_memory(memory_data)
user_memory = memory_data[name]
# سعی می‌کنیم فایل‌ها را در هر دو مسیر (جدید و قدیم) چک کنیم
# مسیر صحیح (که دانلود شده)
correct_base_dir = os.path.join(MEMORY_INDEX_PATH, "llamaindex", name)
# مسیر قدیمی (که ممکن است بیلدر آنجا بسازد)
legacy_base_dir = os.path.join(LEGACY_INDEX_PATH, "llamaindex", name)
# بررسی وجود فایل در مسیر صحیح
indices_exist = (
os.path.exists(os.path.join(correct_base_dir, "sessions_index.json")) and
os.path.exists(os.path.join(correct_base_dir, "episodic_memory_index.json"))
)
if update_memory_index or not indices_exist:
try:
print(f"⚙️ Building indices for {name}...")
# این تابع احتمالا در ../memories می‌سازد
build_memory_index(memory_data, data_args, name=name)
# بلافاصله فایل‌ها را به مسیر درست منتقل می‌کنیم و آپلود می‌کنیم
sync_legacy_files_to_app()
push_to_hub()
except Exception as e:
print(f"Warning building index: {e}")
# بارگذاری (لودینگ)
# اولویت با مسیری است که فایل دارد. اگر دانلود شده باشد، در correct_base_dir است.
# اگر همین الان ساخته شده باشد و کپی شده باشد، باز هم در correct_base_dir است.
load_dir = correct_base_dir if os.path.exists(correct_base_dir) else legacy_base_dir
sessions_memory = None
episodic_memory = None
semantic_memory = None
def load_idx(path):
try:
return load_index_from_storage(StorageContext.from_defaults(persist_dir=path))
except: return None
if os.path.exists(load_dir):
sessions_memory = load_idx(os.path.join(load_dir, "sessions_index")) # ممکن است نام پوشه فرق کند، کد اصلی چک شود
# معمولا json ذخیره نمیشود، بلکه پوشه است. اگر فایل json است:
if not sessions_memory:
# LlamaIndex معمولا پوشه میسازد. اگر فایل json است لاجیک فرق میکند
# اما طبق لاگ شما: Saved ... to .../episodic_memory (بدون پسوند، یعنی پوشه)
pass
# تلاش استاندارد برای لود:
# توجه: در لاگ شما مسیرها .../episodic_memory بود (پوشه).
s_path = os.path.join(load_dir, "sessions_index")
e_path = os.path.join(load_dir, "episodic_memory") # طبق لاگ شما
m_path = os.path.join(load_dir, "semantic_memory") # طبق لاگ شما
# چک کردن فایل json اگر پوشه نبود (fallback)
if not os.path.exists(e_path): e_path += "_index.json"
if not os.path.exists(m_path): m_path += "_index.json"
if os.path.exists(s_path): sessions_memory = load_idx(s_path)
if os.path.exists(e_path): episodic_memory = load_idx(e_path)
if os.path.exists(m_path): semantic_memory = load_idx(m_path)
return f"Welcome {name}!", user_memory, sessions_memory, episodic_memory, semantic_memory
def summarize_memory_event_personality(data_args, memory_data, user_name):
save_memory(memory_data)
try:
updated_memory = summarize_memory(MEMORY_FILE_PATH, user_name, language='en')
# بعد از خلاصه سازی هم ممکن است ایندکس تغییر کند
sync_legacy_files_to_app()
push_to_hub()
return updated_memory.get(user_name, {})
except Exception as e:
print(f"Error summarize: {e}")
return memory_data.get(user_name, {})