File size: 6,588 Bytes
bd91918 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | import numpy as np
from typing import List, Dict, Optional
from rdflib import Graph, Namespace, RDF, Literal, XSD, URIRef
from scripts.text_processor import TextProcessor
from scripts.vector_store import VectorStore
from openvino_genai import LLMPipeline
class RAGEngine:
def __init__(self, model_path: str = "/app/model/phi-3-openvino", device: str = "CPU", max_tokens: int = 512):
self.llm = LLMPipeline(model_path, device=device)
self.max_tokens = max_tokens
self.text_processor = TextProcessor()
self.EX = Namespace("http://example.org/")
def build_knowledge_graph(self, chunks: List[str]) -> Graph:
"""
Build a knowledge graph from chunks.
Each chunk is a node. Adjacent chunks are linked as neighbors
so graph traversal can pull in surrounding context.
"""
graph = Graph()
graph.bind("ex", self.EX)
for idx, chunk in enumerate(chunks):
chunk_node = self.EX[f"Chunk_{idx}"]
# Node type and index
graph.add((chunk_node, RDF.type, self.EX.DocumentChunk))
graph.add((chunk_node, self.EX.chunkIndex, Literal(idx, datatype=XSD.integer)))
# Store full text as literal so we can retrieve it from graph
graph.add((chunk_node, self.EX.hasText, Literal(chunk)))
# Store a short preview
preview = chunk[:100].replace("\n", " ")
graph.add((chunk_node, self.EX.hasPreview, Literal(preview)))
# Link to previous chunk (context window neighbor)
if idx > 0:
prev_node = self.EX[f"Chunk_{idx - 1}"]
graph.add((chunk_node, self.EX.previousChunk, prev_node))
graph.add((prev_node, self.EX.nextChunk, chunk_node))
print(f"[KnowledgeGraph] Built graph with {len(chunks)} nodes, {len(graph)} triples.")
return graph
def get_graph_neighbors(self, graph: Graph, chunk_indices: List[int], chunks: List[str]) -> List[int]:
"""
Given a list of retrieved chunk indices, use the knowledge graph
to also pull in their immediate neighbors (prev + next chunks).
Returns expanded sorted list of unique indices.
"""
expanded = set(chunk_indices)
for idx in chunk_indices:
chunk_node = self.EX[f"Chunk_{idx}"]
# Get next neighbor
for neighbor in graph.objects(chunk_node, self.EX.nextChunk):
neighbor_idx = self._node_to_index(graph, neighbor)
if neighbor_idx is not None and neighbor_idx < len(chunks):
expanded.add(neighbor_idx)
# Get previous neighbor
for neighbor in graph.objects(chunk_node, self.EX.previousChunk):
neighbor_idx = self._node_to_index(graph, neighbor)
if neighbor_idx is not None and neighbor_idx >= 0:
expanded.add(neighbor_idx)
# Return sorted so context is in document order
return sorted(expanded)
def _node_to_index(self, graph: Graph, node: URIRef) -> Optional[int]:
"""Extract chunk index integer from graph node."""
for idx_val in graph.objects(node, self.EX.chunkIndex):
try:
return int(idx_val)
except (ValueError, TypeError):
return None
return None
def retrieve_relevant_docs(
self,
question: str,
chunks: List[str],
vector_store: VectorStore,
graph: Graph,
k: int = 3,
) -> List[str]:
"""
Two-stage retrieval:
1. Vector similarity search → top-k chunk indices
2. Knowledge graph expansion → pull in neighboring chunks for richer context
"""
if not chunks:
return []
# Stage 1: vector search
query_embedding = self.text_processor.get_embeddings([question])
distances, indices = vector_store.search(query_embedding, k)
top_k_indices = indices[0].tolist()
print(f"[VectorSearch] Top-k indices: {top_k_indices}, distances: {distances}")
# Stage 2: graph neighbor expansion
expanded_indices = self.get_graph_neighbors(graph, top_k_indices, chunks)
print(f"[GraphExpansion] Expanded indices: {expanded_indices}")
relevant_docs = [chunks[idx] for idx in expanded_indices]
return relevant_docs
def generate_answer(self, question: str, context: List[str]) -> str:
context_text = "\n\n".join([f"[Context {i+1}]:\n{ctx}" for i, ctx in enumerate(context)])
prompt = f"""You are a helpful document question-answering assistant.
Instructions:
1. Answer the question based ONLY on the provided context
2. Provide clear, well-structured answers
3. If listing items (contacts, requirements, dates), include ALL relevant items from the context
4. Do not include partial lists - if you start a list, complete it
5. Use proper formatting: bullet points for lists, clear sections for different topics
6. If the answer cannot be found in the context, say: "I cannot find the answer in the provided documents."
7. Do not add disclaimers about "limited information" if you can answer from the context
8. Be concise but complete - include all relevant details
Context:
{context_text}
Question: {question}
Answer:"""
result = self.llm.generate(prompt, max_new_tokens=self.max_tokens)
if isinstance(result, str):
return result
elif hasattr(result, 'texts'):
return result.texts[0]
elif isinstance(result, list) and len(result) > 0:
if isinstance(result[0], dict) and 'generated_text' in result[0]:
return result[0]['generated_text']
return str(result[0])
else:
return str(result)
def query(
self,
question: str,
chunks: List[str],
vector_store: VectorStore,
graph: Graph,
k: int = 3,
) -> Dict:
"""
Full RAG pipeline:
vector search → graph expansion → LLM answer generation
"""
relevant_docs = self.retrieve_relevant_docs(
question=question,
chunks=chunks,
vector_store=vector_store,
graph=graph,
k=k,
)
answer = self.generate_answer(question, relevant_docs)
return {
"answer": answer,
"sources": relevant_docs,
"num_sources": len(relevant_docs),
} |