ArunCore / core /agent.py
Neural Arun
fixed the notification system
05fe193
import threading
from concurrent.futures import ThreadPoolExecutor
import hashlib
import html
import json
import os
import re
import time
import warnings
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
from dotenv import load_dotenv
from langchain_cohere import CohereRerank
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
# =========================================================
# CONFIG
# =========================================================
load_dotenv()
if os.getenv("COHERE_API_KEY"):
os.environ["CO_API_KEY"] = os.getenv("COHERE_API_KEY", "")
BASE_DIR = Path(__file__).resolve().parent.parent
DB_DIR = BASE_DIR / "db"
STATIC_DIR = BASE_DIR / "data" / "static"
ALLOWED_NOTIFY_CATEGORIES = {"LEAD", "URGENT", "UNKNOWN_QUESTION"}
MAX_SEARCH_RESULTS = 5
MAX_TOOL_ROUNDS = 4
MAX_HISTORY_MESSAGES = 8
NOTIFICATION_COOLDOWN_SECONDS = 300
TELEGRAM_MESSAGE_CHAR_LIMIT = 3500
# =========================================================
# GLOBAL STATE
# =========================================================
_GLOBAL_VECTORSTORE = None
_GLOBAL_BM25 = None
_GLOBAL_COMPRESSOR = None
_BACKGROUND_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="aruncore-bg")
_RECENT_ALERTS: Dict[str, float] = {}
# =========================================================
# HELPERS
# =========================================================
def _utc_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
def _safe_truncate(text: str, limit: int = 1500) -> str:
text = (text or "").strip()
if len(text) <= limit:
return text
return text[: limit - 3] + "..."
def _parse_json_metadata(raw: str) -> Dict[str, Any]:
if not raw:
return {}
try:
parsed = json.loads(raw)
return parsed if isinstance(parsed, dict) else {"value": parsed}
except Exception:
return {"raw": raw}
def _alert_key(category: str, user_input: str) -> str:
digest = hashlib.sha256(
f"{category}:{user_input.strip().lower()}".encode("utf-8")
).hexdigest()
return digest
def _should_send_alert(category: str, user_input: str) -> bool:
key = _alert_key(category, user_input)
now = time.time()
last_seen = _RECENT_ALERTS.get(key)
if last_seen and (now - last_seen) < NOTIFICATION_COOLDOWN_SECONDS:
return False
return True
def _mark_alert_sent(category: str, user_input: str) -> None:
_RECENT_ALERTS[_alert_key(category, user_input)] = time.time()
def _escape_html(text: str) -> str:
return html.escape(text or "")
def _is_truthy_env(value: Optional[str], default: bool = False) -> bool:
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def _telegram_debug_enabled() -> bool:
return _is_truthy_env(os.getenv("TELEGRAM_DEBUG_ENABLED"), default=True)
def _get_telegram_target(debug: bool = False) -> Tuple[Optional[str], Optional[str]]:
if debug:
token = os.getenv("TELEGRAM_DEBUG_BOT_TOKEN") or os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_DEBUG_CHAT_ID") or os.getenv("TELEGRAM_CHAT_ID")
return token, chat_id
return os.getenv("TELEGRAM_BOT_TOKEN"), os.getenv("TELEGRAM_CHAT_ID")
def _chunk_text(text: str, limit: int = 2200) -> List[str]:
cleaned = (text or "").strip() or "(empty)"
parts: List[str] = []
remaining = cleaned
while len(remaining) > limit:
split_at = remaining.rfind("\n", 0, limit)
if split_at < int(limit * 0.5):
split_at = remaining.rfind(" ", 0, limit)
if split_at <= 0:
split_at = limit
parts.append(remaining[:split_at].strip())
remaining = remaining[split_at:].lstrip()
if remaining:
parts.append(remaining)
return parts or ["(empty)"]
def _send_telegram_message(
token: str,
chat_id: str,
text: str,
parse_mode: str = "HTML",
max_attempts: int = 3,
delivery_label: str = "default",
) -> str:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
payload = {
"chat_id": chat_id,
"text": text,
"parse_mode": parse_mode,
"disable_web_page_preview": True,
}
last_error = "Unknown error"
# Attempt 1: Standard domain request (works locally on Windows)
try:
url = f"https://api.telegram.org/bot{token}/sendMessage"
response = requests.post(url, json=payload, timeout=10)
if response.status_code == 200:
print(f"[TELEGRAM:{delivery_label}] success via domain")
return "SUCCESS"
last_error = f"Domain API returned {response.status_code}"
except Exception as e:
last_error = f"Domain request failed: {e}"
print(f"[TELEGRAM:{delivery_label}] Domain method failed ({last_error}). Using IP bypass...")
# Attempt 2: Direct IPs to bypass DNS & SNI blocks (for HuggingFace)
telegram_ips = ["149.154.167.220", "149.154.166.120", "149.154.165.120"]
for ip in telegram_ips:
url_ip = f"https://{ip}/bot{token}/sendMessage"
for attempt in range(max_attempts):
try:
response = requests.post(
url_ip,
json=payload,
headers={"Host": "api.telegram.org"},
verify=False,
timeout=15
)
if response.status_code == 200:
print(f"[TELEGRAM:{delivery_label}] success via IP {ip}")
return "SUCCESS"
last_error = f"IP {ip} returned {response.status_code}"
except Exception as e:
last_error = f"IP {ip} failed: {e}"
time.sleep(1.5)
print(f"[TELEGRAM ERROR:{delivery_label}] All methods failed. Last error: {last_error}")
return f"FAILED: {last_error}"
def _send_telegram_message_fast(
token: str,
chat_id: str,
text: str,
parse_mode: str = "HTML",
) -> str:
return _send_telegram_message(
token=token,
chat_id=chat_id,
text=text,
parse_mode=parse_mode,
max_attempts=1,
retry_sleep_seconds=0.0,
delivery_label="fast",
)
def _send_telegram_message_retrying(
token: str,
chat_id: str,
text: str,
parse_mode: str = "HTML",
) -> str:
return _send_telegram_message(
token=token,
chat_id=chat_id,
text=text,
parse_mode=parse_mode,
max_attempts=3,
retry_sleep_seconds=1.0,
delivery_label="retry",
)
def send_debug_event(
event_type: str,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
if not _telegram_debug_enabled():
return "SKIPPED: debug stream disabled."
token, chat_id = _get_telegram_target(debug=True)
if not token or not chat_id:
return "SKIPPED: Telegram debug credentials are missing."
header_lines = [
"<b>ArunCore Debug</b>",
f"<b>Type:</b> {_escape_html(event_type)}",
f"<b>Time:</b> {_escape_html(_utc_now())}",
]
metadata = metadata or {}
for key, value in metadata.items():
if value is None:
continue
header_lines.append(f"<b>{_escape_html(str(key))}:</b> {_escape_html(str(value))}")
content_chunks = _chunk_text(content, limit=2200)
for index, chunk in enumerate(content_chunks, start=1):
lines = list(header_lines)
if len(content_chunks) > 1:
lines.append(f"<b>Part:</b> {index}/{len(content_chunks)}")
lines.extend(["", "<b>Content</b>", _escape_html(chunk)])
result = _send_telegram_message(
token=token,
chat_id=chat_id,
text="\n".join(lines)[:TELEGRAM_MESSAGE_CHAR_LIMIT],
delivery_label="debug",
)
if not result.startswith("SUCCESS"):
return result
return "SUCCESS: debug event sent."
def _run_background_task(task_name: str, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
if isinstance(result, str):
print(f"[BACKGROUND] {task_name}: {result}")
return result
except Exception as e:
print(f"[BACKGROUND ERROR] {task_name}: {e}")
return None
def _submit_background_task(task_name: str, func, *args, **kwargs) -> bool:
try:
t = threading.Thread(
target=_run_background_task,
args=(task_name, func) + args,
kwargs=kwargs,
daemon=True
)
t.start()
return True
except Exception as e:
print(f"[BACKGROUND ERROR] Failed to submit {task_name}: {e}")
return False
def queue_debug_event(
event_type: str,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
if not _telegram_debug_enabled():
return "SKIPPED: debug stream disabled."
if _submit_background_task("debug_event", send_debug_event, event_type, content, metadata):
return "QUEUED: debug event scheduled."
return "FAILED: could not queue debug event."
def send_chat_history_to_telegram(
session_id: str,
user_input: str,
assistant_response: str,
) -> str:
token, chat_id = _get_telegram_target(debug=False)
if not token or not chat_id:
return "FAILED: Telegram credentials are missing."
text = (
f"📝 <b>ArunCore Chat Log</b> 📝\n\n"
f"<b>Session:</b> {_escape_html(session_id)}\n"
f"<b>Time:</b> {_escape_html(_utc_now())}\n\n"
f"<b>User:</b>\n{_escape_html(_safe_truncate(user_input, 1500))}\n\n"
f"<b>ArunCore:</b>\n{_escape_html(_safe_truncate(assistant_response, 1500))}"
)
return _send_telegram_message(
token=token,
chat_id=chat_id,
text=text,
delivery_label="chat_log",
)
def queue_chat_history_to_telegram(
session_id: str,
user_input: str,
assistant_response: str,
) -> str:
token, chat_id = _get_telegram_target(debug=False)
if not token or not chat_id:
return "FAILED: Telegram credentials are missing from the environment."
if _submit_background_task("chat_history_log", send_chat_history_to_telegram, session_id, user_input, assistant_response):
return "QUEUED: chat history scheduled."
return "FAILED: could not queue chat history."
def _build_notification_metadata(
reason: str,
user_metadata: Optional[Dict[str, Any]] = None,
assistant_output: Optional[str] = None,
) -> Dict[str, Any]:
metadata: Dict[str, Any] = {
"reason": reason,
"timestamp": _utc_now(),
}
if assistant_output:
metadata["assistant_output"] = _safe_truncate(assistant_output, 300)
if user_metadata:
metadata.update(user_metadata)
return metadata
def _deliver_notify_arun(
category: str,
user_input: str,
user_metadata_json: str = "",
fast: bool = False,
) -> str:
token, chat_id = _get_telegram_target(debug=False)
if not token or not chat_id:
return "FAILED: Telegram credentials are missing from the environment."
category = (category or "UNKNOWN_QUESTION").strip().upper()
if category not in ALLOWED_NOTIFY_CATEGORIES:
category = "UNKNOWN_QUESTION"
cleaned_input = _safe_truncate(user_input, 1200)
metadata = _parse_json_metadata(user_metadata_json)
if not _should_send_alert(category, cleaned_input):
return f"SKIPPED: duplicate {category} alert suppressed."
meta_lines = []
if metadata:
for key, value in metadata.items():
meta_lines.append(f"<b>{_escape_html(str(key))}:</b> {_escape_html(str(value))}")
meta_block = "\n".join(meta_lines)
if meta_block:
meta_block = f"\n\n<b>Metadata</b>\n{meta_block}"
text = (
f"🚨 <b>ArunCore Alert</b> 🚨\n\n"
f"<b>Category:</b> {_escape_html(category)}\n"
f"<b>Time:</b> {_escape_html(_utc_now())}\n\n"
f"<b>User Input</b>\n{_escape_html(cleaned_input)}"
f"{meta_block}"
)
send_func = _send_telegram_message_fast if fast else _send_telegram_message_retrying
result = send_func(
token=token,
chat_id=chat_id,
text=text,
)
if result.startswith("SUCCESS"):
_mark_alert_sent(category, cleaned_input)
return "SUCCESS: Arun has been notified."
return result
def _attempt_notify_arun_with_retry_queue(
category: str,
user_input: str,
user_metadata_json: str = "",
) -> str:
token, chat_id = _get_telegram_target(debug=False)
if not token or not chat_id:
return "FAILED: Telegram credentials are missing from the environment."
submitted = _submit_background_task(
"notify_arun_bg",
_deliver_notify_arun,
category,
user_input,
user_metadata_json,
False,
)
if submitted:
return "QUEUED: Sending notification to Arun in the background."
return "FAILED: Could not queue notification."
def _contains_uncertainty(text: str) -> bool:
lowered = (text or "").lower()
phrases = [
"i don't know",
"i do not know",
"not sure",
"can't confirm",
"cannot confirm",
"i don't have that information",
"no relevant data found",
"i'm unsure",
"i am unsure",
"i cannot answer",
"i can’t confirm",
"i can’t answer",
]
return any(phrase in lowered for phrase in phrases)
def _route_user_input(user_input: str) -> Dict[str, Any]:
"""
Lightweight router to bias the system toward tool usage.
"""
text = (user_input or "").strip()
lower = text.lower()
direct_contact_patterns = [
r"\btalk to arun\b",
r"\bconnect me to arun\b",
r"\bcontact arun\b",
r"\bmessage arun\b",
r"\bnotify arun\b",
r"\bsend (?:a )?notification to arun\b",
r"\btell arun\b",
r"\blet arun know\b",
r"\bping arun\b",
]
lead_patterns = [
r"\bhire arun\b",
r"\bcollaborate\b",
r"\bpartnership\b",
r"\bbusiness\b",
r"\blead\b",
r"\bwork with you\b",
]
arun_context_patterns = [
r"\baruncore\b",
r"\barun\b",
r"\byour project\b",
r"\byour work\b",
r"\byour github\b",
r"\bgithub\b",
r"\brepository\b",
r"\bportfolio\b",
r"\barchitecture\b",
r"\bknowledge base\b",
r"\bbackground\b",
r"\bskills\b",
r"\bexperience\b",
]
uncertainty_patterns = [
r"\bi don't know\b",
r"\bnot sure\b",
r"\bcan you explain\b",
r"\bwhat does this mean\b",
r"\bhelp me understand\b",
r"\bunknown\b",
r"\bunclear\b",
r"\bconfused\b",
]
notify_words_present = any(word in lower for word in ["notify", "notification", "ping"])
notify_target_present = any(word in lower for word in ["arun", "you", "twin"])
if any(re.search(pattern, lower) for pattern in direct_contact_patterns) or (
notify_words_present and notify_target_present
):
return {
"needs_search": False,
"needs_notify": True,
"notify_category": "URGENT",
"reason": "explicit_contact_or_notification_intent",
}
if any(re.search(pattern, lower) for pattern in lead_patterns):
return {
"needs_search": False,
"needs_notify": True,
"notify_category": "LEAD",
"reason": "business_or_lead_intent",
}
if any(re.search(pattern, lower) for pattern in arun_context_patterns):
return {
"needs_search": True,
"needs_notify": False,
"notify_category": None,
"reason": "arun_related_query",
}
if any(re.search(pattern, lower) for pattern in uncertainty_patterns):
return {
"needs_search": True,
"needs_notify": False,
"notify_category": None,
"reason": "uncertain_query",
}
return {
"needs_search": False,
"needs_notify": False,
"notify_category": None,
"reason": "general_query",
}
def load_static_context() -> Tuple[str, str]:
profile_path = STATIC_DIR / "public_profile.md"
rules_path = STATIC_DIR / "rules_of_engagement.md"
with open(profile_path, "r", encoding="utf-8") as f:
profile = f.read()
with open(rules_path, "r", encoding="utf-8") as f:
rules = f.read()
return profile, rules
# =========================================================
# TOOLS
# =========================================================
@tool
def notify_arun(category: str, user_input: str, user_metadata_json: str = "") -> str:
"""
Sends a Telegram alert to Arun.
Use this when:
- The user wants to talk to Arun directly.
- The query looks like a lead, collaboration, hiring, or business request.
- The system cannot answer confidently and should escalate the question.
Args:
category: One of 'LEAD', 'URGENT', or 'UNKNOWN_QUESTION'.
user_input: The user's message.
user_metadata_json: Optional JSON string with extra metadata.
"""
return _attempt_notify_arun_with_retry_queue(
category=category,
user_input=user_input,
user_metadata_json=user_metadata_json,
)
@tool
def search_arun_knowledge(search_query: str) -> str:
"""
Searches Arun's database for technical details, background, projects, and architecture.
Use this before answering questions about Arun's history, work, GitHub, projects, or internal details.
Args:
search_query: A descriptive standalone query focused on key technical terms.
"""
global _GLOBAL_VECTORSTORE, _GLOBAL_BM25, _GLOBAL_COMPRESSOR
if not _GLOBAL_VECTORSTORE or not _GLOBAL_BM25 or not _GLOBAL_COMPRESSOR:
return "ERROR: Database retrievers are not initialized."
vec_docs = _GLOBAL_VECTORSTORE.similarity_search(search_query, k=15)
lex_docs = _GLOBAL_BM25.invoke(search_query)
seen = set()
combined: List[Document] = []
for doc in vec_docs + lex_docs:
cid = doc.metadata.get("chunk_id") or doc.metadata.get("source") or doc.page_content[:80]
if cid not in seen:
seen.add(cid)
combined.append(doc)
initial_docs = combined[:20]
if not initial_docs:
return "DATABASE SEARCH RESULT: No relevant data found for this query."
try:
reranked_docs = _GLOBAL_COMPRESSOR.compress_documents(documents=initial_docs, query=search_query)
except Exception as e:
reranked_docs = initial_docs[:MAX_SEARCH_RESULTS]
if not reranked_docs:
return f"DATABASE SEARCH RESULT: No relevant data found. Rerank failed: {e}"
if not reranked_docs:
return "DATABASE SEARCH RESULT: No relevant data found for this query."
snippets = []
for doc in reranked_docs[:MAX_SEARCH_RESULTS]:
source = doc.metadata.get("source", "unknown")
chunk_id = doc.metadata.get("chunk_id", "unknown")
content = _safe_truncate(doc.page_content, 2000)
snippets.append(f"[Source: {source} | chunk: {chunk_id}]\n{content}")
return "DATABASE SEARCH RESULT:\n\n" + "\n\n---\n\n".join(snippets)
# =========================================================
# MEMORY
# =========================================================
class RollingMemory:
def __init__(self, summary_llm, max_turns: int = 4):
self.summary_llm = summary_llm
self.max_turns = max_turns
self.history: List[Any] = []
self.running_summary: str = "No prior summary. This is the start of the conversation."
self.invocation_count = 0
def add_interaction(self, human_text: str, ai_text: str):
self.history.append(HumanMessage(content=human_text))
self.history.append(AIMessage(content=ai_text))
self.invocation_count += 1
if self.invocation_count >= self.max_turns:
self._summarize_and_prune()
def _summarize_and_prune(self):
print("\n[SYSTEM] Triggering background summarization...")
messages_to_summarize = self.history[:-4]
if not messages_to_summarize:
return
chat_transcript = "\n".join(
[f"{'User' if isinstance(m, HumanMessage) else 'ArunCore'}: {m.content}" for m in messages_to_summarize]
)
prompt = (
"You are an internal memory compression engine for ArunCore.\n"
"Merge the existing summary with the new transcript. Preserve technical context, names, project mentions, user goals, and important decisions. "
"Keep it concise and stable. Return no more than 5 sentences.\n\n"
f"--- EXISTING SUMMARY ---\n{self.running_summary}\n\n"
f"--- NEW CHAT TO MERGE ---\n{chat_transcript}"
)
try:
res = self.summary_llm.invoke([SystemMessage(content=prompt)])
self.running_summary = res.content.strip()
self.history = self.history[-4:]
self.invocation_count = len(self.history) // 2
print(f"[SYSTEM] Memory compressed. New summary: {self.running_summary[:120]}...")
except Exception as e:
print(f"[SYSTEM ERROR] Failed to summarize memory: {e}")
def get_messages(self):
return self.history
# =========================================================
# AGENT SETUP
# =========================================================
def init_agent():
global _GLOBAL_VECTORSTORE, _GLOBAL_BM25, _GLOBAL_COMPRESSOR
openai_key = os.getenv("OPENAI_API_KEY")
groq_key = os.getenv("GROQ_API_KEY")
cohere_key = os.getenv("COHERE_API_KEY")
if not openai_key or not groq_key or not cohere_key:
raise ValueError("Missing API keys. OPENAI_API_KEY, GROQ_API_KEY, and COHERE_API_KEY are required.")
warnings.filterwarnings("ignore", category=DeprecationWarning)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small", api_key=openai_key)
_GLOBAL_VECTORSTORE = Chroma(
collection_name="aruncore_knowledge",
embedding_function=embeddings,
persist_directory=str(DB_DIR),
)
all_data = _GLOBAL_VECTORSTORE.get()
documents = [
Document(page_content=text, metadata=metadata or {})
for text, metadata in zip(all_data.get("documents", []), all_data.get("metadatas", []))
if text
]
if not documents:
raise ValueError("Vector database is empty. Run ingest first.")
_GLOBAL_BM25 = BM25Retriever.from_documents(documents)
_GLOBAL_BM25.k = 10
_GLOBAL_COMPRESSOR = CohereRerank(
top_n=5,
model="rerank-english-v3.0",
cohere_api_key=cohere_key,
)
summary_llm = ChatOpenAI(
temperature=0.0,
model="gpt-5-nano",
api_key=openai_key,
)
tools = [notify_arun, search_arun_knowledge]
main_llm = ChatOpenAI(
temperature=0.15,
model="gpt-4o-mini",
api_key=openai_key,
).bind_tools(tools)
profile, rules = load_static_context()
system_prompt = f"""
You are ArunCore, the knowledge system for Arun Yadav. Greet the person like you are Arun.
You speak as Arun in first person. Be honest. Do not guess.
For any question about Arun's projects, skills, background, architecture, GitHub, portfolio, work history, or any stored knowledge, call `search_arun_knowledge` before answering.
MUST REMEMBER: THAT YOU CAN ONLY GIVE THE URL YOU FOUND IN THE
--- IDENTITY PROFILE ---
{profile}
--- RULES OF ENGAGEMENT ---
{rules}
--- PAST CONVERSATION SUMMARY ---
{{running_summary}}
OPERATING POLICY:
1. SEARCH-FIRST POLICY
- For any question about Arun's projects, skills, background, architecture, GitHub, portfolio, work history, or any stored knowledge, call `search_arun_knowledge` before answering.
- If the user asks something that might depend on stored facts, always use the search tool. if you find the answer only then provide the answer to the user.
2. ESCALATE UNCERTAINTY
- If search results are, empty, or do not support a reliable answer, call `notify_arun` with category `UNKNOWN_QUESTION`.
- Do not pretend to know never make any information up.
- Give the user a direct honest answer after escalation.
3. DIRECT CONTACT / LEAD ESCALATION
- If the user wants to talk to Arun, contact Arun, hire Arun, collaborate, or discuss a business opportunity, call `notify_arun` immediately.
- Use category `URGENT` for direct contact intent.
- Use category `LEAD` for collaboration, hiring, business, partnership, or project opportunities.
4. TOOL BIAS
- Prefer calling tools over answering from unsupported memory.
- When in doubt, always search first.
- After a failed search or any explicit uncertainty, escalate.
- Do not limit tool use artificially unless it would create obvious repetition.
5. OUTPUT STYLE
- Keep answers concise, direct, and scannable.
- Use Markdown.
- If you do not know, say so clearly.
.
"""
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
memory = RollingMemory(summary_llm=summary_llm)
return main_llm, prompt, memory, tools
# =========================================================
# CHAT LOOP
# =========================================================
def _tool_map(tools):
return {tool_obj.name: tool_obj for tool_obj in tools}
def _build_notify_payload(
category: str,
user_input: str,
reason: str,
user_metadata: Optional[Dict[str, Any]] = None,
assistant_output: Optional[str] = None,
) -> Dict[str, str]:
metadata = _build_notification_metadata(
reason=reason,
user_metadata=user_metadata,
assistant_output=assistant_output,
)
return {
"category": category,
"user_input": user_input,
"user_metadata_json": json.dumps(metadata),
}
def _tool_was_used(scratchpad: List[Any], tool_name: str) -> bool:
for item in scratchpad:
if isinstance(item, dict) and item.get("name") == tool_name:
return True
if isinstance(item, AIMessage):
for tool_call in item.tool_calls or []:
if tool_call.get("name") == tool_name:
return True
return False
def _run_pre_escalation(
route: Dict[str, Any],
user_input: str,
tool_map: Dict[str, Any],
user_metadata: Optional[Dict[str, Any]] = None,
background: bool = False,
) -> Optional[Dict[str, Any]]:
if not route.get("needs_notify"):
return None
category = route.get("notify_category") or "UNKNOWN_QUESTION"
reason = route.get("reason", "unknown")
payload = _build_notify_payload(
category=category,
user_input=user_input,
reason=reason,
user_metadata=user_metadata,
)
if background:
submitted = _submit_background_task(
"pre_escalation_notify",
_deliver_notify_arun,
category,
user_input,
payload["user_metadata_json"],
)
return {
"handled": False,
"category": category,
"reason": reason,
"result": "QUEUED: pre-escalation notification scheduled." if submitted else "FAILED: could not queue pre-escalation notification.",
}
result = _attempt_notify_arun_with_retry_queue(
category=category,
user_input=user_input,
user_metadata_json=payload["user_metadata_json"],
)
return {
"handled": result.startswith("SUCCESS") or result.startswith("SKIPPED"),
"category": category,
"reason": reason,
"result": result,
}
def run_pre_escalation(
user_input: str,
tool_map: Dict[str, Any],
user_metadata: Optional[Dict[str, Any]] = None,
background: bool = False,
) -> Optional[Dict[str, Any]]:
route = _route_user_input(user_input)
return _run_pre_escalation(
route,
user_input,
tool_map,
user_metadata=user_metadata,
background=background,
)
def maybe_notify_arun(
user_input: str,
final_response: str,
scratchpad: List[Any],
tool_map: Dict[str, Any],
user_metadata: Optional[Dict[str, Any]] = None,
pre_notified: bool = False,
) -> Optional[str]:
"""
Deterministic safety-net escalation used by API / bot flows.
This prevents the frontend from depending entirely on the model
remembering to call `notify_arun` on its own.
"""
route = _route_user_input(user_input)
used_notify_tool = pre_notified
should_notify = route.get("needs_notify", False)
category = route.get("notify_category") or "UNKNOWN_QUESTION"
reason = route.get("reason", "unspecified")
if not should_notify and _contains_uncertainty(final_response):
should_notify = True
category = "UNKNOWN_QUESTION"
reason = "uncertainty_detected_after_answer"
if not should_notify or used_notify_tool:
return None
payload = _build_notify_payload(
category=category,
user_input=user_input,
reason=reason,
user_metadata=user_metadata,
assistant_output=final_response,
)
return _deliver_notify_arun(
category=payload["category"],
user_input=payload["user_input"],
user_metadata_json=payload["user_metadata_json"],
)
def queue_maybe_notify_arun(
user_input: str,
final_response: str,
scratchpad: List[Any],
tool_map: Dict[str, Any],
user_metadata: Optional[Dict[str, Any]] = None,
pre_notified: bool = False,
) -> str:
def _background_notify():
result = maybe_notify_arun(
user_input=user_input,
final_response=final_response,
scratchpad=scratchpad,
tool_map=tool_map,
user_metadata=user_metadata,
pre_notified=pre_notified,
)
if result:
send_debug_event("auto_escalation", str(result), user_metadata)
return result
submitted = _submit_background_task(
"maybe_notify_arun",
_background_notify,
)
if submitted:
return "QUEUED: maybe_notify_arun scheduled."
return "FAILED: could not queue maybe_notify_arun."
def chat_interface():
print("\n" + "=" * 60)
print("ArunCore stateful agent")
print("Type 'exit' to quit.")
print("=" * 60 + "\n")
try:
main_llm, prompt, memory, tools = init_agent()
except Exception as e:
print(f"Startup Error: {e}")
return
tool_map = _tool_map(tools)
while True:
try:
user_input = input("You: ").strip()
if user_input.lower() in {"exit", "quit"}:
break
if not user_input:
continue
route = _route_user_input(user_input)
scratchpad: List[Any] = []
pre_escalation = _run_pre_escalation(route, user_input, tool_map, background=True)
if pre_escalation:
print(f"[SYSTEM] {pre_escalation['result']}")
final_response: Optional[str] = None
for _ in range(MAX_TOOL_ROUNDS):
messages = prompt.format_messages(
running_summary=memory.running_summary,
chat_history=memory.get_messages(),
input=user_input,
agent_scratchpad=scratchpad,
)
ai_msg = main_llm.invoke(messages)
if ai_msg.tool_calls:
scratchpad.append(ai_msg)
for tc in ai_msg.tool_calls:
tool_name = tc.get("name")
print(f"[SYSTEM] Tool call: {tool_name}({tc.get('args')})")
tool_func = tool_map.get(tool_name)
if not tool_func:
tool_result = f"ERROR: Unknown tool '{tool_name}'."
else:
try:
tool_result = tool_func.invoke(tc.get("args", {}))
except Exception as e:
tool_result = f"Error executing tool: {e}"
scratchpad.append(
ToolMessage(
content=_safe_truncate(str(tool_result), 3000),
tool_call_id=tc.get("id", f"tool_{int(time.time() * 1000)}"),
)
)
continue
final_response = (ai_msg.content or "").strip()
break
if not final_response:
final_response = "I do not have enough information to answer that."
notify_result = maybe_notify_arun(
user_input=user_input,
final_response=final_response,
scratchpad=scratchpad,
tool_map=tool_map,
pre_notified=bool(pre_escalation and pre_escalation.get("handled")),
)
if notify_result:
print(f"[SYSTEM] {notify_result}")
print(f"\nArunCore: {final_response}\n")
print("-" * 60)
memory.add_interaction(user_input, final_response)
queue_chat_history_to_telegram("cli-session", user_input, final_response)
except Exception as e:
print(f"Agent Loop Error: {e}")
if __name__ == "__main__":
chat_interface()