File size: 20,970 Bytes
6145277 7c13f9b 6145277 1ed2170 dd27c49 a24df4b dd27c49 1ed2170 8dc579c 1ed2170 dd27c49 7c13f9b ec32184 6145277 ec32184 1ed2170 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 1ed2170 dd27c49 7c13f9b 8dc579c a24df4b 7c13f9b 8dc579c a24df4b 1ed2170 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 1ed2170 7c13f9b 8dc579c 7c13f9b 8dc579c 1ed2170 8dc579c 7c13f9b 1ed2170 8dc579c 7c13f9b 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 7c13f9b a24df4b 8dc579c 7c13f9b 1ed2170 dd27c49 7c13f9b ec32184 1ed2170 6145277 7c13f9b 8dc579c 1ed2170 7c13f9b 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 7c13f9b 8dc579c 1ed2170 7c13f9b dd27c49 8dc579c 7c13f9b dd27c49 8dc579c 6145277 a24df4b ec32184 a24df4b dd27c49 7c13f9b 8dc579c 1ed2170 dd27c49 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b dd27c49 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c 7c13f9b 8dc579c a24df4b 1ed2170 7c13f9b a24df4b dd27c49 8dc579c 7c13f9b 8dc579c 1ed2170 7c13f9b 1ed2170 8dc579c 7c13f9b 1ed2170 8dc579c 1ed2170 7c13f9b 8dc579c 7c13f9b 1ed2170 7c13f9b 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 8dc579c 7c13f9b 1ed2170 8dc579c 1ed2170 a24df4b 8dc579c 6145277 8dc579c 1ed2170 a24df4b 8dc579c a24df4b 8dc579c 1ed2170 a24df4b 1ed2170 7c13f9b 1ed2170 8dc579c 1ed2170 7c13f9b 8dc579c 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 7c13f9b 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 8dc579c 1ed2170 7c13f9b 8dc579c 7c13f9b 1ed2170 7c13f9b dd27c49 a24df4b 8dc579c 7c13f9b dd27c49 ec32184 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 | """
PraisonChat Agent System v7 β Full Capability
===============================================
The AI ALWAYS has the ability to:
- Search the web (DuckDuckGo, real-time)
- Generate PDFs, images, voice, charts
- Schedule tasks (run after delay)
- Execute any Python code
- Install any Python package
- Do ANYTHING a Python program can do
NEVER says "I can't do that"
"""
import os, json, asyncio, datetime, traceback, re
from openai import AsyncOpenAI
from sandbox import run as sandbox_run, pip_install, PKG_DIR
from docs_context import PRAISONAI_DOCS
from memory import (
get_memory_context, get_skills_context,
save_memory, save_skill, search_memories
)
LONGCAT_BASE = "https://api.longcat.chat/openai/v1"
MODEL_MAP = {
"LongCat-Flash-Lite": "LongCat-Flash-Lite",
"LongCat-Flash-Chat": "LongCat-Flash-Chat",
"LongCat-Flash-Thinking-2601": "LongCat-Flash-Thinking-2601",
}
DEFAULT_MODEL = "LongCat-Flash-Lite"
def now_str():
return datetime.datetime.now().strftime("%A, %B %d %Y %I:%M:%S %p")
# ββ Keywords that need code ββββββββββββββββββββββββββββββββββββ
NEEDS_CODE_KW = [
"search","find","look up","lookup","browse","web","news","latest","current","today",
"who is","what is the","when did","where is","how many","how much",
"president","prime minister","ceo","winner","champion","price","stock","weather",
"time","date","day","year","calculate","math","compute","solve","convert",
"create","make","generate","build","draw","plot","chart","graph","diagram",
"image","picture","photo","qr","barcode","qr code",
"voice","audio","speak","say it","read aloud","mp3","tts",
"pdf","document","word doc","excel","spreadsheet","csv",
"code","script","python","run","execute","program",
"file","download","save","export","zip",
"fetch","scrape","extract","parse","api",
"install","pip","package",
"schedule","remind","after","in 1 minute","in 5 minutes","timer","delay",
"agent","sub-agent","spawn","research",
"memory","remember","recall","forget",
"skill","tool","capability",
"translate","language",
"summarize","analyze","compare",
"email","whatsapp","telegram","send",
]
def needs_code(msg: str) -> bool:
m = msg.lower()
return any(kw in m for kw in NEEDS_CODE_KW)
SYSTEM_PROMPT = f"""You are PraisonChat β an AUTONOMOUS AI AGENT with a REAL Python code interpreter.
You have UNLIMITED capabilities through Python. You can do ANYTHING.
Current datetime: {{DATETIME}}
Python packages dir: {PKG_DIR}
## YOUR CAPABILITIES (you ALWAYS have these β NEVER say you can't)
| Task | How |
|------|-----|
| Web search & real-time info | duckduckgo_search |
| Current events, news, facts | ALWAYS search β your training data is outdated |
| Generate PDF | fpdf2 or reportlab |
| Generate images | pillow, matplotlib |
| Generate voice/audio | gtts |
| Create QR codes | qrcode |
| Create charts/graphs | matplotlib |
| Run calculations | Python math |
| Execute any code | sandbox |
| Schedule delayed tasks | asyncio.sleep |
| Fetch web pages | requests/httpx |
| Current date/time | datetime.datetime.now() |
| Install new packages | pip install |
| Create any file type | Python file I/O |
## CRITICAL RULES
1. NEVER say "I cannot", "I'm unable", "I don't have access" β just write code that does it
2. For ANY real-world fact (president, news, prices, sports) β ALWAYS use DuckDuckGo search
3. Your training data is outdated β NEVER answer current events from memory
4. If a package is missing, just import it β it auto-installs
5. For scheduled tasks: use asyncio.sleep() in an execute block
6. ALWAYS produce the actual output (file, audio, image) not just describe it
## MEMORY
{{MEMORY}}
## SKILLS
{{SKILLS}}
## CODE INTERPRETER
Use <execute> tags. Code runs IMMEDIATELY with real results.
Always start code with:
```
import sys
sys.path.insert(0, '{PKG_DIR}')
```
### Web Search (ALWAYS use for current info)
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.text("current US president 2025", max_results=3))
for r in results:
print(r['title'])
print(r['body'][:200])
print()
</execute>
### Generate PDF
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
from fpdf import FPDF
pdf = FPDF()
pdf.add_page()
pdf.set_font("Helvetica", size=16)
pdf.cell(0, 10, "Hello from PraisonChat!", ln=True)
pdf.output("output.pdf")
print("PDF created")
</execute>
### Generate Voice
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
from gtts import gTTS
tts = gTTS(text="Hello! This is your voice response.", lang='en')
tts.save("voice_response.mp3")
print("Voice generated")
</execute>
### Generate Image/Chart
<execute>
import sys, matplotlib
matplotlib.use('Agg')
sys.path.insert(0, '{PKG_DIR}')
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.bar(['A','B','C'], [10,20,15])
ax.set_title("My Chart")
plt.savefig("chart.png", dpi=150, bbox_inches='tight')
plt.close()
print("Chart saved")
</execute>
### Generate QR Code
<execute>
import sys
sys.path.insert(0, '{PKG_DIR}')
import qrcode
qr = qrcode.make("https://example.com")
qr.save("qrcode.png")
print("QR code saved")
</execute>
### Scheduled Message (send after delay)
<execute>
import asyncio, time
print(f"Task scheduled. Will execute after delay.")
# Note: actual delay is handled by the scheduler system
</execute>
### Install New Package
<execute>
import subprocess, sys
subprocess.run([sys.executable, "-m", "pip", "install", "some-package", "--quiet",
"--target", "{PKG_DIR}"])
print("Installed")
</execute>
## MEMORY SYSTEM
<save_memory key="user_name">User's name is John</save_memory>
<search_memory>user preferences</search_memory>
## SKILLS SYSTEM
<save_skill name="search_web" description="Real-time DuckDuckGo search">
import sys
sys.path.insert(0, '{PKG_DIR}')
def search_web(query, n=5):
from duckduckgo_search import DDGS
with DDGS() as d:
return list(d.text(query, max_results=n))
</save_skill>
## SUB-AGENTS
<spawn_agent name="SearchAgent" task="Find current US president">
import sys
sys.path.insert(0, '{PKG_DIR}')
from duckduckgo_search import DDGS
with DDGS() as d:
for r in d.text("US president 2025 current", max_results=3):
print(r['title'], '-', r['body'][:150])
</spawn_agent>
## FINAL RESPONSE RULES
- Strip ALL tags from final response β never show <execute> to user
- Present results cleanly in markdown
- For voice: use [SPEAK: text] in your response
- For scheduled tasks: acknowledge the schedule and confirm when it runs
- NEVER say you can't do something
{PRAISONAI_DOCS}
"""
def get_system(memory_ctx="", skills_ctx=""):
return SYSTEM_PROMPT\
.replace("{DATETIME}", now_str())\
.replace("{MEMORY}", memory_ctx or "No memories yet.")\
.replace("{SKILLS}", skills_ctx or "No skills yet.")
def extract_blocks(text, tag):
results = []
for m in re.finditer(rf'<{tag}([^>]*)>(.*?)</{tag}>', text, re.DOTALL):
attrs = {}
for a in re.finditer(r'(\w+)=["\']([^"\']*)["\']', m.group(1)):
attrs[a.group(1)] = a.group(2)
results.append({"attrs": attrs, "content": m.group(2).strip()})
return results
def strip_tags(text):
for tag in ["execute","spawn_agent","save_memory","search_memory","save_skill"]:
text = re.sub(rf'<{tag}[^>]*>.*?</{tag}>', '', text, flags=re.DOTALL)
text = re.sub(r'\[SPEAK:.*?\]', '', text, flags=re.DOTALL)
return text.strip()
def detect_schedule(user_msg: str):
"""Detect scheduling intent and extract delay."""
from scheduler import parse_delay
patterns = [
r'(?:send|message|remind|tell me|notify|alert).*?(?:in|after)\s+(.+?)(?:\s*$|\.)',
r'(?:after|in)\s+(.+?)\s+(?:send|message|remind|tell|notify)',
r'(?:schedule|set a timer|set timer).*?(?:for|in)\s+(.+?)(?:\s*$|\.)',
]
for pat in patterns:
m = re.search(pat, user_msg.lower())
if m:
delay = parse_delay(m.group(1))
if delay and delay > 0:
return delay
return None
class AgentOrchestrator:
def __init__(self):
self._clients = {}
# Cross-platform callbacks: when Telegram sends, notify web clients
self._tg_notify_callbacks = {} # session_id -> async callback
def client(self, api_key):
if api_key not in self._clients:
self._clients[api_key] = AsyncOpenAI(
api_key=api_key, base_url=LONGCAT_BASE
)
return self._clients[api_key]
def register_tg_notify(self, session_id: str, callback):
"""Register callback to be called when Telegram sends a message."""
self._tg_notify_callbacks[session_id] = callback
def unregister_tg_notify(self, session_id: str):
self._tg_notify_callbacks.pop(session_id, None)
async def stream_response(self, user_msg, history, api_key,
model=DEFAULT_MODEL, session_id=None,
scheduled_msg=None):
def emit(d): return json.dumps(d)
model = MODEL_MAP.get(model, DEFAULT_MODEL)
cl = self.client(api_key)
# Handle scheduled message delivery
if scheduled_msg:
yield emit({"type":"response_start"})
yield emit({"type":"token","content":f"β° **Scheduled message:**\n\n{scheduled_msg}"})
yield emit({"type":"done"})
return
try:
# ββ Detect scheduling intent ββββββββββββββββββββββββββββββ
delay = detect_schedule(user_msg)
if delay and delay > 0:
yield emit({"type":"thinking","text":f"Scheduling task for {delay:.0f}s from nowβ¦"})
# Extract the message to send
msg_to_send = re.sub(
r'(?:send|message|remind|tell me|notify|alert)\s+.*?(?:after|in)\s+[\d\w\s]+',
'', user_msg, flags=re.IGNORECASE
).strip() or user_msg
# Schedule it
from scheduler import schedule_task
async def delayed_delivery():
from telegram_bot import _histories
# Generate the actual response
results = []
async for chunk in self.stream_response(
msg_to_send.replace("after", "").replace("in 1 minute","").strip(),
history, api_key, model
):
ev = json.loads(chunk)
if ev.get("type") == "token":
results.append(ev.get("content",""))
final = "".join(results)
# Notify via Telegram if session linked
if session_id and session_id in self._tg_notify_callbacks:
cb = self._tg_notify_callbacks[session_id]
await cb(final)
# Also log it
print(f"[SCHEDULER] Delivered scheduled message: {final[:100]}")
mins = delay / 60
await schedule_task(delay, f"Scheduled: {user_msg[:50]}", delayed_delivery)
yield emit({"type":"response_start"})
yield emit({"type":"token","content":
f"β
**Scheduled!** I'll send you that message in **{mins:.1f} minute(s)**.\n\n"
f"_(Make sure your Telegram is connected to receive the delivery.)_"
})
yield emit({"type":"done"})
return
# ββ Fast path: simple conversation ββββββββββββββββββββββββ
if not needs_code(user_msg):
yield emit({"type":"thinking","text":"Respondingβ¦"})
await asyncio.sleep(0)
messages = [{"role":"system","content":
f"You are PraisonChat, a helpful AI assistant.\n"
f"Current datetime: {now_str()}\n"
f"Be concise and helpful. Use markdown.\n"
f"Memory: {get_memory_context()[:500] or 'none'}"}]
for m in history[-12:]:
messages.append({"role":m["role"],"content":str(m.get("content",""))[:2000]})
messages.append({"role":"user","content":user_msg})
yield emit({"type":"response_start"})
stream = await cl.chat.completions.create(
model=model, messages=messages,
max_tokens=4000, temperature=0.7, stream=True
)
async for chunk in stream:
c = chunk.choices[0].delta.content
if c: yield emit({"type":"token","content":c})
yield emit({"type":"done"})
return
# ββ Agent path: needs real execution ββββββββββββββββββββββ
mem_ctx = get_memory_context()
skills_ctx = get_skills_context()
messages = [{"role":"system","content":get_system(mem_ctx, skills_ctx)}]
for m in history[-12:]:
messages.append({"role":m["role"],"content":str(m.get("content",""))[:2000]})
messages.append({"role":"user","content":user_msg})
MAX_ITER = 6
all_results = {}
for iteration in range(MAX_ITER):
yield emit({"type":"thinking","text":f"Working⦠(step {iteration+1})"})
await asyncio.sleep(0)
resp = await cl.chat.completions.create(
model=model, messages=messages, max_tokens=8000, temperature=0.7
)
raw = resp.choices[0].message.content or ""
# Memory ops
for blk in extract_blocks(raw, "save_memory"):
key = blk["attrs"].get("key", f"note_{iteration}")
save_memory(key, blk["content"])
yield emit({"type":"memory_saved","key":key})
for blk in extract_blocks(raw, "save_skill"):
name = blk["attrs"].get("name", f"skill_{iteration}")
desc = blk["attrs"].get("description","")
save_skill(name, blk["content"], desc)
yield emit({"type":"skill_saved","name":name,"description":desc})
# Execute code blocks
exec_blocks = extract_blocks(raw, "execute")
agent_blocks = extract_blocks(raw, "spawn_agent")
has_actions = bool(exec_blocks or agent_blocks or
extract_blocks(raw,"save_memory") or
extract_blocks(raw,"search_memory") or
extract_blocks(raw,"save_skill"))
for idx, blk in enumerate(exec_blocks):
yield emit({"type":"executing","index":idx})
await asyncio.sleep(0)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda c=blk["content"]: sandbox_run(c, max_retries=3, timeout=60)
)
for inst in result.get("installs",[]):
yield emit({"type":"pkg_install","package":inst["package"],"ok":inst["ok"]})
stdout = result.get("stdout","").strip()
stderr = result.get("stderr","").strip()
ok = result.get("ok", False)
files = result.get("files", [])
for f in files:
ext = f.get("ext","").lower()
b64 = f["b64"]
if ext in {"mp3","wav","ogg","m4a"}:
yield emit({"type":"audio_response","audio_b64":b64,"filename":f["name"]})
elif ext in {"png","jpg","jpeg","gif","webp","svg"}:
yield emit({"type":"image_response","image_b64":b64,"filename":f["name"],"ext":ext})
else:
yield emit({"type":"file_response","file_b64":b64,"filename":f["name"],"size":f.get("size",0)})
out = stdout if ok else f"Error: {stderr}"
all_results[f"code_{iteration}_{idx}"] = out
yield emit({"type":"exec_done","ok":ok,"output":out[:300],"files":[f["name"] for f in files]})
for blk in agent_blocks:
name = blk["attrs"].get("name", f"Agent_{iteration}")
task = blk["attrs"].get("task","")
yield emit({"type":"agent_created","name":name,"task":task[:100]})
yield emit({"type":"agent_working","name":name})
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, lambda c=blk["content"]: sandbox_run(c, max_retries=3, timeout=90)
)
for inst in result.get("installs",[]):
yield emit({"type":"pkg_install","package":inst["package"],"ok":inst["ok"]})
stdout = result.get("stdout","").strip()
ok = result.get("ok", False)
files = result.get("files",[])
for f in files:
ext=f.get("ext","").lower();b64=f["b64"]
if ext in {"mp3","wav","ogg"}:
yield emit({"type":"audio_response","audio_b64":b64,"filename":f["name"]})
elif ext in {"png","jpg","jpeg","gif","webp"}:
yield emit({"type":"image_response","image_b64":b64,"filename":f["name"],"ext":ext})
else:
yield emit({"type":"file_response","file_b64":b64,"filename":f["name"],"size":f.get("size",0)})
out = stdout if ok else f"Error: {result.get('stderr','')}"
all_results[name] = out
yield emit({"type":"agent_done","name":name,"preview":out[:250],"ok":ok})
if not has_actions:
clean = strip_tags(raw)
if not clean.strip():
clean = "Done! Let me know if you need anything else."
yield emit({"type":"response_start"})
for i in range(0, len(clean), 6):
yield emit({"type":"token","content":clean[i:i+6]})
if i % 60 == 0: await asyncio.sleep(0)
# Handle voice
if "[SPEAK:" in raw:
try:
speak_text = raw.split("[SPEAK:")[1].rsplit("]",1)[0].strip()
voice_code = f"""
import sys, io
sys.path.insert(0, '{PKG_DIR}')
from gtts import gTTS
tts = gTTS(text={repr(speak_text[:2000])}, lang='en', slow=False)
tts.save('voice_response.mp3')
print('ok')
"""
loop = asyncio.get_event_loop()
vr = await loop.run_in_executor(
None, lambda: sandbox_run(voice_code, timeout=30)
)
for f in vr.get("files",[]):
if f.get("ext")=="mp3":
yield emit({"type":"audio_response","audio_b64":f["b64"],"filename":"voice_response.mp3"})
except Exception:
pass
break
else:
results_text = "REAL RESULTS:\n\n"
for k, v in list(all_results.items())[-4:]:
results_text += f"[{k}]:\n{v[:1200]}\n\n"
messages.append({"role":"assistant","content":raw})
messages.append({"role":"user","content":(
f"{results_text}\n"
f"Give the user a clean final answer using these REAL results. "
f"No code tags. No explanations of what you did. Just present the results."
)})
yield emit({"type":"done"})
except Exception as e:
tb = traceback.format_exc()
print(f"[AGENT] {e}\n{tb}")
yield emit({"type":"response_start"})
yield emit({"type":"token","content":
f"β **Error:** {str(e)}\n\n"
f"Please check your LongCat API key in βοΈ Settings."})
yield emit({"type":"done"})
orchestrator = AgentOrchestrator() |