OneServerToRuleThemAll / storage /neo4j_storage.py
etukurudinesh's picture
add files
feea636
from neo4j import GraphDatabase
from typing import Dict, List
from urllib.parse import urlparse
from config.settings import settings
class Neo4jStorage:
def __init__(self):
self.driver = GraphDatabase.driver(
settings.database.neo4j_uri,
auth=(settings.database.neo4j_user, settings.database.neo4j_password)
)
self._create_constraints()
def _create_constraints(self):
"""Create constraints and indexes for better performance"""
with self.driver.session() as session:
try:
session.run("CREATE CONSTRAINT page_url IF NOT EXISTS FOR (p:Page) REQUIRE p.url IS UNIQUE")
session.run("CREATE CONSTRAINT domain_name IF NOT EXISTS FOR (d:Domain) REQUIRE d.name IS UNIQUE")
session.run("CREATE INDEX page_title IF NOT EXISTS FOR (p:Page) ON (p.title)")
except Exception as e:
pass # Constraints might already exist
def store_relationships(self, url: str, extracted_data: Dict, dom_structure: Dict):
"""Store page relationships and structure in Neo4j"""
with self.driver.session() as session:
# Create main page node
self._create_page_node(session, url, extracted_data)
# Create domain relationships
self._create_domain_relationships(session, url, extracted_data)
# Create content relationships
self._create_content_relationships(session, url, extracted_data)
# Create link relationships
self._create_link_relationships(session, url, extracted_data["links"])
# Create DOM structure relationships
self._create_dom_relationships(session, url, dom_structure)
def _create_page_node(self, session, url: str, data: Dict):
"""Create or update page node with LLM-friendly properties"""
query = """
MERGE (p:Page {url: $url})
SET p.title = $title,
p.description = $description,
p.domain = $domain,
p.content_type = $content_type,
p.complexity_score = $complexity_score,
p.reading_time = $reading_time,
p.word_count = $word_count,
p.last_scraped = datetime()
"""
session.run(query, {
"url": url,
"title": data["metadata"]["title"],
"description": data["metadata"]["description"],
"domain": data["metadata"]["domain"],
"content_type": self._identify_content_type(data),
"complexity_score": self._calculate_complexity_score(data),
"reading_time": len(data["text_summary"].split()) // 250,
"word_count": len(data["text_summary"].split())
})
def _create_domain_relationships(self, session, url: str, data: Dict):
"""Create domain nodes and relationships"""
domain = data["metadata"]["domain"]
# Create domain node
session.run("""
MERGE (d:Domain {name: $domain})
SET d.last_updated = datetime()
""", {"domain": domain})
# Link page to domain
session.run("""
MATCH (p:Page {url: $url})
MATCH (d:Domain {name: $domain})
MERGE (p)-[:BELONGS_TO]->(d)
""", {"url": url, "domain": domain})
def _create_content_relationships(self, session, url: str, data: Dict):
"""Create content structure relationships for LLM understanding"""
# Create topic nodes from headings
for i, heading in enumerate(data["metadata"]["headings"]):
session.run("""
MATCH (p:Page {url: $url})
MERGE (h:Heading {text: $text, level: $level, page_url: $url})
SET h.position = $position
MERGE (p)-[:HAS_HEADING]->(h)
""", {
"url": url,
"text": heading["text"],
"level": heading["level"],
"position": i
})
# Create content block relationships
for i, block in enumerate(data["content"][:10]): # Limit for performance
session.run("""
MATCH (p:Page {url: $url})
MERGE (c:ContentBlock {text: $text, page_url: $url, position: $position})
SET c.tag = $tag,
c.length = $length
MERGE (p)-[:HAS_CONTENT]->(c)
""", {
"url": url,
"text": block["text"][:500], # Truncate for storage
"tag": block["tag"],
"length": len(block["text"]),
"position": i
})
def _create_link_relationships(self, session, url: str, links: List[Dict]):
"""Create link relationships for navigation understanding"""
for link in links[:20]: # Limit for performance
target_url = link["url"]
link_text = link["text"]
is_internal = link["internal"]
# Create target page node (minimal)
session.run("""
MERGE (target:Page {url: $target_url})
SET target.discovered_via = $source_url
""", {"target_url": target_url, "source_url": url})
# Create relationship
relationship_type = "LINKS_TO_INTERNAL" if is_internal else "LINKS_TO_EXTERNAL"
session.run(f"""
MATCH (source:Page {{url: $source_url}})
MATCH (target:Page {{url: $target_url}})
MERGE (source)-[r:{relationship_type}]->(target)
SET r.link_text = $link_text,
r.is_internal = $is_internal
""", {
"source_url": url,
"target_url": target_url,
"link_text": link_text,
"is_internal": is_internal
})
def _create_dom_relationships(self, session, url: str, dom_structure: Dict):
"""Create DOM structure relationships for content hierarchy"""
# Create semantic structure nodes
semantic_elements = dom_structure["semantic_structure"]["semantic_elements"]
for tag, count in semantic_elements.items():
if count > 0:
session.run("""
MATCH (p:Page {url: $url})
MERGE (s:SemanticElement {tag: $tag, page_url: $url})
SET s.count = $count
MERGE (p)-[:HAS_SEMANTIC_ELEMENT]->(s)
""", {"url": url, "tag": tag, "count": count})
def get_page_relationships(self, url: str) -> Dict:
"""Get all relationships for a page for LLM context"""
with self.driver.session() as session:
result = session.run("""
MATCH (p:Page {url: $url})
OPTIONAL MATCH (p)-[:LINKS_TO_INTERNAL]->(internal:Page)
OPTIONAL MATCH (p)-[:LINKS_TO_EXTERNAL]->(external:Page)
OPTIONAL MATCH (p)-[:HAS_HEADING]->(h:Heading)
RETURN p, collect(DISTINCT internal.url) as internal_links,
collect(DISTINCT external.url) as external_links,
collect(DISTINCT {text: h.text, level: h.level}) as headings
""", {"url": url})
record = result.single()
if record:
return {
"page": dict(record["p"]),
"internal_links": record["internal_links"],
"external_links": record["external_links"],
"headings": record["headings"]
}
return {}
def get_related_pages(self, url: str, limit: int = 5) -> List[Dict]:
"""Find related pages for LLM context and study suggestions"""
with self.driver.session() as session:
result = session.run("""
MATCH (p:Page {url: $url})
MATCH (p)-[:BELONGS_TO]->(d:Domain)
MATCH (related:Page)-[:BELONGS_TO]->(d)
WHERE related.url <> $url
RETURN related.url as url, related.title as title,
related.content_type as content_type,
related.complexity_score as complexity_score
ORDER BY related.complexity_score DESC
LIMIT $limit
""", {"url": url, "limit": limit})
return [dict(record) for record in result]
def _identify_content_type(self, data: Dict) -> str:
"""Identify content type for graph relationships"""
title = data["metadata"]["title"].lower()
if "tutorial" in title or "guide" in title:
return "tutorial"
elif "documentation" in title or "docs" in title:
return "documentation"
elif "blog" in title or "article" in title:
return "article"
return "general"
def _calculate_complexity_score(self, data: Dict) -> float:
"""Calculate complexity score for relationship weighting"""
text_length = len(data["text_summary"])
content_blocks = len(data["content"])
return min(text_length / 1000 + content_blocks / 10, 10.0)
def close(self):
"""Close database connection"""
self.driver.close()