Spaces:
Running
Running
File size: 5,876 Bytes
021e065 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import os
import sys
import json
import traceback
SENTI_DIR = r"c:\Users\LENOVO\Desktop\senti_ai\senti"
sys.path.insert(0, SENTI_DIR)
from backend.database.postgres.db import SessionLocal
from core.pipeline.router import SentiRouter
# We will write a custom version of SentiRouter.process that prints before every single step
def debug_process(self, query, parsed_intent, parsed_data, flavor="BUSINESS", country_code="KE", conversation_id=None):
import uuid
import time
from datetime import datetime
pipeline_id = str(uuid.uuid4())
start_time = time.time()
self._trace = []
print("[DEBUG] Step 1: Safety Check...")
from core.brain.safety.empathy_interceptor import empathy_interceptor
safety = empathy_interceptor.check(query)
print(f"[DEBUG] Step 1 done. Safety: {safety}")
if safety.get("intercepted"):
return self._safety_response(safety, start_time)
print("[DEBUG] Step 2: Intent Classification...")
from core.brain.routing.intent_classifier import intent_classifier
classified = intent_classifier.classify(query)
intent = parsed_intent if parsed_intent != "GENERAL_QUERY" else classified["intent"]
data = parsed_data or classified.get("data", {})
print(f"[DEBUG] Step 2 done. Intent: {intent}")
print("[DEBUG] Step 3: Tier Assignment...")
from core.brain.moe.router import moe_router, Expert
from core.brain.intent.ba_detector import ba_detector
is_ba = ba_detector.is_ba_query(query, intent)
from core.brain.routing.tier_router import tier_router, Tier
tier = tier_router.assign_tier(intent=intent, query=query, is_ba_mode=is_ba)
tier_config = tier_router.get_tier_config(tier)
print(f"[DEBUG] Step 3 done. Tier: {tier.value}")
print("[DEBUG] Step 4: CAG Packing...")
cag_context = ""
if tier_config["use_cag"]:
from core.brain.memory.cag import cag
cag_context = cag.pack(user_hash=self.user_hash, db=self.db)
print(f"[DEBUG] Step 4 done. CAG Context length: {len(cag_context)}")
print("[DEBUG] Step 5: Memory Injection...")
enriched_data, injected_fields = self.memory_agent.inject_missing_fields(data)
print(f"[DEBUG] Step 5 done. Enriched: {enriched_data}")
print("[DEBUG] Step 6: MoE Routing...")
experts = moe_router.route(intent, query)
expert_suffix = moe_router.get_system_prompt_suffix(experts)
print(f"[DEBUG] Step 6 done. Experts: {[e.value for e in experts]}")
print("[DEBUG] Step 7: Compute Engine...")
from core.pipeline.compute import compute_engine
compute_result = compute_engine.compute(intent, enriched_data, self.user_hash)
print(f"[DEBUG] Step 7 done. Computed: {compute_result.get('computed')}")
print("[DEBUG] Step 8: KAG reasoning...")
kag_context = ""
if tier_config["use_kag"]:
from core.brain.knowledge_graph.reasoner import kag_reasoner
kag_result = kag_reasoner.reason(query, {"intent": intent})
kag_context = kag_result.get("kg_context", "")
print(f"[DEBUG] Step 8 done.")
print("[DEBUG] Step 9: RAG retrieval...")
rag_results = []
if tier_config["use_rag"]:
from core.brain.retrieval.rag import rag_retriever
if rag_retriever.should_retrieve(intent, query):
rag_results = rag_retriever.retrieve(query=query, intent=intent, limit=3)
print(f"[DEBUG] Step 9 done. RAG count: {len(rag_results)}")
print("[DEBUG] Step 10: Inference level determination...")
from core.brain.inference.engine import inference_engine
inference_level = inference_engine.determine_level(intent, query, compute_result)
print(f"[DEBUG] Step 10 done. Level: {inference_level}")
print("[DEBUG] Step 10.5: Markets context...")
markets_context = ""
if Expert.MARKETS in experts:
print("[DEBUG] Markets expert active.")
print("[DEBUG] Step 10.5 done.")
print("[DEBUG] Step 10.6: Superpacks call...")
superpacks_context = ""
superpack_data = {}
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def get_superpack_data():
print("[DEBUG] Inside get_superpack_data async function...")
res = {}
try:
from core.engines.superpacks import superpacks_client
if Expert.COMPLIANCE in experts or "compliance" in query.lower() or "law" in query.lower() or "tax" in query.lower() or "kra" in query.lower():
print("[DEBUG] Calling Law Compliance superpack...")
res["compliance_model"] = await superpacks_client.get_law_compliance(query)
print("[DEBUG] Law Compliance superpack call done.")
except Exception as ex:
print(f"[DEBUG] get_superpack_data Exception: {ex}")
res["error"] = str(ex)
return res
try:
loop = asyncio.get_event_loop()
print(f"[DEBUG] Got event loop: {loop}. Running until complete...")
superpack_data = loop.run_until_complete(get_superpack_data())
print(f"[DEBUG] run_until_complete done. Data: {superpack_data}")
except Exception as e:
print(f"[DEBUG] Loop run Exception: {e}")
print("[DEBUG] Step 10.6 done.")
print("[DEBUG] Step 11: Formatting response...")
# Skip actual formatting to check if it's safe
print("[DEBUG] Step 11 done.")
return {"status": "ok"}
# Monkeypatch
SentiRouter.process = debug_process
if __name__ == "__main__":
db = SessionLocal()
router = SentiRouter(db=db, user_hash="test_user_hash_123")
try:
router.process("I want to avoid KRA tax penalty rules.", "TAX_COMPLIANCE", {})
print("SUCCESS: process completed successfully")
except BaseException as e:
print(f"CRASH: {type(e)}")
traceback.print_exc()
db.close()
|