Spaces:
Running
Running
File size: 4,441 Bytes
ab88576 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
"""Neo4j Knowledge Graph Service for Drug Repurposing"""
from neo4j import GraphDatabase
from typing import List, Dict, Optional, Any
import os
from dotenv import load_dotenv
import logging
load_dotenv()
logger = logging.getLogger(__name__)
class Neo4jService:
def __init__(self):
self.uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
self.user = os.getenv("NEO4J_USER", "neo4j")
self.password = os.getenv("NEO4J_PASSWORD")
self.database = os.getenv("NEO4J_DATABASE", "neo4j")
if not self.password:
logger.warning("⚠️ NEO4J_PASSWORD not set")
self.driver = None
return
try:
self.driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password))
self.driver.verify_connectivity()
logger.info(f"✅ Neo4j connected: {self.uri} (db: {self.database})")
except Exception as e:
logger.error(f"❌ Neo4j connection failed: {e}")
self.driver = None
def is_connected(self) -> bool:
return self.driver is not None
def close(self):
if self.driver:
self.driver.close()
def ingest_search_results(self, disease_name: str, papers: List[Dict[str, Any]],
drugs_mentioned: List[str] = None) -> Dict[str, int]:
if not self.driver:
return {"error": "Neo4j not connected"}
stats = {"papers": 0, "drugs": 0, "relationships": 0, "errors": 0}
try:
with self.driver.session(database=self.database) as session:
session.run("MERGE (d:Disease {name: $name})", name=disease_name)
for paper in papers:
try:
paper_id = paper.get('id') or paper.get('url', '')
if not paper_id:
continue
session.run("""
MERGE (p:Paper {paper_id: $id})
SET p.title = $title,
p.abstract = $abstract,
p.url = $url,
p.source = $source,
p.updated_at = datetime()
""",
id=paper_id,
title=str(paper.get('title', ''))[:500],
abstract=str(paper.get('abstract', ''))[:2000],
url=str(paper.get('url', ''))[:500],
source=str(paper.get('source', ''))[:100])
session.run("""
MATCH (p:Paper {paper_id: $id})
MATCH (d:Disease {name: $disease})
MERGE (p)-[r:ABOUT]->(d)
""", id=paper_id, disease=disease_name)
stats['papers'] += 1
stats['relationships'] += 1
except Exception as e:
stats['errors'] += 1
if drugs_mentioned:
for drug in drugs_mentioned:
try:
session.run("MERGE (d:Drug {name: $name})", name=drug)
session.run("""
MATCH (drug:Drug {name: $drug})
MATCH (disease:Disease {name: $disease})
MERGE (drug)-[r:POTENTIAL_TREATMENT]->(disease)
""", drug=drug, disease=disease_name)
stats['drugs'] += 1
stats['relationships'] += 1
except Exception as e:
stats['errors'] += 1
logger.info(f"�� Neo4j ingestion: {stats['papers']} papers, {stats['drugs']} drugs")
except Exception as e:
logger.error(f"Neo4j ingestion error: {e}")
stats['errors'] += 1
return stats
_neo4j_service = None
def get_neo4j_service() -> Optional[Neo4jService]:
global _neo4j_service
if _neo4j_service is None:
_neo4j_service = Neo4jService()
return _neo4j_service if _neo4j_service and _neo4j_service.is_connected() else None
|