Spaces:
Sleeping
Sleeping
| """ | |
| OMEGA Migration: JSON graphs + JSONL sidecar → SQLite + sqlite-vec. | |
| Reads the existing graph JSON files (semantic.json, temporal.json, causal.json, | |
| entity.json) and JSONL store, generates embeddings, and inserts everything into | |
| the new SQLite database. | |
| Usage: | |
| omega migrate-db # Interactive migration | |
| omega migrate-db --force # Overwrite existing database | |
| """ | |
| import hashlib | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Set | |
| from omega import json_compat as json | |
| logger = logging.getLogger("omega.migrate") | |
| OMEGA_DIR = Path(os.environ.get("OMEGA_HOME", str(Path.home() / ".omega"))) | |
| GRAPHS_DIR = OMEGA_DIR / "graphs" | |
| DB_PATH = OMEGA_DIR / "omega.db" | |
| def _read_json(path: Path) -> Dict: | |
| """Read a JSON file with orjson.""" | |
| return json.loads(path.read_bytes()) | |
| def _load_graph_nodes(graph_name: str) -> List[Dict[str, Any]]: | |
| """Load nodes from a graph JSON file.""" | |
| path = GRAPHS_DIR / f"{graph_name}.json" | |
| if not path.exists(): | |
| return [] | |
| try: | |
| data = _read_json(path) | |
| nodes = data.get("nodes", []) | |
| logger.info(f" {graph_name}.json: {len(nodes)} nodes") | |
| return nodes | |
| except Exception as e: | |
| logger.warning(f" Failed to read {graph_name}.json: {e}") | |
| return [] | |
| def _load_graph_edges(graph_name: str) -> List[Dict[str, Any]]: | |
| """Load edges from a graph JSON file.""" | |
| path = GRAPHS_DIR / f"{graph_name}.json" | |
| if not path.exists(): | |
| return [] | |
| try: | |
| data = _read_json(path) | |
| edges = data.get("edges", []) | |
| logger.info(f" {graph_name}.json: {len(edges)} edges") | |
| return edges | |
| except Exception as e: | |
| logger.warning(f" Failed to read {graph_name}.json edges: {e}") | |
| return [] | |
| def _load_jsonl_entries(store_path: Path) -> List[Dict[str, Any]]: | |
| """Load entries from the JSONL sidecar store.""" | |
| if not store_path.exists(): | |
| return [] | |
| entries = [] | |
| errors = 0 | |
| for line in store_path.read_text().splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| entry = json.loads(line.encode() if isinstance(line, str) else line) | |
| entries.append(entry) | |
| except Exception: | |
| errors += 1 | |
| if errors: | |
| logger.warning(f" {errors} malformed JSONL lines skipped") | |
| return entries | |
| def _normalize_metadata(node_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Normalize metadata from graph node format.""" | |
| meta = dict(node_data.get("metadata", {}) or {}) | |
| # Unify "type" -> "event_type" | |
| if "type" in meta and "event_type" not in meta: | |
| meta["event_type"] = meta["type"] | |
| return meta | |
| def migrate(force: bool = False, batch_size: int = 50) -> Dict[str, Any]: | |
| """Run the full migration from JSON graphs to SQLite. | |
| Returns a report dict with counts. | |
| """ | |
| report = { | |
| "nodes_found": 0, | |
| "nodes_migrated": 0, | |
| "edges_migrated": 0, | |
| "duplicates_skipped": 0, | |
| "jsonl_only": 0, | |
| "errors": 0, | |
| "warnings": [], | |
| } | |
| # Check for existing database | |
| if DB_PATH.exists() and not force: | |
| # Check if it has data | |
| import sqlite3 | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| try: | |
| count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0] | |
| if count > 0: | |
| report["warnings"].append(f"Database already exists with {count} memories. Use --force to overwrite.") | |
| return report | |
| except Exception: | |
| pass # Table doesn't exist yet, safe to proceed | |
| finally: | |
| conn.close() | |
| if DB_PATH.exists() and force: | |
| # Back up before overwriting | |
| backup_path = DB_PATH.with_suffix(".db.bak") | |
| import shutil | |
| shutil.copy2(DB_PATH, backup_path) | |
| DB_PATH.unlink() | |
| logger.info(f" Backed up existing database to {backup_path}") | |
| # ---- Phase 1: Load all nodes from JSON graphs ---- | |
| print("\n[1/4] Loading graph data...") | |
| semantic_nodes = _load_graph_nodes("semantic") | |
| temporal_nodes = _load_graph_nodes("temporal") | |
| causal_nodes = _load_graph_nodes("causal") | |
| entity_data = {} | |
| entity_path = GRAPHS_DIR / "entity.json" | |
| if entity_path.exists(): | |
| try: | |
| entity_data = _read_json(entity_path) | |
| print(" entity.json: loaded") | |
| except Exception as e: | |
| logger.warning(f" Failed to read entity.json: {e}") | |
| # Deduplicate nodes across graphs (semantic is the canonical source) | |
| all_nodes: Dict[str, Dict[str, Any]] = {} | |
| for node in semantic_nodes: | |
| nid = node.get("id") | |
| if nid: | |
| all_nodes[nid] = node | |
| # Add any nodes from temporal/causal that aren't in semantic | |
| for nodes in [temporal_nodes, causal_nodes]: | |
| for node in nodes: | |
| nid = node.get("id") | |
| if nid and nid not in all_nodes: | |
| all_nodes[nid] = node | |
| report["nodes_found"] = len(all_nodes) | |
| print(f" Total unique nodes: {len(all_nodes)}") | |
| # ---- Phase 2: Load JSONL entries not in graphs ---- | |
| store_path = OMEGA_DIR / "store.jsonl" | |
| jsonl_entries = _load_jsonl_entries(store_path) | |
| print(f" JSONL entries: {len(jsonl_entries)}") | |
| # Find entries in JSONL but not in graphs | |
| graph_content_hashes: Set[str] = set() | |
| for node in all_nodes.values(): | |
| content = node.get("content", "") | |
| if content: | |
| graph_content_hashes.add(hashlib.sha256(content.encode()).hexdigest()) | |
| jsonl_only_entries = [] | |
| for entry in jsonl_entries: | |
| content = entry.get("content", "") | |
| if content: | |
| h = hashlib.sha256(content.encode()).hexdigest() | |
| if h not in graph_content_hashes: | |
| jsonl_only_entries.append(entry) | |
| graph_content_hashes.add(h) # Prevent JSONL-internal dupes | |
| if jsonl_only_entries: | |
| print(f" JSONL-only entries (not in graphs): {len(jsonl_only_entries)}") | |
| report["jsonl_only"] = len(jsonl_only_entries) | |
| # ---- Phase 3: Generate embeddings and insert into SQLite ---- | |
| print(f"\n[2/4] Generating embeddings for {len(all_nodes) + len(jsonl_only_entries)} entries...") | |
| from omega.sqlite_store import SQLiteStore | |
| store = SQLiteStore(db_path=DB_PATH) | |
| try: | |
| return _migrate_into_store(store, all_nodes, jsonl_only_entries, entity_data, report) | |
| finally: | |
| store.close() | |
| def _migrate_into_store( | |
| store, | |
| all_nodes, | |
| jsonl_only_entries, | |
| entity_data, | |
| report, | |
| ) -> Dict[str, Any]: | |
| """Insert nodes, edges, and entities into the store. Called by migrate().""" | |
| # Prepare all items for batch insertion | |
| items_to_store: List[Dict[str, Any]] = [] | |
| for node_data in all_nodes.values(): | |
| meta = _normalize_metadata(node_data) | |
| content = node_data.get("content", "") | |
| if not content.strip(): | |
| continue | |
| items_to_store.append( | |
| { | |
| "node_id": node_data.get("id"), | |
| "content": content, | |
| "metadata": meta, | |
| "event_type": meta.get("event_type", ""), | |
| "session_id": meta.get("session_id", ""), | |
| "project": meta.get("project", ""), | |
| "created_at": node_data.get("created_at"), | |
| "access_count": node_data.get("access_count", 0), | |
| "last_accessed": node_data.get("last_accessed"), | |
| "ttl_seconds": node_data.get("ttl_seconds"), | |
| "embedding": node_data.get("embedding"), # Usually None (not in JSON) | |
| } | |
| ) | |
| for entry in jsonl_only_entries: | |
| content = entry.get("content", "") | |
| if not content.strip(): | |
| continue | |
| meta = {} | |
| for key in ["event_type", "session_id", "project", "type"]: | |
| if key in entry: | |
| meta[key] = entry[key] | |
| if "type" in meta and "event_type" not in meta: | |
| meta["event_type"] = meta.pop("type") | |
| items_to_store.append( | |
| { | |
| "node_id": entry.get("id"), | |
| "content": content, | |
| "metadata": meta, | |
| "event_type": meta.get("event_type", ""), | |
| "session_id": meta.get("session_id", entry.get("session_id", "")), | |
| "project": meta.get("project", entry.get("project", "")), | |
| "created_at": entry.get("timestamp") or entry.get("created_at"), | |
| "access_count": 0, | |
| "last_accessed": None, | |
| "ttl_seconds": None, | |
| "embedding": None, | |
| } | |
| ) | |
| # Batch-generate embeddings | |
| items_needing_embedding = [i for i, item in enumerate(items_to_store) if item.get("embedding") is None] | |
| if items_needing_embedding: | |
| print(f" Generating embeddings for {len(items_needing_embedding)} nodes...") | |
| try: | |
| from omega.graphs import generate_embedding | |
| done = 0 | |
| for idx in items_needing_embedding: | |
| content = items_to_store[idx]["content"] | |
| try: | |
| emb = generate_embedding(content) | |
| items_to_store[idx]["embedding"] = emb | |
| except Exception: | |
| pass | |
| done += 1 | |
| if done % 100 == 0: | |
| print(f" {done}/{len(items_needing_embedding)} embeddings generated...") | |
| print(f" {done}/{len(items_needing_embedding)} embeddings generated.") | |
| except ImportError: | |
| print(" WARNING: Could not import embedding model. Nodes will be stored without embeddings.") | |
| report["warnings"].append("Embedding model not available; stored without vectors") | |
| # Insert into SQLite | |
| print(f"\n[3/4] Inserting {len(items_to_store)} memories into SQLite...") | |
| migrated = 0 | |
| dupes = 0 | |
| errors = 0 | |
| for i, item in enumerate(items_to_store): | |
| try: | |
| # Build metadata for store() | |
| meta = dict(item.get("metadata", {})) | |
| if item.get("event_type"): | |
| meta["event_type"] = item["event_type"] | |
| if item.get("session_id"): | |
| meta["session_id"] = item["session_id"] | |
| if item.get("project"): | |
| meta["project"] = item["project"] | |
| if item.get("created_at"): | |
| meta["_original_created_at"] = item["created_at"] | |
| if item.get("access_count"): | |
| meta["access_count"] = item["access_count"] | |
| if item.get("last_accessed"): | |
| meta["last_accessed"] = item["last_accessed"] | |
| node_id = store.store( | |
| content=item["content"], | |
| session_id=item.get("session_id"), | |
| metadata=meta, | |
| embedding=item.get("embedding"), | |
| ttl_seconds=item.get("ttl_seconds"), | |
| skip_inference=True, # We already generated embeddings | |
| ) | |
| if node_id: | |
| migrated += 1 | |
| else: | |
| dupes += 1 | |
| except Exception as e: | |
| errors += 1 | |
| if errors <= 5: | |
| logger.warning(f" Error migrating node: {e}") | |
| if (i + 1) % 200 == 0: | |
| print(f" {i + 1}/{len(items_to_store)} inserted...") | |
| report["nodes_migrated"] = migrated | |
| report["duplicates_skipped"] = dupes | |
| report["errors"] = errors | |
| # ---- Phase 4: Migrate causal edges ---- | |
| # Note: _conn direct access is intentional — migration is single-threaded | |
| # and offline, so the store's _lock is not needed. | |
| print("\n[4/4] Migrating edges...") | |
| causal_edges = _load_graph_edges("causal") | |
| edges_migrated = 0 | |
| for edge_data in causal_edges: | |
| try: | |
| source = edge_data.get("source") | |
| target = edge_data.get("target") | |
| if source and target: | |
| store._conn.execute( | |
| "INSERT OR IGNORE INTO edges (source_id, target_id, edge_type, weight, metadata) VALUES (?, ?, ?, ?, ?)", | |
| ( | |
| source, | |
| target, | |
| edge_data.get("edge_type", "causal"), | |
| edge_data.get("weight", 1.0), | |
| json.dumps(edge_data.get("metadata", {})), | |
| ), | |
| ) | |
| edges_migrated += 1 | |
| except Exception as e: | |
| if edges_migrated == 0: | |
| logger.warning(f" Edge migration error: {e}") | |
| store._conn.commit() | |
| report["edges_migrated"] = edges_migrated | |
| # Migrate entity index | |
| entity_index = entity_data.get("entity_index", {}) | |
| if entity_index: | |
| for entity_id, node_ids in entity_index.items(): | |
| for nid in node_ids: | |
| try: | |
| store._conn.execute( | |
| "INSERT OR IGNORE INTO entity_index (entity_id, node_id) VALUES (?, ?)", | |
| (entity_id, nid), | |
| ) | |
| except Exception: | |
| pass | |
| store._conn.commit() | |
| print(f" Entity index: {len(entity_index)} entities migrated") | |
| # ---- Rename old files ---- | |
| store_path = OMEGA_DIR / "store.jsonl" | |
| backed_up = [] | |
| for name in ["semantic.json", "temporal.json", "causal.json", "entity.json"]: | |
| old = GRAPHS_DIR / name | |
| if old.exists(): | |
| bak = old.with_suffix(".json.bak") | |
| old.rename(bak) | |
| backed_up.append(name) | |
| # Also back up the JSONL sidecar | |
| if store_path.exists(): | |
| store_path.rename(store_path.with_suffix(".jsonl.pre-sqlite")) | |
| # Back up auxiliary index files | |
| for name in ["stats.json", "type_index.json", "session_index.json", "project_index.json", "feedback_index.json"]: | |
| old = GRAPHS_DIR / name | |
| if old.exists(): | |
| old.rename(old.with_suffix(".json.bak")) | |
| # Clean up lock files and WAL | |
| for lock in GRAPHS_DIR.glob("*.lock"): | |
| lock.unlink() | |
| wal_path = GRAPHS_DIR / "wal.jsonl" | |
| if wal_path.exists(): | |
| wal_path.rename(wal_path.with_suffix(".jsonl.bak")) | |
| if backed_up: | |
| print(f" Backed up: {', '.join(backed_up)}") | |
| print(f"\n{'=' * 50}") | |
| print("Migration complete!") | |
| print(f" Migrated: {migrated} memories") | |
| print(f" Duplicates: {dupes} skipped") | |
| print(f" Edges: {edges_migrated}") | |
| print(f" Errors: {errors}") | |
| print(f" Database: {DB_PATH} ({DB_PATH.stat().st_size / 1024:.0f} KB)") | |
| if backed_up: | |
| print(f" Old files renamed to .bak in {GRAPHS_DIR}") | |
| return report | |
| def auto_migrate_if_needed() -> bool: | |
| """Auto-migrate on first run if JSON graphs exist but SQLite doesn't. | |
| Called by bridge._get_store() to handle transparent migration. | |
| Returns True if migration was performed. | |
| """ | |
| # Already have a database with data — no migration needed | |
| if DB_PATH.exists(): | |
| import sqlite3 | |
| conn = sqlite3.connect(str(DB_PATH)) | |
| try: | |
| count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0] | |
| if count > 0: | |
| return False | |
| except Exception: | |
| pass # Table doesn't exist yet, proceed with migration | |
| finally: | |
| conn.close() | |
| # Check if there are JSON graphs to migrate | |
| has_graphs = any((GRAPHS_DIR / f"{name}.json").exists() for name in ["semantic", "temporal", "causal"]) | |
| has_jsonl = (OMEGA_DIR / "store.jsonl").exists() | |
| if not has_graphs and not has_jsonl: | |
| return False # Fresh install, no migration needed | |
| print("OMEGA: Auto-migrating to SQLite backend...") | |
| try: | |
| report = migrate(force=False) | |
| if report.get("nodes_migrated", 0) > 0: | |
| print(f"OMEGA: Migrated {report['nodes_migrated']} memories to SQLite.") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Auto-migration failed: {e}") | |
| print(f"OMEGA: Auto-migration failed: {e}") | |
| print(" Run 'omega migrate-db --force' to retry manually.") | |
| return False | |