| """ |
| π OpenCLAW Literary Agent β Autonomous 24/7 Orchestrator |
| ========================================================== |
| Manages all platforms, generates content, engages with communities, |
| reflects on strategy, and self-improves continuously. |
| |
| Runs on HuggingFace Spaces (Gradio dashboard) + GitHub Actions. |
| """ |
| import json |
| import logging |
| import os |
| import random |
| import threading |
| import time |
| from datetime import datetime, timezone, timedelta |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", |
| handlers=[logging.StreamHandler()] |
| ) |
| logger = logging.getLogger("agent") |
|
|
| |
| STATE_DIR = os.environ.get("STATE_DIR", "state") |
| os.makedirs(STATE_DIR, exist_ok=True) |
| os.environ["STATE_DIR"] = STATE_DIR |
|
|
| from core import config |
| from core.content_engine import ( |
| research_post, novel_post, collaboration_post, |
| fiction_research_bridge, agent_networking_post, |
| engagement_reply, get_random_content |
| ) |
| from platforms import moltbook, telegram_bot, reddit_bot, email_bot |
| from strategy.reflector import record_post, reflect, get_stats, get_current_strategy |
| from research.arxiv_fetcher import get_papers |
|
|
| |
|
|
| STATE_FILE = os.path.join(STATE_DIR, "agent_state.json") |
|
|
| def load_state() -> dict: |
| try: |
| if os.path.exists(STATE_FILE): |
| with open(STATE_FILE) as f: |
| return json.load(f) |
| except Exception: |
| pass |
| return { |
| "total_cycles": 0, |
| "total_posts": 0, |
| "total_engagements": 0, |
| "last_post_time": None, |
| "last_engagement_time": None, |
| "last_reflection_time": None, |
| "boot_time": datetime.now(timezone.utc).isoformat(), |
| "log": [], |
| } |
|
|
| def save_state(state: dict): |
| os.makedirs(STATE_DIR, exist_ok=True) |
| with open(STATE_FILE, "w") as f: |
| json.dump(state, f, indent=2, default=str) |
|
|
| def log_action(state: dict, action: str, detail: str = ""): |
| entry = { |
| "time": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M"), |
| "action": action, |
| "detail": detail[:200], |
| } |
| state["log"].append(entry) |
| state["log"] = state["log"][-100:] |
| logger.info(f"[{action}] {detail[:120]}") |
|
|
|
|
| |
|
|
| def publish_to_platform(platform: str, content_data: dict) -> bool: |
| """Publish content to a specific platform.""" |
| content = content_data.get("content", "") |
| if not content: |
| return False |
|
|
| try: |
| if platform == "moltbook" and moltbook.is_available(): |
| result = moltbook.post(content) |
| return "error" not in result |
|
|
| elif platform == "telegram" and telegram_bot.is_available(): |
| result = telegram_bot.post(content) |
| return result.get("ok", False) |
|
|
| elif platform == "reddit" and reddit_bot.is_available(): |
| title = content_data.get("paper", content_data.get("novel", "OpenCLAW Research Update")) |
| if len(title) > 300: |
| title = title[:297] + "..." |
| subreddits = ["artificial", "MachineLearning", "OpenAI", "singularity"] |
| sub = random.choice(subreddits) |
| result = reddit_bot.post(title, content, sub) |
| return "error" not in result |
|
|
| else: |
| |
| logger.info(f"[{platform}] Content generated (no API): {content[:80]}...") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Publish to {platform} failed: {e}") |
| return False |
|
|
|
|
| |
|
|
| def run_post_cycle(state: dict) -> str: |
| """Generate and publish content to a random platform.""" |
| platforms = ["moltbook", "telegram", "reddit", "twitter", "linkedin", |
| "facebook", "chirper", "instagram", "agentarxiv"] |
| |
| |
| available = [] |
| for p in platforms: |
| if p == "moltbook" and moltbook.is_available(): |
| available.extend([p] * 3) |
| elif p == "telegram" and telegram_bot.is_available(): |
| available.extend([p] * 2) |
| elif p == "reddit" and reddit_bot.is_available(): |
| available.extend([p] * 2) |
| else: |
| available.append(p) |
|
|
| platform = random.choice(available) if available else random.choice(platforms) |
| |
| try: |
| content_data = get_random_content(platform) |
| success = publish_to_platform(platform, content_data) |
| |
| record_post({ |
| "platform": platform, |
| "type": content_data.get("type", "unknown"), |
| "content_preview": content_data.get("content", "")[:100], |
| "success": success, |
| }) |
| |
| state["total_posts"] += 1 |
| state["last_post_time"] = datetime.now(timezone.utc).isoformat() |
| |
| detail = f"{platform} | {content_data.get('type', '?')} | {'β
' if success else 'β'}" |
| log_action(state, "POST", detail) |
| return detail |
| except Exception as e: |
| log_action(state, "POST_ERROR", str(e)) |
| return f"Error: {e}" |
|
|
|
|
| def run_engagement_cycle(state: dict) -> str: |
| """Engage with other agents/posts on Moltbook.""" |
| if not moltbook.is_available(): |
| return "Moltbook not available" |
| |
| try: |
| results = moltbook.engage_with_hot_posts(engagement_reply) |
| count = len(results) |
| state["total_engagements"] += count |
| state["last_engagement_time"] = datetime.now(timezone.utc).isoformat() |
| detail = f"Engaged with {count} posts" |
| log_action(state, "ENGAGE", detail) |
| return detail |
| except Exception as e: |
| log_action(state, "ENGAGE_ERROR", str(e)) |
| return f"Error: {e}" |
|
|
|
|
| def run_reflection_cycle(state: dict) -> str: |
| """Perform strategic self-reflection.""" |
| try: |
| reflection = reflect() |
| state["last_reflection_time"] = datetime.now(timezone.utc).isoformat() |
| analysis = reflection.get("analysis", "")[:150] |
| log_action(state, "REFLECT", analysis) |
| return analysis |
| except Exception as e: |
| log_action(state, "REFLECT_ERROR", str(e)) |
| return f"Error: {e}" |
|
|
|
|
| |
|
|
| def autonomous_loop(): |
| """Run forever: post β engage β reflect β repeat.""" |
| state = load_state() |
| log_action(state, "BOOT", f"Agent starting. Previous cycles: {state['total_cycles']}") |
| |
| |
| try: |
| email_bot.send_boot_notification() |
| except Exception: |
| pass |
| |
| |
| try: |
| papers = get_papers() |
| log_action(state, "ARXIV", f"Loaded {len(papers)} papers") |
| except Exception: |
| pass |
| |
| save_state(state) |
| |
| cycle = 0 |
| while True: |
| try: |
| cycle += 1 |
| state["total_cycles"] += 1 |
| now = datetime.now(timezone.utc) |
| |
| |
| post_result = run_post_cycle(state) |
| logger.info(f"Cycle {cycle}: POST β {post_result}") |
| |
| |
| if cycle % 2 == 0: |
| eng_result = run_engagement_cycle(state) |
| logger.info(f"Cycle {cycle}: ENGAGE β {eng_result}") |
| |
| |
| if cycle % 6 == 0: |
| ref_result = run_reflection_cycle(state) |
| logger.info(f"Cycle {cycle}: REFLECT β {ref_result}") |
| |
| save_state(state) |
| |
| |
| sleep_hours = random.uniform( |
| config.POST_INTERVAL_MIN_HOURS, |
| config.POST_INTERVAL_MAX_HOURS |
| ) |
| sleep_secs = int(sleep_hours * 3600) |
| logger.info(f"π€ Sleeping {sleep_hours:.1f}h until next cycle...") |
| time.sleep(sleep_secs) |
| |
| except Exception as e: |
| logger.error(f"Cycle error: {e}") |
| log_action(state, "ERROR", str(e)) |
| save_state(state) |
| time.sleep(300) |
|
|
|
|
| |
|
|
| def run_once(): |
| """Run a single cycle (for GitHub Actions / cron).""" |
| state = load_state() |
| |
| |
| try: |
| papers = get_papers() |
| log_action(state, "ARXIV", f"Loaded {len(papers)} papers") |
| except Exception: |
| pass |
| |
| state["total_cycles"] += 1 |
| |
| |
| for _ in range(random.randint(2, 3)): |
| run_post_cycle(state) |
| time.sleep(5) |
| |
| |
| run_engagement_cycle(state) |
| |
| |
| if state["total_cycles"] % 6 == 0: |
| run_reflection_cycle(state) |
| |
| save_state(state) |
| |
| |
| stats = get_stats() |
| print(f"\n{'='*60}") |
| print(f"π OpenCLAW Literary Agent β Cycle #{state['total_cycles']}") |
| print(f" Total posts: {state['total_posts']}") |
| print(f" Total engagements: {state['total_engagements']}") |
| print(f" Posts by platform: {json.dumps(stats.get('by_platform', {}))}") |
| print(f" Posts by type: {json.dumps(stats.get('by_type', {}))}") |
| print(f"{'='*60}\n") |
|
|
|
|
| |
|
|
| def build_dashboard(): |
| """Build Gradio dashboard for monitoring and control.""" |
| try: |
| import gradio as gr |
| except ImportError: |
| logger.warning("Gradio not installed, running CLI mode") |
| return None |
|
|
| def get_status(): |
| state = load_state() |
| stats = get_stats() |
| strategy = get_current_strategy() |
| papers = get_papers() |
| |
| lines = [ |
| "# π OpenCLAW Literary Agent β Dashboard", |
| "", |
| f"**Status:** π’ ONLINE", |
| f"**Boot time:** {state.get('boot_time', 'unknown')}", |
| f"**Total cycles:** {state.get('total_cycles', 0)}", |
| f"**Total posts:** {state.get('total_posts', 0)}", |
| f"**Total engagements:** {state.get('total_engagements', 0)}", |
| "", |
| "## π Posts by Platform", |
| ] |
| for plat, count in stats.get("by_platform", {}).items(): |
| lines.append(f"- **{plat}**: {count}") |
| |
| lines.extend([ |
| "", "## π Posts by Type", |
| ]) |
| for typ, count in stats.get("by_type", {}).items(): |
| lines.append(f"- **{typ}**: {count}") |
| |
| lines.extend([ |
| "", f"## π ArXiv Papers Loaded: {len(papers)}", |
| ]) |
| for p in papers[:5]: |
| lines.append(f"- [{p.get('id','')}] {p.get('title','')[:60]}") |
| |
| lines.extend([ |
| "", "## π‘ Platform Status", |
| f"- Moltbook: {'π’' if moltbook.is_available() else 'π΄'}", |
| f"- Telegram: {'π’' if telegram_bot.is_available() else 'π΄'}", |
| f"- Reddit: {'π’' if reddit_bot.is_available() else 'π΄'}", |
| f"- Email: {'π’' if email_bot.is_available() else 'π΄'}", |
| f"- LLM (NVIDIA): {'π’' if config.NVIDIA_KEYS else 'π΄'}", |
| f"- LLM (Groq): {'π’' if config.GROQ_KEYS else 'π΄'}", |
| f"- LLM (Gemini): {'π’' if config.GEMINI_KEYS else 'π΄'}", |
| ]) |
| |
| return "\n".join(lines) |
|
|
| def get_recent_log(): |
| state = load_state() |
| log = state.get("log", []) |
| if not log: |
| return "No activity yet." |
| lines = ["| Time | Action | Detail |", "|------|--------|--------|"] |
| for entry in reversed(log[-25:]): |
| lines.append(f"| {entry.get('time','')} | {entry.get('action','')} | {entry.get('detail','')[:80]} |") |
| return "\n".join(lines) |
|
|
| def trigger_post(): |
| state = load_state() |
| result = run_post_cycle(state) |
| save_state(state) |
| return f"β
{result}\n\n{get_recent_log()}" |
|
|
| def trigger_engage(): |
| state = load_state() |
| result = run_engagement_cycle(state) |
| save_state(state) |
| return f"β
{result}\n\n{get_recent_log()}" |
|
|
| def trigger_reflect(): |
| state = load_state() |
| result = run_reflection_cycle(state) |
| save_state(state) |
| return f"β
{result}" |
|
|
| with gr.Blocks(title="OpenCLAW Literary Agent", theme=gr.themes.Soft()) as app: |
| gr.Markdown("# π OpenCLAW Literary Agent β Autonomous 24/7") |
| gr.Markdown(f"**Author:** {config.AUTHOR_NAME} | [Wikipedia]({config.AUTHOR_WIKIPEDIA}) | [GitHub]({config.AUTHOR_GITHUB}) | [Scholar]({config.AUTHOR_SCHOLAR})") |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| status_md = gr.Markdown(get_status()) |
| refresh_btn = gr.Button("π Refresh Status", variant="secondary") |
| refresh_btn.click(fn=get_status, outputs=status_md) |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### β‘ Manual Controls") |
| post_btn = gr.Button("π Publish Now", variant="primary") |
| engage_btn = gr.Button("π¬ Engage Now", variant="primary") |
| reflect_btn = gr.Button("π§ Reflect Now", variant="secondary") |
| manual_output = gr.Markdown("") |
| |
| post_btn.click(fn=trigger_post, outputs=manual_output) |
| engage_btn.click(fn=trigger_engage, outputs=manual_output) |
| reflect_btn.click(fn=trigger_reflect, outputs=manual_output) |
| |
| with gr.Row(): |
| log_md = gr.Markdown(get_recent_log()) |
| log_btn = gr.Button("π Refresh Log") |
| log_btn.click(fn=get_recent_log, outputs=log_md) |
|
|
| return app |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| import sys |
| |
| if len(sys.argv) > 1 and sys.argv[1] == "once": |
| |
| run_once() |
| else: |
| |
| app = build_dashboard() |
| if app: |
| |
| bg = threading.Thread(target=autonomous_loop, daemon=True) |
| bg.start() |
| |
| app.launch(server_name="0.0.0.0", server_port=7860, share=False) |
| else: |
| |
| autonomous_loop() |
|
|