Spaces:
Running
Running
File size: 7,251 Bytes
81598c5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | """
Neo4j store — knowledge graph.
Nodes: Bookmark, Tag, Category, Source, Domain
Edges: TAGGED, IN_CATEGORY, FROM_SOURCE, FROM_DOMAIN, SIMILAR_TO, CO_OCCURS_WITH
"""
import re
from urllib.parse import urlparse
from neo4j import GraphDatabase
from openmark import config
def get_driver():
return GraphDatabase.driver(
config.NEO4J_URI,
auth=(config.NEO4J_USER, config.NEO4J_PASSWORD),
)
def setup_constraints(driver):
"""Create uniqueness constraints once."""
constraints = [
"CREATE CONSTRAINT bookmark_url IF NOT EXISTS FOR (b:Bookmark) REQUIRE b.url IS UNIQUE",
"CREATE CONSTRAINT tag_name IF NOT EXISTS FOR (t:Tag) REQUIRE t.name IS UNIQUE",
"CREATE CONSTRAINT category_name IF NOT EXISTS FOR (c:Category) REQUIRE c.name IS UNIQUE",
"CREATE CONSTRAINT source_name IF NOT EXISTS FOR (s:Source) REQUIRE s.name IS UNIQUE",
"CREATE CONSTRAINT domain_name IF NOT EXISTS FOR (d:Domain) REQUIRE d.name IS UNIQUE",
]
with driver.session(database=config.NEO4J_DATABASE) as session:
for cypher in constraints:
try:
session.run(cypher)
except Exception as e:
print(f" Constraint (already exists or error): {e}")
print("Constraints ready.")
def extract_domain(url: str) -> str:
try:
return urlparse(url).netloc.replace("www.", "")
except Exception:
return "unknown"
def ingest(items: list[dict], driver=None):
"""Write all nodes and relationships to Neo4j."""
own_driver = driver is None
if own_driver:
driver = get_driver()
setup_constraints(driver)
total = len(items)
batch_size = 200
print(f"Neo4j ingesting {total} items...")
for start in range(0, total, batch_size):
batch = items[start:start + batch_size]
with driver.session(database=config.NEO4J_DATABASE) as session:
session.execute_write(_write_batch, batch)
print(f" Neo4j wrote {min(start + batch_size, total)}/{total}")
print("Building tag co-occurrence edges...")
_build_tag_cooccurrence(driver)
print("Neo4j ingestion complete.")
if own_driver:
driver.close()
def _write_batch(tx, batch: list[dict]):
for item in batch:
url = item["url"]
title = item["title"][:500]
category = item["category"]
tags = item["tags"]
score = float(item["score"])
source = item["source"]
domain = extract_domain(url)
# Bookmark node
tx.run("""
MERGE (b:Bookmark {url: $url})
SET b.title = $title, b.score = $score
""", url=url, title=title, score=score)
# Category node + relationship
tx.run("""
MERGE (c:Category {name: $cat})
WITH c
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:IN_CATEGORY]->(c)
""", cat=category, url=url)
# Source node + relationship
tx.run("""
MERGE (s:Source {name: $src})
WITH s
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:FROM_SOURCE]->(s)
""", src=source, url=url)
# Domain node + relationship
if domain and domain != "unknown":
tx.run("""
MERGE (d:Domain {name: $domain})
WITH d
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:FROM_DOMAIN]->(d)
""", domain=domain, url=url)
# Tag nodes + relationships
for tag in tags:
if not tag:
continue
tx.run("""
MERGE (t:Tag {name: $tag})
WITH t
MATCH (b:Bookmark {url: $url})
MERGE (b)-[:TAGGED]->(t)
""", tag=tag, url=url)
def _build_tag_cooccurrence(driver):
"""
For each bookmark with multiple tags, create CO_OCCURS_WITH edges between tags.
Weight = number of bookmarks where both tags appear together.
"""
with driver.session(database=config.NEO4J_DATABASE) as session:
session.run("""
MATCH (b:Bookmark)-[:TAGGED]->(t1:Tag)
MATCH (b)-[:TAGGED]->(t2:Tag)
WHERE t1.name < t2.name
MERGE (t1)-[r:CO_OCCURS_WITH]-(t2)
ON CREATE SET r.count = 1
ON MATCH SET r.count = r.count + 1
""")
print(" Tag co-occurrence edges built.")
def add_similar_to_edges(similar_pairs: list[tuple[str, str, float]], driver=None):
"""
Write SIMILAR_TO edges derived from ChromaDB nearest-neighbor search.
similar_pairs = [(url_a, url_b, similarity_score), ...]
"""
own_driver = driver is None
if own_driver:
driver = get_driver()
with driver.session(database=config.NEO4J_DATABASE) as session:
for url_a, url_b, score in similar_pairs:
session.run("""
MATCH (a:Bookmark {url: $url_a})
MATCH (b:Bookmark {url: $url_b})
MERGE (a)-[r:SIMILAR_TO]-(b)
SET r.score = $score
""", url_a=url_a, url_b=url_b, score=score)
print(f" SIMILAR_TO: {len(similar_pairs)} edges written.")
if own_driver:
driver.close()
def query(cypher: str, params: dict | None = None) -> list[dict]:
"""Run arbitrary Cypher and return results as list of dicts."""
driver = get_driver()
with driver.session(database=config.NEO4J_DATABASE) as session:
result = session.run(cypher, params or {})
rows = [dict(r) for r in result]
driver.close()
return rows
def get_stats() -> dict:
rows = query("""
MATCH (b:Bookmark) WITH count(b) AS bookmarks
MATCH (t:Tag) WITH bookmarks, count(t) AS tags
MATCH (c:Category) WITH bookmarks, tags, count(c) AS categories
RETURN bookmarks, tags, categories
""")
return rows[0] if rows else {}
def find_similar(url: str, limit: int = 10) -> list[dict]:
return query("""
MATCH (b:Bookmark {url: $url})-[r:SIMILAR_TO]-(other:Bookmark)
RETURN other.url AS url, other.title AS title, r.score AS similarity
ORDER BY r.score DESC LIMIT $limit
""", {"url": url, "limit": limit})
def find_by_tag(tag: str, limit: int = 20) -> list[dict]:
return query("""
MATCH (b:Bookmark)-[:TAGGED]->(t:Tag {name: $tag})
RETURN b.url AS url, b.title AS title, b.score AS score
ORDER BY b.score DESC LIMIT $limit
""", {"tag": tag.lower(), "limit": limit})
def find_tag_cluster(tag: str, hops: int = 2, limit: int = 30) -> list[dict]:
"""Follow CO_OCCURS_WITH edges to find related tags and their bookmarks."""
return query(f"""
MATCH (t:Tag {{name: $tag}})-[:CO_OCCURS_WITH*1..{hops}]-(related:Tag)
MATCH (b:Bookmark)-[:TAGGED]->(related)
RETURN DISTINCT b.url AS url, b.title AS title, b.score AS score, related.name AS via_tag
ORDER BY b.score DESC LIMIT $limit
""", {"tag": tag.lower(), "limit": limit})
|