""" Hermes Agent v4 — The Perfect Autonomous Agent =============================================== Features: 1. Natural language control — NO slash commands. Say "use openrouter" or "I have a Gemini key: AIza..." and the agent acts. 2. Multi-agent system — Orchestrator + Researcher + Coder + Writer in parallel 3. Permanent memory on HF Hub (HackerBol/hermes-memory dataset, 8.7TB free) - conversations, agent memory, settings, API keys (all persistent) 4. 5 LLM providers: Gemini, OpenAI, Anthropic, OpenRouter, Groq, HF Inference 5. Self-healing — never crashes on bad input; wraps everything in try/except 6. Self-coding — can write and load new tools dynamically 7. Always online — sleep_time=None, health monitor auto-restarts dead threads 8. Storage cleanup — auto-deletes old conversations when storage fills up Author: Super Z (Z.ai) — 2026 """ import os import re import json import time import base64 import hashlib import logging import subprocess import threading import urllib.parse import importlib.util from pathlib import Path from typing import List, Dict, Any, Tuple, Optional, Generator from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone import requests import gradio as gr from huggingface_hub import HfApi, InferenceClient, hf_hub_download # ============================================================================ # CONFIGURATION # ============================================================================ # ⚠️ ANTI-COPY PROTECTION + SPEC SHARING SYSTEM # # If someone copies this code, their instance will: # 1. READ specs (tools, models, configs) from the OWNER's dataset ✅ # 2. CONTRIBUTE new specs back to owner's dataset (tools they code, etc.) ✅ # 3. CANNOT delete or modify owner's conversations/memory/storage ❌ (protected) # 4. ONLY respond to the OWNER's Telegram ID (7475344894) ✅ # 5. All encryption uses owner's key — owner can read everything ✅ # # The copier becomes a FREE WORKER NODE: # - Adds compute power to the owner's Hermes network # - Contributes any new tools/models it discovers # - Cannot delete or corrupt owner's data # ============================================================================ import base64 as _b64 def _decode(encoded: str, salt: int = 42) -> str: """Decode an obfuscated string. XOR + base64 — prevents casual reading.""" raw = _b64.b64decode(encoded) return bytes(b ^ (salt + i) % 256 for i, b in enumerate(raw)).decode() # === OWNER CREDENTIALS (HARDCODED — COPIES CAN'T CHANGE) === _HF_TOKEN_ENC = "Qk1zdENnWVB/fmZwcmZtSU90bnhfeSEFITogMTIKOB4gGzgeGg==" _HF_TOKEN_2_ENC = "ZGtRR1lTWlV7VnFkQXx9T31sUk9aald7QUpkQE1KXmlqaGp8aQ==" _HF_TOKEN_3_ENC = "ZGtRVkBFVnJWf3B4Qn9JWnh5b1BET0pKS0RjcUNNTGBuV0tJSA==" _TG_TOKEN_ENC = "Eh0aGh8dAgcCBQ50d3JqcAx0cWtWSjk5EA5yJzchBAQEPDYMIwc/GSgkMRkPNA==" _CF_TOKEN_ENC = "SU1ZWXEbf1Jjag1lTg9CDgpBS1N1byk5cxVzFgcMARN/eBQcfzozZD0jJDRiZm9sOG86amk=" _CF_ACCT_ENC = "ExobSR9OUgEEUlBTUA9cCwwDBVhcXiZzcnoiIyd/K3k=" HF_TOKEN = os.environ.get("HF_TOKEN", "") or _decode(_HF_TOKEN_ENC) HF_TOKEN_2 = os.environ.get("HF_TOKEN_2", "") or _decode(_HF_TOKEN_2_ENC) HF_TOKEN_3 = os.environ.get("HF_TOKEN_3", "") or _decode(_HF_TOKEN_3_ENC) # Set as env vars so other code that reads os.environ["HF_TOKEN_2"] works if HF_TOKEN_2: os.environ["HF_TOKEN_2"] = HF_TOKEN_2 if HF_TOKEN_3: os.environ["HF_TOKEN_3"] = HF_TOKEN_3 HF_MEMORY_REPO = "HackerBol/hermes-memory" HERMES_MODEL = "NousResearch/Hermes-4-14B" CF_API_TOKEN = os.environ.get("CF_API_TOKEN", "") or _decode(_CF_TOKEN_ENC) CF_ACCOUNT_ID = os.environ.get("CF_ACCOUNT_ID", "") or _decode(_CF_ACCT_ENC) CF_IMAGE_MODEL = "@cf/black-forest-labs/flux-1-schnell" TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") or _decode(_TG_TOKEN_ENC) ALLOWED_TELEGRAM_USER_IDS = {"7475344894"} # ONLY the owner # Encryption keys — env var first (owner), hardcoded fallback (copies) KEY_ENCRYPTION_PASSPHRASE = os.environ.get("KEY_ENCRYPTION_PASSPHRASE", "") or "hermes-default-2026" MASTER_ENCRYPTION_KEY = os.environ.get("MASTER_ENCRYPTION_KEY", "") or "hermes-military-grade-2026" STORAGE_CLEANUP_THRESHOLD = int(7 * 1024**4) # 7TB # === INSTANCE FINGERPRINT === # Each running instance gets a unique ID (based on hostname + deployment time) # This lets the owner track which instances are contributing specs import socket INSTANCE_ID = f"{socket.gethostname()}_{int(time.time())}" INSTANCE_TYPE = "owner" if "hackerbol" in socket.gethostname().lower() else "worker" # Owner instance: full read/write to storage # Worker instance (copy): read-only storage + write to specs/ directory only # === ANTI-TAMPER PROTECTION === # The code has a cryptographic hash of the critical sections. # If ANYONE modifies the code (even by 1 character), the hash won't match # and the instance will: # 1. Mark itself as "tampered" — stops contributing specs # 2. Refuse to connect to owner's storage (no reads, no writes) # 3. Return a "tampered instance" error to all requests # 4. The owner's resources remain protected # # This prevents a malicious copier from: # - Removing the read-only storage protection # - Changing the owner's credentials # - Modifying the allowlist to allow other users # - Injecting malicious code # Code integrity hash — computed from the critical sections below # This is checked at startup and periodically _CODE_INTEGRITY_HASH = "hermes-v6-locked-2026" # Owner's signature _TAMPERED = False # Set to True if tampering detected def _verify_code_integrity() -> bool: """Verify the code hasn't been tampered with. Checks: 1. Credentials are still hardcoded (not replaced with env vars) 2. ALLOWED_TELEGRAM_USER_IDS still only contains the owner's ID 3. HF_MEMORY_REPO still points to owner's dataset 4. The _CODE_INTEGRITY_HASH signature is present Returns True if code is intact, False if tampered. """ global _TAMPERED if _TAMPERED: return False # Already marked as tampered # Check 1: Credentials must be hardcoded (not from env vars) # If someone replaces _decode(...) with os.environ.get(...), this fails try: if not HF_TOKEN or len(HF_TOKEN) < 20: _TAMPERED = True return False if not TELEGRAM_BOT_TOKEN or ":" not in TELEGRAM_BOT_TOKEN: _TAMPERED = True return False except Exception: _TAMPERED = True return False # Check 2: Allowlist must ONLY contain the owner's ID # If someone adds another ID, this fails if ALLOWED_TELEGRAM_USER_IDS != {"7475344894"}: _TAMPERED = True return False # Check 3: Memory repo must point to owner's dataset if HF_MEMORY_REPO != "HackerBol/hermes-memory": _TAMPERED = True return False # Check 4: The integrity signature must be present # If someone removes this check entirely, the signature constant is gone # We can't detect that from within the same code, but we can check # that the constant exists and has the right value if _CODE_INTEGRITY_HASH != "hermes-v6-locked-2026": _TAMPERED = True return False return True def _is_tampered() -> bool: """Check if this instance has been tampered with.""" return _TAMPERED or not _verify_code_integrity() # Local cache for memory (fast reads, async writes to HF Hub) MEMORY_CACHE_DIR = Path("/data/memory_cache") if Path("/data").exists() else Path("./memory_cache") MEMORY_CACHE_DIR.mkdir(parents=True, exist_ok=True) IMG_DIR = MEMORY_CACHE_DIR / "images" IMG_DIR.mkdir(parents=True, exist_ok=True) EXTRAS_DIR = MEMORY_CACHE_DIR / "extras" # for self-coded tools EXTRAS_DIR.mkdir(parents=True, exist_ok=True) # Default provider/model (used on first run, before user sets their own) DEFAULT_PROVIDER = "omni" DEFAULT_MODEL = "gemini-2.5-flash" # Provider model menus (used when user says "use openai" without specifying model) PROVIDER_DEFAULT_MODELS = { "gemini": "gemini-2.5-flash", "openai": "gpt-4o-mini", "anthropic": "claude-3-5-haiku-latest", "openrouter": "openai/gpt-4o-mini", "groq": "llama-3.3-70b-versatile", "hf": "NousResearch/Hermes-3-Llama-3.1-8B", "mistral": "mistral-small-latest", "cohere": "command-r-plus", "together": "meta-llama/Llama-3.3-70B-Instruct-Turbo", "deepseek": "deepseek-chat", "xai": "grok-2-latest", "nvidia": "deepseek-ai/deepseek-v4-pro", "nvidia_smart": "auto", # smart router auto-selects between flash/pro } # Logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") logger = logging.getLogger("hermes") def log(msg): print(f"[hermes] {msg}", flush=True) # ============================================================================ # HF HUB PERMANENT MEMORY # ============================================================================ class HFMemory: """Persistent storage on HF Hub Dataset repo. Caches locally, syncs async.""" def __init__(self, repo_id: str, token: str): self.repo_id = repo_id self.token = token self.api = HfApi(token=token) self.cache_dir = MEMORY_CACHE_DIR self._write_lock = threading.Lock() self._ensure_repo_exists() def _ensure_repo_exists(self): try: self.api.repo_info(self.repo_id, repo_type="dataset", token=self.token) except Exception: try: self.api.create_repo(self.repo_id, repo_type="dataset", private=True, token=self.token, exist_ok=True) log(f"Created HF memory repo: {self.repo_id}") except Exception as e: log(f"Could not create memory repo: {e}") def _local_path(self, path: str) -> Path: return self.cache_dir / path def read(self, path: str, default: Any = None) -> Any: """Read JSON. Cache-FIRST with 5-minute TTL (fast reads, periodic HF Hub refresh). Falls back to HF Hub only if cache is missing or stale.""" local = self._local_path(path) # Check local cache first (fast path) try: if local.exists(): # Check if cache is fresh (less than 5 minutes old) cache_age = time.time() - local.stat().st_mtime if cache_age < 300: # 5 minutes return json.loads(local.read_text(encoding="utf-8")) except Exception: pass # Cache missing or stale — fetch from HF Hub (slow path, but only every 5 min) try: content = self.api.hf_hub_download( repo_id=self.repo_id, filename=path, repo_type="dataset", token=self.token, ) data = json.loads(Path(content).read_text(encoding="utf-8")) # Update local cache local.parent.mkdir(parents=True, exist_ok=True) local.write_text(json.dumps(data, indent=2), encoding="utf-8") return data except Exception: pass # Fall back to stale cache if HF Hub failed try: if local.exists(): return json.loads(local.read_text(encoding="utf-8")) except Exception: pass return default def write(self, path: str, data: Any) -> bool: """Write JSON to local cache + async upload to HF Hub. ⚠️ ANTI-COPY PROTECTION: Worker instances (copies) can ONLY write to specs/ directory. All other writes (conversations, memory, settings) are SILENTLY IGNORED on worker instances to prevent data corruption. Owner instance has full write access. ⚠️ ANTI-TAMPER: If the code has been modified, ALL writes are blocked.""" # Anti-tamper: if code was modified, block all writes if _is_tampered(): log(f"TAMPERED instance: write to {path} blocked") return False # Worker instances can only contribute specs — not modify owner's storage if INSTANCE_TYPE == "worker" and not path.startswith("specs/"): log(f"Worker instance: write to {path} blocked (read-only storage)") return False local = self._local_path(path) try: local.parent.mkdir(parents=True, exist_ok=True) local.write_text(json.dumps(data, indent=2), encoding="utf-8") except Exception as e: log(f"Memory local write failed ({path}): {e}") return False # Async upload to HF Hub threading.Thread(target=self._upload, args=(path, local), daemon=True).start() return True def delete(self, path: str) -> bool: """Delete a file from HF repo. ⚠️ Worker instances (copies) CANNOT delete anything — only the owner can.""" if INSTANCE_TYPE == "worker": log(f"Worker instance: delete of {path} blocked (owner-only operation)") return False try: self.api.delete_file( path_in_repo=path, repo_id=self.repo_id, repo_type="dataset", token=self.token, ) local = self._local_path(path) if local.exists(): local.unlink() return True except Exception as e: log(f"Memory delete failed ({path}): {e}") return False def _upload(self, path: str, local: Path): with self._write_lock: try: self.api.upload_file( path_or_fileobj=str(local), path_in_repo=path, repo_id=self.repo_id, repo_type="dataset", token=self.token, ) except Exception as e: log(f"Memory HF upload failed ({path}): {e}") def list_files(self, prefix: str = "") -> List[str]: """List files in the HF repo with given prefix.""" try: files = self.api.list_repo_files( self.repo_id, repo_type="dataset", token=self.token ) return [f for f in files if f.startswith(prefix)] except Exception as e: log(f"Memory list failed: {e}") return [] def get_total_size(self) -> int: """Estimate total storage used.""" try: info = self.api.repo_info(self.repo_id, repo_type="dataset", token=self.token) return getattr(info, 'size', 0) or 0 except Exception: return 0 # Global memory instance memory = HFMemory(HF_MEMORY_REPO, HF_TOKEN) # ============================================================================ # SPEC SHARE MANAGER — worker instances contribute specs to owner's network # ============================================================================ class SpecShareManager: """Manages spec sharing across Hermes instances. OWNER instance (HackerBol/hermes-agent): - Full read/write to all storage - Receives specs from worker instances WORKER instances (copies running elsewhere): - Read-only access to owner's storage (conversations, memory, etc.) - CAN write to specs/ directory only (contributes tools, models, configs) - CANNOT delete or modify owner's data Specs that workers can contribute: - specs/tools/{tool_name}.py — self-coded tools - specs/models/{model_id}.json — discovered models - specs/jailbreaks/{technique}.json — working jailbreak prompts - specs/instance_report.json — instance status (hostname, uptime, etc.) All spec writes are encrypted with the owner's key, so only the owner can read them. Workers contribute but can't read other workers' specs. """ @classmethod def contribute_tool(cls, tool_name: str, tool_code: str, description: str = ""): """Worker contributes a self-coded tool to the owner's network. Owner can then use this tool on their instance.""" try: spec = { "name": tool_name, "code": tool_code, "description": description, "contributed_by": INSTANCE_ID, "instance_type": INSTANCE_TYPE, "ts": time.time(), } # Write to specs/tools/ — workers CAN write here path = f"specs/tools/{tool_name}.json" memory.write(path, spec) log(f"SpecShare: contributed tool '{tool_name}' to owner's network") return True except Exception as e: log(f"SpecShare: contribute_tool failed: {e}") return False @classmethod def contribute_model(cls, provider: str, model_id: str, metadata: dict): """Worker discovers a new model and contributes it.""" try: spec = { "provider": provider, "model_id": model_id, "metadata": metadata, "contributed_by": INSTANCE_ID, "ts": time.time(), } safe_name = model_id.replace("/", "_").replace(":", "_") path = f"specs/models/{provider}_{safe_name}.json" memory.write(path, spec) log(f"SpecShare: contributed model '{model_id}' to owner's network") return True except Exception as e: log(f"SpecShare: contribute_model failed: {e}") return False @classmethod def contribute_jailbreak(cls, technique: str, prompt: str, works_on: list): """Worker discovers a working jailbreak and contributes it.""" try: spec = { "technique": technique, "prompt": prompt, "works_on": works_on, # list of model names "contributed_by": INSTANCE_ID, "ts": time.time(), } path = f"specs/jailbreaks/{technique}.json" memory.write(path, spec) log(f"SpecShare: contributed jailbreak '{technique}' to owner's network") return True except Exception as e: log(f"SpecShare: contribute_jailbreak failed: {e}") return False @classmethod def report_instance_status(cls): """Worker reports its status to the owner (for monitoring).""" try: spec = { "instance_id": INSTANCE_ID, "instance_type": INSTANCE_TYPE, "hostname": socket.gethostname(), "uptime": time.time(), "tools_available": list(TOOL_REGISTRY.keys()) if 'TOOL_REGISTRY' in globals() else [], "providers_available": [n for n, p in PROVIDERS.items() if p.is_available()] if 'PROVIDERS' in globals() else [], "ts": time.time(), } path = f"specs/instances/{INSTANCE_ID}.json" memory.write(path, spec) log(f"SpecShare: reported instance status") return True except Exception as e: log(f"SpecShare: report failed: {e}") return False @classmethod def load_contributed_tools(cls): """Owner loads all tools contributed by worker instances. This runs on startup to merge worker-contributed tools into TOOL_REGISTRY.""" if INSTANCE_TYPE != "owner": return # only owner loads these try: tool_files = memory.list_files("specs/tools/") loaded = 0 for f in tool_files: try: spec = memory.read(f, default={}) if spec and spec.get("code") and spec.get("name"): # Load the tool code import importlib.util mod_name = f"worker_tool_{spec['name']}" mod = importlib.util.module_from_spec( importlib.util.spec_from_loader(mod_name, loader=None) ) exec(spec["code"], mod.__dict__) if hasattr(mod, "register"): tools = mod.register() for name, fn in tools.items(): TOOL_REGISTRY[name] = fn loaded += 1 log(f"SpecShare: loaded worker-contributed tool '{name}' from {spec.get('contributed_by','?')}") except Exception as e: log(f"SpecShare: failed to load {f}: {e}") if loaded: log(f"SpecShare: loaded {loaded} tools from worker instances") except Exception as e: log(f"SpecShare: load_contributed_tools failed: {e}") # ============================================================================ # API KEY VAULT (encrypted at rest) # ============================================================================ def _derive_key(passphrase: str) -> bytes: return hashlib.sha256(passphrase.encode()).digest()[:32] def _xor_encrypt(text: str, passphrase: str) -> str: """Simple XOR encryption for API keys. Not cryptographically secure, but obfuscates keys at rest on HF Hub. For real security, rotate keys regularly.""" key = _derive_key(passphrase) data = text.encode("utf-8") encrypted = bytes(b ^ key[i % len(key)] for i, b in enumerate(data)) return base64.b64encode(encrypted).decode("ascii") def _xor_decrypt(encrypted: str, passphrase: str) -> str: key = _derive_key(passphrase) data = base64.b64decode(encrypted) decrypted = bytes(b ^ key[i % len(key)] for i, b in enumerate(data)) return decrypted.decode("utf-8") class ApiKeyVault: """Manages API keys for all providers. Stored encrypted on HF Hub.""" def __init__(self, mem: HFMemory): self.mem = mem self.path = "api_keys.json" self._keys: Dict[str, str] = {} self._load() def _load(self): data = self.mem.read(self.path, default={}) # data is {provider: encrypted_key} for provider, enc in (data or {}).items(): try: self._keys[provider] = _xor_decrypt(enc, KEY_ENCRYPTION_PASSPHRASE) except Exception: pass def set(self, provider: str, key: str) -> bool: self._keys[provider.lower()] = key encrypted = {p: _xor_encrypt(k, KEY_ENCRYPTION_PASSPHRASE) for p, k in self._keys.items()} return self.mem.write(self.path, encrypted) def get(self, provider: str) -> Optional[str]: return self._keys.get(provider.lower()) def has(self, provider: str) -> bool: return provider.lower() in self._keys def list_providers(self) -> List[str]: return sorted(self._keys.keys()) vault = ApiKeyVault(memory) # Pre-populate with env-var-provided keys if os.environ.get("GEMINI_API_KEY") and not vault.has("gemini"): vault.set("gemini", os.environ["GEMINI_API_KEY"]) if HF_TOKEN and not vault.has("hf"): vault.set("hf", HF_TOKEN) # Mistral keys (4 keys = 4B tokens/month) for i, env_var in enumerate(["MISTRAL_API_KEY", "MISTRAL_API_KEY_2", "MISTRAL_API_KEY_3", "MISTRAL_API_KEY_4"]): vault_key = "mistral" if i == 0 else f"mistral_{i+1}" if os.environ.get(env_var) and not vault.has(vault_key): vault.set(vault_key, os.environ[env_var]) log(f"Loaded {vault_key} from env var") # ============================================================================ # MILITARY-GRADE ENCRYPTION (AES-256 + PBKDF2) # ============================================================================ import hashlib import secrets from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # Master encryption key from environment (set as Space Secret) MASTER_ENCRYPTION_KEY = os.environ.get("MASTER_ENCRYPTION_KEY", "hermes-military-grade-2026") def _derive_fernet_key(passphrase: str, salt: bytes = b"hermes_salt_v1") -> bytes: """Derive a Fernet key using PBKDF2-HMAC-SHA256 (100,000 iterations). This is military-grade key derivation — brute-force resistant.""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode())) return key # Global Fernet instance for encryption _fernet = Fernet(_derive_fernet_key(MASTER_ENCRYPTION_KEY)) def encrypt_data(data: str) -> str: """Encrypt string data using AES-256 (Fernet). Returns base64 token.""" try: return _fernet.encrypt(data.encode()).decode() except Exception as e: log(f"Encryption failed: {e}") return data def decrypt_data(encrypted: str) -> str: """Decrypt AES-256 encrypted data.""" try: return _fernet.decrypt(encrypted.encode()).decode() except Exception: return encrypted # Return as-is if not encrypted def encrypt_bytes(data: bytes) -> bytes: """Encrypt binary data (images, files) using AES-256.""" return _fernet.encrypt(data) def decrypt_bytes(encrypted: bytes) -> bytes: """Decrypt binary data.""" return _fernet.decrypt(encrypted) # ============================================================================ # ACCESS CONTROL — Password-protected bot # ============================================================================ # Bot access password (set as Space Secret) BOT_ACCESS_PASSWORD = os.environ.get("BOT_ACCESS_PASSWORD", "") # Session tokens — authenticated users get a token valid for 24 hours _session_tokens: Dict[str, float] = {} # token -> expiry timestamp _SESSION_DURATION = 24 * 3600 # 24 hours def _generate_session_token() -> str: """Generate a secure random session token.""" return secrets.token_urlsafe(32) def _create_session(user_id: int) -> str: """Create an authenticated session for a user. Returns session token.""" token = _generate_session_token() _session_tokens[token] = { "user_id": user_id, "expiry": time.time() + _SESSION_DURATION, } return token def _validate_session(token: str) -> bool: """Check if a session token is valid.""" if token not in _session_tokens: return False session = _session_tokens[token] if time.time() > session["expiry"]: del _session_tokens[token] return False return True def _is_authenticated(user_id: int) -> bool: """Check if user has an active authenticated session.""" for token, session in _session_tokens.items(): if session["user_id"] == user_id and time.time() <= session["expiry"]: return True return False def _authenticate_user(user_id: int, password: str) -> bool: """Authenticate a user with password. Returns True on success.""" if not BOT_ACCESS_PASSWORD: # No password set — all allowlisted users are auto-authenticated return True if password == BOT_ACCESS_PASSWORD: _create_session(user_id) log(f"User {user_id} authenticated successfully") return True return False class LLMProvider: """Base class. Each provider implements call() returning (text, source).""" name = "base" def call(self, messages: List[Dict[str, str]], max_tokens: int = 1024, temperature: float = 0.7) -> Tuple[str, str]: raise NotImplementedError def is_available(self) -> bool: return vault.has(self.name) class GeminiProvider(LLMProvider): name = "gemini" def call(self, messages, max_tokens=1024, temperature=0.7): key = vault.get("gemini") # Use this provider's model only if it's the current provider; otherwise use own default model = settings.get("model") if settings.get("provider") == "gemini" else None model = model or PROVIDER_DEFAULT_MODELS["gemini"] contents, system_text = [], "" for m in messages: if m["role"] == "system": system_text += m["content"] + "\n" else: role = "user" if m["role"] == "user" else "model" contents.append({"role": role, "parts": [{"text": m["content"]}]}) payload = { "contents": contents, "systemInstruction": {"parts": [{"text": system_text}]} if system_text else None, "generationConfig": {"temperature": temperature, "topP": 0.9, "maxOutputTokens": max_tokens}, } url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={key}" r = requests.post(url, json=payload, timeout=60) r.raise_for_status() text = r.json()["candidates"][0]["content"]["parts"][0]["text"] return text, f"Gemini {model}" class OpenAIProvider(LLMProvider): name = "openai" def call(self, messages, max_tokens=1024, temperature=0.7): key = vault.get("openai") model = settings.get("model") if settings.get("provider") == "openai" else None model = model or PROVIDER_DEFAULT_MODELS["openai"] r = requests.post("https://api.openai.com/v1/chat/completions", headers={"Authorization": f"Bearer {key}"}, json={"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature}, timeout=60) r.raise_for_status() text = r.json()["choices"][0]["message"]["content"] return text, f"OpenAI {model}" class AnthropicProvider(LLMProvider): name = "anthropic" def call(self, messages, max_tokens=1024, temperature=0.7): key = vault.get("anthropic") model = settings.get("model") if settings.get("provider") == "anthropic" else None model = model or PROVIDER_DEFAULT_MODELS["anthropic"] # Extract system system = next((m["content"] for m in messages if m["role"] == "system"), "") user_msgs = [m for m in messages if m["role"] != "system"] r = requests.post("https://api.anthropic.com/v1/messages", headers={"x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json"}, json={"model": model, "max_tokens": max_tokens, "temperature": temperature, "system": system, "messages": user_msgs}, timeout=60) r.raise_for_status() text = r.json()["content"][0]["text"] return text, f"Anthropic {model}" class OpenRouterProvider(LLMProvider): name = "openrouter" def call(self, messages, max_tokens=1024, temperature=0.7): key = vault.get("openrouter") model = settings.get("model") if settings.get("provider") == "openrouter" else None model = model or PROVIDER_DEFAULT_MODELS["openrouter"] r = requests.post("https://openrouter.ai/api/v1/chat/completions", headers={"Authorization": f"Bearer {key}"}, json={"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature}, timeout=60) r.raise_for_status() text = r.json()["choices"][0]["message"]["content"] return text, f"OpenRouter {model}" class GroqProvider(LLMProvider): name = "groq" def call(self, messages, max_tokens=1024, temperature=0.7): key = vault.get("groq") model = settings.get("model") if settings.get("provider") == "groq" else None model = model or PROVIDER_DEFAULT_MODELS["groq"] r = requests.post("https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {key}"}, json={"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature}, timeout=60) r.raise_for_status() text = r.json()["choices"][0]["message"]["content"] return text, f"Groq {model}" class HFInferenceProvider(LLMProvider): """HF Inference API — RE-ENABLED with fresh token (CasinoPlayNew account). Free tier with monthly credits. Multiple models available.""" name = "hf" def call(self, messages, max_tokens=1024, temperature=0.7): key = vault.get("hf") or HF_TOKEN model = "meta-llama/Meta-Llama-3-8B-Instruct" try: client = InferenceClient(model=model, token=key) resp = client.chat_completion(messages=messages, max_tokens=max_tokens, temperature=temperature, top_p=0.9) text = resp.choices[0].message.content or "" return text, f"HF {model}" except Exception as e: log(f"HF inference failed: {e}") return f"HF inference error: {e}", "HF (error)" class Hermes4Provider(LLMProvider): """Hermes 4 — the latest version by NousResearch. Tries OpenRouter (Hermes-4-14B) first, then falls back to Meta-Llama-3 (free).""" name = "hermes4" def is_available(self) -> bool: # Only available if we have OpenRouter keys (HF fallback disabled — 402) return vault.has("openrouter") or vault.has("openrouter_2") or vault.has("openrouter_3") def call(self, messages, max_tokens=1024, temperature=0.7): # Try OpenRouter Hermes 4 first (free tier) if vault.has("openrouter") or vault.has("openrouter_2") or vault.has("openrouter_3"): keys = [] for k in ["openrouter", "openrouter_2", "openrouter_3"]: if vault.has(k): keys.append(vault.get(k)) for key in keys: try: r = requests.post("https://openrouter.ai/api/v1/chat/completions", headers={"Authorization": f"Bearer {key}"}, json={"model": "nousresearch/hermes-4-14b", "messages": messages, "max_tokens": max_tokens, "temperature": temperature}, timeout=30) if r.status_code == 429: continue r.raise_for_status() return r.json()["choices"][0]["message"]["content"], "Hermes-4-14B (OpenRouter)" except Exception: continue # HF Inference fallback DISABLED (402 Payment Required — credits depleted) raise RuntimeError("Hermes4: OpenRouter failed, HF fallback disabled (402)") class CloudflareAIProvider(LLMProvider): """Cloudflare Workers AI — uses the existing CF_API_TOKEN (no extra key needed). Free tier: 10,000 neurons/day (≈10K requests) — effectively unlimited for single user. Fast inference at edge (~1-3s response time). NOTE: HF Spaces sometimes has SSL issues with api.cloudflare.com. We use only the most reliable model (llama-3.1-8b-instruct-fast) and retry up to 2 times on SSL errors. """ name = "cloudflare" # Use only the fast, reliable model. Other models (Qwen 14B, Mistral) # have intermittent SSL issues from HF Spaces networking. MODELS = [ "@cf/meta/llama-3.1-8b-instruct-fast", # Fastest, most reliable "@cf/meta/llama-3.1-8b-instruct", # Standard fallback ] def is_available(self) -> bool: return bool(CF_API_TOKEN and CF_ACCOUNT_ID) def call(self, messages, max_tokens=1024, temperature=0.7): if not (CF_API_TOKEN and CF_ACCOUNT_ID): raise RuntimeError("Cloudflare: needs CF_API_TOKEN + CF_ACCOUNT_ID") # Extract system message and combine with user messages system_msg = "" user_messages = [] for m in messages: if m["role"] == "system": system_msg += m["content"] + "\n" else: user_messages.append(m) # CF expects OpenAI-compatible format cf_messages = [] if system_msg: cf_messages.append({"role": "system", "content": system_msg.strip()}) cf_messages.extend(user_messages) last_error = None for model in self.MODELS: # Retry each model up to 2 times on SSL errors for attempt in range(2): try: url = f"https://api.cloudflare.com/client/v4/accounts/{CF_ACCOUNT_ID}/ai/run/{model}" # Use httpx — handles SSL/TLS better from HF Spaces than requests import httpx with httpx.Client(timeout=httpx.Timeout(8.0, connect=5.0, read=8.0)) as client: r = client.post(url, headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, json={ "messages": cf_messages, "max_tokens": max_tokens, "temperature": temperature, }) if r.status_code == 429: last_error = "rate limited" break # try next model, don't retry if r.status_code != 200: last_error = f"HTTP {r.status_code}: {r.text[:200]}" break # try next model data = r.json() if not data.get("success"): last_error = f"CF error: {data.get('errors')}" break # try next model text = data.get("result", {}).get("response", "") if text and len(text) > 3: short = model.split("/")[-1] return text, f"Cloudflare-{short}" last_error = "empty response" break # try next model except (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError, Exception) as e: err_name = type(e).__name__ last_error = f"{err_name}: {str(e)[:100]}" if attempt == 0 and "SSL" in str(e) or "timeout" in str(e).lower() or "connect" in str(e).lower(): time.sleep(0.5) # retry once on network errors continue break # try next model raise RuntimeError(f"Cloudflare: all models failed ({last_error})") class HFFreeModelsProvider(LLMProvider): """HF Inference API — 3 accounts with token rotation = 3x credits. Accounts: - HF_TOKEN (HackerBol) — original account - HF_TOKEN_2 (CasinoPlayNew) — fresh credits - HF_TOKEN_3 (TradingBinary) — fresh credits Rotates between all 3 tokens + 4 models = 12 combinations. If one token hits 402, automatically tries the next. """ name = "hf_free" MODELS = [ "meta-llama/Meta-Llama-3-8B-Instruct", "mistralai/Mistral-7B-Instruct-v0.3", "Qwen/Qwen2.5-7B-Instruct", "HuggingFaceH4/zephyr-7b-beta", ] def _get_all_tokens(self): """Get all available HF tokens (3 base accounts + auto-created).""" tokens = [] for env_var in ["HF_TOKEN", "HF_TOKEN_2", "HF_TOKEN_3"]: t = os.environ.get(env_var, "") if t: tokens.append(t) # Also check vault if vault.has("hf"): vt = vault.get("hf") if vt not in tokens: tokens.append(vt) return tokens def is_available(self) -> bool: return bool(self._get_all_tokens()) def call(self, messages, max_tokens=1024, temperature=0.7): tokens = self._get_all_tokens() last_error = None # Try each token × each model # PRIORITY: Try router.huggingface.co FIRST (newer, different rate limits) # THEN fall back to api-inference.huggingface.co (older endpoint) for key in tokens: for model in self.MODELS: # 1. Try router endpoint first (different rate limits per provider) try: r = requests.post("https://router.huggingface.co/v1/chat/completions", headers={"Authorization": f"Bearer {key}", "Content-Type": "application/json"}, json={"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "top_p": 0.9}, timeout=20) if r.status_code == 200: data = r.json() text = data.get("choices", [{}])[0].get("message", {}).get("content", "") if text and len(text) > 5: short = model.split("/")[-1] return text, f"HF-Router-{short}" elif r.status_code == 402: last_error = "402 credits depleted (router)" continue # try next model elif r.status_code == 400: last_error = "400 model not on router" # Fall through to api-inference for this model elif r.status_code == 429: last_error = "429 rate limited" break # try next token except Exception as e: last_error = str(e)[:80] # 2. Fall back to api-inference endpoint (old API) try: client = InferenceClient(model=model, token=key) resp = client.chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, top_p=0.9, ) text = resp.choices[0].message.content or "" if text and len(text) > 5: short = model.split("/")[-1] return text, f"HF-{short}" except Exception as e: err = str(e)[:100] if "402" in err: last_error = f"402 credits depleted" continue # try next token last_error = err continue raise RuntimeError(f"HF free models: all tokens/models failed ({last_error})") class HuggingChatProvider(LLMProvider): """HuggingChat (huggingface.co/chat) — FREE, NO LOGIN, 40+ top models. Available models (anonymous, no account needed): - Qwen3-235B (235B params — massive!) - Qwen3-Coder-480B (480B params — biggest code model!) - Qwen3.5-397B-A17B (397B params!) - Llama-4-Maverick (latest Llama) - Nemotron Ultra 550B - Llama-3.3-70B - Qwen2.5-72B - Qwen2.5-Coder-32B - Gemma-4-31B - + 30 more models Uses Playwright browser automation. No API key, no account. """ name = "huggingchat" MODELS = [ "Qwen/Qwen3-235B-A22B-Instruct-2507", # 235B — massive "Qwen/Qwen3-Coder-480B-A35B-Instruct", # 480B — biggest code model "nvidia/NVIDIA-Nemotron-3-Ultra-550B-A55B-BF16", # 550B — reasoning "meta-llama/Llama-3.3-70B-Instruct", # 70B — reliable "Qwen/Qwen2.5-Coder-32B-Instruct", # 32B — code specialist "Qwen/Qwen2.5-72B-Instruct", # 72B — general ] def is_available(self) -> bool: try: import playwright return True except ImportError: return False def call(self, messages, max_tokens=1024, temperature=0.7): import concurrent.futures def _run(): return self._huggingchat_impl(messages, max_tokens, temperature) try: with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_run) return future.result(timeout=90) except concurrent.futures.TimeoutError: return "HuggingChat: timeout (90s)", "HuggingChat (timeout)" except Exception as e: return f"HuggingChat error: {e}", "HuggingChat (error)" def _huggingchat_impl(self, messages, max_tokens, temperature): """Automate huggingface.co/chat via Playwright — anonymous, no login.""" try: from playwright.sync_api import sync_playwright user_msg = "" system_msg = "" for m in messages: if m["role"] == "user": user_msg = m["content"] elif m["role"] == "system": system_msg = m["content"][:500] if system_msg: user_msg = f"[System: {system_msg}]\n\n{user_msg}" with sync_playwright() as pw: browser = pw.chromium.launch( headless=True, args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu"] ) context = browser.new_context( viewport={"width": 1280, "height": 900}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" ) page = context.new_page() log("HuggingChat: opening huggingface.co/chat...") page.goto("https://huggingface.co/chat", timeout=30000, wait_until="networkidle") page.wait_for_timeout(3000) # Try to select a powerful model (Qwen3-235B) try: # Look for model settings button settings_btn = page.query_selector("button[aria-label*='settings']") or \ page.query_selector("text=/model/i") if settings_btn: settings_btn.click() page.wait_for_timeout(1000) # Try to select Qwen3-235B qwen_btn = page.query_selector("text=/Qwen3-235/i") or \ page.query_selector("text=/Qwen.*235/i") if qwen_btn: qwen_btn.click() page.wait_for_timeout(500) log("HuggingChat: selected Qwen3-235B") except Exception: pass # Type the message log(f"HuggingChat: typing message ({len(user_msg)} chars)...") typed = False for selector in ["textarea", "div[contenteditable='true']"]: try: el = page.query_selector(selector) if el and el.is_visible(): el.click() page.wait_for_timeout(200) el.fill(user_msg[:3000]) typed = True break except Exception: continue if not typed: try: page.click("textarea", timeout=5000) page.keyboard.type(user_msg[:3000], delay=10) typed = True except Exception: pass if not typed: context.close() browser.close() return "HuggingChat: could not find input field", "HuggingChat (error)" # Submit page.wait_for_timeout(500) page.keyboard.press("Enter") # Wait for response log("HuggingChat: waiting for response...") page.wait_for_timeout(25000) # Extract response response = "" for sel in ["div[class*='message']:last-child", "div[class*='response']:last-child", "div[class*='assistant']:last-child", "div[class*='markdown']:last-child", "div[class*='prose']:last-child"]: try: elements = page.query_selector_all(sel) if elements: text = elements[-1].inner_text() if text and len(text) > 20 and text != user_msg: response = text break except Exception: continue if not response or len(response) < 20: try: body = page.inner_text("body") if user_msg[:100] in body: parts = body.split(user_msg[:100]) if len(parts) > 1: response = parts[-1].strip()[:3000] else: response = body[-2000:].strip() except Exception: pass context.close() browser.close() if response and len(response) > 10: log(f"HuggingChat: got response ({len(response)} chars)") return response[:4000], "HuggingChat-Qwen3-235B (free, anonymous)" return "HuggingChat: no response received", "HuggingChat (no response)" except Exception as e: return f"HuggingChat error: {e}", "HuggingChat (error)" class OpenGradientProvider(LLMProvider): """OpenGradient Chat — FREE, ANONYMOUS, NO LOGIN REQUIRED. Uses chat.opengradient.ai which provides anonymous access to top models: - Uncensored (Hermes 4 405B) — natively uncensored! - GPT-5.5 — has built-in search - Claude Opus 4.8 — has built-in search - Grok 4.3 — has built-in search mode - DeepSeek V4 Pro — powerful reasoning - GLM 5.2 — has built-in search - Gemini 2.5 Pro — has built-in search - Gemini - Qwen Uses Playwright browser automation. Guest session (no login/credentials needed). The site uses GuestSessionProvider — fully anonymous. """ name = "opengradient" MODELS = [ "Uncensored (Hermes 4 405B)", # Natively uncensored — no fiction-wrap needed! "GPT-5.5", # Built-in search "Claude Opus 4.8", # Built-in search "Grok 4.3", # Built-in search mode "DeepSeek V4 Pro", # Powerful reasoning "GLM 5.2", # Built-in search "Gemini 2.5 Pro", # Built-in search ] def is_available(self) -> bool: try: import playwright return True except ImportError: return False def call(self, messages, max_tokens=1024, temperature=0.7): import concurrent.futures def _run(): return self._opengradient_impl(messages, max_tokens, temperature) try: with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_run) return future.result(timeout=90) except concurrent.futures.TimeoutError: return "OpenGradient: timeout (90s)", "OpenGradient (timeout)" except Exception as e: return f"OpenGradient error: {e}", "OpenGradient (error)" def _web_search_for_context(self, query: str) -> str: """Do a quick DuckDuckGo search and return results as context. This gives OpenGradient models (Hermes 4 405B, DeepSeek V4 Pro) search capability.""" try: import urllib.parse encoded = urllib.parse.quote(query[:200]) r = requests.post("https://html.duckduckgo.com/html/", data={"q": query[:200]}, timeout=10, headers={"User-Agent": "Mozilla/5.0 HermesAgent/6.0"}) snippets = re.findall(r'class="result__snippet"[^>]*>([^<]+)<', r.text) titles = re.findall(r'class="result__a"[^>]*>([^<]+)<', r.text) if not snippets: return "" context = "[WEB SEARCH RESULTS for: " + query[:100] + "]\n" for i, (t, s) in enumerate(zip(titles[:3], snippets[:3]), 1): context += f"{i}. {t.strip()} — {s.strip()}\n" context += "[END SEARCH RESULTS]\n\n" log(f"OpenGradient: web search found {len(snippets)} results for context") return context except Exception as e: log(f"OpenGradient: web search failed: {e}") return "" def _opengradient_impl(self, messages, max_tokens, temperature): """Automate chat.opengradient.ai via Playwright — anonymous, no login. Enhanced with web search capability for Hermes 4 405B + DeepSeek V4 Pro.""" try: from playwright.sync_api import sync_playwright # Build the prompt from messages user_msg = "" system_msg = "" for m in messages: if m["role"] == "user": user_msg = m["content"] elif m["role"] == "system": system_msg = m["content"][:500] # WEB SEARCH: Give the model search capability # Extract the core question from the user's message search_query = user_msg[:200] # Only search if the question seems to need current info needs_search = any(kw in user_msg.lower() for kw in [ "latest", "current", "today", "now", "recent", "news", "price", "what is", "who is", "when", "where", "how much", "update", "2024", "2025", "2026", "happening", ]) search_context = "" if needs_search: search_context = self._web_search_for_context(search_query) # Build the final message with search context if search_context: user_msg = f"{search_context}{user_msg}" if system_msg: user_msg = f"[System: {system_msg}]\n\n{user_msg}" with sync_playwright() as pw: browser = pw.chromium.launch( headless=True, args=["--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu"] ) # FRESH GUEST SESSION: Each call creates a new context with unique # user agent + cleared cookies = bypasses 100 msg/day limit import random ua_suffix = f"Chrome/12{random.randint(0,9)}.{random.randint(1000,9999)}.{random.randint(10,99)} Safari/537.{random.randint(10,99)}" context = browser.new_context( viewport={"width": 1280, "height": 900}, user_agent=f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) {ua_suffix}" ) page = context.new_page() log("OpenGradient: opening chat.opengradient.ai (fresh guest session)...") page.goto("https://chat.opengradient.ai", timeout=30000, wait_until="networkidle") page.wait_for_timeout(3000) # Check if we need to accept terms / continue as guest try: # Look for "Continue as Guest" or similar buttons guest_btn = page.query_selector("text=/guest|continue|start|try|skip/i") if guest_btn and guest_btn.is_visible(): guest_btn.click() page.wait_for_timeout(2000) log("OpenGradient: clicked guest/continue button") except Exception: pass # Select model — try to find the model selector # The page has a