gansta / utils /bunker_client.py
Elliotasdasdasfasas's picture
Deploy CTM Codebase bypass FUSE 503
ed89628
"""
Bunker Client 2.0 - Fail-Safe Cloud Storage Bridge
==================================================
Connects local Hugging Face Spaces to Azure Managed Disk via Cloudflare Tunnel.
Features:
- Fire-and-Forget Uploads (Non-blocking)
- Local Buffering on Failure (The "Survival Mode")
- Background Synchronization
- Circuit Breaker & Backoff
Configuration:
- URL: https://exactly-heather-coaches-travelers.trycloudflare.com
- Auth: azureuser / bunker_password_2026
"""
import os
import time
import json
import logging
import shutil
import threading
import requests
from queue import Queue, Empty
from requests.auth import HTTPBasicAuth
from pathlib import Path
# Configure Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(name)s | %(message)s')
logger = logging.getLogger("bunker_client")
class BunkerClient:
"""
Client for the Bunker Bridge.
Ensures data eventually reaches Azure, even if the tunnel is flaky.
"""
# Configuration
BUNKER_URL = "http://20.125.88.188:9621" # Plan B: Direct Azure IP
AUTH = HTTPBasicAuth('azureuser', 'bunker_password_2026')
BUFFER_DIR = "_bunker_buffer"
MAX_RETRIES = 5
TIMEOUT = 2.0 # Aggressive timeout for main thread
CIRCUIT_BREAKER_WINDOW = 300 # 5 minutes
def __init__(self, buffer_dir: str = None):
self.buffer_dir = Path(buffer_dir or self.BUFFER_DIR)
self.buffer_dir.mkdir(parents=True, exist_ok=True)
self.upload_queue = Queue()
self.shutdown_event = threading.Event()
self.circuit_open_until = 0
# Start Background Sync
self.worker_thread = threading.Thread(target=self._sync_worker, daemon=True)
self.worker_thread.start()
# Hydrate queue from existing buffer
self._hydrate_queue()
logger.info(f"BunkerClient 2.0 initialized. Buffer: {self.buffer_dir}")
def _hydrate_queue(self):
"""Recover pending uploads from disk buffer on startup."""
count = 0
for file_path in self.buffer_dir.glob("*"):
if file_path.is_file():
# Re-queue: (local_path, remote_path is inferred from filename or metadata)
# For simplicity in v1, we assume filename is unique and sufficient
self.upload_queue.put(file_path)
count += 1
if count:
logger.info(f"Recovered {count} pending uploads from buffer.")
def save_file(self, local_path: str, remote_folder: str = "") -> bool:
"""
Public API: Request to save a file.
Returns check immediately (True if queued/uploaded).
"""
local_path = Path(local_path)
if not local_path.exists():
logger.error(f"File not found: {local_path}")
return False
filename = local_path.name
# 1. Try Direct Upload (Fast Path)
if not self._is_circuit_open():
try:
success = self._upload_single(local_path, remote_folder, timeout=self.TIMEOUT)
if success:
return True
except Exception as e:
logger.warning(f"Direct upload failed ({e}). Fallback to buffer.")
else:
logger.warning("Circuit breaker OPEN. Skipping direct upload.")
# 2. Fallback: Copy to Buffer & Queue
try:
timestamp = int(time.time())
buffered_name = f"{timestamp}_{filename}"
buffered_path = self.buffer_dir / buffered_name
# Store metadata for remote folder
meta_path = buffered_path.with_suffix('.meta')
with open(meta_path, 'w') as f:
json.dump({"remote_folder": remote_folder, "original_name": filename}, f)
shutil.copy2(local_path, buffered_path)
self.upload_queue.put(buffered_path)
logger.info(f"Buffered file: {filename} -> {buffered_name}")
return True # Successfully buffered
except Exception as e:
logger.critical(f"CRITICAL: Failed to buffer file {filename}: {e}")
return False
def _upload_single(self, local_path: Path, remote_folder: str, timeout: float) -> bool:
"""Perform a single blocking upload."""
filename = local_path.name
url = f"{self.BUNKER_URL}/upload/{remote_folder}" if remote_folder else f"{self.BUNKER_URL}/upload"
# Adjusted for likely simple PUT/POST structure of your tunnel server,
# assuming it accepts raw data or files.
# Standard implementation for generic file server:
try:
with open(local_path, 'rb') as f:
files = {'file': (filename, f)}
# If remote_folder is path params or query, adjust here.
# Assuming simple flat POST for MVP or path-based.
# Let's assume the server endpoint handles /upload/filename or similar
# Just posting to root upload for now to be safe with unknown server spec
upload_url = f"{self.BUNKER_URL}/api/upload"
data = {'path': str(Path(remote_folder) / filename)}
response = requests.post(
upload_url,
files=files,
data=data,
auth=self.AUTH,
timeout=timeout
)
if response.status_code in [200, 201]:
return True
else:
logger.warning(f"Upload failed: {response.status_code} - {response.text}")
return False
except requests.exceptions.Timeout:
raise
except Exception as e:
logger.error(f"Network error: {e}")
raise
def _sync_worker(self):
"""Background thread to process the queue."""
failures = 0
while not self.shutdown_event.is_set():
if self._is_circuit_open():
time.sleep(10)
continue
try:
# Get task from queue
buffered_path = self.upload_queue.get(timeout=1)
buffered_path = Path(buffered_path)
if not buffered_path.exists():
self.upload_queue.task_done()
continue
# Read metadata
meta_path = buffered_path.with_suffix('.meta')
remote_folder = ""
original_name = buffered_path.name
if meta_path.exists():
try:
with open(meta_path, 'r') as f:
meta = json.load(f)
remote_folder = meta.get("remote_folder", "")
original_name = meta.get("original_name", buffered_path.name)
except:
pass
# Rename for upload context (temp)
# Just pass path and handle logic
logger.info(f"Background Sync: Uploading {original_name}...")
try:
# Rename temporary to original name for upload if needed,
# but _upload_single reads content. We just need to ensure
# we send the right filename in the request.
# We will mock the path object to have the name we want
# Create a dummy path object with correct name but pointing to buffered content?
# Easier: Modify _upload_single to accept override name
# For now, let's just upload.
# Retrying logic
success = False
for i in range(self.MAX_RETRIES):
try:
# Use internal helper that allows manual filename
with open(buffered_path, 'rb') as f:
files = {'file': (original_name, f)}
upload_url = f"{self.BUNKER_URL}/api/upload"
data = {'path': str(Path(remote_folder) / original_name)}
res = requests.post(
upload_url,
files=files,
data=data,
auth=self.AUTH,
timeout=10 + (i*5) # Increasing timeout for background
)
if res.status_code in [200, 201]:
success = True
break
except Exception as e:
logger.warning(f"Retry {i+1}/{self.MAX_RETRIES} failed: {e}")
time.sleep(2 ** i) # Exponential backoff
if success:
logger.info(f"Synced: {original_name}")
# Cleanup
os.remove(buffered_path)
if meta_path.exists():
os.remove(meta_path)
failures = 0 # Reset circuit breaker counter
else:
logger.error(f"Failed to sync {original_name} after retries.")
# Keep in buffer? Or move to 'dead letter'?
# For now, put back in queue with delay?
# To avoid infinite loop, we leave it in buffer but don't re-queue immediately.
failures += 1
except Exception as e:
logger.error(f"Sync worker exception: {e}")
failures += 1
self.upload_queue.task_done()
# Check Circuit Breaker
if failures >= 3:
self._trip_circuit()
except Empty:
pass
except Exception as e:
logger.error(f"Worker loop error: {e}")
time.sleep(1)
def _is_circuit_open(self):
if time.time() < self.circuit_open_until:
return True
return False
def _trip_circuit(self):
logger.warning(f"Circuit Breaker TRIPPED. Pausing background sync for {self.CIRCUIT_BREAKER_WINDOW}s")
self.circuit_open_until = time.time() + self.CIRCUIT_BREAKER_WINDOW
def save_thought(self, thought_data: dict, topic: str = "general") -> bool:
"""Helper for Brain/CTM to save JSON thoughts directly."""
try:
timestamp = int(time.time()*1000)
filename = f"thought_{topic}_{timestamp}.json"
local_tmp = Path(self.buffer_dir) / f"tmp_{filename}"
with open(local_tmp, 'w') as f:
json.dump(thought_data, f, indent=2)
# Queue for upload to /thoughts/{topic}
return self.save_file(str(local_tmp), remote_folder=f"thoughts/{topic}")
except Exception as e:
logger.error(f"Failed to save thought: {e}")
return False