Spaces:
Sleeping
Sleeping
| """ | |
| Neo4j store — knowledge graph. | |
| Nodes: Bookmark, Tag, Category, Source, Domain | |
| Edges: TAGGED, IN_CATEGORY, FROM_SOURCE, FROM_DOMAIN, SIMILAR_TO, CO_OCCURS_WITH | |
| """ | |
| import re | |
| from urllib.parse import urlparse | |
| from neo4j import GraphDatabase | |
| from openmark import config | |
| def get_driver(): | |
| return GraphDatabase.driver( | |
| config.NEO4J_URI, | |
| auth=(config.NEO4J_USER, config.NEO4J_PASSWORD), | |
| ) | |
| def setup_constraints(driver): | |
| """Create uniqueness constraints once.""" | |
| constraints = [ | |
| "CREATE CONSTRAINT bookmark_url IF NOT EXISTS FOR (b:Bookmark) REQUIRE b.url IS UNIQUE", | |
| "CREATE CONSTRAINT tag_name IF NOT EXISTS FOR (t:Tag) REQUIRE t.name IS UNIQUE", | |
| "CREATE CONSTRAINT category_name IF NOT EXISTS FOR (c:Category) REQUIRE c.name IS UNIQUE", | |
| "CREATE CONSTRAINT source_name IF NOT EXISTS FOR (s:Source) REQUIRE s.name IS UNIQUE", | |
| "CREATE CONSTRAINT domain_name IF NOT EXISTS FOR (d:Domain) REQUIRE d.name IS UNIQUE", | |
| ] | |
| with driver.session(database=config.NEO4J_DATABASE) as session: | |
| for cypher in constraints: | |
| try: | |
| session.run(cypher) | |
| except Exception as e: | |
| print(f" Constraint (already exists or error): {e}") | |
| print("Constraints ready.") | |
| def extract_domain(url: str) -> str: | |
| try: | |
| return urlparse(url).netloc.replace("www.", "") | |
| except Exception: | |
| return "unknown" | |
| def ingest(items: list[dict], driver=None): | |
| """Write all nodes and relationships to Neo4j.""" | |
| own_driver = driver is None | |
| if own_driver: | |
| driver = get_driver() | |
| setup_constraints(driver) | |
| total = len(items) | |
| batch_size = 200 | |
| print(f"Neo4j ingesting {total} items...") | |
| for start in range(0, total, batch_size): | |
| batch = items[start:start + batch_size] | |
| with driver.session(database=config.NEO4J_DATABASE) as session: | |
| session.execute_write(_write_batch, batch) | |
| print(f" Neo4j wrote {min(start + batch_size, total)}/{total}") | |
| print("Building tag co-occurrence edges...") | |
| _build_tag_cooccurrence(driver) | |
| print("Neo4j ingestion complete.") | |
| if own_driver: | |
| driver.close() | |
| def _write_batch(tx, batch: list[dict]): | |
| for item in batch: | |
| url = item["url"] | |
| title = item["title"][:500] | |
| category = item["category"] | |
| tags = item["tags"] | |
| score = float(item["score"]) | |
| source = item["source"] | |
| domain = extract_domain(url) | |
| # Bookmark node | |
| tx.run(""" | |
| MERGE (b:Bookmark {url: $url}) | |
| SET b.title = $title, b.score = $score | |
| """, url=url, title=title, score=score) | |
| # Category node + relationship | |
| tx.run(""" | |
| MERGE (c:Category {name: $cat}) | |
| WITH c | |
| MATCH (b:Bookmark {url: $url}) | |
| MERGE (b)-[:IN_CATEGORY]->(c) | |
| """, cat=category, url=url) | |
| # Source node + relationship | |
| tx.run(""" | |
| MERGE (s:Source {name: $src}) | |
| WITH s | |
| MATCH (b:Bookmark {url: $url}) | |
| MERGE (b)-[:FROM_SOURCE]->(s) | |
| """, src=source, url=url) | |
| # Domain node + relationship | |
| if domain and domain != "unknown": | |
| tx.run(""" | |
| MERGE (d:Domain {name: $domain}) | |
| WITH d | |
| MATCH (b:Bookmark {url: $url}) | |
| MERGE (b)-[:FROM_DOMAIN]->(d) | |
| """, domain=domain, url=url) | |
| # Tag nodes + relationships | |
| for tag in tags: | |
| if not tag: | |
| continue | |
| tx.run(""" | |
| MERGE (t:Tag {name: $tag}) | |
| WITH t | |
| MATCH (b:Bookmark {url: $url}) | |
| MERGE (b)-[:TAGGED]->(t) | |
| """, tag=tag, url=url) | |
| def _build_tag_cooccurrence(driver): | |
| """ | |
| For each bookmark with multiple tags, create CO_OCCURS_WITH edges between tags. | |
| Weight = number of bookmarks where both tags appear together. | |
| """ | |
| with driver.session(database=config.NEO4J_DATABASE) as session: | |
| session.run(""" | |
| MATCH (b:Bookmark)-[:TAGGED]->(t1:Tag) | |
| MATCH (b)-[:TAGGED]->(t2:Tag) | |
| WHERE t1.name < t2.name | |
| MERGE (t1)-[r:CO_OCCURS_WITH]-(t2) | |
| ON CREATE SET r.count = 1 | |
| ON MATCH SET r.count = r.count + 1 | |
| """) | |
| print(" Tag co-occurrence edges built.") | |
| def add_similar_to_edges(similar_pairs: list[tuple[str, str, float]], driver=None): | |
| """ | |
| Write SIMILAR_TO edges derived from ChromaDB nearest-neighbor search. | |
| similar_pairs = [(url_a, url_b, similarity_score), ...] | |
| """ | |
| own_driver = driver is None | |
| if own_driver: | |
| driver = get_driver() | |
| with driver.session(database=config.NEO4J_DATABASE) as session: | |
| for url_a, url_b, score in similar_pairs: | |
| session.run(""" | |
| MATCH (a:Bookmark {url: $url_a}) | |
| MATCH (b:Bookmark {url: $url_b}) | |
| MERGE (a)-[r:SIMILAR_TO]-(b) | |
| SET r.score = $score | |
| """, url_a=url_a, url_b=url_b, score=score) | |
| print(f" SIMILAR_TO: {len(similar_pairs)} edges written.") | |
| if own_driver: | |
| driver.close() | |
| def query(cypher: str, params: dict | None = None) -> list[dict]: | |
| """Run arbitrary Cypher and return results as list of dicts.""" | |
| driver = get_driver() | |
| with driver.session(database=config.NEO4J_DATABASE) as session: | |
| result = session.run(cypher, params or {}) | |
| rows = [dict(r) for r in result] | |
| driver.close() | |
| return rows | |
| def get_stats() -> dict: | |
| rows = query(""" | |
| MATCH (b:Bookmark) WITH count(b) AS bookmarks | |
| MATCH (t:Tag) WITH bookmarks, count(t) AS tags | |
| MATCH (c:Category) WITH bookmarks, tags, count(c) AS categories | |
| RETURN bookmarks, tags, categories | |
| """) | |
| return rows[0] if rows else {} | |
| def find_similar(url: str, limit: int = 10) -> list[dict]: | |
| return query(""" | |
| MATCH (b:Bookmark {url: $url})-[r:SIMILAR_TO]-(other:Bookmark) | |
| RETURN other.url AS url, other.title AS title, r.score AS similarity | |
| ORDER BY r.score DESC LIMIT $limit | |
| """, {"url": url, "limit": limit}) | |
| def find_by_tag(tag: str, limit: int = 20) -> list[dict]: | |
| return query(""" | |
| MATCH (b:Bookmark)-[:TAGGED]->(t:Tag {name: $tag}) | |
| RETURN b.url AS url, b.title AS title, b.score AS score | |
| ORDER BY b.score DESC LIMIT $limit | |
| """, {"tag": tag.lower(), "limit": limit}) | |
| def find_tag_cluster(tag: str, hops: int = 2, limit: int = 30) -> list[dict]: | |
| """Follow CO_OCCURS_WITH edges to find related tags and their bookmarks.""" | |
| return query(f""" | |
| MATCH (t:Tag {{name: $tag}})-[:CO_OCCURS_WITH*1..{hops}]-(related:Tag) | |
| MATCH (b:Bookmark)-[:TAGGED]->(related) | |
| RETURN DISTINCT b.url AS url, b.title AS title, b.score AS score, related.name AS via_tag | |
| ORDER BY b.score DESC LIMIT $limit | |
| """, {"tag": tag.lower(), "limit": limit}) | |