import os from flask import Flask, render_template, request, jsonify, stream_with_context, Response from memvid_sdk import create, open as open_memvid from huggingface_hub import hf_hub_download, upload_file, HfApi app = Flask(__name__) # CONFIGURATION FILENAME = "knowledge.mv2" HF_TOKEN = os.environ.get("HF_TOKEN") DATASET_NAME = "memvid-storage" # Global variables db = None DB_PATH = os.path.abspath(FILENAME) DATASET_REPO_ID = None def get_repo_id(): """Helper to dynamically resolve 'username/dataset_name'""" global DATASET_REPO_ID if DATASET_REPO_ID: return DATASET_REPO_ID if HF_TOKEN: try: api = HfApi(token=HF_TOKEN) username = api.whoami()['name'] DATASET_REPO_ID = f"{username}/{DATASET_NAME}" return DATASET_REPO_ID except Exception as e: print(f"⚠️ Error getting username: {e}") return None return None def init_db(): """ 1. Ensure Dataset Exists. 2. Try to download existing DB. 3. Initialize Memvid. """ global db, DATASET_REPO_ID repo_id = get_repo_id() # 1. Sync / Setup Cloud Storage if HF_TOKEN and repo_id: print(f"🔄 Checking cloud storage at {repo_id}...") api = HfApi(token=HF_TOKEN) try: # Create the repo if it doesn't exist api.create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True) # Check for file existence files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") if FILENAME in files: downloaded_path = hf_hub_download( repo_id=repo_id, filename=FILENAME, repo_type="dataset", token=HF_TOKEN, local_dir=".", local_dir_use_symlinks=False ) print(f"✅ Downloaded database to {downloaded_path}") else: print("⚠️ Database file not found in repo. A new one will be created and synced.") except Exception as e: print(f"⚠️ Cloud sync warning: {e}") try: if os.path.exists(DB_PATH): db = open_memvid(DB_PATH, read_only=False) else: db = create(DB_PATH) except ImportError: from memvid_sdk import Memvid if os.path.exists(DB_PATH): db = Memvid() db.open(DB_PATH) else: db = Memvid() db.create(DB_PATH) def sync_to_hub(): """Uploads the local .mv2 file back to Hugging Face""" repo_id = get_repo_id() if not HF_TOKEN or not repo_id: print("⚠️ No HF_TOKEN or Repo ID found. Skipping sync.") return try: print("☁️ Syncing to Hub...") upload_file( path_or_fileobj=DB_PATH, path_in_repo=FILENAME, repo_id=repo_id, repo_type="dataset", token=HF_TOKEN, commit_message="Memvid: Auto-save memory update" ) print("✅ Sync complete.") except Exception as e: print(f"❌ Sync failed: {e}") # Initialize on startup init_db() @app.route('/') def index(): return render_template('index.html') @app.route('/add', methods=['POST']) def add_memory(): # 1. Setup Validation global db content = request.form.get('content') if not content: return jsonify({"error": "No content provided"}), 400 # 2. Define the Stream Generator def generate(): try: # Step A: Re-init if needed inside the stream global db if not db: init_db() if not db: yield '{"status": "error", "message": "Database init failed"}\n' return # Step B: Database Put yield '{"status": "processing", "message": "Ingesting content..."}\n' payload = { "text": content, "labels": ["web-entry"], "title": "User Memory" } db.put(payload) # Step C: Flush to Disk yield '{"status": "processing", "message": "Flushing to disk..."}\n' del db db = None # Step D: Sync yield '{"status": "processing", "message": "Syncing to cloud (this may take a moment)..."}\n' sync_to_hub() # Step E: Reload yield '{"status": "processing", "message": "Reloading index..."}\n' init_db() # Final Success Message yield '{"status": "success", "message": "Memory added and synced."}\n' except Exception as e: # Capture any errors during the process yield f'{{"status": "error", "message": "{str(e)}"}}\n' # 3. Return the Stream return Response(stream_with_context(generate()), mimetype='application/x-ndjson') @app.route('/search', methods=['POST']) def search_memory(): if not db: return jsonify({"error": "Database not initialized"}), 500 query = request.form.get('query') if not query: return jsonify({"error": "No query provided"}), 400 try: # 1. Search response = db.find(query) # 2. Parse & Clean clean_results = [] hits = response.get('hits', []) for hit in hits: score = hit.get('score', 0.0) if score < 0.65: continue # --- CLEANING LOGIC --- raw_snippet = hit.get('snippet', '') lines = raw_snippet.split('\n') content_lines = [ line for line in lines if not line.strip().startswith(('title:', 'tags:', 'labels:', 'extractous_metadata:')) ] clean_text = "\n".join(content_lines).strip() tags = hit.get('tags', []) labels = hit.get('labels', []) clean_results.append({ "title": hit.get('title') or "Untitled Memory", "text": clean_text, "tags": tags, "labels": labels, "date": hit.get('created_at', ''), "score": f"{score:.2f}" }) return jsonify({"success": True, "results": clean_results}) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)