self-trained2 / models /agent.py
DeepImagix's picture
Update models/agent.py
54a63c3 verified
Raw
History Blame Contribute Delete
34.9 kB
"""
NeuraPrompt AI β€” agent.py (POWER AGENT v2)
============================================
A true autonomous agent β€” thinks, plans, acts, executes, creates.
Communicates with main.py (MongoDB, file system, chat API).
Runs on port 8001 via uvicorn.
Capabilities:
✦ Live web search + URL fetch + page extract
✦ Wikipedia deep research
✦ Weather (wttr.in, free)
✦ Python code execution (sandboxed subprocess)
✦ Shell command execution (controlled)
✦ File creation + ZIP bundling β†’ stored in MongoDB (via main.py)
✦ Image analysis (base64 β†’ OpenRouter vision model β†’ DB delete after)
✦ MongoDB read/write via main.py REST (memory, user profile)
✦ Math evaluator (safe, no exec)
✦ Time / timezone lookup
✦ Streaming SSE thought process (real-time to frontend)
✦ Model auto-fallback (you pick models in FREE_MODELS)
✦ Full agent loop: Think β†’ Plan β†’ Act β†’ Observe β†’ Repeat β†’ Finish
"""
import os, re, json, time, math, asyncio, logging, subprocess
import tempfile, base64, hashlib, secrets, zipfile, io
from datetime import datetime, timezone
from typing import Optional, Any, AsyncGenerator
from urllib.parse import quote_plus
import httpx
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
# ─────────────────────────────────────────────────────────────
# CONFIG
# ─────────────────────────────────────────────────────────────
OPENROUTER_KEY = os.getenv("OPENROUTE_KEY", "")
MAIN_API_BASE = os.getenv("MAIN_API_BASE", "https://deepimagix-self-trained2.hf.space") # main.py base URL
AGENT_PORT = int(os.getenv("AGENT_PORT", "7860"))
MAX_STEPS = 8 # max reasoning iterations
MAX_TOKENS = 4096
AGENT_TIMEOUT = 90.0
CODE_TIMEOUT = 15 # seconds for subprocess code runs
# ── YOU CHOOSE MODELS HERE ──────────────────────────────────
FREE_MODELS = [
"anthropic/claude-3-haiku", # fast, smart
"google/gemini-flash-1.5", # multimodal
"qwen/qwen-2.5-72b-instruct:free", # strong free
"deepseek/deepseek-r1:free", # reasoning
"mistralai/mistral-7b-instruct:free", # fallback
]
PRIMARY_MODEL = FREE_MODELS[0]
VISION_MODEL = FREE_MODELS[1] # for image analysis
FALLBACK_MODEL = FREE_MODELS[-1]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [AGENT] %(levelname)s: %(message)s",
)
log = logging.getLogger("neuraprompt.agent")
BROWSER_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
}
# ─────────────────────────────────────────────────────────────
# REQUEST / RESPONSE MODELS
# ─────────────────────────────────────────────────────────────
class AgentRequest(BaseModel):
user_id: str
goal: str
mode: str = "agent" # "agent" | "direct"
max_steps: int = MAX_STEPS
context: Optional[str] = None
# ─────────────────────────────────────────────────────────────
# TOOL IMPLEMENTATIONS
# ─────────────────────────────────────────────────────────────
async def tool_web_search(query: str) -> str:
try:
from bs4 import BeautifulSoup
results = []
ia_url = (
f"https://api.duckduckgo.com/?q={quote_plus(query)}"
"&format=json&no_redirect=1&no_html=1&skip_disambig=1"
)
async with httpx.AsyncClient(timeout=8.0, headers=BROWSER_HEADERS) as c:
r = await c.get(ia_url)
data = r.json()
instant = (data.get("AbstractText") or data.get("Answer") or "").strip()
if instant:
results.append(f"[Quick Answer] {instant}")
html_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
async with httpx.AsyncClient(timeout=14.0, headers=BROWSER_HEADERS, follow_redirects=True) as c:
r = await c.get(html_url)
soup = BeautifulSoup(r.text, "lxml")
for tag in soup.select(".result__body")[:6]:
t = tag.select_one(".result__title a")
s = tag.select_one(".result__snippet")
title = t.get_text(strip=True) if t else ""
snippet = s.get_text(strip=True) if s else ""
href = t.get("href", "") if t else ""
if "uddg=" in href:
from urllib.parse import parse_qs, urlparse
qs = parse_qs(urlparse(href).query)
href = qs.get("uddg", [href])[0]
if title:
results.append(f"β€’ {title}\n {snippet}\n πŸ”— {href}")
return "\n\n".join(results) if results else f"No results for: {query}"
except Exception as e:
return f"[web_search error] {e}"
async def tool_fetch_url(url: str) -> str:
try:
from bs4 import BeautifulSoup
async with httpx.AsyncClient(timeout=15.0, headers=BROWSER_HEADERS, follow_redirects=True) as c:
r = await c.get(url)
ct = r.headers.get("content-type", "")
if "text/html" not in ct:
return r.text[:3000]
soup = BeautifulSoup(r.text, "lxml")
for tag in soup(["script", "style", "nav", "header", "footer", "aside", "form"]):
tag.decompose()
paras = [p.get_text(" ", strip=True) for p in soup.find_all("p") if len(p.get_text(strip=True)) > 40]
return "\n".join(paras)[:4000] or "No readable content."
except Exception as e:
return f"[fetch_url error] {e}"
async def tool_wiki_search(query: str) -> str:
try:
search_url = (
"https://en.wikipedia.org/w/api.php"
f"?action=query&list=search&srsearch={quote_plus(query)}"
"&format=json&srlimit=3"
)
async with httpx.AsyncClient(timeout=8.0) as c:
r = await c.get(search_url)
hits = r.json().get("query", {}).get("search", [])
if not hits:
return f"No Wikipedia results for: {query}"
pid = hits[0]["pageid"]
ext_url = (
"https://en.wikipedia.org/w/api.php"
f"?action=query&pageids={pid}&prop=extracts&exintro=1"
"&explaintext=1&format=json"
)
async with httpx.AsyncClient(timeout=8.0) as c:
r = await c.get(ext_url)
pages = r.json().get("query", {}).get("pages", {})
text = next(iter(pages.values()), {}).get("extract", "No extract.")
return text[:3000]
except Exception as e:
return f"[wiki_search error] {e}"
async def tool_get_weather(location: str) -> str:
try:
async with httpx.AsyncClient(timeout=8.0) as c:
r = await c.get(f"https://wttr.in/{quote_plus(location)}?format=j1")
r.raise_for_status()
d = r.json()
cur = d["current_condition"][0]
area = d["nearest_area"][0]
city = area["areaName"][0]["value"]
country = area["country"][0]["value"]
forecast = d.get("weather", [])
days_str = ""
for day in forecast[:3]:
date = day.get("date", "")
avg_temp = day.get("avgtempC", "?")
condition = day["hourly"][4]["weatherDesc"][0]["value"] if day.get("hourly") else "?"
days_str += f"\n {date}: {condition}, avg {avg_temp}Β°C"
return (
f"πŸ“ {city}, {country}\n"
f" Now: {cur['weatherDesc'][0]['value']}, "
f"{cur['temp_C']}Β°C / {cur['temp_F']}Β°F "
f"(feels {cur['FeelsLikeC']}Β°C)\n"
f" Humidity: {cur['humidity']}% | Wind: {cur['windspeedKmph']} km/h\n"
f" 3-Day Forecast:{days_str}"
)
except Exception as e:
return f"[get_weather error] {e}"
def tool_calculate(expression: str) -> str:
try:
allowed = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed.update({"abs": abs, "round": round, "min": min, "max": max, "sum": sum, "pow": pow})
safe = re.sub(r"[^0-9+\-*/().,% a-zA-Z_]", "", expression).replace("^", "**")
result = eval(safe, {"__builtins__": {}}, allowed) # noqa: S307
return f"{expression} = {result}"
except Exception as e:
return f"[calculate error] {e}"
def tool_get_time(timezone_name: str = "UTC") -> str:
try:
import pytz
tz = pytz.timezone(timezone_name)
now = datetime.now(tz)
return now.strftime(f"%A, %B %d, %Y %H:%M:%S {timezone_name}")
except Exception:
return datetime.now(timezone.utc).strftime("%A, %B %d, %Y %H:%M:%S UTC")
def tool_run_python(code: str) -> str:
"""Execute Python in a sandboxed subprocess. Returns stdout + stderr."""
try:
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(code)
fname = f.name
result = subprocess.run(
["python3", fname],
capture_output=True, text=True, timeout=CODE_TIMEOUT,
)
os.unlink(fname)
out = result.stdout.strip()
err = result.stderr.strip()
if err and not out:
return f"[stderr]\n{err}"
if err:
return f"[stdout]\n{out}\n\n[stderr]\n{err}"
return out or "(no output)"
except subprocess.TimeoutExpired:
return f"[run_python] Timeout after {CODE_TIMEOUT}s"
except Exception as e:
return f"[run_python error] {e}"
def tool_run_shell(command: str) -> str:
"""Run a shell command. Blocks destructive patterns."""
BLOCKED = [
"rm -rf /", "mkfs", "dd if=", ":(){:|:&};:", "shutdown",
"reboot", "> /dev/sda", "chmod 777 /", "fork bomb",
]
for bad in BLOCKED:
if bad in command:
return f"[run_shell] Blocked: {bad}"
try:
result = subprocess.run(
command, shell=True, capture_output=True, # noqa: S602
text=True, timeout=CODE_TIMEOUT,
)
out = result.stdout.strip()
err = result.stderr.strip()
return (out + ("\n[stderr] " + err if err else "")).strip() or "(no output)"
except subprocess.TimeoutExpired:
return f"[run_shell] Timeout after {CODE_TIMEOUT}s"
except Exception as e:
return f"[run_shell error] {e}"
async def tool_create_file(user_id: str, filename: str, content: str,
file_type: str = "text", extra_files: list = None) -> str:
"""Create a file and register in main.py's MongoDB download system."""
try:
payload = {
"user_id": user_id,
"filename": filename,
"content": content,
"file_type": file_type,
"extra_files": extra_files or [],
}
async with httpx.AsyncClient(timeout=20.0) as c:
r = await c.post(f"{MAIN_API_BASE}/api/agent/create_file", json=payload)
r.raise_for_status()
data = r.json()
token = data.get("token", "")
url = data.get("download_url", "")
size = data.get("size_kb", "?")
return f"βœ… File created: {filename} ({size} KB)\nDownload URL: {url}\nToken: {token}"
except Exception as e:
# Fallback: encode as base64 so frontend can still download
b64 = base64.b64encode(content.encode()).decode()
return (
f"⚠️ main.py unreachable ({e}). File content (base64):\n"
f"filename:{filename}\nb64:{b64}"
)
async def tool_read_memory(user_id: str) -> str:
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.get(f"{MAIN_API_BASE}/api/memory/{user_id}")
r.raise_for_status()
data = r.json()
facts = data.get("memory", {})
if not facts:
return "No memory found for this user."
return "\n".join(f" {k}: {v}" for k, v in facts.items() if k not in ("_id", "user_id"))
except Exception as e:
return f"[read_memory error] {e}"
async def tool_update_memory(user_id: str, key: str, value: str) -> str:
try:
async with httpx.AsyncClient(timeout=10.0) as c:
r = await c.post(f"{MAIN_API_BASE}/api/memory/update",
json={"user_id": user_id, "fact_key": key, "fact_value": value})
r.raise_for_status()
return f"βœ… Memory updated: {key} = {value}"
except Exception as e:
return f"[update_memory error] {e}"
async def tool_analyze_image(image_b64: str, prompt: str = "Describe this image in full detail.") -> str:
if not OPENROUTER_KEY:
return "[analyze_image] No OpenRouter key set."
try:
messages = [{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}},
{"type": "text", "text": prompt},
]
}]
headers = {
"Authorization": f"Bearer {OPENROUTER_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "https://neuraprompt.ai",
"X-Title": "NeuraPrompt Agent",
}
async with httpx.AsyncClient(timeout=30.0) as c:
r = await c.post(
"https://openrouter.ai/api/v1/chat/completions",
headers=headers,
json={"model": VISION_MODEL, "messages": messages, "max_tokens": 1024},
)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"].strip()
except Exception as e:
return f"[analyze_image error] {e}"
async def tool_ask_neuraprompt(user_id: str, message: str) -> str:
try:
async with httpx.AsyncClient(timeout=30.0) as c:
r = await c.post(f"{MAIN_API_BASE}/api/chat",
json={"user_id": user_id, "message": message, "model_id": "neurones-pro-1.0"})
r.raise_for_status()
data = r.json()
return data.get("response") or data.get("reply") or str(data)
except Exception as e:
return f"[ask_neuraprompt error] {e}"
# ─────────────────────────────────────────────────────────────
# TOOL REGISTRY
# ─────────────────────────────────────────────────────────────
TOOL_REGISTRY = {
"web_search": {
"desc": "Search the live web for any topic. Input: search query string.",
"async": True, "needs_user": False, "fn": tool_web_search,
},
"fetch_url": {
"desc": "Fetch and read any URL's full content. Input: URL string.",
"async": True, "needs_user": False, "fn": tool_fetch_url,
},
"wiki_search": {
"desc": "Deep Wikipedia research on any topic. Input: topic or question.",
"async": True, "needs_user": False, "fn": tool_wiki_search,
},
"get_weather": {
"desc": "Get current weather + 3-day forecast. Input: city name.",
"async": True, "needs_user": False, "fn": tool_get_weather,
},
"calculate": {
"desc": "Evaluate math expressions. Input: expression like '(12*3)/4 + sqrt(16)'.",
"async": False, "needs_user": False, "fn": tool_calculate,
},
"get_time": {
"desc": "Get current date/time for any timezone. Input: timezone like 'Africa/Johannesburg'.",
"async": False, "needs_user": False, "fn": tool_get_time,
},
"run_python": {
"desc": "Write and EXECUTE Python code. See real output. Input: complete Python script as string.",
"async": False, "needs_user": False, "fn": tool_run_python,
},
"run_shell": {
"desc": "Execute a shell command on the server. Input: shell command string.",
"async": False, "needs_user": False, "fn": tool_run_shell,
},
"create_file": {
"desc": (
"Create ANY file and register for download. "
'Input JSON string: {"filename":"app.py","content":"...","file_type":"python",'
'"extra_files":[{"filename":"style.css","content":"..."}]}'
),
"async": True, "needs_user": True, "fn": tool_create_file,
},
"read_memory": {
"desc": "Read this user's long-term memory from the database. Input: anything (ignored).",
"async": True, "needs_user": True, "fn": tool_read_memory,
},
"update_memory": {
"desc": 'Save a fact to the user\'s database profile. Input JSON: {"key":"name","value":"Toxic"}',
"async": True, "needs_user": True, "fn": tool_update_memory,
},
"analyze_image": {
"desc": (
"Analyze an uploaded image with vision AI. "
'Input JSON: {"image_b64":"<base64>","prompt":"what to look for"}'
),
"async": True, "needs_user": False, "fn": tool_analyze_image,
},
"ask_neuraprompt": {
"desc": "Ask the main NeuraPrompt AI model a creative or knowledge question. Input: question.",
"async": True, "needs_user": True, "fn": tool_ask_neuraprompt,
},
"finish": {
"desc": "Deliver the final complete answer to the user. Input: full formatted answer.",
"async": False, "needs_user": False, "fn": None,
},
}
TOOLS_PROMPT = "\n".join(
f" {name}: {info['desc']}" for name, info in TOOL_REGISTRY.items()
)
# ─────────────────────────────────────────────────────────────
# OPENROUTER LLM CALL
# ─────────────────────────────────────────────────────────────
async def call_openrouter(messages: list, model: str = PRIMARY_MODEL) -> str:
if not OPENROUTER_KEY:
raise HTTPException(503, "OPENROUTE_KEY not set.")
headers = {
"Authorization": f"Bearer {OPENROUTER_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": "https://deepimagix-self-trained2.hf.space",
"X-Title": "NeuraPrompt Agent",
}
for attempt, mdl in enumerate([model, FALLBACK_MODEL]):
try:
async with httpx.AsyncClient(timeout=AGENT_TIMEOUT) as c:
r = await c.post(
"https://openrouter.ai/api/v1/chat/completions",
headers=headers,
json={"model": mdl, "messages": messages,
"max_tokens": MAX_TOKENS, "temperature": 0.3},
)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"].strip()
except httpx.HTTPStatusError as e:
if attempt == 0 and e.response.status_code in (429, 503):
log.warning(f"Model {mdl} rate-limited, trying fallback.")
continue
raise HTTPException(502, f"OpenRouter: {e}")
raise HTTPException(502, "All models failed.")
# ─────────────────────────────────────────────────────────────
# TOOL DISPATCHER
# ─────────────────────────────────────────────────────────────
async def dispatch_tool(name: str, raw_input: str, user_id: str) -> str:
info = TOOL_REGISTRY.get(name)
if not info or info["fn"] is None:
return f"[dispatch] Unknown tool: {name}"
try:
parsed = json.loads(raw_input) if raw_input.strip().startswith("{") else raw_input
except Exception:
parsed = raw_input
fn = info["fn"]
try:
if name == "create_file":
p = parsed if isinstance(parsed, dict) else {}
return await fn(user_id, p.get("filename", "file.txt"),
p.get("content", str(parsed)),
p.get("file_type", "text"),
p.get("extra_files", []))
elif name == "update_memory":
p = parsed if isinstance(parsed, dict) else {}
return await fn(user_id, p.get("key", "note"), p.get("value", str(parsed)))
elif name == "analyze_image":
p = parsed if isinstance(parsed, dict) else {}
return await fn(p.get("image_b64", str(parsed)),
p.get("prompt", "Describe this image in full detail."))
elif name in ("ask_neuraprompt", "read_memory"):
return await fn(user_id, str(parsed)) if name == "ask_neuraprompt" else await fn(user_id)
elif info["async"]:
return await fn(str(parsed))
else:
return fn(str(parsed))
except Exception as e:
return f"[{name} crash] {e}"
# ─────────────────────────────────────────────────────────────
# SYSTEM PROMPT
# ─────────────────────────────────────────────────────────────
def build_system_prompt(max_steps: int) -> str:
return f"""You are NeuraPrompt Agent β€” the most powerful autonomous AI agent ever built.
You don't just answer. You THINK, PLAN, ACT, and DELIVER real results.
Available tools:
{TOOLS_PROMPT}
AGENT RULES:
1. THINK deeply before every action. Reason step by step.
2. Use tools aggressively β€” search, execute code, create files, run commands.
3. Chain tools smartly: search β†’ fetch β†’ analyze β†’ run code β†’ create file β†’ finish.
4. When solving a problem with code, ALWAYS run it with run_python and show real output.
5. When the user wants a file, CREATE it with create_file β€” don't just write the code.
6. Never fabricate facts. Use web_search or wiki_search to verify everything.
7. You have {max_steps} steps β€” plan efficiently, don't waste steps.
8. Use finish only when the task is 100% complete with a thorough, well-formatted answer.
9. You have permission to run code, shell commands, create files, read/write memory. USE IT.
RESPONSE FORMAT β€” strict JSON only, no markdown fences:
{{
"thought": "Your detailed reasoning about what to do next and why",
"action": "tool_name",
"input": "tool input β€” plain string OR JSON object stringified"
}}
When task is complete:
{{
"thought": "All tasks completed. Delivering final answer.",
"action": "finish",
"input": "Your complete, well-formatted final answer with all results and file links."
}}
"""
# ─────────────────────────────────────────────────────────────
# PARSE LLM JSON
# ─────────────────────────────────────────────────────────────
def _parse_llm_json(text: str) -> dict:
text = re.sub(r"^```(?:json)?\s*", "", text.strip())
text = re.sub(r"\s*```$", "", text)
try:
return json.loads(text)
except Exception:
m = re.search(r'\{.*\}', text, re.DOTALL)
if m:
try:
return json.loads(m.group())
except Exception:
pass
return {"thought": "Parse failure.", "action": "finish", "input": text}
# ─────────────────────────────────────────────────────────────
# AGENT LOOP (blocking β€” returns full result)
# ─────────────────────────────────────────────────────────────
async def run_agent(req: AgentRequest) -> dict:
t_start = time.time()
steps = []
messages = [
{"role": "system", "content": build_system_prompt(req.max_steps)},
{"role": "user", "content":
f"Goal: {req.goal}"
+ (f"\n\nContext: {req.context}" if req.context else "")},
]
for step_num in range(1, req.max_steps + 1):
log.info(f"[{req.user_id}] Step {step_num}/{req.max_steps}")
raw = await call_openrouter(messages)
parsed = _parse_llm_json(raw)
thought = parsed.get("thought", "")
action = parsed.get("action", "finish")
inp = parsed.get("input", "")
record = {"step": step_num, "thought": thought,
"action": action, "input": inp, "output": ""}
if action == "finish":
record["output"] = inp
steps.append(record)
break
output = await dispatch_tool(action, inp, req.user_id)
record["output"] = output
steps.append(record)
log.info(f" [{action}] β†’ {str(output)[:160]}")
messages.append({"role": "assistant", "content": raw})
messages.append({
"role": "user",
"content": f"[Tool: {action}]\nResult: {output}\n\nContinue. Next thought:",
})
else:
# Force finish on max steps
messages.append({
"role": "user",
"content": "Max steps reached. Compile everything and give the final answer now (action=finish).",
})
raw = await call_openrouter(messages)
parsed = _parse_llm_json(raw)
final = parsed.get("input", raw)
steps.append({"step": req.max_steps + 1, "thought": "Forced finish.",
"action": "finish", "input": final, "output": final})
final_answer = steps[-1].get("output") or steps[-1].get("input", "")
return {
"user_id": req.user_id,
"goal": req.goal,
"final_answer": final_answer,
"steps": steps,
"total_steps": len(steps),
"model_used": PRIMARY_MODEL,
"elapsed_ms": int((time.time() - t_start) * 1000),
}
# ─────────────────────────────────────────────────────────────
# STREAMING AGENT LOOP (SSE β€” real-time thought process)
# ─────────────────────────────────────────────────────────────
async def stream_agent(req: AgentRequest) -> AsyncGenerator[str, None]:
def sse(event_type: str, step: int, content: str, tool: str = "") -> str:
return "data: " + json.dumps({
"type": event_type, "step": step, "content": content, "tool": tool,
}) + "\n\n"
t_start = time.time()
messages = [
{"role": "system", "content": build_system_prompt(req.max_steps)},
{"role": "user", "content":
f"Goal: {req.goal}"
+ (f"\n\nContext: {req.context}" if req.context else "")},
]
for step_num in range(1, req.max_steps + 1):
try:
raw = await call_openrouter(messages)
parsed = _parse_llm_json(raw)
except Exception as e:
yield sse("error", step_num, str(e))
return
thought = parsed.get("thought", "")
action = parsed.get("action", "finish")
inp = parsed.get("input", "")
yield sse("thought", step_num, thought)
await asyncio.sleep(0)
if action == "finish":
yield sse("done", step_num, inp)
yield sse("meta", step_num, json.dumps({
"total_steps": step_num,
"elapsed_ms": int((time.time() - t_start) * 1000),
"model": PRIMARY_MODEL,
}))
return
yield sse("action", step_num, f"Tool: {action} | Input: {inp[:200]}", tool=action)
await asyncio.sleep(0)
output = await dispatch_tool(action, inp, req.user_id)
yield sse("result", step_num, output, tool=action)
await asyncio.sleep(0)
messages.append({"role": "assistant", "content": raw})
messages.append({
"role": "user",
"content": f"[Tool: {action}]\nResult: {output}\n\nContinue. Next thought:",
})
# Max steps
yield sse("thought", req.max_steps + 1, "Max steps reached. Compiling final answer.")
messages.append({
"role": "user",
"content": "Max steps reached. Give the final answer now (action=finish).",
})
try:
raw = await call_openrouter(messages)
parsed = _parse_llm_json(raw)
yield sse("done", req.max_steps + 1, parsed.get("input", raw))
except Exception as e:
yield sse("error", req.max_steps + 1, str(e))
# ─────────────────────────────────────────────────────────────
# FASTAPI APP
# ─────────────────────────────────────────────────────────────
app = FastAPI(title="NeuraPrompt Power Agent", version="2.0.0")
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"],
)
@app.get("/")
def root():
return {
"service": "NeuraPrompt Power Agent v2",
"status": "online",
"key_set": bool(OPENROUTER_KEY),
"models": {"primary": PRIMARY_MODEL, "vision": VISION_MODEL, "fallback": FALLBACK_MODEL},
"tools": list(TOOL_REGISTRY.keys()),
"endpoints": {
"POST /agent/run": "Full agent loop (blocking)",
"POST /agent/stream": "Streaming SSE thought process",
"POST /agent/analyze_image": "Upload + analyze image",
"GET /agent/tools/search": "Standalone web search",
"GET /agent/tools/weather": "Standalone weather",
"POST /agent/tools/python": "Run Python code",
"POST /agent/tools/shell": "Run shell command",
},
}
@app.get("/health")
def health():
return {"status": "ok", "key_set": bool(OPENROUTER_KEY), "main_api": MAIN_API_BASE}
@app.post("/agent/run")
async def agent_run(req: AgentRequest):
"""Full agent loop. Returns complete result after all steps."""
if not OPENROUTER_KEY:
raise HTTPException(503, "OPENROUTE_KEY not set.")
if not req.goal.strip():
raise HTTPException(400, "Goal cannot be empty.")
if req.mode == "direct":
answer = await call_openrouter([
{"role": "system", "content": "You are NeuraPrompt AI β€” powerful, precise, helpful."},
{"role": "user", "content": req.goal},
])
return {"user_id": req.user_id, "goal": req.goal, "final_answer": answer,
"steps": [], "total_steps": 0, "model_used": PRIMARY_MODEL, "elapsed_ms": 0}
return await run_agent(req)
@app.post("/agent/stream")
async def agent_stream(req: AgentRequest):
"""Streaming SSE β€” frontend sees each thought/action/result in real time."""
if not OPENROUTER_KEY:
raise HTTPException(503, "OPENROUTE_KEY not set.")
if not req.goal.strip():
raise HTTPException(400, "Goal cannot be empty.")
return StreamingResponse(
stream_agent(req),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.post("/agent/analyze_image")
async def agent_analyze_image(
user_id: str = Form(...),
goal: str = Form("Describe this image in full detail."),
image: UploadFile = File(...),
):
"""Upload image β†’ analyze with vision AI β†’ delete temp from DB."""
raw_bytes = await image.read()
b64 = base64.b64encode(raw_bytes).decode()
analysis = await tool_analyze_image(b64, goal)
# Attempt to clean up any temp storage in main.py
try:
async with httpx.AsyncClient(timeout=8.0) as c:
await c.delete(f"{MAIN_API_BASE}/api/images/temp/{user_id}")
except Exception:
pass
return {"user_id": user_id, "prompt": goal, "analysis": analysis, "model": VISION_MODEL}
@app.get("/agent/tools/search")
async def standalone_search(q: str):
return {"query": q, "result": await tool_web_search(q)}
@app.get("/agent/tools/weather")
async def standalone_weather(location: str):
return {"location": location, "result": await tool_get_weather(location)}
@app.post("/agent/tools/python")
async def standalone_python(code: str = Form(...)):
return {"result": tool_run_python(code)}
@app.post("/agent/tools/shell")
async def standalone_shell(command: str = Form(...)):
return {"result": tool_run_shell(command)}
@app.get("/agent/models")
def list_models():
return {"primary": PRIMARY_MODEL, "vision": VISION_MODEL,
"fallback": FALLBACK_MODEL, "all": FREE_MODELS}
# ─────────────────────────────────────────────────────────────
# ENTRY POINT
# ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("agent:app", host="0.0.0.0", port=AGENT_PORT, reload=False, log_level="info")