import asyncio import json import os import re from dotenv import load_dotenv from langchain_groq import ChatGroq from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes # Import the core ArunCore engine from core.agent import init_agent, RollingMemory, queue_debug_event, queue_maybe_notify_arun, run_pre_escalation load_dotenv() # === In-Memory Session Store (telegram chat_id -> RollingMemory) === sessions: dict[int, RollingMemory] = {} # Initialize the engine once at startup print("Initializing ArunCore Telegram Bot...") main_llm, prompt, _, tools = init_agent() tool_map = {t.name: t for t in tools} print("Bot engine ready.") def get_or_create_memory(chat_id: int) -> RollingMemory: """Returns existing memory for this user, or creates a new one.""" if chat_id not in sessions: summary_llm = ChatGroq( temperature=0.0, model_name="llama-3.1-8b-instant", api_key=os.getenv("GROQ_API_KEY") ) sessions[chat_id] = RollingMemory(summary_llm=summary_llm) return sessions[chat_id] def run_agent(chat_id: int, user_message: str) -> str: """Runs the full stateful agent loop for a given user message.""" memory = get_or_create_memory(chat_id) scratchpad = [] try: queue_debug_event( "user_message", user_message, {"channel": "telegram", "chat_id": chat_id}, ) pre_escalation = run_pre_escalation( user_message, tool_map, {"channel": "telegram", "chat_id": chat_id}, False, ) if pre_escalation: queue_debug_event( "pre_escalation", pre_escalation.get("result", ""), { "channel": "telegram", "chat_id": chat_id, "category": pre_escalation.get("category"), "reason": pre_escalation.get("reason"), }, ) final_response = None max_iterations = 3 for _ in range(max_iterations): messages = prompt.format_messages( running_summary=memory.running_summary, chat_history=memory.get_messages(), input=user_message, agent_scratchpad=scratchpad, ) ai_msg = main_llm.invoke(messages) if ai_msg.tool_calls: scratchpad.append(ai_msg) for tc in ai_msg.tool_calls: tool_name = tc["name"] tool_args = tc.get("args", {}) queue_debug_event( "tool_call", json.dumps(tool_args, ensure_ascii=False, indent=2, default=str), { "channel": "telegram", "chat_id": chat_id, "tool_name": tool_name, }, ) tool_func = tool_map.get(tool_name) try: result = tool_func.invoke(tool_args) except Exception as e: result = f"Tool error: {e}" scratchpad.append({ "role": "tool", "name": tool_name, "tool_call_id": tc["id"], "content": str(result)[:2000], }) queue_debug_event( "tool_result", str(result), { "channel": "telegram", "chat_id": chat_id, "tool_name": tool_name, }, ) else: final_response = ai_msg.content break if not final_response: final_response = "I ran into an issue internally. Please try again." queue_debug_event( "assistant_reply", final_response, {"channel": "telegram", "chat_id": chat_id}, ) queue_maybe_notify_arun( user_input=user_message, final_response=final_response, scratchpad=scratchpad, tool_map=tool_map, user_metadata={"channel": "telegram", "chat_id": chat_id}, pre_notified=bool(pre_escalation and pre_escalation.get("handled")), ) memory.add_interaction(user_message, final_response) return final_response except Exception as e: queue_debug_event( "error", str(e), {"channel": "telegram", "chat_id": chat_id}, ) raise # === Telegram Handlers === async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): welcome = ( "Hi! I'm *ArunCore*, the AI digital twin of *Arun Yadav*.\n\n" "Ask me anything about his projects, skills, or background in AI engineering. " "I'm here to give you the real picture." ) await update.message.reply_text(welcome, parse_mode="Markdown") def format_for_telegram(text: str) -> str: """Converts LLM Markdown into Telegram-safe HTML.""" text = text.replace("&", "&").replace("<", "<").replace(">", ">") text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) text = re.sub(r'^###?\s+(.+)$', r'\n\1', text, flags=re.MULTILINE) text = re.sub(r'```(?:[a-zA-Z]+)?\n?(.*?)\n?```', r'
\1', text, flags=re.DOTALL) text = re.sub(r'`([^`]+)`', r'
\1', text)
text = re.sub(r'^[*-]\s+', '• ', text, flags=re.MULTILINE)
def link_repl(match):
label, url = match.groups()
return f'{label}'
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', link_repl, text)
return text.strip()
async def message_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_text = update.message.text
chat_id = update.effective_chat.id
await update.message.chat.send_action("typing")
reply = await asyncio.to_thread(run_agent, chat_id, user_text)
html_reply = format_for_telegram(reply)
try:
await update.message.reply_text(html_reply, parse_mode="HTML")
except Exception:
await update.message.reply_text(reply)
if __name__ == "__main__":
token = os.getenv("TELEGRAM_PUBLIC_BOT_TOKEN")
if not token:
raise ValueError("TELEGRAM_PUBLIC_BOT_TOKEN not set in .env")
application = Application.builder().token(token).build()
application.add_handler(CommandHandler("start", start_handler))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, message_handler))
print("ArunCore Telegram Bot is running...")
application.run_polling()