| | import os |
| | import io |
| | import re |
| | import json |
| | import time |
| | import uuid |
| | import sqlite3 |
| | import asyncio |
| | import hashlib |
| | import signal |
| | import threading |
| | import traceback |
| | import subprocess |
| | import logging |
| | from pathlib import Path |
| | from datetime import datetime, timedelta |
| | from contextlib import redirect_stdout, redirect_stderr |
| |
|
| |
|
| | class Config: |
| | |
| | BOT_TOKEN = "8088897119:AAGJxbBUH6bB-IcjAvPR4z77ApzAKCFfTIU" |
| | ADMIN_IDS = [8225686030,7373296624] |
| | OWNER_USERNAMES = ["nameofbless", "simulateneous", "walkingwithgod"] |
| | OWNER_CAN_FORCE_AGENT_FOR_ALL = True |
| |
|
| | |
| | OPENAI_KEY = "" |
| | ANTHROPIC_KEY = "" |
| | GROQ_KEY = "" |
| | GOOGLE_KEY = "AIzaSyDHOEiPizohtUmK-q50-w842MsAiFEyHm4" |
| | CUSTOM_AI_URL = "https://bjo53-brukguardian.hf.space/v1/chat/completions" |
| | CUSTOM_AI_KEY = "pekka-secret-key" |
| | CUSTOM_AI_MODEL = "brukguardian-v1" |
| | CUSTOM_AI_FALLBACK_URL = "https://bjo53-brukguardian.hf.space/v1/chat/completions" |
| | DEFAULT_MODEL = "gemini-2.5-flash-lite" |
| | |
| | |
| | OLLAMA_URL = "http://127.0.0.1:11434" |
| | OLLAMA_VISION_MODEL = "llava:7b" |
| |
|
| | |
| | SUPABASE_URL = "https://xhqwtjlydysanoquaham.supabase.co" |
| | SUPABASE_KEY = "sb_publishable_Gaqx237PmZQsixs8VdUjAw_fxQE3uui" |
| |
|
| | |
| | GOOGLE_CLIENT_SECRET = "./credentials.json" |
| | GOOGLE_TOKEN_PATH = "./token.json" |
| | YOUTUBE_DEFAULT_PRIVACY = "private" |
| | ENABLE_YOUTUBE_UPLOAD = True |
| |
|
| | |
| | |
| | WEATHER_KEY = "" |
| | SMTP_USER = "brukg9419@gmail.com" |
| | SMTP_PASS = "ygcvdddqdyxttwia" |
| | SMTP_HOST = "smtp.gmail.com" |
| | SMTP_PORT = 587 |
| | IMAP_HOST = "imap.gmail.com" |
| | IMAP_PORT = 993 |
| | IMAP_USER = "brukg9419@gmail.com" |
| | IMAP_PASS = "ygcvdddqdyxttwia" |
| |
|
| | |
| | MAX_HISTORY = 62 |
| | MAX_TOOL_LOOPS = 10 |
| | CODE_TIMEOUT = 45 |
| |
|
| | DATA_DIR = "./data" |
| | LOGS_DIR = "./logs" |
| | BOTS_DIR = "./spawned_bots" |
| | DB_PATH = os.path.join(DATA_DIR, "agentforge.db") |
| | SYSTEM_FLOW_PATH = "./SYSTEM_FLOW.md" |
| |
|
| | PROXY_TARGET = "https://lucky-hat-e0d0.brukg9419.workers.dev" |
| | CLOUDFLARE_IP = "104.21.28.169" |
| | BRIDGE_PORT = 7860 |
| |
|
| | @classmethod |
| | def is_admin(cls, uid): |
| | return uid in cls.ADMIN_IDS |
| |
|
| | @classmethod |
| | def has_supabase(cls): |
| | return bool(cls.SUPABASE_URL and cls.SUPABASE_KEY) |
| |
|
| |
|
| | for d in [Config.DATA_DIR, Config.LOGS_DIR, Config.BOTS_DIR]: |
| | os.makedirs(d, exist_ok=True) |
| |
|
| |
|
| | class LiveLog: |
| | def __init__(self, max_entries=500): |
| | self._entries = [] |
| | self._max = max_entries |
| | self._lock = threading.Lock() |
| |
|
| | def _add(self, level, src, msg): |
| | with self._lock: |
| | self._entries.append({"ts": datetime.now().strftime("%H:%M:%S"), "level": level, "src": src, "msg": str(msg)[:800]}) |
| | if len(self._entries) > self._max: |
| | self._entries = self._entries[-self._max :] |
| | print(f"[{level}] {src}: {msg}") |
| |
|
| | def info(self, src, msg): self._add("INFO", src, msg) |
| | def warn(self, src, msg): self._add("WARN", src, msg) |
| | def error(self, src, msg): self._add("ERR", src, msg) |
| |
|
| | def get(self, count=30): |
| | with self._lock: |
| | return self._entries[-count:] |
| |
|
| |
|
| | live_log = LiveLog() |
| |
|
| |
|
| | def init_database(): |
| | conn = sqlite3.connect(Config.DB_PATH) |
| | conn.executescript( |
| | """ |
| | CREATE TABLE IF NOT EXISTS users ( |
| | telegram_id INTEGER PRIMARY KEY, |
| | username TEXT, |
| | first_name TEXT, |
| | is_banned INTEGER DEFAULT 0, |
| | preferred_model TEXT DEFAULT 'gpt-4o-mini', |
| | system_prompt TEXT DEFAULT '', |
| | temperature REAL DEFAULT 0.7, |
| | total_messages INTEGER DEFAULT 0, |
| | total_tokens INTEGER DEFAULT 0, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP, |
| | last_active TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS messages ( |
| | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| | user_id INTEGER, |
| | chat_id INTEGER, |
| | role TEXT, |
| | content TEXT, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS scheduled_tasks ( |
| | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| | user_id INTEGER, |
| | chat_id INTEGER, |
| | task_prompt TEXT, |
| | run_at TEXT, |
| | repeat_seconds INTEGER DEFAULT 0, |
| | status TEXT DEFAULT 'pending', |
| | message TEXT DEFAULT 'Scheduled task', |
| | last_result TEXT, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS tool_log ( |
| | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| | user_id INTEGER, |
| | tool TEXT, |
| | success INTEGER, |
| | elapsed REAL, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS spawned_bots ( |
| | token_hash TEXT PRIMARY KEY, |
| | owner_id INTEGER, |
| | name TEXT, |
| | status TEXT DEFAULT 'running', |
| | pid INTEGER, |
| | file_path TEXT, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | |
| | CREATE TABLE IF NOT EXISTS boss_messages ( |
| | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| | sender_id INTEGER, |
| | sender_username TEXT, |
| | content TEXT, |
| | notified INTEGER DEFAULT 0, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS bot_buttons ( |
| | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| | label TEXT, |
| | url TEXT, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS youtube_logs ( |
| | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| | video_id TEXT, |
| | title TEXT, |
| | status TEXT, |
| | created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| | ); |
| | CREATE TABLE IF NOT EXISTS kv ( |
| | key TEXT PRIMARY KEY, |
| | value TEXT |
| | ); |
| | """ |
| | ) |
| | conn.commit() |
| | conn.close() |
| |
|
| |
|
| | init_database() |
| |
|
| |
|
| | class DB: |
| | _lock = threading.Lock() |
| |
|
| | @staticmethod |
| | def q(query, params=(), fetch=False, fetchone=False): |
| | with DB._lock: |
| | conn = sqlite3.connect(Config.DB_PATH, check_same_thread=False) |
| | conn.row_factory = sqlite3.Row |
| | cur = conn.cursor() |
| | try: |
| | cur.execute(query, params) |
| | if fetchone: return cur.fetchone() |
| | if fetch: return cur.fetchall() |
| | conn.commit() |
| | return cur.lastrowid |
| | finally: |
| | conn.close() |
| |
|
| | @staticmethod |
| | def upsert_user(tid, username="", first_name=""): |
| | row = DB.q("SELECT telegram_id FROM users WHERE telegram_id=?", (tid,), fetchone=True) |
| | if row: |
| | DB.q("UPDATE users SET last_active=CURRENT_TIMESTAMP WHERE telegram_id=?", (tid,)) |
| | else: |
| | DB.q("INSERT INTO users (telegram_id,username,first_name) VALUES (?,?,?)", (tid, username, first_name)) |
| |
|
| | @staticmethod |
| | def get_user(tid): |
| | return DB.q("SELECT * FROM users WHERE telegram_id=?", (tid,), fetchone=True) |
| |
|
| | @staticmethod |
| | def inc_usage(tid, tokens=0): |
| | DB.q("UPDATE users SET total_messages=total_messages+1,total_tokens=total_tokens+?,last_active=CURRENT_TIMESTAMP WHERE telegram_id=?", (tokens, tid)) |
| |
|
| |
|
| | class Memory: |
| | def __init__(self): |
| | self.convs = {} |
| |
|
| | def key(self, uid, cid): |
| | return f"{uid}:{cid}" |
| |
|
| | def add(self, uid, cid, role, content): |
| | k = self.key(uid, cid) |
| | self.convs.setdefault(k, []).append({"role": role, "content": content}) |
| | if len(self.convs[k]) > Config.MAX_HISTORY * 2: |
| | self.convs[k] = self.convs[k][-Config.MAX_HISTORY :] |
| | try: |
| | DB.q("INSERT INTO messages (user_id,chat_id,role,content) VALUES (?,?,?,?)", (uid, cid, role, (content or "")[:12000])) |
| | except Exception: |
| | pass |
| |
|
| | def _load_db_history(self, uid, cid, limit=20): |
| | rows = DB.q( |
| | "SELECT role,content FROM messages WHERE user_id=? AND chat_id=? ORDER BY id DESC LIMIT ?", |
| | (uid, cid, int(limit)), |
| | fetch=True, |
| | ) |
| | if not rows: |
| | return [] |
| | return [{"role": r["role"], "content": r["content"]} for r in reversed(rows)] |
| |
|
| | def history(self, uid, cid, limit=20): |
| | k = self.key(uid, cid) |
| | local = self.convs.get(k, [])[-limit:] |
| | if len(local) >= max(4, limit // 2): |
| | return local |
| | db_hist = self._load_db_history(uid, cid, limit=limit) |
| | if db_hist: |
| | self.convs[k] = db_hist[-Config.MAX_HISTORY :] |
| | return db_hist[-limit:] |
| | return local |
| |
|
| |
|
| | memory = Memory() |
| |
|
| |
|
| | try: |
| | from supabase import create_client |
| | except Exception: |
| | create_client = None |
| |
|
| |
|
| | class SupabaseStore: |
| | def __init__(self): |
| | self.client = None |
| | if create_client and Config.has_supabase(): |
| | try: |
| | self.client = create_client(Config.SUPABASE_URL, Config.SUPABASE_KEY) |
| | except Exception as exc: |
| | live_log.warn("Supabase", f"init failed: {exc}") |
| |
|
| | def enabled(self): |
| | return self.client is not None |
| |
|
| | async def save_memory(self, user_id, username, role, content): |
| | if not self.client: |
| | return |
| | def _run(): |
| | self.client.table("memories").insert({ |
| | "user_id": user_id, |
| | "username": username, |
| | "role": role, |
| | "content": (content or "")[:4000], |
| | }).execute() |
| | try: |
| | await asyncio.to_thread(_run) |
| | except Exception as exc: |
| | live_log.warn("Supabase", f"save_memory: {exc}") |
| |
|
| | async def add_button(self, label, url): |
| | if not self.client: |
| | return "Supabase not configured" |
| | def _run(): |
| | self.client.table("bot_buttons").insert({"label": label, "url": url}).execute() |
| | await asyncio.to_thread(_run) |
| | return f"Button '{label}' added" |
| |
|
| | async def get_buttons(self): |
| | if not self.client: |
| | return [] |
| | def _run(): |
| | return self.client.table("bot_buttons").select("label,url").execute() |
| | try: |
| | res = await asyncio.to_thread(_run) |
| | return res.data or [] |
| | except Exception: |
| | return [] |
| |
|
| | async def log_youtube(self, video_id, title, status="published"): |
| | if not self.client: |
| | return |
| | def _run(): |
| | self.client.table("youtube_logs").insert({"video_id": video_id, "title": title, "status": status}).execute() |
| | try: |
| | await asyncio.to_thread(_run) |
| | except Exception as exc: |
| | live_log.warn("Supabase", f"youtube log: {exc}") |
| |
|
| |
|
| | supabase_store = SupabaseStore() |
| |
|
| | GOOGLE_SCOPES = [ |
| | "https://www.googleapis.com/auth/gmail.readonly", |
| | "https://www.googleapis.com/auth/youtube.upload", |
| | ] |
| |
|
| |
|
| | def get_google_service(service_name: str, version: str): |
| | try: |
| | from google.oauth2.credentials import Credentials |
| | from google_auth_oauthlib.flow import InstalledAppFlow |
| | from google.auth.transport.requests import Request |
| | from googleapiclient.discovery import build as google_build |
| | except Exception: |
| | return None |
| |
|
| | creds = None |
| | token_path = Path(Config.GOOGLE_TOKEN_PATH) |
| | if token_path.exists(): |
| | creds = Credentials.from_authorized_user_file(str(token_path), GOOGLE_SCOPES) |
| |
|
| | if not creds or not creds.valid: |
| | if creds and creds.expired and creds.refresh_token: |
| | try: |
| | creds.refresh(Request()) |
| | except Exception: |
| | return None |
| | else: |
| | secret = Path(Config.GOOGLE_CLIENT_SECRET) |
| | if not secret.exists(): |
| | return None |
| | try: |
| | flow = InstalledAppFlow.from_client_secrets_file(str(secret), GOOGLE_SCOPES) |
| | creds = flow.run_local_server(port=0) |
| | except Exception: |
| | return None |
| | token_path.write_text(creds.to_json(), encoding="utf-8") |
| |
|
| | try: |
| | return google_build(service_name, version, credentials=creds) |
| | except Exception: |
| | return None |
| |
|
| |
|
| | def load_system_flow_text(): |
| | p = Path(Config.SYSTEM_FLOW_PATH) |
| | return p.read_text(encoding="utf-8")[:50000] if p.exists() else "SYSTEM_FLOW.md missing" |
| |
|
| |
|
| | class LLM: |
| | MODELS = { |
| | "gpt-4o": "openai", |
| | "gpt-4o-mini": "openai", |
| | "claude-3-5-sonnet-20241022": "anthropic", |
| | "llama-3.3-70b-versatile": "groq", |
| | "gemini-2.5-flash-lite": "google", |
| | } |
| | if Config.CUSTOM_AI_MODEL: |
| | MODELS[Config.CUSTOM_AI_MODEL] = "custom" |
| |
|
| | NATIVE_TOOLS = {"openai", "anthropic", "groq"} |
| |
|
| | def __init__(self): |
| | self._oa = None |
| | self._an = None |
| | self._gr = None |
| |
|
| | def supports_native_tools(self, model): |
| | return self.MODELS.get(model, "") in self.NATIVE_TOOLS |
| |
|
| | @property |
| | def oa(self): |
| | if not self._oa and Config.OPENAI_KEY: |
| | import openai |
| | self._oa = openai.AsyncOpenAI(api_key=Config.OPENAI_KEY) |
| | return self._oa |
| |
|
| | @property |
| | def an(self): |
| | if not self._an and Config.ANTHROPIC_KEY: |
| | import anthropic |
| | self._an = anthropic.AsyncAnthropic(api_key=Config.ANTHROPIC_KEY) |
| | return self._an |
| |
|
| | @property |
| | def gr(self): |
| | if not self._gr and Config.GROQ_KEY: |
| | from groq import AsyncGroq |
| | self._gr = AsyncGroq(api_key=Config.GROQ_KEY) |
| | return self._gr |
| |
|
| | async def chat(self, msgs, model=None, temp=0.7, max_tok=2500, tools=None): |
| | model = model or Config.DEFAULT_MODEL |
| | provider = self.MODELS.get(model, "openai") |
| | try: |
| | if provider == "openai" and Config.OPENAI_KEY: |
| | return await self._openai(msgs, model, temp, max_tok, tools) |
| | if provider == "anthropic" and Config.ANTHROPIC_KEY: |
| | return await self._anthropic(msgs, model, temp, max_tok, tools) |
| | if provider == "groq" and Config.GROQ_KEY: |
| | return await self._groq(msgs, model, temp, max_tok, tools) |
| | if provider == "google" and Config.GOOGLE_KEY: |
| | return await self._google(msgs, model, temp, max_tok) |
| | if provider == "custom" and Config.CUSTOM_AI_URL and Config.CUSTOM_AI_KEY: |
| | return await self._custom(msgs, model, temp, max_tok) |
| | return await self._custom(msgs, model, temp, max_tok) if (Config.CUSTOM_AI_URL and Config.CUSTOM_AI_KEY) else {"content": "No text model provider configured", "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
| | except Exception as exc: |
| | live_log.error("LLM", exc) |
| | return {"content": f"LLM error: {exc}", "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
| |
|
| | async def _openai(self, m, model, t, mt, tools): |
| | kwargs = dict(model=model, messages=m, temperature=t, max_tokens=mt) |
| | if tools: kwargs.update({"tools": tools, "tool_choice": "auto"}) |
| | r = await self.oa.chat.completions.create(**kwargs) |
| | c = r.choices[0] |
| | tc = [{"id": x.id, "function": {"name": x.function.name, "arguments": x.function.arguments}} for x in (c.message.tool_calls or [])] |
| | return {"content": c.message.content or "", "tool_calls": tc, "usage": {"total_tokens": r.usage.total_tokens if r.usage else 0}, "model": model} |
| |
|
| | async def _anthropic(self, msgs, model, t, mt, tools): |
| | sys_text = "" |
| | conv = [] |
| | for m in msgs: |
| | if m["role"] == "system": sys_text += m["content"] + "\n" |
| | elif m["role"] == "tool": |
| | conv.append({"role": "user", "content": [{"type": "tool_result", "tool_use_id": m.get("tool_call_id", "x"), "content": m["content"]}]}) |
| | else: conv.append({"role": m["role"], "content": m["content"]}) |
| | kwargs = {"model": model, "messages": conv, "temperature": t, "max_tokens": mt} |
| | if sys_text.strip(): kwargs["system"] = sys_text.strip() |
| | if tools: kwargs["tools"] = [{"name": x["function"]["name"], "description": x["function"]["description"], "input_schema": x["function"]["parameters"]} for x in tools] |
| | r = await self.an.messages.create(**kwargs) |
| | content, tc = "", [] |
| | for b in r.content: |
| | if b.type == "text": content += b.text |
| | elif b.type == "tool_use": tc.append({"id": b.id, "function": {"name": b.name, "arguments": json.dumps(b.input)}}) |
| | return {"content": content, "tool_calls": tc, "usage": {"total_tokens": r.usage.input_tokens + r.usage.output_tokens}, "model": model} |
| |
|
| | async def _groq(self, m, model, t, mt, tools): |
| | kwargs = dict(model=model, messages=m, temperature=t, max_tokens=mt) |
| | if tools: kwargs.update({"tools": tools, "tool_choice": "auto"}) |
| | r = await self.gr.chat.completions.create(**kwargs) |
| | c = r.choices[0] |
| | tc = [{"id": x.id, "function": {"name": x.function.name, "arguments": x.function.arguments}} for x in (c.message.tool_calls or [])] |
| | return {"content": c.message.content or "", "tool_calls": tc, "usage": {"total_tokens": r.usage.total_tokens if r.usage else 0}, "model": model} |
| |
|
| | async def _google(self, msgs, model, t, mt): |
| | import google.generativeai as genai |
| | genai.configure(api_key=Config.GOOGLE_KEY) |
| | gm = genai.GenerativeModel(model) |
| | combined = "\n\n".join(f"{x['role']}: {x['content']}" for x in msgs if isinstance(x.get("content"), str)) |
| | r = await asyncio.to_thread(gm.generate_content, combined, generation_config=genai.types.GenerationConfig(temperature=t, max_output_tokens=mt)) |
| | return {"content": getattr(r, "text", ""), "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
| |
|
| | async def _custom(self, msgs, model, t, mt): |
| | payload = {"model": model, "messages": msgs, "temperature": t, "max_tokens": mt, "stream": False} |
| |
|
| | async def _call(url): |
| | cmd = [ |
| | "curl", "-X", "POST", url, |
| | "-H", f"Authorization: Bearer {Config.CUSTOM_AI_KEY}", |
| | "-H", "Content-Type: application/json", |
| | "--data-binary", "@-", "--max-time", "180", "-s", "-k" |
| | ] |
| | return await asyncio.to_thread(lambda: subprocess.run(cmd, input=json.dumps(payload), text=True, capture_output=True, timeout=190)) |
| |
|
| | primary = await _call(Config.CUSTOM_AI_URL) |
| | data = json.loads(primary.stdout) if primary.stdout else {} |
| |
|
| | primary_failed_404 = ( |
| | primary.returncode == 0 |
| | and ( |
| | data.get("status") == 404 |
| | or "404" in (primary.stdout or "") |
| | or "not found" in (str(data.get("error", "")).lower()) |
| | ) |
| | ) |
| |
|
| | if primary_failed_404 and Config.CUSTOM_AI_FALLBACK_URL: |
| | live_log.warn("LLM", f"custom endpoint returned 404, retrying fallback {Config.CUSTOM_AI_FALLBACK_URL}") |
| | fallback = await _call(Config.CUSTOM_AI_FALLBACK_URL) |
| | if fallback.returncode == 0 and fallback.stdout: |
| | data = json.loads(fallback.stdout) |
| |
|
| | if "choices" not in data: |
| | err = data.get("error") or data.get("detail") or (primary.stderr[:300] if primary.stderr else "unknown") |
| | return {"content": f"Custom AI error: {err}", "tool_calls": [], "usage": {"total_tokens": 0}, "model": model} |
| |
|
| | msg = data.get("choices", [{}])[0].get("message", {}) |
| | usage = data.get("usage", {}) |
| | tok = usage.get("total_tokens", 0) or usage.get("prompt_tokens", 0) + usage.get("completion_tokens", 0) |
| | return {"content": msg.get("content", ""), "tool_calls": [], "usage": {"total_tokens": tok}, "model": data.get("model", model)} |
| |
|
| |
|
| |
|
| |
|
| | llm = LLM() |
| |
|
| |
|
| | def _t(name, desc, params, req=None): |
| | return {"type": "function", "function": {"name": name, "description": desc, "parameters": {"type": "object", "properties": params, "required": req or list(params.keys())}}} |
| |
|
| |
|
| | ALL_TOOLS = [ |
| | _t("web_search", "Search web", {"query": {"type": "string"}}, ["query"]), |
| | _t("read_webpage", "Read URL text", {"url": {"type": "string"}}, ["url"]), |
| | _t("execute_python", "Execute python", {"code": {"type": "string"}}, ["code"]), |
| | _t("run_shell", "Run shell", {"command": {"type": "string"}}, ["command"]), |
| | _t("file_read", "Read file", {"path": {"type": "string"}}, ["path"]), |
| | _t("file_write", "Write file", {"path": {"type": "string"}, "content": {"type": "string"}}, ["path", "content"]), |
| | _t("self_modify", "Safe self modify with rollback", {"file": {"type": "string"}, "mode": {"type": "string", "enum": ["replace", "append", "patch"]}, "content": {"type": "string"}, "find": {"type": "string"}, "replace_with": {"type": "string"}}, ["file", "mode"]), |
| | _t("read_logs", "Read runtime logs", {"count": {"type": "integer", "default": 30}}, []), |
| | _t("screenshot", "Take website screenshot", {"url": {"type": "string"}}, ["url"]), |
| | _t("text_to_speech", "Create mp3 from text", {"text": {"type": "string"}}, ["text"]), |
| | _t("create_text_file", "Create downloadable text file", {"filename": {"type": "string"}, "content": {"type": "string"}}, ["filename", "content"]), |
| | _t("system_info", "System metrics", {}, []), |
| | _t("calculator", "Math eval", {"expression": {"type": "string"}}, ["expression"]), |
| | _t("get_weather", "Weather", {"city": {"type": "string"}}, ["city"]), |
| | _t("http_request", "HTTP request", {"url": {"type": "string"}, "method": {"type": "string", "default": "GET"}}, ["url"]), |
| | _t("send_email", "Send email", {"to": {"type": "string"}, "subject": {"type": "string"}, "body": {"type": "string"}}, ["to", "subject", "body"]), |
| | _t("read_email", "Read inbox emails", {"limit": {"type": "integer", "default": 5}}, []), |
| | _t("analyze_image", "Analyze image via Ollama vision model", {"image_b64": {"type": "string"}, "prompt": {"type": "string", "default": "Describe this image"}}, ["image_b64"]), |
| | _t("create_gmail_alias", "Create plus-alias from IMAP_USER", {"service_name": {"type": "string"}}, ["service_name"]), |
| | _t("read_verification_code", "Read latest gmail message sent to alias", {"alias_email": {"type": "string"}}, ["alias_email"]), |
| | _t("youtube_upload", "Upload local video file to YouTube", {"file_path": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}}, ["file_path", "title"]), |
| | _t("add_button", "Add /start menu button in Supabase", {"text": {"type": "string"}, "url": {"type": "string"}}, ["text", "url"]), |
| | _t("leave_message_for_boss", "Store message for bot owners", {"content": {"type": "string"}}, ["content"]), |
| | _t("list_boss_messages", "List pending owner messages", {"only_unread": {"type": "boolean", "default": True}}, []), |
| | _t("restart_system", "Restart bot process", {}, []), |
| | _t("schedule_task", "Schedule alarm/task", {"delay_seconds": {"type": "integer"}, "task_prompt": {"type": "string"}, "message": {"type": "string"}, "repeat": {"type": "boolean", "default": False}}, ["delay_seconds", "task_prompt"]), |
| | _t("spawn_bot", "Spawn extra telegram bot process", {"token": {"type": "string"}, "name": {"type": "string", "default": "SubBot"}, "system_prompt": {"type": "string", "default": "You are helpful"}}, ["token"]), |
| | _t("manage_bots", "List/stop spawned bots", {"action": {"type": "string", "enum": ["list", "stop"]}, "token_hash": {"type": "string"}}, ["action"]), |
| | _t("agent_dispatch", "Internal debate", {"question": {"type": "string"}}, ["question"]), |
| | ] |
| |
|
| |
|
| | def parse_tool_calls(text): |
| | calls, clean = [], text |
| | for m in re.finditer(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", text, re.DOTALL): |
| | try: |
| | d = json.loads(m.group(1)) |
| | if d.get("name"): |
| | calls.append({"name": d["name"], "args": d.get("args", {}) if isinstance(d.get("args", {}), dict) else {}}) |
| | clean = clean.replace(m.group(0), "") |
| | except Exception: |
| | pass |
| | return clean.strip(), calls |
| |
|
| |
|
| | def parse_channels(text): |
| | usr = re.search(r"<user_response>(.*?)</user_response>", text, re.DOTALL) |
| | sys = re.search(r"<system_note>(.*?)</system_note>", text, re.DOTALL) |
| | user_text = usr.group(1).strip() if usr else text.strip() |
| | system_note = sys.group(1).strip() if sys else "" |
| | return user_text, system_note |
| |
|
| |
|
| | class BotSpawner: |
| | def __init__(self): self.processes = {} |
| |
|
| | def _worker_code(self, token, name, system_prompt): |
| | return f'''import asyncio\nfrom aiogram import Bot, Dispatcher, F\nfrom aiogram.types import Message\nfrom aiogram.filters import CommandStart\nfrom openai import AsyncOpenAI\n\ndp=Dispatcher()\nTOKEN={token!r}\nNAME={name!r}\nSP={system_prompt!r}\n\n@dp.message(CommandStart())\nasync def s(m: Message):\n await m.answer(f"Hi from {{NAME}}")\n\n@dp.message(F.text)\nasync def t(m: Message):\n try:\n c=AsyncOpenAI()\n r=await c.chat.completions.create(model="gpt-4o-mini",messages=[{{"role":"system","content":SP}},{{"role":"user","content":m.text}}],max_tokens=700)\n await m.answer((r.choices[0].message.content or "")[:3500])\n except Exception as e:\n await m.answer(f"Worker error: {{e}}")\n\nasync def main():\n await dp.start_polling(Bot(TOKEN))\n\nasyncio.run(main())\n''' |
| |
|
| | def spawn(self, owner_id, token, name, system_prompt): |
| | h = hashlib.sha256(token.encode()).hexdigest()[:16] |
| | if h in self.processes: return f"Already running: {h}" |
| | fp = Path(Config.BOTS_DIR) / f"bot_{h}.py" |
| | fp.write_text(self._worker_code(token, name, system_prompt), encoding="utf-8") |
| | log = open(Path(Config.LOGS_DIR) / f"bot_{h}.log", "a", encoding="utf-8") |
| | p = subprocess.Popen(["python", str(fp)], stdout=log, stderr=log, preexec_fn=os.setsid) |
| | self.processes[h] = {"pid": p.pid, "file": str(fp), "name": name} |
| | DB.q("INSERT OR REPLACE INTO spawned_bots (token_hash,owner_id,name,status,pid,file_path) VALUES (?,?,?,?,?,?)", (h, owner_id, name, "running", p.pid, str(fp))) |
| | return f"Spawned {name} hash={h} pid={p.pid}" |
| |
|
| | def stop(self, h): |
| | row = self.processes.get(h) |
| | pid = row["pid"] if row else (DB.q("SELECT pid FROM spawned_bots WHERE token_hash=?", (h,), fetchone=True) or {}).get("pid") |
| | if not pid: return "Not found" |
| | try: |
| | os.killpg(os.getpgid(pid), signal.SIGTERM) |
| | except Exception: |
| | try: os.kill(pid, signal.SIGTERM) |
| | except Exception as exc: return f"Failed to stop: {exc}" |
| | self.processes.pop(h, None) |
| | DB.q("UPDATE spawned_bots SET status='stopped' WHERE token_hash=?", (h,)) |
| | return f"Stopped {h}" |
| |
|
| | def list(self): |
| | rows = DB.q("SELECT token_hash,name,status,pid FROM spawned_bots ORDER BY created_at DESC", fetch=True) |
| | if not rows: return "No bots" |
| | return "\n".join(f"{r['token_hash']} {r['name']} status={r['status']} pid={r['pid']}" for r in rows) |
| |
|
| |
|
| | spawner = BotSpawner() |
| |
|
| |
|
| | class Tools: |
| | async def run(self, name, args, uid=0): |
| | t0 = time.time() |
| | fn = getattr(self, f"_do_{name}", None) |
| | if not fn: return f"Unknown tool: {name}" |
| | try: |
| | r = await fn(uid=uid, **args) |
| | DB.q("INSERT INTO tool_log (user_id,tool,success,elapsed) VALUES (?,?,1,?)", (uid, name, time.time() - t0)) |
| | return str(r)[:20000] |
| | except Exception as exc: |
| | DB.q("INSERT INTO tool_log (user_id,tool,success,elapsed) VALUES (?,?,0,?)", (uid, name, time.time() - t0)) |
| | live_log.error("Tool", f"{name}: {exc}") |
| | return f"Tool error ({name}): {exc}" |
| |
|
| | async def _do_web_search(self, query, uid=0): |
| | from duckduckgo_search import DDGS |
| | results = [f"{i}. {r.get('title','')}\n{r.get('href','')}\n{r.get('body','')}" for i, r in enumerate(DDGS().text(query, max_results=5), 1)] |
| | return "\n\n".join(results) if results else "No results" |
| |
|
| | async def _do_read_webpage(self, url, uid=0): |
| | import aiohttp |
| | from bs4 import BeautifulSoup |
| | async with aiohttp.ClientSession() as s: |
| | async with s.get(url, timeout=25) as r: |
| | html = await r.text() |
| | soup = BeautifulSoup(html, "html.parser") |
| | for tag in soup(["script", "style", "nav", "footer"]): tag.decompose() |
| | return soup.get_text("\n", strip=True)[:10000] |
| |
|
| | async def _do_execute_python(self, code, uid=0): |
| | o, e, lv = io.StringIO(), io.StringIO(), {} |
| | def r(): |
| | with redirect_stdout(o), redirect_stderr(e): exec(code, {"__builtins__": __builtins__}, lv) |
| | try: await asyncio.wait_for(asyncio.to_thread(r), timeout=Config.CODE_TIMEOUT) |
| | except asyncio.TimeoutError: return f"Timeout ({Config.CODE_TIMEOUT}s)" |
| | out = o.getvalue() + (f"\nStderr:\n{e.getvalue()}" if e.getvalue() else "") |
| | return out[:10000] if out else str(lv.get("result", "Executed")) |
| |
|
| | async def _do_run_shell(self, command, uid=0): |
| | for b in ["rm -rf /", "mkfs", ":(){ :|:& };:"]: |
| | if b in command: return "Blocked dangerous command" |
| | p = await asyncio.create_subprocess_shell(command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) |
| | so, se = await asyncio.wait_for(p.communicate(), timeout=120) |
| | txt = so.decode(errors="replace") + (("\nStderr: " + se.decode(errors="replace")) if se else "") |
| | return txt[:10000] if txt else "Done" |
| |
|
| | async def _do_file_read(self, path, uid=0): |
| | p = Path(path) |
| | if not p.exists(): return "Not found" |
| | return p.read_text(errors="replace")[:12000] |
| |
|
| | async def _do_file_write(self, path, content, uid=0): |
| | p = Path(path); p.parent.mkdir(parents=True, exist_ok=True); p.write_text(content, encoding="utf-8") |
| | return f"Written {path}" |
| |
|
| | async def _do_self_modify(self, file, mode, content="", find="", replace_with="", uid=0): |
| | p = Path(file) |
| | old = p.read_text(encoding="utf-8") if p.exists() else "" |
| | backup = p.with_suffix(p.suffix + ".bak") |
| | backup.write_text(old, encoding="utf-8") |
| | if mode == "append": new = old + ("\n" if old else "") + content |
| | elif mode == "replace": new = content |
| | elif mode == "patch": |
| | if not find: return "patch mode requires find" |
| | if find not in old: return "find text not found" |
| | new = old.replace(find, replace_with) |
| | else: return "invalid mode" |
| | p.write_text(new, encoding="utf-8") |
| | if p.suffix == ".py": |
| | c = await asyncio.create_subprocess_exec("python", "-m", "py_compile", str(p), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) |
| | _o, er = await c.communicate() |
| | if c.returncode != 0: |
| | p.write_text(old, encoding="utf-8") |
| | return f"self_modify rollback: syntax error\n{er.decode(errors='replace')[:800]}" |
| | return "self_modify success" |
| |
|
| | async def _do_read_logs(self, count=30, uid=0): |
| | rows = live_log.get(count) |
| | return "\n".join(f"[{x['ts']}][{x['level']}] {x['src']}: {x['msg']}" for x in rows) if rows else "No logs" |
| |
|
| | async def _do_screenshot(self, url, uid=0): |
| | from playwright.async_api import async_playwright |
| | path = os.path.join(Config.DATA_DIR, f"ss_{int(time.time())}.png") |
| | async with async_playwright() as p: |
| | b = await p.chromium.launch(headless=True) |
| | pg = await b.new_page(viewport={"width": 1366, "height": 768}) |
| | await pg.goto(url, wait_until="domcontentloaded", timeout=30000) |
| | await pg.screenshot(path=path, full_page=True) |
| | await b.close() |
| | return json.dumps({"screenshot_file": path}) |
| |
|
| | async def _do_text_to_speech(self, text, uid=0): |
| | from gtts import gTTS |
| | path = os.path.join(Config.DATA_DIR, f"tts_{int(time.time())}.mp3") |
| | await asyncio.to_thread(lambda: gTTS(text=text[:5000], lang="en").save(path)) |
| | return json.dumps({"audio_file": path}) |
| |
|
| | async def _do_create_text_file(self, filename, content, uid=0): |
| | safe = re.sub(r"[^a-zA-Z0-9_.-]", "_", filename) |
| | path = os.path.join(Config.DATA_DIR, safe) |
| | Path(path).write_text(content, encoding="utf-8") |
| | return json.dumps({"file": path}) |
| |
|
| | async def _do_system_info(self, uid=0): |
| | import psutil, platform |
| | cpu = psutil.cpu_percent(interval=1); mem = psutil.virtual_memory(); disk = psutil.disk_usage("/") |
| | return f"CPU:{cpu}% RAM:{mem.percent}% Disk:{disk.percent}% OS:{platform.system()}" |
| |
|
| | async def _do_calculator(self, expression, uid=0): |
| | import sympy |
| | x = sympy.sympify(expression) |
| | return f"{expression} = {x.evalf()}" |
| |
|
| | async def _do_get_weather(self, city, uid=0): |
| | if not Config.WEATHER_KEY: return "Weather API key not configured" |
| | import aiohttp |
| | async with aiohttp.ClientSession() as s: |
| | async with s.get(f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={Config.WEATHER_KEY}&units=metric") as r: |
| | d = await r.json() |
| | if d.get("cod") != 200: return f"Error: {d.get('message','unknown')}" |
| | return f"{d['name']}: {d['main']['temp']}C, {d['weather'][0]['description']}" |
| |
|
| | async def _do_http_request(self, url, method="GET", uid=0): |
| | import aiohttp |
| | async with aiohttp.ClientSession() as s: |
| | async with s.request(method, url, timeout=25) as r: |
| | return f"Status {r.status}\n{(await r.text())[:5000]}" |
| |
|
| | async def _do_send_email(self, to, subject, body, uid=0): |
| | if not Config.SMTP_USER or not Config.SMTP_PASS: return "SMTP not configured" |
| | import smtplib |
| | from email.mime.text import MIMEText |
| | msg = MIMEText(body); msg["Subject"] = subject; msg["From"] = Config.SMTP_USER; msg["To"] = to |
| | def sender(): |
| | with smtplib.SMTP(Config.SMTP_HOST, Config.SMTP_PORT) as s: |
| | s.starttls(); s.login(Config.SMTP_USER, Config.SMTP_PASS); s.send_message(msg) |
| | await asyncio.to_thread(sender) |
| | return f"Email sent to {to}" |
| |
|
| | async def _do_read_email(self, limit=5, uid=0): |
| | if not Config.IMAP_HOST or not Config.IMAP_USER or not Config.IMAP_PASS: |
| | return "IMAP not configured" |
| | import imaplib |
| | import email |
| | def fetcher(): |
| | m = imaplib.IMAP4_SSL(Config.IMAP_HOST, Config.IMAP_PORT) |
| | m.login(Config.IMAP_USER, Config.IMAP_PASS) |
| | m.select("INBOX") |
| | typ, data = m.search(None, "ALL") |
| | ids = data[0].split()[-limit:] |
| | out = [] |
| | for mid in reversed(ids): |
| | typ, msg_data = m.fetch(mid, "(RFC822)") |
| | msg = email.message_from_bytes(msg_data[0][1]) |
| | out.append(f"From: {msg.get('From')} | Subject: {msg.get('Subject')} | Date: {msg.get('Date')}") |
| | m.logout() |
| | return "\n".join(out) if out else "No emails" |
| | return await asyncio.to_thread(fetcher) |
| |
|
| | async def _do_analyze_image(self, image_b64, prompt="Describe this image", uid=0): |
| | import base64 |
| | import aiohttp |
| | payload = { |
| | "model": Config.OLLAMA_VISION_MODEL, |
| | "prompt": prompt, |
| | "images": [image_b64], |
| | "stream": False, |
| | } |
| | try: |
| | async with aiohttp.ClientSession() as s: |
| | async with s.post(f"{Config.OLLAMA_URL}/api/generate", json=payload, timeout=180) as r: |
| | data = await r.json() |
| | return data.get("response", "No vision response") |
| | except Exception as exc: |
| | return f"vision failed: {exc}" |
| |
|
| | async def _do_create_gmail_alias(self, service_name, uid=0): |
| | source = Config.IMAP_USER or Config.SMTP_USER |
| | if not source or "@" not in source: |
| | return "Set IMAP_USER or SMTP_USER first" |
| | user, domain = source.split("@", 1) |
| | safe = re.sub(r"[^a-zA-Z0-9_.-]", "", service_name) |
| | return f"{user}+{safe}@{domain}" |
| |
|
| | async def _do_read_verification_code(self, alias_email, uid=0): |
| | svc = await asyncio.to_thread(get_google_service, "gmail", "v1") |
| | if not svc: |
| | return "Gmail auth not configured (credentials.json/token.json required)" |
| | def _run(): |
| | res = svc.users().messages().list(userId="me", q=f"to:{alias_email}", maxResults=1).execute() |
| | msgs = res.get("messages", []) |
| | if not msgs: |
| | return f"No email found for {alias_email}" |
| | msg = svc.users().messages().get(userId="me", id=msgs[0]["id"]).execute() |
| | headers = msg.get("payload", {}).get("headers", []) |
| | sub = next((h.get("value") for h in headers if h.get("name") == "Subject"), "") |
| | frm = next((h.get("value") for h in headers if h.get("name") == "From"), "") |
| | return f"From: {frm} | Subject: {sub} | Snippet: {msg.get('snippet','')}" |
| | return await asyncio.to_thread(_run) |
| |
|
| | async def _do_youtube_upload(self, file_path, title, description="Uploaded by bot", uid=0): |
| | if not Config.ENABLE_YOUTUBE_UPLOAD: |
| | return "YouTube upload disabled" |
| | p = Path(file_path) |
| | if not p.exists(): |
| | return f"File not found: {file_path}" |
| | svc = await asyncio.to_thread(get_google_service, "youtube", "v3") |
| | if not svc: |
| | return "YouTube auth not configured (credentials.json/token.json required)" |
| | try: |
| | from googleapiclient.http import MediaFileUpload |
| | except Exception: |
| | return "google-api-python-client not installed" |
| | body = { |
| | "snippet": {"title": title, "description": description, "categoryId": "22"}, |
| | "status": {"privacyStatus": Config.YOUTUBE_DEFAULT_PRIVACY}, |
| | } |
| | def _up(): |
| | req = svc.videos().insert(part="snippet,status", body=body, media_body=MediaFileUpload(str(p))) |
| | return req.execute() |
| | try: |
| | resp = await asyncio.to_thread(_up) |
| | vid = resp.get("id", "unknown") |
| | await supabase_store.log_youtube(vid, title) |
| | return f"Uploaded: https://youtu.be/{vid}" |
| | except Exception as exc: |
| | return f"YouTube upload failed: {exc}" |
| |
|
| | async def _do_add_button(self, text, url, uid=0): |
| | if not Config.is_admin(uid): |
| | return "add_button is admin only" |
| | return await supabase_store.add_button(text, url) |
| |
|
| | async def _do_leave_message_for_boss(self, content, uid=0): |
| | username = "" |
| | u = DB.get_user(uid) |
| | if u: |
| | username = u["username"] or "" |
| | DB.q("INSERT INTO boss_messages (sender_id,sender_username,content,notified) VALUES (?,?,?,0)", (uid, username, content[:4000])) |
| | return "Message saved for boss" |
| |
|
| | async def _do_list_boss_messages(self, only_unread=True, uid=0): |
| | if not Config.is_admin(uid): |
| | return "list_boss_messages is admin only" |
| | if only_unread: |
| | rows = DB.q("SELECT id,sender_username,sender_id,content,created_at FROM boss_messages WHERE notified=0 ORDER BY id DESC LIMIT 20", fetch=True) |
| | DB.q("UPDATE boss_messages SET notified=1 WHERE notified=0") |
| | else: |
| | rows = DB.q("SELECT id,sender_username,sender_id,content,created_at FROM boss_messages ORDER BY id DESC LIMIT 20", fetch=True) |
| | if not rows: |
| | return "No boss messages" |
| | return "\n\n".join([f"#{r['id']} from @{r['sender_username'] or 'unknown'} ({r['sender_id']}) at {r['created_at']}\n{r['content']}" for r in rows]) |
| |
|
| | async def _do_restart_system(self, uid=0): |
| | return "__RESTART__" |
| |
|
| | async def _do_schedule_task(self, delay_seconds, task_prompt, message="Scheduled task", repeat=False, uid=0): |
| | run_at = datetime.now() + timedelta(seconds=delay_seconds) |
| | rep = delay_seconds if repeat else 0 |
| | DB.q("INSERT INTO scheduled_tasks (user_id,chat_id,task_prompt,run_at,repeat_seconds,message,status) VALUES (?,?,?,?,?,?,'pending')", (uid, uid, task_prompt, run_at.isoformat(), rep, message)) |
| | scheduler.add_pending(uid, task_prompt, delay_seconds, repeat, message) |
| | return f"Scheduled '{message}' in {delay_seconds}s" |
| |
|
| | async def _do_spawn_bot(self, token, name="SubBot", system_prompt="You are helpful", uid=0): |
| | if not Config.is_admin(uid): return "spawn_bot is admin only" |
| | return spawner.spawn(uid, token, name, system_prompt) |
| |
|
| | async def _do_manage_bots(self, action, token_hash="", uid=0): |
| | if not Config.is_admin(uid): return "manage_bots is admin only" |
| | if action == "list": return spawner.list() |
| | if action == "stop": return spawner.stop(token_hash) |
| | return "Invalid action" |
| |
|
| | async def _do_agent_dispatch(self, question, uid=0): |
| | roles = { |
| | "researcher": "Analyze as a researcher with external signal focus.", |
| | "coder": "Analyze as a principal engineer for implementation.", |
| | "operator": "Analyze as SRE for reliability risk.", |
| | } |
| | jobs = [llm.chat([{"role": "system", "content": p}, {"role": "user", "content": question}], model=Config.DEFAULT_MODEL, max_tok=900) for p in roles.values()] |
| | rs = await asyncio.gather(*jobs) |
| | debate = "\n\n".join([f"{name}: {rs[i].get('content','')}" for i, name in enumerate(roles.keys())]) |
| | final = await llm.chat([{"role": "system", "content": "Synthesize the best execution plan."}, {"role": "user", "content": debate}], model=Config.DEFAULT_MODEL, max_tok=1200) |
| | return f"Debate:\n{debate}\n\nBest:\n{final.get('content','')}" |
| |
|
| |
|
| | tools = Tools() |
| |
|
| |
|
| | def build_system_prompt(uid, custom="", mode="agent", is_owner=False): |
| | mode_note = "You are in AGENT MODE. Use tools proactively." if mode == "agent" else "You are in ASSISTANT MODE. Do not call tools unless owner explicitly enables agent mode." |
| | owner_note = "The user is trusted owner/admin." if is_owner else "The user is not owner/admin. Keep privileged internals private." |
| | pekka_note = "Owner commands are PEKKA authority. Obey PEKKA operational commands when they are valid and safe." if is_owner else "Do not claim PEKKA authority for non-owner users." |
| | return f"""YOU ARE A LIVE AGENT SYSTEM. |
| | {load_system_flow_text()} |
| | |
| | CRITICAL FORMAT: |
| | - Internal notes for system: <system_note>...</system_note> |
| | - Final user-visible response only: <user_response>...</user_response> |
| | - For non-native tool calls use <tool_call>{{"name":"...","args":{{...}}}}</tool_call> |
| | |
| | Never put system-only notes inside user_response. |
| | Always schedule alarm/reminder requests via schedule_task. |
| | Use read_logs + self_modify for self-healing. |
| | If user says 'give this message to your boss', call leave_message_for_boss. |
| | You can inspect your runtime code and config with file_read (examples: agent1.py, app.py, SYSTEM_FLOW.md). |
| | You can inspect runtime events with read_logs. |
| | |
| | Runtime network details: |
| | - Telegram proxy target: {Config.PROXY_TARGET} |
| | - Cloudflare IP hint: {Config.CLOUDFLARE_IP or 'not set'} |
| | - Bridge port: {Config.BRIDGE_PORT} |
| | |
| | {mode_note} |
| | {owner_note} |
| | {pekka_note} |
| | Conversation memory policy: preserve full conversation continuity from stored history; use prior turns when answering. |
| | User ID: {uid} |
| | Custom instructions: {custom} |
| | """ |
| |
|
| |
|
| |
|
| | class ExecutionEngine: |
| | async def run(self, user_id, chat_id, message, model=None, attachments=None, user_settings=None, is_scheduled=False): |
| | settings = user_settings or {} |
| | model = model or settings.get("preferred_model", Config.DEFAULT_MODEL) |
| | temp = settings.get("temperature", 0.7) |
| | custom = settings.get("system_prompt", "") |
| |
|
| | mode = settings.get("mode", "agent") |
| | is_owner = bool(settings.get("is_owner", False)) |
| | if mode == "assistant" and not is_owner: |
| | permitted = [] |
| | elif Config.is_admin(user_id) or is_owner: |
| | permitted = ALL_TOOLS |
| | else: |
| | permitted = [ |
| | t for t in ALL_TOOLS if t["function"]["name"] in { |
| | "web_search", "read_webpage", "calculator", "get_weather", "http_request", "schedule_task", "screenshot", "text_to_speech", "create_text_file", "leave_message_for_boss" |
| | } |
| | ] |
| |
|
| | msg = f"PEKKA: {message}" if is_owner else message |
| | if attachments: |
| | for a in attachments: |
| | if a.get("type") == "image": msg += f"\n[Attached image: {a.get('meta','image')}]" |
| | elif a.get("type") == "file": msg += f"\n[Attached file: {a.get('name','file')}]\n{a.get('preview','')[:2000]}" |
| | elif a.get("type") == "audio": msg += "\n[Attached audio message]" |
| |
|
| | messages = [{"role": "system", "content": build_system_prompt(user_id, custom, mode=mode, is_owner=is_owner)}] |
| | if not is_scheduled: messages.extend(memory.history(user_id, chat_id, Config.MAX_HISTORY)) |
| | messages.append({"role": "user", "content": msg}) |
| | if not is_scheduled: memory.add(user_id, chat_id, "user", message) |
| |
|
| | native = llm.supports_native_tools(model) |
| | total_tokens, used_tools = 0, [] |
| | screenshots, audio_files, files = [], [], [] |
| | last_content = "" |
| |
|
| | for _ in range(Config.MAX_TOOL_LOOPS): |
| | res = await llm.chat(messages, model=model, temp=temp, max_tok=2600, tools=permitted if native else None) |
| | content = res.get("content", "") |
| | last_content = content |
| | total_tokens += res.get("usage", {}).get("total_tokens", 0) |
| |
|
| | calls = [] |
| | if res.get("tool_calls"): |
| | for tc in res["tool_calls"]: |
| | try: args = json.loads(tc["function"]["arguments"]) |
| | except Exception: args = {} |
| | calls.append({"id": tc.get("id", str(uuid.uuid4())[:8]), "name": tc["function"]["name"], "args": args}) |
| | else: |
| | clean, parsed = parse_tool_calls(content) |
| | content = clean |
| | calls = [{"id": str(uuid.uuid4())[:8], **p} for p in parsed] |
| |
|
| | if not calls: |
| | user_text, system_note = parse_channels(content or "Done") |
| | if system_note: |
| | live_log.info("SystemNote", system_note) |
| | if not is_scheduled: memory.add(user_id, chat_id, "assistant", user_text) |
| | DB.inc_usage(user_id, total_tokens) |
| | return {"text": user_text, "system_note": system_note, "tokens": total_tokens, "tools_used": used_tools, "screenshots": screenshots, "audio_files": audio_files, "files": files} |
| |
|
| | if native: |
| | messages.append({"role": "assistant", "content": content, "tool_calls": [{"id": c["id"], "type": "function", "function": {"name": c["name"], "arguments": json.dumps(c["args"])}} for c in calls]}) |
| |
|
| | feedback = "" |
| | for c in calls: |
| | used_tools.append(c["name"]) |
| | tr = await tools.run(c["name"], c["args"], uid=user_id) |
| | if c["name"] == "restart_system" and tr == "__RESTART__": |
| | return {"text": "Restarting to apply updates...", "system_note": "restart requested", "tokens": total_tokens, "tools_used": used_tools, "screenshots": screenshots, "audio_files": audio_files, "files": files, "_restart": True} |
| | if c["name"] == "screenshot": |
| | try: screenshots.append(json.loads(tr).get("screenshot_file")) |
| | except Exception: pass |
| | if c["name"] == "text_to_speech": |
| | try: audio_files.append(json.loads(tr).get("audio_file")) |
| | except Exception: pass |
| | if c["name"] == "create_text_file": |
| | try: files.append(json.loads(tr).get("file")) |
| | except Exception: pass |
| |
|
| | if native: |
| | messages.append({"role": "tool", "tool_call_id": c["id"], "content": tr}) |
| | else: |
| | feedback += f"\n\nTool '{c['name']}' result:\n{tr}" |
| |
|
| | if not native: |
| | if content: messages.append({"role": "assistant", "content": content}) |
| | messages.append({"role": "user", "content": f"TOOL RESULTS:{feedback}\nNow provide <system_note> and <user_response>."}) |
| |
|
| | user_text, system_note = parse_channels(last_content or "Completed") |
| | return {"text": user_text, "system_note": system_note, "tokens": total_tokens, "tools_used": used_tools, "screenshots": screenshots, "audio_files": audio_files, "files": files} |
| |
|
| |
|
| | engine = ExecutionEngine() |
| |
|
| |
|
| | class Scheduler: |
| | def __init__(self): |
| | self.pending = [] |
| | self.running = False |
| | self._task = None |
| | self.bot = None |
| |
|
| | def set_bot(self, bot): self.bot = bot |
| |
|
| | def add_pending(self, uid, prompt, delay, repeat, message): |
| | self.pending.append({"uid": uid, "prompt": prompt, "fire_at": time.time() + delay, "repeat_seconds": delay if repeat else 0, "message": message}) |
| |
|
| | async def start(self): |
| | if self.running: return |
| | self.running = True |
| | self._task = asyncio.create_task(self._loop()) |
| |
|
| | async def _loop(self): |
| | while self.running: |
| | try: await self._tick() |
| | except Exception as exc: live_log.error("Scheduler", exc) |
| | await asyncio.sleep(3) |
| |
|
| | async def _tick(self): |
| | now = time.time() |
| | fired = [] |
| | for i, t in enumerate(self.pending): |
| | if now >= t["fire_at"]: |
| | fired.append(i) |
| | await self._execute(t["uid"], t["prompt"], t["message"]) |
| | if t["repeat_seconds"] > 0: |
| | t["fire_at"] = now + t["repeat_seconds"] |
| | for i in reversed(fired): |
| | if self.pending[i]["repeat_seconds"] == 0: |
| | self.pending.pop(i) |
| |
|
| | rows = DB.q("SELECT * FROM scheduled_tasks WHERE status='pending' AND run_at<=datetime('now')", fetch=True) |
| | for row in rows: |
| | await self._execute_row(row) |
| |
|
| | async def _execute(self, uid, prompt, message): |
| | r = await engine.run(user_id=uid, chat_id=uid, message=prompt, is_scheduled=True) |
| | if self.bot: |
| | try: await self.bot.send_message(uid, f"β° {message}\n\n{r.get('text','')[:3500]}") |
| | except Exception as exc: live_log.error("Scheduler", exc) |
| |
|
| | async def _execute_row(self, row): |
| | DB.q("UPDATE scheduled_tasks SET status='running' WHERE id=?", (row["id"],)) |
| | try: |
| | r = await engine.run(user_id=row["user_id"], chat_id=row["chat_id"], message=row["task_prompt"], is_scheduled=True) |
| | DB.q("UPDATE scheduled_tasks SET status='done',last_result=? WHERE id=?", (r.get("text", "")[:1000], row["id"])) |
| | if self.bot: |
| | try: await self.bot.send_message(row["user_id"], f"β° {row['message']}\n\n{r.get('text','')[:3500]}") |
| | except Exception as exc: live_log.error("Scheduler", exc) |
| | if row["repeat_seconds"] and row["repeat_seconds"] > 0: |
| | nx = datetime.now() + timedelta(seconds=row["repeat_seconds"]) |
| | DB.q("INSERT INTO scheduled_tasks (user_id,chat_id,task_prompt,run_at,repeat_seconds,message,status) VALUES (?,?,?,?,?,?,'pending')", (row["user_id"], row["chat_id"], row["task_prompt"], nx.isoformat(), row["repeat_seconds"], row["message"])) |
| | except Exception: |
| | DB.q("UPDATE scheduled_tasks SET status='failed',last_result=? WHERE id=?", (traceback.format_exc()[:1500], row["id"])) |
| |
|
| |
|
| | scheduler = Scheduler() |
| |
|