OpenMark / openmark /stores /neo4j_store.py
codingwithadi's picture
Upload folder using huggingface_hub
81598c5 verified
"""
Neo4j store — knowledge graph.
Nodes: Bookmark, Tag, Category, Source, Domain
Edges: TAGGED, IN_CATEGORY, FROM_SOURCE, FROM_DOMAIN, SIMILAR_TO, CO_OCCURS_WITH
"""
import re
from urllib.parse import urlparse
from neo4j import GraphDatabase
from openmark import config
def get_driver():
return GraphDatabase.driver(
config.NEO4J_URI,
auth=(config.NEO4J_USER, config.NEO4J_PASSWORD),
)
def setup_constraints(driver):
"""Create uniqueness constraints once."""
constraints = [
"CREATE CONSTRAINT bookmark_url IF NOT EXISTS FOR (b:Bookmark) REQUIRE b.url IS UNIQUE",
"CREATE CONSTRAINT tag_name IF NOT EXISTS FOR (t:Tag) REQUIRE t.name IS UNIQUE",
"CREATE CONSTRAINT category_name IF NOT EXISTS FOR (c:Category) REQUIRE c.name IS UNIQUE",
"CREATE CONSTRAINT source_name IF NOT EXISTS FOR (s:Source) REQUIRE s.name IS UNIQUE",
"CREATE CONSTRAINT domain_name IF NOT EXISTS FOR (d:Domain) REQUIRE d.name IS UNIQUE",
]
with driver.session(database=config.NEO4J_DATABASE) as session:
for cypher in constraints:
try:
session.run(cypher)
except Exception as e:
print(f" Constraint (already exists or error): {e}")
print("Constraints ready.")
def extract_domain(url: str) -> str:
try:
return urlparse(url).netloc.replace("www.", "")
except Exception:
return "unknown"
def ingest(items: list[dict], driver=None):
"""Write all nodes and relationships to Neo4j."""
own_driver = driver is None
if own_driver:
driver = get_driver()
setup_constraints(driver)
total = len(items)
batch_size = 200
print(f"Neo4j ingesting {total} items...")
for start in range(0, total, batch_size):
batch = items[start:start + batch_size]
with driver.session(database=config.NEO4J_DATABASE) as session:
session.execute_write(_write_batch, batch)
print(f" Neo4j wrote {min(start + batch_size, total)}/{total}")
print("Building tag co-occurrence edges...")
_build_tag_cooccurrence(driver)
print("Neo4j ingestion complete.")
if own_driver:
driver.close()
def _write_batch(tx, batch: list[dict]):
for item in batch:
url = item["url"]
title = item["title"][:500]
category = item["category"]
tags = item["tags"]
score = float(item["score"])
source = item["source"]
domain = extract_domain(url)
# Bookmark node
tx.run("""
MERGE (b:Bookmark {url: $url})
SET b.title = $title, b.score = $score
""", url=url, title=title, score=score)
# Category node + relationship
tx.run("""
MERGE (c:Category {name: $cat})
WITH c
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:IN_CATEGORY]->(c)
""", cat=category, url=url)
# Source node + relationship
tx.run("""
MERGE (s:Source {name: $src})
WITH s
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:FROM_SOURCE]->(s)
""", src=source, url=url)
# Domain node + relationship
if domain and domain != "unknown":
tx.run("""
MERGE (d:Domain {name: $domain})
WITH d
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:FROM_DOMAIN]->(d)
""", domain=domain, url=url)
# Tag nodes + relationships
for tag in tags:
if not tag:
continue
tx.run("""
MERGE (t:Tag {name: $tag})
WITH t
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:TAGGED]->(t)
""", tag=tag, url=url)
def _build_tag_cooccurrence(driver):
"""
For each bookmark with multiple tags, create CO_OCCURS_WITH edges between tags.
Weight = number of bookmarks where both tags appear together.
"""
with driver.session(database=config.NEO4J_DATABASE) as session:
session.run("""
MATCH (b:Bookmark)-[:TAGGED]->(t1:Tag)
MATCH (b)-[:TAGGED]->(t2:Tag)
WHERE t1.name < t2.name
MERGE (t1)-[r:CO_OCCURS_WITH]-(t2)
ON CREATE SET r.count = 1
ON MATCH SET r.count = r.count + 1
""")
print(" Tag co-occurrence edges built.")
def add_similar_to_edges(similar_pairs: list[tuple[str, str, float]], driver=None):
"""
Write SIMILAR_TO edges derived from ChromaDB nearest-neighbor search.
similar_pairs = [(url_a, url_b, similarity_score), ...]
"""
own_driver = driver is None
if own_driver:
driver = get_driver()
with driver.session(database=config.NEO4J_DATABASE) as session:
for url_a, url_b, score in similar_pairs:
session.run("""
MATCH (a:Bookmark {url: $url_a})
MATCH (b:Bookmark {url: $url_b})
MERGE (a)-[r:SIMILAR_TO]-(b)
SET r.score = $score
""", url_a=url_a, url_b=url_b, score=score)
print(f" SIMILAR_TO: {len(similar_pairs)} edges written.")
if own_driver:
driver.close()
def query(cypher: str, params: dict | None = None) -> list[dict]:
"""Run arbitrary Cypher and return results as list of dicts."""
driver = get_driver()
with driver.session(database=config.NEO4J_DATABASE) as session:
result = session.run(cypher, params or {})
rows = [dict(r) for r in result]
driver.close()
return rows
def get_stats() -> dict:
rows = query("""
MATCH (b:Bookmark) WITH count(b) AS bookmarks
MATCH (t:Tag) WITH bookmarks, count(t) AS tags
MATCH (c:Category) WITH bookmarks, tags, count(c) AS categories
RETURN bookmarks, tags, categories
""")
return rows[0] if rows else {}
def find_similar(url: str, limit: int = 10) -> list[dict]:
return query("""
MATCH (b:Bookmark {url: $url})-[r:SIMILAR_TO]-(other:Bookmark)
RETURN other.url AS url, other.title AS title, r.score AS similarity
ORDER BY r.score DESC LIMIT $limit
""", {"url": url, "limit": limit})
def find_by_tag(tag: str, limit: int = 20) -> list[dict]:
return query("""
MATCH (b:Bookmark)-[:TAGGED]->(t:Tag {name: $tag})
RETURN b.url AS url, b.title AS title, b.score AS score
ORDER BY b.score DESC LIMIT $limit
""", {"tag": tag.lower(), "limit": limit})
def find_tag_cluster(tag: str, hops: int = 2, limit: int = 30) -> list[dict]:
"""Follow CO_OCCURS_WITH edges to find related tags and their bookmarks."""
return query(f"""
MATCH (t:Tag {{name: $tag}})-[:CO_OCCURS_WITH*1..{hops}]-(related:Tag)
MATCH (b:Bookmark)-[:TAGGED]->(related)
RETURN DISTINCT b.url AS url, b.title AS title, b.score AS score, related.name AS via_tag
ORDER BY b.score DESC LIMIT $limit
""", {"tag": tag.lower(), "limit": limit})