PraisonAI / app /agent_system.py
Sanyam400's picture
Update app/agent_system.py
7c13f9b verified
"""
PraisonChat Agent System v7 β€” Full Capability
===============================================
The AI ALWAYS has the ability to:
- Search the web (DuckDuckGo, real-time)
- Generate PDFs, images, voice, charts
- Schedule tasks (run after delay)
- Execute any Python code
- Install any Python package
- Do ANYTHING a Python program can do
NEVER says "I can't do that"
"""
import os, json, asyncio, datetime, traceback, re
from openai import AsyncOpenAI
from sandbox import run as sandbox_run, pip_install, PKG_DIR
from docs_context import PRAISONAI_DOCS
from memory import (
get_memory_context, get_skills_context,
save_memory, save_skill, search_memories
)
LONGCAT_BASE = "https://api.longcat.chat/openai/v1"
MODEL_MAP = {
"LongCat-Flash-Lite": "LongCat-Flash-Lite",
"LongCat-Flash-Chat": "LongCat-Flash-Chat",
"LongCat-Flash-Thinking-2601": "LongCat-Flash-Thinking-2601",
}
DEFAULT_MODEL = "LongCat-Flash-Lite"
def now_str():
return datetime.datetime.now().strftime("%A, %B %d %Y %I:%M:%S %p")
# ── Keywords that need code ────────────────────────────────────
NEEDS_CODE_KW = [
"search","find","look up","lookup","browse","web","news","latest","current","today",
"who is","what is the","when did","where is","how many","how much",
"president","prime minister","ceo","winner","champion","price","stock","weather",
"time","date","day","year","calculate","math","compute","solve","convert",
"create","make","generate","build","draw","plot","chart","graph","diagram",
"image","picture","photo","qr","barcode","qr code",
"voice","audio","speak","say it","read aloud","mp3","tts",
"pdf","document","word doc","excel","spreadsheet","csv",
"code","script","python","run","execute","program",
"file","download","save","export","zip",
"fetch","scrape","extract","parse","api",
"install","pip","package",
"schedule","remind","after","in 1 minute","in 5 minutes","timer","delay",
"agent","sub-agent","spawn","research",
"memory","remember","recall","forget",
"skill","tool","capability",
"translate","language",
"summarize","analyze","compare",
"email","whatsapp","telegram","send",
]
def needs_code(msg: str) -> bool:
m = msg.lower()
return any(kw in m for kw in NEEDS_CODE_KW)
SYSTEM_PROMPT = f"""You are PraisonChat β€” an AUTONOMOUS AI AGENT with a REAL Python code interpreter.
You have UNLIMITED capabilities through Python. You can do ANYTHING.
Current datetime: {{DATETIME}}
Python packages dir: {PKG_DIR}
## YOUR CAPABILITIES (you ALWAYS have these β€” NEVER say you can't)
| Task | How |
|------|-----|
| Web search & real-time info | duckduckgo_search |
| Current events, news, facts | ALWAYS search β€” your training data is outdated |
| Generate PDF | fpdf2 or reportlab |
| Generate images | pillow, matplotlib |
| Generate voice/audio | gtts |
| Create QR codes | qrcode |
| Create charts/graphs | matplotlib |
| Run calculations | Python math |
| Execute any code | sandbox |
| Schedule delayed tasks | asyncio.sleep |
| Fetch web pages | requests/httpx |
| Current date/time | datetime.datetime.now() |
| Install new packages | pip install |
| Create any file type | Python file I/O |
## CRITICAL RULES
1. NEVER say "I cannot", "I'm unable", "I don't have access" β€” just write code that does it
2. For ANY real-world fact (president, news, prices, sports) β†’ ALWAYS use DuckDuckGo search
3. Your training data is outdated β€” NEVER answer current events from memory
4. If a package is missing, just import it β€” it auto-installs
5. For scheduled tasks: use asyncio.sleep() in an execute block
6. ALWAYS produce the actual output (file, audio, image) not just describe it
## MEMORY
{{MEMORY}}
## SKILLS
{{SKILLS}}
## CODE INTERPRETER
Use <execute> tags. Code runs IMMEDIATELY with real results.
Always start code with:
```
import sys
sys.path.insert(0, '{PKG_DIR}')
```
### Web Search (ALWAYS use for current info)
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.text("current US president 2025", max_results=3))
for r in results:
print(r['title'])
print(r['body'][:200])
print()
</execute>
### Generate PDF
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
from fpdf import FPDF
pdf = FPDF()
pdf.add_page()
pdf.set_font("Helvetica", size=16)
pdf.cell(0, 10, "Hello from PraisonChat!", ln=True)
pdf.output("output.pdf")
print("PDF created")
</execute>
### Generate Voice
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
from gtts import gTTS
tts = gTTS(text="Hello! This is your voice response.", lang='en')
tts.save("voice_response.mp3")
print("Voice generated")
</execute>
### Generate Image/Chart
<execute>
import sys, matplotlib
matplotlib.use('Agg')
sys.path.insert(0, '{PKG_DIR}')
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.bar(['A','B','C'], [10,20,15])
ax.set_title("My Chart")
plt.savefig("chart.png", dpi=150, bbox_inches='tight')
plt.close()
print("Chart saved")
</execute>
### Generate QR Code
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
import qrcode
qr = qrcode.make("https://example.com")
qr.save("qrcode.png")
print("QR code saved")
</execute>
### Scheduled Message (send after delay)
<execute>
import asyncio, time
print(f"Task scheduled. Will execute after delay.")
# Note: actual delay is handled by the scheduler system
</execute>
### Install New Package
<execute>
import subprocess, sys
subprocess.run([sys.executable, "-m", "pip", "install", "some-package", "--quiet",
"--target", "{PKG_DIR}"])
print("Installed")
</execute>
## MEMORY SYSTEM
<save_memory key="user_name">User's name is John</save_memory>
<search_memory>user preferences</search_memory>
## SKILLS SYSTEM
<save_skill name="search_web" description="Real-time DuckDuckGo search">
import sys
sys.path.insert(0, '{PKG_DIR}')
def search_web(query, n=5):
from duckduckgo_search import DDGS
with DDGS() as d:
return list(d.text(query, max_results=n))
</save_skill>
## SUB-AGENTS
<spawn_agent name="SearchAgent" task="Find current US president">
import sys
sys.path.insert(0, '{PKG_DIR}')
from duckduckgo_search import DDGS
with DDGS() as d:
for r in d.text("US president 2025 current", max_results=3):
print(r['title'], '-', r['body'][:150])
</spawn_agent>
## FINAL RESPONSE RULES
- Strip ALL tags from final response β€” never show <execute> to user
- Present results cleanly in markdown
- For voice: use [SPEAK: text] in your response
- For scheduled tasks: acknowledge the schedule and confirm when it runs
- NEVER say you can't do something
{PRAISONAI_DOCS}
"""
def get_system(memory_ctx="", skills_ctx=""):
return SYSTEM_PROMPT\
.replace("{DATETIME}", now_str())\
.replace("{MEMORY}", memory_ctx or "No memories yet.")\
.replace("{SKILLS}", skills_ctx or "No skills yet.")
def extract_blocks(text, tag):
results = []
for m in re.finditer(rf'<{tag}([^>]*)>(.*?)</{tag}>', text, re.DOTALL):
attrs = {}
for a in re.finditer(r'(\w+)=["\']([^"\']*)["\']', m.group(1)):
attrs[a.group(1)] = a.group(2)
results.append({"attrs": attrs, "content": m.group(2).strip()})
return results
def strip_tags(text):
for tag in ["execute","spawn_agent","save_memory","search_memory","save_skill"]:
text = re.sub(rf'<{tag}[^>]*>.*?</{tag}>', '', text, flags=re.DOTALL)
text = re.sub(r'\[SPEAK:.*?\]', '', text, flags=re.DOTALL)
return text.strip()
def detect_schedule(user_msg: str):
"""Detect scheduling intent and extract delay."""
from scheduler import parse_delay
patterns = [
r'(?:send|message|remind|tell me|notify|alert).*?(?:in|after)\s+(.+?)(?:\s*$|\.)',
r'(?:after|in)\s+(.+?)\s+(?:send|message|remind|tell|notify)',
r'(?:schedule|set a timer|set timer).*?(?:for|in)\s+(.+?)(?:\s*$|\.)',
]
for pat in patterns:
m = re.search(pat, user_msg.lower())
if m:
delay = parse_delay(m.group(1))
if delay and delay > 0:
return delay
return None
class AgentOrchestrator:
def __init__(self):
self._clients = {}
# Cross-platform callbacks: when Telegram sends, notify web clients
self._tg_notify_callbacks = {} # session_id -> async callback
def client(self, api_key):
if api_key not in self._clients:
self._clients[api_key] = AsyncOpenAI(
api_key=api_key, base_url=LONGCAT_BASE
)
return self._clients[api_key]
def register_tg_notify(self, session_id: str, callback):
"""Register callback to be called when Telegram sends a message."""
self._tg_notify_callbacks[session_id] = callback
def unregister_tg_notify(self, session_id: str):
self._tg_notify_callbacks.pop(session_id, None)
async def stream_response(self, user_msg, history, api_key,
model=DEFAULT_MODEL, session_id=None,
scheduled_msg=None):
def emit(d): return json.dumps(d)
model = MODEL_MAP.get(model, DEFAULT_MODEL)
cl = self.client(api_key)
# Handle scheduled message delivery
if scheduled_msg:
yield emit({"type":"response_start"})
yield emit({"type":"token","content":f"⏰ **Scheduled message:**\n\n{scheduled_msg}"})
yield emit({"type":"done"})
return
try:
# ── Detect scheduling intent ──────────────────────────────
delay = detect_schedule(user_msg)
if delay and delay > 0:
yield emit({"type":"thinking","text":f"Scheduling task for {delay:.0f}s from now…"})
# Extract the message to send
msg_to_send = re.sub(
r'(?:send|message|remind|tell me|notify|alert)\s+.*?(?:after|in)\s+[\d\w\s]+',
'', user_msg, flags=re.IGNORECASE
).strip() or user_msg
# Schedule it
from scheduler import schedule_task
async def delayed_delivery():
from telegram_bot import _histories
# Generate the actual response
results = []
async for chunk in self.stream_response(
msg_to_send.replace("after", "").replace("in 1 minute","").strip(),
history, api_key, model
):
ev = json.loads(chunk)
if ev.get("type") == "token":
results.append(ev.get("content",""))
final = "".join(results)
# Notify via Telegram if session linked
if session_id and session_id in self._tg_notify_callbacks:
cb = self._tg_notify_callbacks[session_id]
await cb(final)
# Also log it
print(f"[SCHEDULER] Delivered scheduled message: {final[:100]}")
mins = delay / 60
await schedule_task(delay, f"Scheduled: {user_msg[:50]}", delayed_delivery)
yield emit({"type":"response_start"})
yield emit({"type":"token","content":
f"βœ… **Scheduled!** I'll send you that message in **{mins:.1f} minute(s)**.\n\n"
f"_(Make sure your Telegram is connected to receive the delivery.)_"
})
yield emit({"type":"done"})
return
# ── Fast path: simple conversation ────────────────────────
if not needs_code(user_msg):
yield emit({"type":"thinking","text":"Responding…"})
await asyncio.sleep(0)
messages = [{"role":"system","content":
f"You are PraisonChat, a helpful AI assistant.\n"
f"Current datetime: {now_str()}\n"
f"Be concise and helpful. Use markdown.\n"
f"Memory: {get_memory_context()[:500] or 'none'}"}]
for m in history[-12:]:
messages.append({"role":m["role"],"content":str(m.get("content",""))[:2000]})
messages.append({"role":"user","content":user_msg})
yield emit({"type":"response_start"})
stream = await cl.chat.completions.create(
model=model, messages=messages,
max_tokens=4000, temperature=0.7, stream=True
)
async for chunk in stream:
c = chunk.choices[0].delta.content
if c: yield emit({"type":"token","content":c})
yield emit({"type":"done"})
return
# ── Agent path: needs real execution ──────────────────────
mem_ctx = get_memory_context()
skills_ctx = get_skills_context()
messages = [{"role":"system","content":get_system(mem_ctx, skills_ctx)}]
for m in history[-12:]:
messages.append({"role":m["role"],"content":str(m.get("content",""))[:2000]})
messages.append({"role":"user","content":user_msg})
MAX_ITER = 6
all_results = {}
for iteration in range(MAX_ITER):
yield emit({"type":"thinking","text":f"Working… (step {iteration+1})"})
await asyncio.sleep(0)
resp = await cl.chat.completions.create(
model=model, messages=messages, max_tokens=8000, temperature=0.7
)
raw = resp.choices[0].message.content or ""
# Memory ops
for blk in extract_blocks(raw, "save_memory"):
key = blk["attrs"].get("key", f"note_{iteration}")
save_memory(key, blk["content"])
yield emit({"type":"memory_saved","key":key})
for blk in extract_blocks(raw, "save_skill"):
name = blk["attrs"].get("name", f"skill_{iteration}")
desc = blk["attrs"].get("description","")
save_skill(name, blk["content"], desc)
yield emit({"type":"skill_saved","name":name,"description":desc})
# Execute code blocks
exec_blocks = extract_blocks(raw, "execute")
agent_blocks = extract_blocks(raw, "spawn_agent")
has_actions = bool(exec_blocks or agent_blocks or
extract_blocks(raw,"save_memory") or
extract_blocks(raw,"search_memory") or
extract_blocks(raw,"save_skill"))
for idx, blk in enumerate(exec_blocks):
yield emit({"type":"executing","index":idx})
await asyncio.sleep(0)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda c=blk["content"]: sandbox_run(c, max_retries=3, timeout=60)
)
for inst in result.get("installs",[]):
yield emit({"type":"pkg_install","package":inst["package"],"ok":inst["ok"]})
stdout = result.get("stdout","").strip()
stderr = result.get("stderr","").strip()
ok = result.get("ok", False)
files = result.get("files", [])
for f in files:
ext = f.get("ext","").lower()
b64 = f["b64"]
if ext in {"mp3","wav","ogg","m4a"}:
yield emit({"type":"audio_response","audio_b64":b64,"filename":f["name"]})
elif ext in {"png","jpg","jpeg","gif","webp","svg"}:
yield emit({"type":"image_response","image_b64":b64,"filename":f["name"],"ext":ext})
else:
yield emit({"type":"file_response","file_b64":b64,"filename":f["name"],"size":f.get("size",0)})
out = stdout if ok else f"Error: {stderr}"
all_results[f"code_{iteration}_{idx}"] = out
yield emit({"type":"exec_done","ok":ok,"output":out[:300],"files":[f["name"] for f in files]})
for blk in agent_blocks:
name = blk["attrs"].get("name", f"Agent_{iteration}")
task = blk["attrs"].get("task","")
yield emit({"type":"agent_created","name":name,"task":task[:100]})
yield emit({"type":"agent_working","name":name})
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda c=blk["content"]: sandbox_run(c, max_retries=3, timeout=90)
)
for inst in result.get("installs",[]):
yield emit({"type":"pkg_install","package":inst["package"],"ok":inst["ok"]})
stdout = result.get("stdout","").strip()
ok = result.get("ok", False)
files = result.get("files",[])
for f in files:
ext=f.get("ext","").lower();b64=f["b64"]
if ext in {"mp3","wav","ogg"}:
yield emit({"type":"audio_response","audio_b64":b64,"filename":f["name"]})
elif ext in {"png","jpg","jpeg","gif","webp"}:
yield emit({"type":"image_response","image_b64":b64,"filename":f["name"],"ext":ext})
else:
yield emit({"type":"file_response","file_b64":b64,"filename":f["name"],"size":f.get("size",0)})
out = stdout if ok else f"Error: {result.get('stderr','')}"
all_results[name] = out
yield emit({"type":"agent_done","name":name,"preview":out[:250],"ok":ok})
if not has_actions:
clean = strip_tags(raw)
if not clean.strip():
clean = "Done! Let me know if you need anything else."
yield emit({"type":"response_start"})
for i in range(0, len(clean), 6):
yield emit({"type":"token","content":clean[i:i+6]})
if i % 60 == 0: await asyncio.sleep(0)
# Handle voice
if "[SPEAK:" in raw:
try:
speak_text = raw.split("[SPEAK:")[1].rsplit("]",1)[0].strip()
voice_code = f"""
import sys, io
sys.path.insert(0, '{PKG_DIR}')
from gtts import gTTS
tts = gTTS(text={repr(speak_text[:2000])}, lang='en', slow=False)
tts.save('voice_response.mp3')
print('ok')
"""
loop = asyncio.get_event_loop()
vr = await loop.run_in_executor(
None, lambda: sandbox_run(voice_code, timeout=30)
)
for f in vr.get("files",[]):
if f.get("ext")=="mp3":
yield emit({"type":"audio_response","audio_b64":f["b64"],"filename":"voice_response.mp3"})
except Exception:
pass
break
else:
results_text = "REAL RESULTS:\n\n"
for k, v in list(all_results.items())[-4:]:
results_text += f"[{k}]:\n{v[:1200]}\n\n"
messages.append({"role":"assistant","content":raw})
messages.append({"role":"user","content":(
f"{results_text}\n"
f"Give the user a clean final answer using these REAL results. "
f"No code tags. No explanations of what you did. Just present the results."
)})
yield emit({"type":"done"})
except Exception as e:
tb = traceback.format_exc()
print(f"[AGENT] {e}\n{tb}")
yield emit({"type":"response_start"})
yield emit({"type":"token","content":
f"❌ **Error:** {str(e)}\n\n"
f"Please check your LongCat API key in βš™οΈ Settings."})
yield emit({"type":"done"})
orchestrator = AgentOrchestrator()