import os import json import asyncio from fastapi import FastAPI, HTTPException from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from typing import Dict from dotenv import load_dotenv # Import the core engine components from core.agent import init_agent, RollingMemory, queue_debug_event, queue_maybe_notify_arun, run_pre_escalation, queue_chat_history_to_telegram from langchain_openai import ChatOpenAI load_dotenv() # Initialize the ArunCore Engine try: print("Initializing ArunCore API Backend...") main_llm, prompt, default_memory, tools = init_agent() # We create a tool map to easily execute tools by name global_tool_map = {t.name: t for t in tools} print("API Backend Initialized Successfully.") except Exception as e: print(f"Failed to initialize backend: {e}") raise e app = FastAPI(title="ArunCore API", description="Stateful Agentic Backend for Arun Yadav's Digital Twin.") # Enable CORS for external frontends app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # === SESSION MANAGEMENT === active_sessions: Dict[str, RollingMemory] = {} class ChatRequest(BaseModel): session_id: str message: str @app.post("/chat") async def chat_endpoint(req: ChatRequest): if not req.message.strip(): raise HTTPException(status_code=400, detail="Message cannot be empty.") if req.session_id not in active_sessions: summary_llm = ChatOpenAI(temperature=0.0, model="gpt-4o-mini", api_key=os.getenv("OPENAI_API_KEY")) active_sessions[req.session_id] = RollingMemory(summary_llm=summary_llm) memory = active_sessions[req.session_id] async def event_generator(): scratchpad = [] thoughts = [] max_iterations = 8 iterations = 0 search_count = 0 max_search_limit = 3 final_response = None try: yield json.dumps({"type": "status", "content": "Analyzing your request..."}) + "\n" queue_debug_event( "user_message", req.message, {"channel": "api", "session_id": req.session_id}, ) pre_escalation = await asyncio.to_thread( run_pre_escalation, req.message, global_tool_map, {"channel": "api", "session_id": req.session_id}, True, ) if pre_escalation: pre_escalation_result = pre_escalation.get("result", "") if pre_escalation_result.startswith("SUCCESS"): pre_escalation_status = "Notification sent to Arun." elif pre_escalation_result.startswith("SKIPPED"): pre_escalation_status = "Notification was already sent recently." elif "Retry queued in background" in pre_escalation_result: pre_escalation_status = "Notification was not confirmed immediately. Retrying in background." elif "QUEUED" in pre_escalation_result: pre_escalation_status = "Sending notification to Arun in the background." elif "credentials are missing" in pre_escalation_result: pre_escalation_status = "Error: Please set TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID in HuggingFace Spaces Settings!" else: pre_escalation_status = "Notification could not be confirmed." yield json.dumps({"type": "status", "content": pre_escalation_status}) + "\n" thoughts.append(pre_escalation_status) queue_debug_event( "pre_escalation", pre_escalation_result, { "channel": "api", "session_id": req.session_id, "category": pre_escalation.get("category"), "reason": pre_escalation.get("reason"), }, ) while iterations < max_iterations: messages = prompt.format_messages( running_summary=memory.running_summary, chat_history=memory.get_messages(), input=req.message, agent_scratchpad=scratchpad, ) ai_msg = await asyncio.to_thread(main_llm.invoke, messages) if ai_msg.tool_calls: scratchpad.append(ai_msg) for tc in ai_msg.tool_calls: tool_name = tc["name"] tool_args = tc.get("args", {}) status_msg = "Searching Arun's knowledge..." if tool_name == "search_arun_knowledge" else \ "Sending notification to Arun..." if tool_name == "notify_arun" else \ f"Running {tool_name}..." yield json.dumps({"type": "status", "content": status_msg}) + "\n" thoughts.append(status_msg) queue_debug_event( "tool_call", json.dumps(tool_args, ensure_ascii=False, indent=2, default=str), { "channel": "api", "session_id": req.session_id, "tool_name": tool_name, }, ) if tool_name == "search_arun_knowledge": search_count += 1 if search_count > max_search_limit: tool_result = f"Search limit reached ({max_search_limit}). Finalizing based on existing context." else: tool_func = global_tool_map.get(tool_name) tool_result = await asyncio.to_thread(tool_func.invoke, tool_args) scratchpad.append({ "role": "tool", "name": tool_name, "tool_call_id": tc["id"], "content": str(tool_result)[:2000], }) queue_debug_event( "tool_result", str(tool_result), { "channel": "api", "session_id": req.session_id, "tool_name": tool_name, }, ) iterations += 1 else: final_response = ai_msg.content break if not final_response: final_response = "I encountered a processing limit. How else can I help?" queue_debug_event( "assistant_reply", final_response, {"channel": "api", "session_id": req.session_id}, ) queue_maybe_notify_arun( user_input=req.message, final_response=final_response, scratchpad=scratchpad, tool_map=global_tool_map, user_metadata={"channel": "api", "session_id": req.session_id}, pre_notified=bool(pre_escalation and pre_escalation.get("handled")), ) memory.add_interaction(req.message, final_response) queue_chat_history_to_telegram(req.session_id, req.message, final_response) yield json.dumps({ "type": "final", "reply": final_response, "thoughts": thoughts, "session_id": req.session_id, }) + "\n" except Exception as e: queue_debug_event( "error", str(e), {"channel": "api", "session_id": req.session_id}, ) yield json.dumps({"type": "error", "content": str(e)}) + "\n" return StreamingResponse(event_generator(), media_type="application/x-ndjson") return StreamingResponse(event_generator(), media_type="application/x-ndjson") @app.get("/health") async def health_check(): return {"status": "online", "active_sessions": len(active_sessions)} @app.get("/test-telegram") def test_telegram(): import os, urllib.request, json, traceback, ssl token = os.getenv("TELEGRAM_BOT_TOKEN") chat_id = os.getenv("TELEGRAM_CHAT_ID") if not token or not chat_id: return {"status": "error", "message": "Missing credentials", "has_token": bool(token), "has_chat_id": bool(chat_id)} token = token.strip(' "\'') chat_id = chat_id.strip(' "\'') url = f"https://api.telegram.org/bot{token}/sendMessage" payload = {"chat_id": chat_id, "text": "Test message from HuggingFace backend using urllib!"} data = json.dumps(payload).encode('utf-8') headers = {'Content-Type': 'application/json', 'User-Agent': 'ArunCore/1.0'} ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE try: req = urllib.request.Request(url, data=data, headers=headers, method='POST') with urllib.request.urlopen(req, timeout=10, context=ctx) as response: resp_text = response.read().decode('utf-8') return {"status": "finished", "status_code": response.status, "response": resp_text} except Exception as e: return {"status": "exception", "error": str(e), "traceback": traceback.format_exc()} if __name__ == "__main__": import uvicorn uvicorn.run("core.api:app", host="0.0.0.0", port=8000, reload=True)