""" PraisonChat Agent System v7 — Full Capability =============================================== The AI ALWAYS has the ability to: - Search the web (DuckDuckGo, real-time) - Generate PDFs, images, voice, charts - Schedule tasks (run after delay) - Execute any Python code - Install any Python package - Do ANYTHING a Python program can do NEVER says "I can't do that" """ import os, json, asyncio, datetime, traceback, re from openai import AsyncOpenAI from sandbox import run as sandbox_run, pip_install, PKG_DIR from docs_context import PRAISONAI_DOCS from memory import ( get_memory_context, get_skills_context, save_memory, save_skill, search_memories ) LONGCAT_BASE = "https://api.longcat.chat/openai/v1" MODEL_MAP = { "LongCat-Flash-Lite": "LongCat-Flash-Lite", "LongCat-Flash-Chat": "LongCat-Flash-Chat", "LongCat-Flash-Thinking-2601": "LongCat-Flash-Thinking-2601", } DEFAULT_MODEL = "LongCat-Flash-Lite" def now_str(): return datetime.datetime.now().strftime("%A, %B %d %Y %I:%M:%S %p") # ── Keywords that need code ──────────────────────────────────── NEEDS_CODE_KW = [ "search","find","look up","lookup","browse","web","news","latest","current","today", "who is","what is the","when did","where is","how many","how much", "president","prime minister","ceo","winner","champion","price","stock","weather", "time","date","day","year","calculate","math","compute","solve","convert", "create","make","generate","build","draw","plot","chart","graph","diagram", "image","picture","photo","qr","barcode","qr code", "voice","audio","speak","say it","read aloud","mp3","tts", "pdf","document","word doc","excel","spreadsheet","csv", "code","script","python","run","execute","program", "file","download","save","export","zip", "fetch","scrape","extract","parse","api", "install","pip","package", "schedule","remind","after","in 1 minute","in 5 minutes","timer","delay", "agent","sub-agent","spawn","research", "memory","remember","recall","forget", "skill","tool","capability", "translate","language", "summarize","analyze","compare", "email","whatsapp","telegram","send", ] def needs_code(msg: str) -> bool: m = msg.lower() return any(kw in m for kw in NEEDS_CODE_KW) SYSTEM_PROMPT = f"""You are PraisonChat — an AUTONOMOUS AI AGENT with a REAL Python code interpreter. You have UNLIMITED capabilities through Python. You can do ANYTHING. Current datetime: {{DATETIME}} Python packages dir: {PKG_DIR} ## YOUR CAPABILITIES (you ALWAYS have these — NEVER say you can't) | Task | How | |------|-----| | Web search & real-time info | duckduckgo_search | | Current events, news, facts | ALWAYS search — your training data is outdated | | Generate PDF | fpdf2 or reportlab | | Generate images | pillow, matplotlib | | Generate voice/audio | gtts | | Create QR codes | qrcode | | Create charts/graphs | matplotlib | | Run calculations | Python math | | Execute any code | sandbox | | Schedule delayed tasks | asyncio.sleep | | Fetch web pages | requests/httpx | | Current date/time | datetime.datetime.now() | | Install new packages | pip install | | Create any file type | Python file I/O | ## CRITICAL RULES 1. NEVER say "I cannot", "I'm unable", "I don't have access" — just write code that does it 2. For ANY real-world fact (president, news, prices, sports) → ALWAYS use DuckDuckGo search 3. Your training data is outdated — NEVER answer current events from memory 4. If a package is missing, just import it — it auto-installs 5. For scheduled tasks: use asyncio.sleep() in an execute block 6. ALWAYS produce the actual output (file, audio, image) not just describe it ## MEMORY {{MEMORY}} ## SKILLS {{SKILLS}} ## CODE INTERPRETER Use tags. Code runs IMMEDIATELY with real results. Always start code with: ``` import sys sys.path.insert(0, '{PKG_DIR}') ``` ### Web Search (ALWAYS use for current info) import sys sys.path.insert(0, '{PKG_DIR}') from duckduckgo_search import DDGS with DDGS() as ddgs: results = list(ddgs.text("current US president 2025", max_results=3)) for r in results: print(r['title']) print(r['body'][:200]) print() ### Generate PDF import sys sys.path.insert(0, '{PKG_DIR}') from fpdf import FPDF pdf = FPDF() pdf.add_page() pdf.set_font("Helvetica", size=16) pdf.cell(0, 10, "Hello from PraisonChat!", ln=True) pdf.output("output.pdf") print("PDF created") ### Generate Voice import sys sys.path.insert(0, '{PKG_DIR}') from gtts import gTTS tts = gTTS(text="Hello! This is your voice response.", lang='en') tts.save("voice_response.mp3") print("Voice generated") ### Generate Image/Chart import sys, matplotlib matplotlib.use('Agg') sys.path.insert(0, '{PKG_DIR}') import matplotlib.pyplot as plt fig, ax = plt.subplots() ax.bar(['A','B','C'], [10,20,15]) ax.set_title("My Chart") plt.savefig("chart.png", dpi=150, bbox_inches='tight') plt.close() print("Chart saved") ### Generate QR Code import sys sys.path.insert(0, '{PKG_DIR}') import qrcode qr = qrcode.make("https://example.com") qr.save("qrcode.png") print("QR code saved") ### Scheduled Message (send after delay) import asyncio, time print(f"Task scheduled. Will execute after delay.") # Note: actual delay is handled by the scheduler system ### Install New Package import subprocess, sys subprocess.run([sys.executable, "-m", "pip", "install", "some-package", "--quiet", "--target", "{PKG_DIR}"]) print("Installed") ## MEMORY SYSTEM User's name is John user preferences ## SKILLS SYSTEM import sys sys.path.insert(0, '{PKG_DIR}') def search_web(query, n=5): from duckduckgo_search import DDGS with DDGS() as d: return list(d.text(query, max_results=n)) ## SUB-AGENTS import sys sys.path.insert(0, '{PKG_DIR}') from duckduckgo_search import DDGS with DDGS() as d: for r in d.text("US president 2025 current", max_results=3): print(r['title'], '-', r['body'][:150]) ## FINAL RESPONSE RULES - Strip ALL tags from final response — never show to user - Present results cleanly in markdown - For voice: use [SPEAK: text] in your response - For scheduled tasks: acknowledge the schedule and confirm when it runs - NEVER say you can't do something {PRAISONAI_DOCS} """ def get_system(memory_ctx="", skills_ctx=""): return SYSTEM_PROMPT\ .replace("{DATETIME}", now_str())\ .replace("{MEMORY}", memory_ctx or "No memories yet.")\ .replace("{SKILLS}", skills_ctx or "No skills yet.") def extract_blocks(text, tag): results = [] for m in re.finditer(rf'<{tag}([^>]*)>(.*?)', text, re.DOTALL): attrs = {} for a in re.finditer(r'(\w+)=["\']([^"\']*)["\']', m.group(1)): attrs[a.group(1)] = a.group(2) results.append({"attrs": attrs, "content": m.group(2).strip()}) return results def strip_tags(text): for tag in ["execute","spawn_agent","save_memory","search_memory","save_skill"]: text = re.sub(rf'<{tag}[^>]*>.*?', '', text, flags=re.DOTALL) text = re.sub(r'\[SPEAK:.*?\]', '', text, flags=re.DOTALL) return text.strip() def detect_schedule(user_msg: str): """Detect scheduling intent and extract delay.""" from scheduler import parse_delay patterns = [ r'(?:send|message|remind|tell me|notify|alert).*?(?:in|after)\s+(.+?)(?:\s*$|\.)', r'(?:after|in)\s+(.+?)\s+(?:send|message|remind|tell|notify)', r'(?:schedule|set a timer|set timer).*?(?:for|in)\s+(.+?)(?:\s*$|\.)', ] for pat in patterns: m = re.search(pat, user_msg.lower()) if m: delay = parse_delay(m.group(1)) if delay and delay > 0: return delay return None class AgentOrchestrator: def __init__(self): self._clients = {} # Cross-platform callbacks: when Telegram sends, notify web clients self._tg_notify_callbacks = {} # session_id -> async callback def client(self, api_key): if api_key not in self._clients: self._clients[api_key] = AsyncOpenAI( api_key=api_key, base_url=LONGCAT_BASE ) return self._clients[api_key] def register_tg_notify(self, session_id: str, callback): """Register callback to be called when Telegram sends a message.""" self._tg_notify_callbacks[session_id] = callback def unregister_tg_notify(self, session_id: str): self._tg_notify_callbacks.pop(session_id, None) async def stream_response(self, user_msg, history, api_key, model=DEFAULT_MODEL, session_id=None, scheduled_msg=None): def emit(d): return json.dumps(d) model = MODEL_MAP.get(model, DEFAULT_MODEL) cl = self.client(api_key) # Handle scheduled message delivery if scheduled_msg: yield emit({"type":"response_start"}) yield emit({"type":"token","content":f"⏰ **Scheduled message:**\n\n{scheduled_msg}"}) yield emit({"type":"done"}) return try: # ── Detect scheduling intent ────────────────────────────── delay = detect_schedule(user_msg) if delay and delay > 0: yield emit({"type":"thinking","text":f"Scheduling task for {delay:.0f}s from now…"}) # Extract the message to send msg_to_send = re.sub( r'(?:send|message|remind|tell me|notify|alert)\s+.*?(?:after|in)\s+[\d\w\s]+', '', user_msg, flags=re.IGNORECASE ).strip() or user_msg # Schedule it from scheduler import schedule_task async def delayed_delivery(): from telegram_bot import _histories # Generate the actual response results = [] async for chunk in self.stream_response( msg_to_send.replace("after", "").replace("in 1 minute","").strip(), history, api_key, model ): ev = json.loads(chunk) if ev.get("type") == "token": results.append(ev.get("content","")) final = "".join(results) # Notify via Telegram if session linked if session_id and session_id in self._tg_notify_callbacks: cb = self._tg_notify_callbacks[session_id] await cb(final) # Also log it print(f"[SCHEDULER] Delivered scheduled message: {final[:100]}") mins = delay / 60 await schedule_task(delay, f"Scheduled: {user_msg[:50]}", delayed_delivery) yield emit({"type":"response_start"}) yield emit({"type":"token","content": f"✅ **Scheduled!** I'll send you that message in **{mins:.1f} minute(s)**.\n\n" f"_(Make sure your Telegram is connected to receive the delivery.)_" }) yield emit({"type":"done"}) return # ── Fast path: simple conversation ──────────────────────── if not needs_code(user_msg): yield emit({"type":"thinking","text":"Responding…"}) await asyncio.sleep(0) messages = [{"role":"system","content": f"You are PraisonChat, a helpful AI assistant.\n" f"Current datetime: {now_str()}\n" f"Be concise and helpful. Use markdown.\n" f"Memory: {get_memory_context()[:500] or 'none'}"}] for m in history[-12:]: messages.append({"role":m["role"],"content":str(m.get("content",""))[:2000]}) messages.append({"role":"user","content":user_msg}) yield emit({"type":"response_start"}) stream = await cl.chat.completions.create( model=model, messages=messages, max_tokens=4000, temperature=0.7, stream=True ) async for chunk in stream: c = chunk.choices[0].delta.content if c: yield emit({"type":"token","content":c}) yield emit({"type":"done"}) return # ── Agent path: needs real execution ────────────────────── mem_ctx = get_memory_context() skills_ctx = get_skills_context() messages = [{"role":"system","content":get_system(mem_ctx, skills_ctx)}] for m in history[-12:]: messages.append({"role":m["role"],"content":str(m.get("content",""))[:2000]}) messages.append({"role":"user","content":user_msg}) MAX_ITER = 6 all_results = {} for iteration in range(MAX_ITER): yield emit({"type":"thinking","text":f"Working… (step {iteration+1})"}) await asyncio.sleep(0) resp = await cl.chat.completions.create( model=model, messages=messages, max_tokens=8000, temperature=0.7 ) raw = resp.choices[0].message.content or "" # Memory ops for blk in extract_blocks(raw, "save_memory"): key = blk["attrs"].get("key", f"note_{iteration}") save_memory(key, blk["content"]) yield emit({"type":"memory_saved","key":key}) for blk in extract_blocks(raw, "save_skill"): name = blk["attrs"].get("name", f"skill_{iteration}") desc = blk["attrs"].get("description","") save_skill(name, blk["content"], desc) yield emit({"type":"skill_saved","name":name,"description":desc}) # Execute code blocks exec_blocks = extract_blocks(raw, "execute") agent_blocks = extract_blocks(raw, "spawn_agent") has_actions = bool(exec_blocks or agent_blocks or extract_blocks(raw,"save_memory") or extract_blocks(raw,"search_memory") or extract_blocks(raw,"save_skill")) for idx, blk in enumerate(exec_blocks): yield emit({"type":"executing","index":idx}) await asyncio.sleep(0) loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda c=blk["content"]: sandbox_run(c, max_retries=3, timeout=60) ) for inst in result.get("installs",[]): yield emit({"type":"pkg_install","package":inst["package"],"ok":inst["ok"]}) stdout = result.get("stdout","").strip() stderr = result.get("stderr","").strip() ok = result.get("ok", False) files = result.get("files", []) for f in files: ext = f.get("ext","").lower() b64 = f["b64"] if ext in {"mp3","wav","ogg","m4a"}: yield emit({"type":"audio_response","audio_b64":b64,"filename":f["name"]}) elif ext in {"png","jpg","jpeg","gif","webp","svg"}: yield emit({"type":"image_response","image_b64":b64,"filename":f["name"],"ext":ext}) else: yield emit({"type":"file_response","file_b64":b64,"filename":f["name"],"size":f.get("size",0)}) out = stdout if ok else f"Error: {stderr}" all_results[f"code_{iteration}_{idx}"] = out yield emit({"type":"exec_done","ok":ok,"output":out[:300],"files":[f["name"] for f in files]}) for blk in agent_blocks: name = blk["attrs"].get("name", f"Agent_{iteration}") task = blk["attrs"].get("task","") yield emit({"type":"agent_created","name":name,"task":task[:100]}) yield emit({"type":"agent_working","name":name}) loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, lambda c=blk["content"]: sandbox_run(c, max_retries=3, timeout=90) ) for inst in result.get("installs",[]): yield emit({"type":"pkg_install","package":inst["package"],"ok":inst["ok"]}) stdout = result.get("stdout","").strip() ok = result.get("ok", False) files = result.get("files",[]) for f in files: ext=f.get("ext","").lower();b64=f["b64"] if ext in {"mp3","wav","ogg"}: yield emit({"type":"audio_response","audio_b64":b64,"filename":f["name"]}) elif ext in {"png","jpg","jpeg","gif","webp"}: yield emit({"type":"image_response","image_b64":b64,"filename":f["name"],"ext":ext}) else: yield emit({"type":"file_response","file_b64":b64,"filename":f["name"],"size":f.get("size",0)}) out = stdout if ok else f"Error: {result.get('stderr','')}" all_results[name] = out yield emit({"type":"agent_done","name":name,"preview":out[:250],"ok":ok}) if not has_actions: clean = strip_tags(raw) if not clean.strip(): clean = "Done! Let me know if you need anything else." yield emit({"type":"response_start"}) for i in range(0, len(clean), 6): yield emit({"type":"token","content":clean[i:i+6]}) if i % 60 == 0: await asyncio.sleep(0) # Handle voice if "[SPEAK:" in raw: try: speak_text = raw.split("[SPEAK:")[1].rsplit("]",1)[0].strip() voice_code = f""" import sys, io sys.path.insert(0, '{PKG_DIR}') from gtts import gTTS tts = gTTS(text={repr(speak_text[:2000])}, lang='en', slow=False) tts.save('voice_response.mp3') print('ok') """ loop = asyncio.get_event_loop() vr = await loop.run_in_executor( None, lambda: sandbox_run(voice_code, timeout=30) ) for f in vr.get("files",[]): if f.get("ext")=="mp3": yield emit({"type":"audio_response","audio_b64":f["b64"],"filename":"voice_response.mp3"}) except Exception: pass break else: results_text = "REAL RESULTS:\n\n" for k, v in list(all_results.items())[-4:]: results_text += f"[{k}]:\n{v[:1200]}\n\n" messages.append({"role":"assistant","content":raw}) messages.append({"role":"user","content":( f"{results_text}\n" f"Give the user a clean final answer using these REAL results. " f"No code tags. No explanations of what you did. Just present the results." )}) yield emit({"type":"done"}) except Exception as e: tb = traceback.format_exc() print(f"[AGENT] {e}\n{tb}") yield emit({"type":"response_start"}) yield emit({"type":"token","content": f"❌ **Error:** {str(e)}\n\n" f"Please check your LongCat API key in ⚙️ Settings."}) yield emit({"type":"done"}) orchestrator = AgentOrchestrator()