Spaces:
Running
Running
| import os | |
| import sys | |
| import json | |
| import traceback | |
| SENTI_DIR = r"c:\Users\LENOVO\Desktop\senti_ai\senti" | |
| sys.path.insert(0, SENTI_DIR) | |
| from backend.database.postgres.db import SessionLocal | |
| from core.pipeline.router import SentiRouter | |
| # We will write a custom version of SentiRouter.process that prints before every single step | |
| def debug_process(self, query, parsed_intent, parsed_data, flavor="BUSINESS", country_code="KE", conversation_id=None): | |
| import uuid | |
| import time | |
| from datetime import datetime | |
| pipeline_id = str(uuid.uuid4()) | |
| start_time = time.time() | |
| self._trace = [] | |
| print("[DEBUG] Step 1: Safety Check...") | |
| from core.brain.safety.empathy_interceptor import empathy_interceptor | |
| safety = empathy_interceptor.check(query) | |
| print(f"[DEBUG] Step 1 done. Safety: {safety}") | |
| if safety.get("intercepted"): | |
| return self._safety_response(safety, start_time) | |
| print("[DEBUG] Step 2: Intent Classification...") | |
| from core.brain.routing.intent_classifier import intent_classifier | |
| classified = intent_classifier.classify(query) | |
| intent = parsed_intent if parsed_intent != "GENERAL_QUERY" else classified["intent"] | |
| data = parsed_data or classified.get("data", {}) | |
| print(f"[DEBUG] Step 2 done. Intent: {intent}") | |
| print("[DEBUG] Step 3: Tier Assignment...") | |
| from core.brain.moe.router import moe_router, Expert | |
| from core.brain.intent.ba_detector import ba_detector | |
| is_ba = ba_detector.is_ba_query(query, intent) | |
| from core.brain.routing.tier_router import tier_router, Tier | |
| tier = tier_router.assign_tier(intent=intent, query=query, is_ba_mode=is_ba) | |
| tier_config = tier_router.get_tier_config(tier) | |
| print(f"[DEBUG] Step 3 done. Tier: {tier.value}") | |
| print("[DEBUG] Step 4: CAG Packing...") | |
| cag_context = "" | |
| if tier_config["use_cag"]: | |
| from core.brain.memory.cag import cag | |
| cag_context = cag.pack(user_hash=self.user_hash, db=self.db) | |
| print(f"[DEBUG] Step 4 done. CAG Context length: {len(cag_context)}") | |
| print("[DEBUG] Step 5: Memory Injection...") | |
| enriched_data, injected_fields = self.memory_agent.inject_missing_fields(data) | |
| print(f"[DEBUG] Step 5 done. Enriched: {enriched_data}") | |
| print("[DEBUG] Step 6: MoE Routing...") | |
| experts = moe_router.route(intent, query) | |
| expert_suffix = moe_router.get_system_prompt_suffix(experts) | |
| print(f"[DEBUG] Step 6 done. Experts: {[e.value for e in experts]}") | |
| print("[DEBUG] Step 7: Compute Engine...") | |
| from core.pipeline.compute import compute_engine | |
| compute_result = compute_engine.compute(intent, enriched_data, self.user_hash) | |
| print(f"[DEBUG] Step 7 done. Computed: {compute_result.get('computed')}") | |
| print("[DEBUG] Step 8: KAG reasoning...") | |
| kag_context = "" | |
| if tier_config["use_kag"]: | |
| from core.brain.knowledge_graph.reasoner import kag_reasoner | |
| kag_result = kag_reasoner.reason(query, {"intent": intent}) | |
| kag_context = kag_result.get("kg_context", "") | |
| print(f"[DEBUG] Step 8 done.") | |
| print("[DEBUG] Step 9: RAG retrieval...") | |
| rag_results = [] | |
| if tier_config["use_rag"]: | |
| from core.brain.retrieval.rag import rag_retriever | |
| if rag_retriever.should_retrieve(intent, query): | |
| rag_results = rag_retriever.retrieve(query=query, intent=intent, limit=3) | |
| print(f"[DEBUG] Step 9 done. RAG count: {len(rag_results)}") | |
| print("[DEBUG] Step 10: Inference level determination...") | |
| from core.brain.inference.engine import inference_engine | |
| inference_level = inference_engine.determine_level(intent, query, compute_result) | |
| print(f"[DEBUG] Step 10 done. Level: {inference_level}") | |
| print("[DEBUG] Step 10.5: Markets context...") | |
| markets_context = "" | |
| if Expert.MARKETS in experts: | |
| print("[DEBUG] Markets expert active.") | |
| print("[DEBUG] Step 10.5 done.") | |
| print("[DEBUG] Step 10.6: Superpacks call...") | |
| superpacks_context = "" | |
| superpack_data = {} | |
| import asyncio | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| async def get_superpack_data(): | |
| print("[DEBUG] Inside get_superpack_data async function...") | |
| res = {} | |
| try: | |
| from core.engines.superpacks import superpacks_client | |
| if Expert.COMPLIANCE in experts or "compliance" in query.lower() or "law" in query.lower() or "tax" in query.lower() or "kra" in query.lower(): | |
| print("[DEBUG] Calling Law Compliance superpack...") | |
| res["compliance_model"] = await superpacks_client.get_law_compliance(query) | |
| print("[DEBUG] Law Compliance superpack call done.") | |
| except Exception as ex: | |
| print(f"[DEBUG] get_superpack_data Exception: {ex}") | |
| res["error"] = str(ex) | |
| return res | |
| try: | |
| loop = asyncio.get_event_loop() | |
| print(f"[DEBUG] Got event loop: {loop}. Running until complete...") | |
| superpack_data = loop.run_until_complete(get_superpack_data()) | |
| print(f"[DEBUG] run_until_complete done. Data: {superpack_data}") | |
| except Exception as e: | |
| print(f"[DEBUG] Loop run Exception: {e}") | |
| print("[DEBUG] Step 10.6 done.") | |
| print("[DEBUG] Step 11: Formatting response...") | |
| # Skip actual formatting to check if it's safe | |
| print("[DEBUG] Step 11 done.") | |
| return {"status": "ok"} | |
| # Monkeypatch | |
| SentiRouter.process = debug_process | |
| if __name__ == "__main__": | |
| db = SessionLocal() | |
| router = SentiRouter(db=db, user_hash="test_user_hash_123") | |
| try: | |
| router.process("I want to avoid KRA tax penalty rules.", "TAX_COMPLIANCE", {}) | |
| print("SUCCESS: process completed successfully") | |
| except BaseException as e: | |
| print(f"CRASH: {type(e)}") | |
| traceback.print_exc() | |
| db.close() | |