|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import time |
|
|
import uuid |
|
|
from dataclasses import dataclass, field |
|
|
from typing import List, Dict, Any |
|
|
|
|
|
from datetime import datetime |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from fastapi import FastAPI |
|
|
from pydantic import BaseModel |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PHI_43 = 22.93606797749979 |
|
|
SYSTEM_ID = "QUANTARION-HYPERGRAPH-RAG-PROD" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOG_DIR = os.path.join(os.getcwd(), "Logs") |
|
|
os.makedirs(LOG_DIR, exist_ok=True) |
|
|
LOG_PATH = os.path.join(LOG_DIR, ".text") |
|
|
|
|
|
def log_line(msg: str) -> None: |
|
|
ts = datetime.utcnow().isoformat() |
|
|
line = f"[{ts}] [{SYSTEM_ID}] {msg}" |
|
|
print(line) |
|
|
try: |
|
|
with open(LOG_PATH, "a", encoding="utf-8") as f: |
|
|
f.write(line + " |
|
|
") |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Hyperedge: |
|
|
id: str |
|
|
vertices: List[str] |
|
|
weight: float |
|
|
meta: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Hypergraph: |
|
|
vertices: List[str] |
|
|
hyperedges: List[Hyperedge] |
|
|
|
|
|
|
|
|
class QueryRequest(BaseModel): |
|
|
query: str |
|
|
top_k: int = 5 |
|
|
|
|
|
|
|
|
class QueryResponse(BaseModel): |
|
|
query_id: str |
|
|
query: str |
|
|
selected_hyperedges: List[Dict[str, Any]] |
|
|
answer: str |
|
|
phi43_check: float |
|
|
latency_ms: float |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HypergraphRAGEngine: |
|
|
""" |
|
|
Production-grade Hypergraph RAG: |
|
|
- Embeddings via SentenceTransformer |
|
|
- Hyperedges = n-ary concept relations |
|
|
- Retrieval = minimal hyperedge cover approximation |
|
|
- φ⁴³ used as a numeric regularizer for scoring/stability |
|
|
""" |
|
|
|
|
|
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): |
|
|
log_line("Initializing HypergraphRAGEngine…") |
|
|
self.model_name = model_name |
|
|
self.embedder = SentenceTransformer(model_name) |
|
|
self.hypergraph: Hypergraph = Hypergraph(vertices=[], hyperedges=[]) |
|
|
self.vertex_embeddings: Dict[str, np.ndarray] = {} |
|
|
self.ready = False |
|
|
|
|
|
|
|
|
|
|
|
def build_from_documents(self, docs: List[Dict[str, Any]]) -> None: |
|
|
""" |
|
|
docs: list of {"id": str, "text": str, "entities": [str,...]} |
|
|
entities = extracted or annotated concept ids/names. |
|
|
""" |
|
|
log_line(f"Building hypergraph from {len(docs)} documents…") |
|
|
|
|
|
vertices_set = set() |
|
|
hyperedges: List[Hyperedge] = [] |
|
|
|
|
|
|
|
|
for d in docs: |
|
|
for ent in d.get("entities", []): |
|
|
vertices_set.add(ent) |
|
|
|
|
|
vertices = sorted(list(vertices_set)) |
|
|
|
|
|
|
|
|
if vertices: |
|
|
log_line(f"Embedding {len(vertices)} vertices…") |
|
|
embs = self.embedder.encode(vertices, normalize_embeddings=True) |
|
|
self.vertex_embeddings = { |
|
|
v: embs[i] for i, v in enumerate(vertices) |
|
|
} |
|
|
|
|
|
|
|
|
for d in docs: |
|
|
ents = list(set(d.get("entities", []))) |
|
|
if len(ents) < 2: |
|
|
continue |
|
|
|
|
|
he_id = str(uuid.uuid4()) |
|
|
he = Hyperedge( |
|
|
id=he_id, |
|
|
vertices=ents, |
|
|
weight=1.0, |
|
|
meta={ |
|
|
"doc_id": d["id"], |
|
|
"text": d["text"], |
|
|
}, |
|
|
) |
|
|
hyperedges.append(he) |
|
|
|
|
|
self.hypergraph = Hypergraph(vertices=vertices, hyperedges=hyperedges) |
|
|
self.ready = True |
|
|
log_line( |
|
|
f"Hypergraph built: |V|={len(self.hypergraph.vertices)}, |E|={len(self.hypergraph.hyperedges)}" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def _query_embedding(self, query: str) -> np.ndarray: |
|
|
return self.embedder.encode([query], normalize_embeddings=True)[0] |
|
|
|
|
|
def _hyperedge_score(self, query_emb: np.ndarray, he: Hyperedge) -> float: |
|
|
|
|
|
sims = [] |
|
|
for v in he.vertices: |
|
|
ve = self.vertex_embeddings.get(v) |
|
|
if ve is not None: |
|
|
sims.append(float(np.dot(query_emb, ve))) |
|
|
if not sims: |
|
|
base = 0.0 |
|
|
else: |
|
|
base = float(np.mean(sims)) |
|
|
|
|
|
reg = (base + 1.0) / 2.0 |
|
|
return float(base + 0.01 * (PHI_43 / 23.0) * reg) |
|
|
|
|
|
def retrieve_hyperedges(self, query: str, top_k: int = 5) -> List[Hyperedge]: |
|
|
if not self.ready or not self.hypergraph.hyperedges: |
|
|
return [] |
|
|
|
|
|
q_emb = self._query_embedding(query) |
|
|
scored = [] |
|
|
for he in self.hypergraph.hyperedges: |
|
|
s = self._hyperedge_score(q_emb, he) |
|
|
scored.append((s, he)) |
|
|
|
|
|
scored.sort(key=lambda x: x[0], reverse=True) |
|
|
return [he for _, he in scored[:top_k]] |
|
|
|
|
|
|
|
|
|
|
|
def generate_answer(self, query: str, hyperedges: List[Hyperedge]) -> str: |
|
|
""" |
|
|
In production, this would call QVNN/LLM with retrieved context. |
|
|
Here we produce a concise, deterministic executive-style answer. |
|
|
""" |
|
|
if not hyperedges: |
|
|
return ( |
|
|
"No sufficient hypergraph context was found for this query in the " |
|
|
"current Quantarion Hypergraph-RAG index." |
|
|
) |
|
|
|
|
|
docs = [he.meta.get("text", "") for he in hyperedges] |
|
|
docs = [d for d in docs if d.strip()] |
|
|
snippet = " ".join(docs)[:800] |
|
|
|
|
|
return ( |
|
|
"Executive hypergraph-grounded summary: |
|
|
" |
|
|
f"- Query: {query} |
|
|
" |
|
|
f"- Top hyperedges: {len(hyperedges)} |
|
|
" |
|
|
f"- Condensed context: {snippet} |
|
|
" |
|
|
"This answer is generated by selecting a minimal set of " |
|
|
"multi-entity hyperedges that best align with the query, " |
|
|
"using φ⁴³-regularized similarity scoring." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def phi43_check(self, hyperedges: List[Hyperedge]) -> float: |
|
|
""" |
|
|
Simple φ-check: scale count of hyperedges into [0,1] vs PHI_43. |
|
|
""" |
|
|
if not hyperedges: |
|
|
return 0.0 |
|
|
val = len(hyperedges) / PHI_43 |
|
|
return float(max(0.0, min(1.0, val))) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI(title="Quantarion Hypergraph-RAG Production API") |
|
|
|
|
|
engine = HypergraphRAGEngine() |
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
def _startup(): |
|
|
|
|
|
log_line("Startup: building demo hypergraph index…") |
|
|
demo_docs = [ |
|
|
{ |
|
|
"id": "doc1", |
|
|
"text": "Neuromorphic SNNs provide event-driven, low-power computation.", |
|
|
"entities": ["neuromorphic", "SNN", "event-driven"], |
|
|
}, |
|
|
{ |
|
|
"id": "doc2", |
|
|
"text": "Hypergraph RAG uses hyperedges to capture multi-entity relations.", |
|
|
"entities": ["hypergraph", "RAG", "multi-entity"], |
|
|
}, |
|
|
{ |
|
|
"id": "doc3", |
|
|
"text": "Hybrid retrieval combines dense, sparse, and graph-based signals.", |
|
|
"entities": ["hybrid retrieval", "dense", "sparse", "graph"], |
|
|
}, |
|
|
] |
|
|
engine.build_from_documents(demo_docs) |
|
|
log_line("Startup: Hypergraph-RAG demo index ready.") |
|
|
|
|
|
|
|
|
@app.post("/query", response_model=QueryResponse) |
|
|
def query_hypergraph_rag(req: QueryRequest): |
|
|
t0 = time.time() |
|
|
qid = str(uuid.uuid4()) |
|
|
log_line(f"QUERY {qid} | {req.query}") |
|
|
|
|
|
selected = engine.retrieve_hyperedges(req.query, top_k=req.top_k) |
|
|
answer = engine.generate_answer(req.query, selected) |
|
|
phi_val = engine.phi43_check(selected) |
|
|
latency = (time.time() - t0) * 1000.0 |
|
|
|
|
|
log_line( |
|
|
f"QUERY {qid} | hyperedges={len(selected)} | phi43_check={phi_val:.3f} | latency_ms={latency:.1f}" |
|
|
) |
|
|
|
|
|
return QueryResponse( |
|
|
query_id=qid, |
|
|
query=req.query, |
|
|
selected_hyperedges=[ |
|
|
{ |
|
|
"id": he.id, |
|
|
"vertices": he.vertices, |
|
|
"weight": he.weight, |
|
|
"meta": he.meta, |
|
|
} |
|
|
for he in selected |
|
|
], |
|
|
answer=answer, |
|
|
phi43_check=phi_val, |
|
|
latency_ms=latency, |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
log_line("Starting Quantarion Hypergraph-RAG Production server on 0.0.0.0:8000…") |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |