import os import re import threading import shutil import sys from typing import Dict, List, Any class FalkorDBMock: """A robust in-memory graph mock for local development.""" def __init__(self): self.nodes = [] # List of dicts with 'labels', 'properties' self.vectors = {} # node_index -> vector self.lock = threading.Lock() def _resolve_params(self, cypher: str, params: Dict = None) -> str: if not params: return cypher # Sort keys by length descending to avoid partial replacement issues ($id vs $id_2) for k in sorted(params.keys(), key=len, reverse=True): v = params[k] placeholder = f"${k}" # If it's a vector, we don't replace it in the string (it's too large) if isinstance(v, list) and len(v) > 0 and isinstance(v[0], float): continue replacement = f"'{v}'" if isinstance(v, str) else str(v) cypher = cypher.replace(placeholder, replacement) return cypher def query(self, cypher: str, params: Dict = None): with self.lock: cypher = self._resolve_params(cypher, params) cypher_upper = cypher.upper() # 0. Handle Vector Index Creation if "CALL DB.IDX.VECTOR.CREATE" in cypher_upper: return [] # 1. Handle Vector Search if "CALL DB.IDX.VECTOR.QUERY" in cypher_upper: label_filter = "Rule" if "'RULES'" in cypher_upper or "'RULE'" in cypher_upper else "Memory" if "'MEMORIES'" in cypher_upper or "'MEMORY'" in cypher_upper else None results = [] for n in self.nodes: if not label_filter or label_filter in n['labels']: results.append([ n['properties'].get('name', n['properties'].get('content', 'Unknown')), n['properties'].get('description', n['properties'].get('summary', '')), 0.9 ]) return results # 2. Handle CREATE / MERGE if "CREATE" in cypher_upper or "MERGE" in cypher_upper: # Multi-clause support (crude) clauses = re.split(r"(CREATE|MERGE)", cypher, flags=re.IGNORECASE)[1:] for i in range(0, len(clauses), 2): op = clauses[i].upper() content = clauses[i+1] label_match = re.search(r":(\w+)", content) label = label_match.group(1) if label_match else "Unknown" props = self._parse_props(content) self.nodes.append({"labels": [label], "properties": props}) return [] # 3. Handle MATCH / SET / RETURN if "MATCH" in cypher_upper: # Find matching nodes matched_nodes = [] # Improved regex to handle (n), (n:Label), and (n {props}) match_clauses = re.findall(r"MATCH\s+\((\w+)(?::(\w+))?(?:\s*\{([^}]+)\})?\)", cypher, flags=re.IGNORECASE | re.DOTALL) for var, label, props_str in match_clauses: target_props = self._parse_props(f"{{{props_str}}}") if props_str else {} for node in self.nodes: if not label or label in node['labels']: match = True for k, v in target_props.items(): if node['properties'].get(k) != v: match = False break if match: # Basic WHERE label support: WHERE n:Player OR n:Location where_match = re.search(rf"WHERE\s+(.+?)(?=\s+SET|\s+MATCH|\s+MERGE|\s+CREATE|\s+RETURN|$)", cypher, flags=re.IGNORECASE | re.DOTALL) if where_match: where_clause = where_match.group(1) allowed_labels = re.findall(rf"{var}:(\w+)", where_clause) if allowed_labels: if not any(l in node['labels'] for l in allowed_labels): continue matched_nodes.append(node) # Handle SET if "SET" in cypher_upper: set_clauses = re.findall(r"SET\s+(\w+)\.(\w+)\s*=\s*(.+?)(?=\s+SET|\s+MATCH|\s+MERGE|\s+CREATE|\s+RETURN|$)", cypher, flags=re.IGNORECASE) for var, prop, val_expr in set_clauses: val_expr = val_expr.strip().strip("'\"") for node in matched_nodes: # Crude numeric evaluation if val_expr.replace('.','',1).isdigit() or (val_expr.startswith('-') and val_expr[1:].replace('.','',1).isdigit()): node['properties'][prop] = float(val_expr) if '.' in val_expr else int(val_expr) else: node['properties'][prop] = val_expr return [] # Handle DELETE if "DELETE" in cypher_upper: # Very crude: delete all matched nodes for node in matched_nodes: if node in self.nodes: self.nodes.remove(node) return [] # Handle RETURN return_match = re.search(r"RETURN\s+(.+)$", cypher, flags=re.IGNORECASE | re.DOTALL) if return_match: ret_expr = return_match.group(1).strip() results = [] for node in matched_nodes: row = [] for item in ret_expr.split(","): item = item.strip() # Handle labels(n)[0] as type if "LABELS(" in item.upper(): row.append(node['labels'][0]) elif "." in item: # Handle n.prop prop = item.split(".")[1] # Remove 'AS alias' if present prop = prop.split(" ")[0] row.append(node['properties'].get(prop)) else: row.append(node['properties']) results.append(row) return results return None def _parse_props(self, s: str) -> Dict: props = {} # String props matches = re.findall(r"(\w+):\s*['\"]([^'\"]+)['\"]", s) for k, v in matches: props[k] = v # Number props (handling negative numbers) num_matches = re.findall(r"(\w+):\s*(-?\d+\.?\d*)", s) for k, v in num_matches: props[k] = float(v) if '.' in v else int(v) return props def dump(self, path: str): import pickle with open(path, "wb") as f: pickle.dump({"nodes": self.nodes, "vectors": self.vectors}, f) def load(self, path: str): import pickle if os.path.exists(path): with open(path, "rb") as f: data = pickle.load(f) self.nodes = data.get("nodes", []) self.vectors = data.get("vectors", {}) class DBClient: def __init__(self): self.is_mock = os.getenv("USE_MOCK_DB", "False") == "True" if self.is_mock: self.client = FalkorDBMock() print("Using FalkorDBMock as requested.") else: host = os.getenv("DB_HOST", "localhost") port = int(os.getenv("DB_PORT", 6379)) # Retry loop for initial connection (helps in Docker startup races) retries = 5 while retries > 0: try: from falkordb import FalkorDB self.client = FalkorDB(host=host, port=port) # Verify connection and module availability try: self.client.select_graph("world").query("RETURN 1") except Exception as e: if "unknown command 'GRAPH.QUERY'" in str(e): raise RuntimeError("FalkorDB module (falkordb.so) is NOT loaded in the Redis server.") from e raise print(f"Connected to FalkorDB at {host}:{port}") return except Exception as e: print(f"Failed to connect to FalkorDB (Retries left: {retries}): {e}") retries -= 1 if retries > 0: import time time.sleep(2) print(f"CRITICAL: Failed to connect to live FalkorDB after all retries.") # We do NOT fall back to mock here anymore as per requirements raise RuntimeError("Could not connect to live FalkorDB.") def query(self, cypher: str, params: Dict = None): if self.is_mock: return self.client.query(cypher, params) res = self.client.select_graph("world").query(cypher, params) # return the result set as a list for easier len() checks and compatibility return res.result_set def player_exists(self) -> bool: """Robustly checks if a player exists in the database.""" try: res = self.query("MATCH (p:Player) RETURN p.name LIMIT 1") if not res: return False # FalkorDB result_set is a list of lists, mock is similar return len(list(res)) > 0 except: return False def save_db(self, path: str): if self.is_mock: self.client.dump(path) else: try: self.client.execute_command("SAVE") except Exception as e: print(f"Warning: SAVE command failed: {e}") rdb_source = "/data/dump.rdb" if os.path.exists(rdb_source): os.makedirs(os.path.dirname(path), exist_ok=True) shutil.copy(rdb_source, path) elif os.path.exists("dump.rdb"): os.makedirs(os.path.dirname(path), exist_ok=True) shutil.copy("dump.rdb", path) else: print("Error: Could not find dump.rdb to save.") def load_db(self, path: str): if self.is_mock: self.client.load(path) else: rdb_target = "/data/dump.rdb" # Ensure target directory exists os.makedirs(os.path.dirname(rdb_target), exist_ok=True) if os.path.exists(path): shutil.copy(path, rdb_target) print(f"Database loaded from {path} to {rdb_target}.") db_client = DBClient()