Spaces:
Sleeping
Sleeping
File size: 11,122 Bytes
5ef3f54 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | import os
import re
import threading
import shutil
import sys
from typing import Dict, List, Any
class FalkorDBMock:
"""A robust in-memory graph mock for local development."""
def __init__(self):
self.nodes = [] # List of dicts with 'labels', 'properties'
self.vectors = {} # node_index -> vector
self.lock = threading.Lock()
def _resolve_params(self, cypher: str, params: Dict = None) -> str:
if not params:
return cypher
# Sort keys by length descending to avoid partial replacement issues ($id vs $id_2)
for k in sorted(params.keys(), key=len, reverse=True):
v = params[k]
placeholder = f"${k}"
# If it's a vector, we don't replace it in the string (it's too large)
if isinstance(v, list) and len(v) > 0 and isinstance(v[0], float):
continue
replacement = f"'{v}'" if isinstance(v, str) else str(v)
cypher = cypher.replace(placeholder, replacement)
return cypher
def query(self, cypher: str, params: Dict = None):
with self.lock:
cypher = self._resolve_params(cypher, params)
cypher_upper = cypher.upper()
# 0. Handle Vector Index Creation
if "CALL DB.IDX.VECTOR.CREATE" in cypher_upper:
return []
# 1. Handle Vector Search
if "CALL DB.IDX.VECTOR.QUERY" in cypher_upper:
label_filter = "Rule" if "'RULES'" in cypher_upper or "'RULE'" in cypher_upper else "Memory" if "'MEMORIES'" in cypher_upper or "'MEMORY'" in cypher_upper else None
results = []
for n in self.nodes:
if not label_filter or label_filter in n['labels']:
results.append([
n['properties'].get('name', n['properties'].get('content', 'Unknown')),
n['properties'].get('description', n['properties'].get('summary', '')),
0.9
])
return results
# 2. Handle CREATE / MERGE
if "CREATE" in cypher_upper or "MERGE" in cypher_upper:
# Multi-clause support (crude)
clauses = re.split(r"(CREATE|MERGE)", cypher, flags=re.IGNORECASE)[1:]
for i in range(0, len(clauses), 2):
op = clauses[i].upper()
content = clauses[i+1]
label_match = re.search(r":(\w+)", content)
label = label_match.group(1) if label_match else "Unknown"
props = self._parse_props(content)
self.nodes.append({"labels": [label], "properties": props})
return []
# 3. Handle MATCH / SET / RETURN
if "MATCH" in cypher_upper:
# Find matching nodes
matched_nodes = []
# Improved regex to handle (n), (n:Label), and (n {props})
match_clauses = re.findall(r"MATCH\s+\((\w+)(?::(\w+))?(?:\s*\{([^}]+)\})?\)", cypher, flags=re.IGNORECASE | re.DOTALL)
for var, label, props_str in match_clauses:
target_props = self._parse_props(f"{{{props_str}}}") if props_str else {}
for node in self.nodes:
if not label or label in node['labels']:
match = True
for k, v in target_props.items():
if node['properties'].get(k) != v:
match = False
break
if match:
# Basic WHERE label support: WHERE n:Player OR n:Location
where_match = re.search(rf"WHERE\s+(.+?)(?=\s+SET|\s+MATCH|\s+MERGE|\s+CREATE|\s+RETURN|$)", cypher, flags=re.IGNORECASE | re.DOTALL)
if where_match:
where_clause = where_match.group(1)
allowed_labels = re.findall(rf"{var}:(\w+)", where_clause)
if allowed_labels:
if not any(l in node['labels'] for l in allowed_labels):
continue
matched_nodes.append(node)
# Handle SET
if "SET" in cypher_upper:
set_clauses = re.findall(r"SET\s+(\w+)\.(\w+)\s*=\s*(.+?)(?=\s+SET|\s+MATCH|\s+MERGE|\s+CREATE|\s+RETURN|$)", cypher, flags=re.IGNORECASE)
for var, prop, val_expr in set_clauses:
val_expr = val_expr.strip().strip("'\"")
for node in matched_nodes:
# Crude numeric evaluation
if val_expr.replace('.','',1).isdigit() or (val_expr.startswith('-') and val_expr[1:].replace('.','',1).isdigit()):
node['properties'][prop] = float(val_expr) if '.' in val_expr else int(val_expr)
else:
node['properties'][prop] = val_expr
return []
# Handle DELETE
if "DELETE" in cypher_upper:
# Very crude: delete all matched nodes
for node in matched_nodes:
if node in self.nodes:
self.nodes.remove(node)
return []
# Handle RETURN
return_match = re.search(r"RETURN\s+(.+)$", cypher, flags=re.IGNORECASE | re.DOTALL)
if return_match:
ret_expr = return_match.group(1).strip()
results = []
for node in matched_nodes:
row = []
for item in ret_expr.split(","):
item = item.strip()
# Handle labels(n)[0] as type
if "LABELS(" in item.upper():
row.append(node['labels'][0])
elif "." in item:
# Handle n.prop
prop = item.split(".")[1]
# Remove 'AS alias' if present
prop = prop.split(" ")[0]
row.append(node['properties'].get(prop))
else:
row.append(node['properties'])
results.append(row)
return results
return None
def _parse_props(self, s: str) -> Dict:
props = {}
# String props
matches = re.findall(r"(\w+):\s*['\"]([^'\"]+)['\"]", s)
for k, v in matches: props[k] = v
# Number props (handling negative numbers)
num_matches = re.findall(r"(\w+):\s*(-?\d+\.?\d*)", s)
for k, v in num_matches: props[k] = float(v) if '.' in v else int(v)
return props
def dump(self, path: str):
import pickle
with open(path, "wb") as f:
pickle.dump({"nodes": self.nodes, "vectors": self.vectors}, f)
def load(self, path: str):
import pickle
if os.path.exists(path):
with open(path, "rb") as f:
data = pickle.load(f)
self.nodes = data.get("nodes", [])
self.vectors = data.get("vectors", {})
class DBClient:
def __init__(self):
self.is_mock = os.getenv("USE_MOCK_DB", "False") == "True"
if self.is_mock:
self.client = FalkorDBMock()
print("Using FalkorDBMock as requested.")
else:
host = os.getenv("DB_HOST", "localhost")
port = int(os.getenv("DB_PORT", 6379))
# Retry loop for initial connection (helps in Docker startup races)
retries = 5
while retries > 0:
try:
from falkordb import FalkorDB
self.client = FalkorDB(host=host, port=port)
# Verify connection and module availability
try:
self.client.select_graph("world").query("RETURN 1")
except Exception as e:
if "unknown command 'GRAPH.QUERY'" in str(e):
raise RuntimeError("FalkorDB module (falkordb.so) is NOT loaded in the Redis server.") from e
raise
print(f"Connected to FalkorDB at {host}:{port}")
return
except Exception as e:
print(f"Failed to connect to FalkorDB (Retries left: {retries}): {e}")
retries -= 1
if retries > 0:
import time
time.sleep(2)
print(f"CRITICAL: Failed to connect to live FalkorDB after all retries.")
# We do NOT fall back to mock here anymore as per requirements
raise RuntimeError("Could not connect to live FalkorDB.")
def query(self, cypher: str, params: Dict = None):
if self.is_mock:
return self.client.query(cypher, params)
res = self.client.select_graph("world").query(cypher, params)
# return the result set as a list for easier len() checks and compatibility
return res.result_set
def player_exists(self) -> bool:
"""Robustly checks if a player exists in the database."""
try:
res = self.query("MATCH (p:Player) RETURN p.name LIMIT 1")
if not res: return False
# FalkorDB result_set is a list of lists, mock is similar
return len(list(res)) > 0
except:
return False
def save_db(self, path: str):
if self.is_mock:
self.client.dump(path)
else:
try:
self.client.execute_command("SAVE")
except Exception as e:
print(f"Warning: SAVE command failed: {e}")
rdb_source = "/data/dump.rdb"
if os.path.exists(rdb_source):
os.makedirs(os.path.dirname(path), exist_ok=True)
shutil.copy(rdb_source, path)
elif os.path.exists("dump.rdb"):
os.makedirs(os.path.dirname(path), exist_ok=True)
shutil.copy("dump.rdb", path)
else:
print("Error: Could not find dump.rdb to save.")
def load_db(self, path: str):
if self.is_mock:
self.client.load(path)
else:
rdb_target = "/data/dump.rdb"
# Ensure target directory exists
os.makedirs(os.path.dirname(rdb_target), exist_ok=True)
if os.path.exists(path):
shutil.copy(path, rdb_target)
print(f"Database loaded from {path} to {rdb_target}.")
db_client = DBClient()
|