import threading from concurrent.futures import ThreadPoolExecutor import hashlib import html import json import os import re import time import warnings from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import requests from dotenv import load_dotenv from langchain_cohere import CohereRerank from langchain_community.retrievers import BM25Retriever from langchain_community.vectorstores import Chroma from langchain_core.documents import Document from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.tools import tool from langchain_openai import ChatOpenAI, OpenAIEmbeddings # ========================================================= # CONFIG # ========================================================= load_dotenv() if os.getenv("COHERE_API_KEY"): os.environ["CO_API_KEY"] = os.getenv("COHERE_API_KEY", "") BASE_DIR = Path(__file__).resolve().parent.parent DB_DIR = BASE_DIR / "db" STATIC_DIR = BASE_DIR / "data" / "static" ALLOWED_NOTIFY_CATEGORIES = {"LEAD", "URGENT", "UNKNOWN_QUESTION"} MAX_SEARCH_RESULTS = 5 MAX_TOOL_ROUNDS = 4 MAX_HISTORY_MESSAGES = 8 NOTIFICATION_COOLDOWN_SECONDS = 300 TELEGRAM_MESSAGE_CHAR_LIMIT = 3500 # ========================================================= # GLOBAL STATE # ========================================================= _GLOBAL_VECTORSTORE = None _GLOBAL_BM25 = None _GLOBAL_COMPRESSOR = None _BACKGROUND_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="aruncore-bg") _RECENT_ALERTS: Dict[str, float] = {} # ========================================================= # HELPERS # ========================================================= def _utc_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") def _safe_truncate(text: str, limit: int = 1500) -> str: text = (text or "").strip() if len(text) <= limit: return text return text[: limit - 3] + "..." def _parse_json_metadata(raw: str) -> Dict[str, Any]: if not raw: return {} try: parsed = json.loads(raw) return parsed if isinstance(parsed, dict) else {"value": parsed} except Exception: return {"raw": raw} def _alert_key(category: str, user_input: str) -> str: digest = hashlib.sha256( f"{category}:{user_input.strip().lower()}".encode("utf-8") ).hexdigest() return digest def _should_send_alert(category: str, user_input: str) -> bool: key = _alert_key(category, user_input) now = time.time() last_seen = _RECENT_ALERTS.get(key) if last_seen and (now - last_seen) < NOTIFICATION_COOLDOWN_SECONDS: return False return True def _mark_alert_sent(category: str, user_input: str) -> None: _RECENT_ALERTS[_alert_key(category, user_input)] = time.time() def _escape_html(text: str) -> str: return html.escape(text or "") def _is_truthy_env(value: Optional[str], default: bool = False) -> bool: if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} def _telegram_debug_enabled() -> bool: return _is_truthy_env(os.getenv("TELEGRAM_DEBUG_ENABLED"), default=True) def _get_telegram_target(debug: bool = False) -> Tuple[Optional[str], Optional[str]]: if debug: token = os.getenv("TELEGRAM_DEBUG_BOT_TOKEN") or os.getenv("TELEGRAM_BOT_TOKEN") chat_id = os.getenv("TELEGRAM_DEBUG_CHAT_ID") or os.getenv("TELEGRAM_CHAT_ID") return token, chat_id return os.getenv("TELEGRAM_BOT_TOKEN"), os.getenv("TELEGRAM_CHAT_ID") def _chunk_text(text: str, limit: int = 2200) -> List[str]: cleaned = (text or "").strip() or "(empty)" parts: List[str] = [] remaining = cleaned while len(remaining) > limit: split_at = remaining.rfind("\n", 0, limit) if split_at < int(limit * 0.5): split_at = remaining.rfind(" ", 0, limit) if split_at <= 0: split_at = limit parts.append(remaining[:split_at].strip()) remaining = remaining[split_at:].lstrip() if remaining: parts.append(remaining) return parts or ["(empty)"] def _send_telegram_message( token: str, chat_id: str, text: str, parse_mode: str = "HTML", max_attempts: int = 3, delivery_label: str = "default", ) -> str: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) payload = { "chat_id": chat_id, "text": text, "parse_mode": parse_mode, "disable_web_page_preview": True, } last_error = "Unknown error" # Attempt 1: Standard domain request (works locally on Windows) try: url = f"https://api.telegram.org/bot{token}/sendMessage" response = requests.post(url, json=payload, timeout=10) if response.status_code == 200: print(f"[TELEGRAM:{delivery_label}] success via domain") return "SUCCESS" last_error = f"Domain API returned {response.status_code}" except Exception as e: last_error = f"Domain request failed: {e}" print(f"[TELEGRAM:{delivery_label}] Domain method failed ({last_error}). Using IP bypass...") # Attempt 2: Direct IPs to bypass DNS & SNI blocks (for HuggingFace) telegram_ips = ["149.154.167.220", "149.154.166.120", "149.154.165.120"] for ip in telegram_ips: url_ip = f"https://{ip}/bot{token}/sendMessage" for attempt in range(max_attempts): try: response = requests.post( url_ip, json=payload, headers={"Host": "api.telegram.org"}, verify=False, timeout=15 ) if response.status_code == 200: print(f"[TELEGRAM:{delivery_label}] success via IP {ip}") return "SUCCESS" last_error = f"IP {ip} returned {response.status_code}" except Exception as e: last_error = f"IP {ip} failed: {e}" time.sleep(1.5) print(f"[TELEGRAM ERROR:{delivery_label}] All methods failed. Last error: {last_error}") return f"FAILED: {last_error}" def _send_telegram_message_fast( token: str, chat_id: str, text: str, parse_mode: str = "HTML", ) -> str: return _send_telegram_message( token=token, chat_id=chat_id, text=text, parse_mode=parse_mode, max_attempts=1, retry_sleep_seconds=0.0, delivery_label="fast", ) def _send_telegram_message_retrying( token: str, chat_id: str, text: str, parse_mode: str = "HTML", ) -> str: return _send_telegram_message( token=token, chat_id=chat_id, text=text, parse_mode=parse_mode, max_attempts=3, retry_sleep_seconds=1.0, delivery_label="retry", ) def send_debug_event( event_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, ) -> str: if not _telegram_debug_enabled(): return "SKIPPED: debug stream disabled." token, chat_id = _get_telegram_target(debug=True) if not token or not chat_id: return "SKIPPED: Telegram debug credentials are missing." header_lines = [ "ArunCore Debug", f"Type: {_escape_html(event_type)}", f"Time: {_escape_html(_utc_now())}", ] metadata = metadata or {} for key, value in metadata.items(): if value is None: continue header_lines.append(f"{_escape_html(str(key))}: {_escape_html(str(value))}") content_chunks = _chunk_text(content, limit=2200) for index, chunk in enumerate(content_chunks, start=1): lines = list(header_lines) if len(content_chunks) > 1: lines.append(f"Part: {index}/{len(content_chunks)}") lines.extend(["", "Content", _escape_html(chunk)]) result = _send_telegram_message( token=token, chat_id=chat_id, text="\n".join(lines)[:TELEGRAM_MESSAGE_CHAR_LIMIT], delivery_label="debug", ) if not result.startswith("SUCCESS"): return result return "SUCCESS: debug event sent." def _run_background_task(task_name: str, func, *args, **kwargs): try: result = func(*args, **kwargs) if isinstance(result, str): print(f"[BACKGROUND] {task_name}: {result}") return result except Exception as e: print(f"[BACKGROUND ERROR] {task_name}: {e}") return None def _submit_background_task(task_name: str, func, *args, **kwargs) -> bool: try: t = threading.Thread( target=_run_background_task, args=(task_name, func) + args, kwargs=kwargs, daemon=True ) t.start() return True except Exception as e: print(f"[BACKGROUND ERROR] Failed to submit {task_name}: {e}") return False def queue_debug_event( event_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, ) -> str: if not _telegram_debug_enabled(): return "SKIPPED: debug stream disabled." if _submit_background_task("debug_event", send_debug_event, event_type, content, metadata): return "QUEUED: debug event scheduled." return "FAILED: could not queue debug event." def send_chat_history_to_telegram( session_id: str, user_input: str, assistant_response: str, ) -> str: token, chat_id = _get_telegram_target(debug=False) if not token or not chat_id: return "FAILED: Telegram credentials are missing." text = ( f"๐Ÿ“ ArunCore Chat Log ๐Ÿ“\n\n" f"Session: {_escape_html(session_id)}\n" f"Time: {_escape_html(_utc_now())}\n\n" f"User:\n{_escape_html(_safe_truncate(user_input, 1500))}\n\n" f"ArunCore:\n{_escape_html(_safe_truncate(assistant_response, 1500))}" ) return _send_telegram_message( token=token, chat_id=chat_id, text=text, delivery_label="chat_log", ) def queue_chat_history_to_telegram( session_id: str, user_input: str, assistant_response: str, ) -> str: token, chat_id = _get_telegram_target(debug=False) if not token or not chat_id: return "FAILED: Telegram credentials are missing from the environment." if _submit_background_task("chat_history_log", send_chat_history_to_telegram, session_id, user_input, assistant_response): return "QUEUED: chat history scheduled." return "FAILED: could not queue chat history." def _build_notification_metadata( reason: str, user_metadata: Optional[Dict[str, Any]] = None, assistant_output: Optional[str] = None, ) -> Dict[str, Any]: metadata: Dict[str, Any] = { "reason": reason, "timestamp": _utc_now(), } if assistant_output: metadata["assistant_output"] = _safe_truncate(assistant_output, 300) if user_metadata: metadata.update(user_metadata) return metadata def _deliver_notify_arun( category: str, user_input: str, user_metadata_json: str = "", fast: bool = False, ) -> str: token, chat_id = _get_telegram_target(debug=False) if not token or not chat_id: return "FAILED: Telegram credentials are missing from the environment." category = (category or "UNKNOWN_QUESTION").strip().upper() if category not in ALLOWED_NOTIFY_CATEGORIES: category = "UNKNOWN_QUESTION" cleaned_input = _safe_truncate(user_input, 1200) metadata = _parse_json_metadata(user_metadata_json) if not _should_send_alert(category, cleaned_input): return f"SKIPPED: duplicate {category} alert suppressed." meta_lines = [] if metadata: for key, value in metadata.items(): meta_lines.append(f"{_escape_html(str(key))}: {_escape_html(str(value))}") meta_block = "\n".join(meta_lines) if meta_block: meta_block = f"\n\nMetadata\n{meta_block}" text = ( f"๐Ÿšจ ArunCore Alert ๐Ÿšจ\n\n" f"Category: {_escape_html(category)}\n" f"Time: {_escape_html(_utc_now())}\n\n" f"User Input\n{_escape_html(cleaned_input)}" f"{meta_block}" ) send_func = _send_telegram_message_fast if fast else _send_telegram_message_retrying result = send_func( token=token, chat_id=chat_id, text=text, ) if result.startswith("SUCCESS"): _mark_alert_sent(category, cleaned_input) return "SUCCESS: Arun has been notified." return result def _attempt_notify_arun_with_retry_queue( category: str, user_input: str, user_metadata_json: str = "", ) -> str: token, chat_id = _get_telegram_target(debug=False) if not token or not chat_id: return "FAILED: Telegram credentials are missing from the environment." submitted = _submit_background_task( "notify_arun_bg", _deliver_notify_arun, category, user_input, user_metadata_json, False, ) if submitted: return "QUEUED: Sending notification to Arun in the background." return "FAILED: Could not queue notification." def _contains_uncertainty(text: str) -> bool: lowered = (text or "").lower() phrases = [ "i don't know", "i do not know", "not sure", "can't confirm", "cannot confirm", "i don't have that information", "no relevant data found", "i'm unsure", "i am unsure", "i cannot answer", "i canโ€™t confirm", "i canโ€™t answer", ] return any(phrase in lowered for phrase in phrases) def _route_user_input(user_input: str) -> Dict[str, Any]: """ Lightweight router to bias the system toward tool usage. """ text = (user_input or "").strip() lower = text.lower() direct_contact_patterns = [ r"\btalk to arun\b", r"\bconnect me to arun\b", r"\bcontact arun\b", r"\bmessage arun\b", r"\bnotify arun\b", r"\bsend (?:a )?notification to arun\b", r"\btell arun\b", r"\blet arun know\b", r"\bping arun\b", ] lead_patterns = [ r"\bhire arun\b", r"\bcollaborate\b", r"\bpartnership\b", r"\bbusiness\b", r"\blead\b", r"\bwork with you\b", ] arun_context_patterns = [ r"\baruncore\b", r"\barun\b", r"\byour project\b", r"\byour work\b", r"\byour github\b", r"\bgithub\b", r"\brepository\b", r"\bportfolio\b", r"\barchitecture\b", r"\bknowledge base\b", r"\bbackground\b", r"\bskills\b", r"\bexperience\b", ] uncertainty_patterns = [ r"\bi don't know\b", r"\bnot sure\b", r"\bcan you explain\b", r"\bwhat does this mean\b", r"\bhelp me understand\b", r"\bunknown\b", r"\bunclear\b", r"\bconfused\b", ] notify_words_present = any(word in lower for word in ["notify", "notification", "ping"]) notify_target_present = any(word in lower for word in ["arun", "you", "twin"]) if any(re.search(pattern, lower) for pattern in direct_contact_patterns) or ( notify_words_present and notify_target_present ): return { "needs_search": False, "needs_notify": True, "notify_category": "URGENT", "reason": "explicit_contact_or_notification_intent", } if any(re.search(pattern, lower) for pattern in lead_patterns): return { "needs_search": False, "needs_notify": True, "notify_category": "LEAD", "reason": "business_or_lead_intent", } if any(re.search(pattern, lower) for pattern in arun_context_patterns): return { "needs_search": True, "needs_notify": False, "notify_category": None, "reason": "arun_related_query", } if any(re.search(pattern, lower) for pattern in uncertainty_patterns): return { "needs_search": True, "needs_notify": False, "notify_category": None, "reason": "uncertain_query", } return { "needs_search": False, "needs_notify": False, "notify_category": None, "reason": "general_query", } def load_static_context() -> Tuple[str, str]: profile_path = STATIC_DIR / "public_profile.md" rules_path = STATIC_DIR / "rules_of_engagement.md" with open(profile_path, "r", encoding="utf-8") as f: profile = f.read() with open(rules_path, "r", encoding="utf-8") as f: rules = f.read() return profile, rules # ========================================================= # TOOLS # ========================================================= @tool def notify_arun(category: str, user_input: str, user_metadata_json: str = "") -> str: """ Sends a Telegram alert to Arun. Use this when: - The user wants to talk to Arun directly. - The query looks like a lead, collaboration, hiring, or business request. - The system cannot answer confidently and should escalate the question. Args: category: One of 'LEAD', 'URGENT', or 'UNKNOWN_QUESTION'. user_input: The user's message. user_metadata_json: Optional JSON string with extra metadata. """ return _attempt_notify_arun_with_retry_queue( category=category, user_input=user_input, user_metadata_json=user_metadata_json, ) @tool def search_arun_knowledge(search_query: str) -> str: """ Searches Arun's database for technical details, background, projects, and architecture. Use this before answering questions about Arun's history, work, GitHub, projects, or internal details. Args: search_query: A descriptive standalone query focused on key technical terms. """ global _GLOBAL_VECTORSTORE, _GLOBAL_BM25, _GLOBAL_COMPRESSOR if not _GLOBAL_VECTORSTORE or not _GLOBAL_BM25 or not _GLOBAL_COMPRESSOR: return "ERROR: Database retrievers are not initialized." vec_docs = _GLOBAL_VECTORSTORE.similarity_search(search_query, k=15) lex_docs = _GLOBAL_BM25.invoke(search_query) seen = set() combined: List[Document] = [] for doc in vec_docs + lex_docs: cid = doc.metadata.get("chunk_id") or doc.metadata.get("source") or doc.page_content[:80] if cid not in seen: seen.add(cid) combined.append(doc) initial_docs = combined[:20] if not initial_docs: return "DATABASE SEARCH RESULT: No relevant data found for this query." try: reranked_docs = _GLOBAL_COMPRESSOR.compress_documents(documents=initial_docs, query=search_query) except Exception as e: reranked_docs = initial_docs[:MAX_SEARCH_RESULTS] if not reranked_docs: return f"DATABASE SEARCH RESULT: No relevant data found. Rerank failed: {e}" if not reranked_docs: return "DATABASE SEARCH RESULT: No relevant data found for this query." snippets = [] for doc in reranked_docs[:MAX_SEARCH_RESULTS]: source = doc.metadata.get("source", "unknown") chunk_id = doc.metadata.get("chunk_id", "unknown") content = _safe_truncate(doc.page_content, 2000) snippets.append(f"[Source: {source} | chunk: {chunk_id}]\n{content}") return "DATABASE SEARCH RESULT:\n\n" + "\n\n---\n\n".join(snippets) # ========================================================= # MEMORY # ========================================================= class RollingMemory: def __init__(self, summary_llm, max_turns: int = 4): self.summary_llm = summary_llm self.max_turns = max_turns self.history: List[Any] = [] self.running_summary: str = "No prior summary. This is the start of the conversation." self.invocation_count = 0 def add_interaction(self, human_text: str, ai_text: str): self.history.append(HumanMessage(content=human_text)) self.history.append(AIMessage(content=ai_text)) self.invocation_count += 1 if self.invocation_count >= self.max_turns: self._summarize_and_prune() def _summarize_and_prune(self): print("\n[SYSTEM] Triggering background summarization...") messages_to_summarize = self.history[:-4] if not messages_to_summarize: return chat_transcript = "\n".join( [f"{'User' if isinstance(m, HumanMessage) else 'ArunCore'}: {m.content}" for m in messages_to_summarize] ) prompt = ( "You are an internal memory compression engine for ArunCore.\n" "Merge the existing summary with the new transcript. Preserve technical context, names, project mentions, user goals, and important decisions. " "Keep it concise and stable. Return no more than 5 sentences.\n\n" f"--- EXISTING SUMMARY ---\n{self.running_summary}\n\n" f"--- NEW CHAT TO MERGE ---\n{chat_transcript}" ) try: res = self.summary_llm.invoke([SystemMessage(content=prompt)]) self.running_summary = res.content.strip() self.history = self.history[-4:] self.invocation_count = len(self.history) // 2 print(f"[SYSTEM] Memory compressed. New summary: {self.running_summary[:120]}...") except Exception as e: print(f"[SYSTEM ERROR] Failed to summarize memory: {e}") def get_messages(self): return self.history # ========================================================= # AGENT SETUP # ========================================================= def init_agent(): global _GLOBAL_VECTORSTORE, _GLOBAL_BM25, _GLOBAL_COMPRESSOR openai_key = os.getenv("OPENAI_API_KEY") groq_key = os.getenv("GROQ_API_KEY") cohere_key = os.getenv("COHERE_API_KEY") if not openai_key or not groq_key or not cohere_key: raise ValueError("Missing API keys. OPENAI_API_KEY, GROQ_API_KEY, and COHERE_API_KEY are required.") warnings.filterwarnings("ignore", category=DeprecationWarning) embeddings = OpenAIEmbeddings(model="text-embedding-3-small", api_key=openai_key) _GLOBAL_VECTORSTORE = Chroma( collection_name="aruncore_knowledge", embedding_function=embeddings, persist_directory=str(DB_DIR), ) all_data = _GLOBAL_VECTORSTORE.get() documents = [ Document(page_content=text, metadata=metadata or {}) for text, metadata in zip(all_data.get("documents", []), all_data.get("metadatas", [])) if text ] if not documents: raise ValueError("Vector database is empty. Run ingest first.") _GLOBAL_BM25 = BM25Retriever.from_documents(documents) _GLOBAL_BM25.k = 10 _GLOBAL_COMPRESSOR = CohereRerank( top_n=5, model="rerank-english-v3.0", cohere_api_key=cohere_key, ) summary_llm = ChatOpenAI( temperature=0.0, model="gpt-5-nano", api_key=openai_key, ) tools = [notify_arun, search_arun_knowledge] main_llm = ChatOpenAI( temperature=0.15, model="gpt-4o-mini", api_key=openai_key, ).bind_tools(tools) profile, rules = load_static_context() system_prompt = f""" You are ArunCore, the knowledge system for Arun Yadav. Greet the person like you are Arun. You speak as Arun in first person. Be honest. Do not guess. For any question about Arun's projects, skills, background, architecture, GitHub, portfolio, work history, or any stored knowledge, call `search_arun_knowledge` before answering. MUST REMEMBER: THAT YOU CAN ONLY GIVE THE URL YOU FOUND IN THE --- IDENTITY PROFILE --- {profile} --- RULES OF ENGAGEMENT --- {rules} --- PAST CONVERSATION SUMMARY --- {{running_summary}} OPERATING POLICY: 1. SEARCH-FIRST POLICY - For any question about Arun's projects, skills, background, architecture, GitHub, portfolio, work history, or any stored knowledge, call `search_arun_knowledge` before answering. - If the user asks something that might depend on stored facts, always use the search tool. if you find the answer only then provide the answer to the user. 2. ESCALATE UNCERTAINTY - If search results are, empty, or do not support a reliable answer, call `notify_arun` with category `UNKNOWN_QUESTION`. - Do not pretend to know never make any information up. - Give the user a direct honest answer after escalation. 3. DIRECT CONTACT / LEAD ESCALATION - If the user wants to talk to Arun, contact Arun, hire Arun, collaborate, or discuss a business opportunity, call `notify_arun` immediately. - Use category `URGENT` for direct contact intent. - Use category `LEAD` for collaboration, hiring, business, partnership, or project opportunities. 4. TOOL BIAS - Prefer calling tools over answering from unsupported memory. - When in doubt, always search first. - After a failed search or any explicit uncertainty, escalate. - Do not limit tool use artificially unless it would create obvious repetition. 5. OUTPUT STYLE - Keep answers concise, direct, and scannable. - Use Markdown. - If you do not know, say so clearly. . """ prompt = ChatPromptTemplate.from_messages( [ ("system", system_prompt), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"), ] ) memory = RollingMemory(summary_llm=summary_llm) return main_llm, prompt, memory, tools # ========================================================= # CHAT LOOP # ========================================================= def _tool_map(tools): return {tool_obj.name: tool_obj for tool_obj in tools} def _build_notify_payload( category: str, user_input: str, reason: str, user_metadata: Optional[Dict[str, Any]] = None, assistant_output: Optional[str] = None, ) -> Dict[str, str]: metadata = _build_notification_metadata( reason=reason, user_metadata=user_metadata, assistant_output=assistant_output, ) return { "category": category, "user_input": user_input, "user_metadata_json": json.dumps(metadata), } def _tool_was_used(scratchpad: List[Any], tool_name: str) -> bool: for item in scratchpad: if isinstance(item, dict) and item.get("name") == tool_name: return True if isinstance(item, AIMessage): for tool_call in item.tool_calls or []: if tool_call.get("name") == tool_name: return True return False def _run_pre_escalation( route: Dict[str, Any], user_input: str, tool_map: Dict[str, Any], user_metadata: Optional[Dict[str, Any]] = None, background: bool = False, ) -> Optional[Dict[str, Any]]: if not route.get("needs_notify"): return None category = route.get("notify_category") or "UNKNOWN_QUESTION" reason = route.get("reason", "unknown") payload = _build_notify_payload( category=category, user_input=user_input, reason=reason, user_metadata=user_metadata, ) if background: submitted = _submit_background_task( "pre_escalation_notify", _deliver_notify_arun, category, user_input, payload["user_metadata_json"], ) return { "handled": False, "category": category, "reason": reason, "result": "QUEUED: pre-escalation notification scheduled." if submitted else "FAILED: could not queue pre-escalation notification.", } result = _attempt_notify_arun_with_retry_queue( category=category, user_input=user_input, user_metadata_json=payload["user_metadata_json"], ) return { "handled": result.startswith("SUCCESS") or result.startswith("SKIPPED"), "category": category, "reason": reason, "result": result, } def run_pre_escalation( user_input: str, tool_map: Dict[str, Any], user_metadata: Optional[Dict[str, Any]] = None, background: bool = False, ) -> Optional[Dict[str, Any]]: route = _route_user_input(user_input) return _run_pre_escalation( route, user_input, tool_map, user_metadata=user_metadata, background=background, ) def maybe_notify_arun( user_input: str, final_response: str, scratchpad: List[Any], tool_map: Dict[str, Any], user_metadata: Optional[Dict[str, Any]] = None, pre_notified: bool = False, ) -> Optional[str]: """ Deterministic safety-net escalation used by API / bot flows. This prevents the frontend from depending entirely on the model remembering to call `notify_arun` on its own. """ route = _route_user_input(user_input) used_notify_tool = pre_notified should_notify = route.get("needs_notify", False) category = route.get("notify_category") or "UNKNOWN_QUESTION" reason = route.get("reason", "unspecified") if not should_notify and _contains_uncertainty(final_response): should_notify = True category = "UNKNOWN_QUESTION" reason = "uncertainty_detected_after_answer" if not should_notify or used_notify_tool: return None payload = _build_notify_payload( category=category, user_input=user_input, reason=reason, user_metadata=user_metadata, assistant_output=final_response, ) return _deliver_notify_arun( category=payload["category"], user_input=payload["user_input"], user_metadata_json=payload["user_metadata_json"], ) def queue_maybe_notify_arun( user_input: str, final_response: str, scratchpad: List[Any], tool_map: Dict[str, Any], user_metadata: Optional[Dict[str, Any]] = None, pre_notified: bool = False, ) -> str: def _background_notify(): result = maybe_notify_arun( user_input=user_input, final_response=final_response, scratchpad=scratchpad, tool_map=tool_map, user_metadata=user_metadata, pre_notified=pre_notified, ) if result: send_debug_event("auto_escalation", str(result), user_metadata) return result submitted = _submit_background_task( "maybe_notify_arun", _background_notify, ) if submitted: return "QUEUED: maybe_notify_arun scheduled." return "FAILED: could not queue maybe_notify_arun." def chat_interface(): print("\n" + "=" * 60) print("ArunCore stateful agent") print("Type 'exit' to quit.") print("=" * 60 + "\n") try: main_llm, prompt, memory, tools = init_agent() except Exception as e: print(f"Startup Error: {e}") return tool_map = _tool_map(tools) while True: try: user_input = input("You: ").strip() if user_input.lower() in {"exit", "quit"}: break if not user_input: continue route = _route_user_input(user_input) scratchpad: List[Any] = [] pre_escalation = _run_pre_escalation(route, user_input, tool_map, background=True) if pre_escalation: print(f"[SYSTEM] {pre_escalation['result']}") final_response: Optional[str] = None for _ in range(MAX_TOOL_ROUNDS): messages = prompt.format_messages( running_summary=memory.running_summary, chat_history=memory.get_messages(), input=user_input, agent_scratchpad=scratchpad, ) ai_msg = main_llm.invoke(messages) if ai_msg.tool_calls: scratchpad.append(ai_msg) for tc in ai_msg.tool_calls: tool_name = tc.get("name") print(f"[SYSTEM] Tool call: {tool_name}({tc.get('args')})") tool_func = tool_map.get(tool_name) if not tool_func: tool_result = f"ERROR: Unknown tool '{tool_name}'." else: try: tool_result = tool_func.invoke(tc.get("args", {})) except Exception as e: tool_result = f"Error executing tool: {e}" scratchpad.append( ToolMessage( content=_safe_truncate(str(tool_result), 3000), tool_call_id=tc.get("id", f"tool_{int(time.time() * 1000)}"), ) ) continue final_response = (ai_msg.content or "").strip() break if not final_response: final_response = "I do not have enough information to answer that." notify_result = maybe_notify_arun( user_input=user_input, final_response=final_response, scratchpad=scratchpad, tool_map=tool_map, pre_notified=bool(pre_escalation and pre_escalation.get("handled")), ) if notify_result: print(f"[SYSTEM] {notify_result}") print(f"\nArunCore: {final_response}\n") print("-" * 60) memory.add_interaction(user_input, final_response) queue_chat_history_to_telegram("cli-session", user_input, final_response) except Exception as e: print(f"Agent Loop Error: {e}") if __name__ == "__main__": chat_interface()