Rob-Bot's picture
Deploy backend scripts
5ef3f54 verified
import os
import re
import threading
import shutil
import sys
from typing import Dict, List, Any
class FalkorDBMock:
"""A robust in-memory graph mock for local development."""
def __init__(self):
self.nodes = [] # List of dicts with 'labels', 'properties'
self.vectors = {} # node_index -> vector
self.lock = threading.Lock()
def _resolve_params(self, cypher: str, params: Dict = None) -> str:
if not params:
return cypher
# Sort keys by length descending to avoid partial replacement issues ($id vs $id_2)
for k in sorted(params.keys(), key=len, reverse=True):
v = params[k]
placeholder = f"${k}"
# If it's a vector, we don't replace it in the string (it's too large)
if isinstance(v, list) and len(v) > 0 and isinstance(v[0], float):
continue
replacement = f"'{v}'" if isinstance(v, str) else str(v)
cypher = cypher.replace(placeholder, replacement)
return cypher
def query(self, cypher: str, params: Dict = None):
with self.lock:
cypher = self._resolve_params(cypher, params)
cypher_upper = cypher.upper()
# 0. Handle Vector Index Creation
if "CALL DB.IDX.VECTOR.CREATE" in cypher_upper:
return []
# 1. Handle Vector Search
if "CALL DB.IDX.VECTOR.QUERY" in cypher_upper:
label_filter = "Rule" if "'RULES'" in cypher_upper or "'RULE'" in cypher_upper else "Memory" if "'MEMORIES'" in cypher_upper or "'MEMORY'" in cypher_upper else None
results = []
for n in self.nodes:
if not label_filter or label_filter in n['labels']:
results.append([
n['properties'].get('name', n['properties'].get('content', 'Unknown')),
n['properties'].get('description', n['properties'].get('summary', '')),
0.9
])
return results
# 2. Handle CREATE / MERGE
if "CREATE" in cypher_upper or "MERGE" in cypher_upper:
# Multi-clause support (crude)
clauses = re.split(r"(CREATE|MERGE)", cypher, flags=re.IGNORECASE)[1:]
for i in range(0, len(clauses), 2):
op = clauses[i].upper()
content = clauses[i+1]
label_match = re.search(r":(\w+)", content)
label = label_match.group(1) if label_match else "Unknown"
props = self._parse_props(content)
self.nodes.append({"labels": [label], "properties": props})
return []
# 3. Handle MATCH / SET / RETURN
if "MATCH" in cypher_upper:
# Find matching nodes
matched_nodes = []
# Improved regex to handle (n), (n:Label), and (n {props})
match_clauses = re.findall(r"MATCH\s+\((\w+)(?::(\w+))?(?:\s*\{([^}]+)\})?\)", cypher, flags=re.IGNORECASE | re.DOTALL)
for var, label, props_str in match_clauses:
target_props = self._parse_props(f"{{{props_str}}}") if props_str else {}
for node in self.nodes:
if not label or label in node['labels']:
match = True
for k, v in target_props.items():
if node['properties'].get(k) != v:
match = False
break
if match:
# Basic WHERE label support: WHERE n:Player OR n:Location
where_match = re.search(rf"WHERE\s+(.+?)(?=\s+SET|\s+MATCH|\s+MERGE|\s+CREATE|\s+RETURN|$)", cypher, flags=re.IGNORECASE | re.DOTALL)
if where_match:
where_clause = where_match.group(1)
allowed_labels = re.findall(rf"{var}:(\w+)", where_clause)
if allowed_labels:
if not any(l in node['labels'] for l in allowed_labels):
continue
matched_nodes.append(node)
# Handle SET
if "SET" in cypher_upper:
set_clauses = re.findall(r"SET\s+(\w+)\.(\w+)\s*=\s*(.+?)(?=\s+SET|\s+MATCH|\s+MERGE|\s+CREATE|\s+RETURN|$)", cypher, flags=re.IGNORECASE)
for var, prop, val_expr in set_clauses:
val_expr = val_expr.strip().strip("'\"")
for node in matched_nodes:
# Crude numeric evaluation
if val_expr.replace('.','',1).isdigit() or (val_expr.startswith('-') and val_expr[1:].replace('.','',1).isdigit()):
node['properties'][prop] = float(val_expr) if '.' in val_expr else int(val_expr)
else:
node['properties'][prop] = val_expr
return []
# Handle DELETE
if "DELETE" in cypher_upper:
# Very crude: delete all matched nodes
for node in matched_nodes:
if node in self.nodes:
self.nodes.remove(node)
return []
# Handle RETURN
return_match = re.search(r"RETURN\s+(.+)$", cypher, flags=re.IGNORECASE | re.DOTALL)
if return_match:
ret_expr = return_match.group(1).strip()
results = []
for node in matched_nodes:
row = []
for item in ret_expr.split(","):
item = item.strip()
# Handle labels(n)[0] as type
if "LABELS(" in item.upper():
row.append(node['labels'][0])
elif "." in item:
# Handle n.prop
prop = item.split(".")[1]
# Remove 'AS alias' if present
prop = prop.split(" ")[0]
row.append(node['properties'].get(prop))
else:
row.append(node['properties'])
results.append(row)
return results
return None
def _parse_props(self, s: str) -> Dict:
props = {}
# String props
matches = re.findall(r"(\w+):\s*['\"]([^'\"]+)['\"]", s)
for k, v in matches: props[k] = v
# Number props (handling negative numbers)
num_matches = re.findall(r"(\w+):\s*(-?\d+\.?\d*)", s)
for k, v in num_matches: props[k] = float(v) if '.' in v else int(v)
return props
def dump(self, path: str):
import pickle
with open(path, "wb") as f:
pickle.dump({"nodes": self.nodes, "vectors": self.vectors}, f)
def load(self, path: str):
import pickle
if os.path.exists(path):
with open(path, "rb") as f:
data = pickle.load(f)
self.nodes = data.get("nodes", [])
self.vectors = data.get("vectors", {})
class DBClient:
def __init__(self):
self.is_mock = os.getenv("USE_MOCK_DB", "False") == "True"
if self.is_mock:
self.client = FalkorDBMock()
print("Using FalkorDBMock as requested.")
else:
host = os.getenv("DB_HOST", "localhost")
port = int(os.getenv("DB_PORT", 6379))
# Retry loop for initial connection (helps in Docker startup races)
retries = 5
while retries > 0:
try:
from falkordb import FalkorDB
self.client = FalkorDB(host=host, port=port)
# Verify connection and module availability
try:
self.client.select_graph("world").query("RETURN 1")
except Exception as e:
if "unknown command 'GRAPH.QUERY'" in str(e):
raise RuntimeError("FalkorDB module (falkordb.so) is NOT loaded in the Redis server.") from e
raise
print(f"Connected to FalkorDB at {host}:{port}")
return
except Exception as e:
print(f"Failed to connect to FalkorDB (Retries left: {retries}): {e}")
retries -= 1
if retries > 0:
import time
time.sleep(2)
print(f"CRITICAL: Failed to connect to live FalkorDB after all retries.")
# We do NOT fall back to mock here anymore as per requirements
raise RuntimeError("Could not connect to live FalkorDB.")
def query(self, cypher: str, params: Dict = None):
if self.is_mock:
return self.client.query(cypher, params)
res = self.client.select_graph("world").query(cypher, params)
# return the result set as a list for easier len() checks and compatibility
return res.result_set
def player_exists(self) -> bool:
"""Robustly checks if a player exists in the database."""
try:
res = self.query("MATCH (p:Player) RETURN p.name LIMIT 1")
if not res: return False
# FalkorDB result_set is a list of lists, mock is similar
return len(list(res)) > 0
except:
return False
def save_db(self, path: str):
if self.is_mock:
self.client.dump(path)
else:
try:
self.client.execute_command("SAVE")
except Exception as e:
print(f"Warning: SAVE command failed: {e}")
rdb_source = "/data/dump.rdb"
if os.path.exists(rdb_source):
os.makedirs(os.path.dirname(path), exist_ok=True)
shutil.copy(rdb_source, path)
elif os.path.exists("dump.rdb"):
os.makedirs(os.path.dirname(path), exist_ok=True)
shutil.copy("dump.rdb", path)
else:
print("Error: Could not find dump.rdb to save.")
def load_db(self, path: str):
if self.is_mock:
self.client.load(path)
else:
rdb_target = "/data/dump.rdb"
# Ensure target directory exists
os.makedirs(os.path.dirname(rdb_target), exist_ok=True)
if os.path.exists(path):
shutil.copy(path, rdb_target)
print(f"Database loaded from {path} to {rdb_target}.")
db_client = DBClient()