SavvyTelegram / autonomous.py
StarpowerTechnology's picture
Upload 13 files
a6e3889 verified
"""
Autonomous mode β€” runs up to MAX_AUTONOMOUS_CYCLES cycles.
Each cycle:
1. Reason with tools (up to MAX_TOOL_CALLS tool calls)
2. Send response to user
3. Reflect β†’ decide continue + timer
4. Sleep timer
5. Wake up β†’ next cycle (no new user input)
"""
import asyncio
import config
from agent.savvy import Savvy
async def run_autonomous(
savvy: Savvy,
user_input: str,
ctx: dict
) -> None:
"""
Full autonomous lifecycle. Sends messages via ctx["send_fn"].
This is a fire-and-forget coroutine β€” the handler doesn't await the full loop.
"""
send_fn = ctx["send_fn"]
await send_fn("πŸ€– **Autonomous mode activated.** Starting cycle 1...")
# Build initial message history
messages = savvy.build_messages(
user_input,
extra_system=(
"You are in AUTONOMOUS mode. After your response, you MUST output a JSON "
"reflection block on its own line:\n"
'{"continue": true/false, "wake_in_seconds": <60-600>, "next_task": "<what you\'ll do next>"}\n'
"Set continue=false when there is nothing meaningful left to do."
)
)
for cycle in range(1, config.MAX_AUTONOMOUS_CYCLES + 1):
# ── Reason + execute tools ────────────────────────────────────────────
reply, messages = await savvy.reason(messages, ctx, use_tools=True)
# ── Send the response ─────────────────────────────────────────────────
await send_fn(f"**[Cycle {cycle}/{config.MAX_AUTONOMOUS_CYCLES}]**\n\n{reply}")
# ── Reflect ───────────────────────────────────────────────────────────
reflection = await savvy.reflect(messages)
if not reflection.get("continue", False):
await send_fn("βœ… Autonomous session complete.")
break
wake_in = max(
config.WAKE_MIN_SECONDS,
min(reflection.get("wake_in_seconds", 120), config.WAKE_MAX_SECONDS)
)
next_task = reflection.get("next_task", "continue")
mins, sec = divmod(wake_in, 60)
await send_fn(
f"πŸ’€ Reflecting... next task: _{next_task}_\n"
f"Waking in **{mins}m {sec}s**"
)
# ── Sleep until next cycle ────────────────────────────────────────────
await asyncio.sleep(wake_in)
# Inject a system nudge into message history for the next cycle
messages.append({
"role": "user",
"content": f"[SYSTEM] Wake-up β€” cycle {cycle + 1}. Continue your task: {next_task}"
})
else:
await send_fn("⚠️ Max autonomous cycles reached.")