Spaces:
Paused
Paused
| """ | |
| Bunker Client 2.0 - Fail-Safe Cloud Storage Bridge | |
| ================================================== | |
| Connects local Hugging Face Spaces to Azure Managed Disk via Cloudflare Tunnel. | |
| Features: | |
| - Fire-and-Forget Uploads (Non-blocking) | |
| - Local Buffering on Failure (The "Survival Mode") | |
| - Background Synchronization | |
| - Circuit Breaker & Backoff | |
| Configuration: | |
| - URL: https://exactly-heather-coaches-travelers.trycloudflare.com | |
| - Auth: azureuser / bunker_password_2026 | |
| """ | |
| import os | |
| import time | |
| import json | |
| import logging | |
| import shutil | |
| import threading | |
| import requests | |
| from queue import Queue, Empty | |
| from requests.auth import HTTPBasicAuth | |
| from pathlib import Path | |
| # Configure Logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(name)s | %(message)s') | |
| logger = logging.getLogger("bunker_client") | |
| class BunkerClient: | |
| """ | |
| Client for the Bunker Bridge. | |
| Ensures data eventually reaches Azure, even if the tunnel is flaky. | |
| """ | |
| # Configuration | |
| BUNKER_URL = "http://20.125.88.188:9621" # Plan B: Direct Azure IP | |
| AUTH = HTTPBasicAuth('azureuser', 'bunker_password_2026') | |
| BUFFER_DIR = "_bunker_buffer" | |
| MAX_RETRIES = 5 | |
| TIMEOUT = 2.0 # Aggressive timeout for main thread | |
| CIRCUIT_BREAKER_WINDOW = 300 # 5 minutes | |
| def __init__(self, buffer_dir: str = None): | |
| self.buffer_dir = Path(buffer_dir or self.BUFFER_DIR) | |
| self.buffer_dir.mkdir(parents=True, exist_ok=True) | |
| self.upload_queue = Queue() | |
| self.shutdown_event = threading.Event() | |
| self.circuit_open_until = 0 | |
| # Start Background Sync | |
| self.worker_thread = threading.Thread(target=self._sync_worker, daemon=True) | |
| self.worker_thread.start() | |
| # Hydrate queue from existing buffer | |
| self._hydrate_queue() | |
| logger.info(f"BunkerClient 2.0 initialized. Buffer: {self.buffer_dir}") | |
| def _hydrate_queue(self): | |
| """Recover pending uploads from disk buffer on startup.""" | |
| count = 0 | |
| for file_path in self.buffer_dir.glob("*"): | |
| if file_path.is_file(): | |
| # Re-queue: (local_path, remote_path is inferred from filename or metadata) | |
| # For simplicity in v1, we assume filename is unique and sufficient | |
| self.upload_queue.put(file_path) | |
| count += 1 | |
| if count: | |
| logger.info(f"Recovered {count} pending uploads from buffer.") | |
| def save_file(self, local_path: str, remote_folder: str = "") -> bool: | |
| """ | |
| Public API: Request to save a file. | |
| Returns check immediately (True if queued/uploaded). | |
| """ | |
| local_path = Path(local_path) | |
| if not local_path.exists(): | |
| logger.error(f"File not found: {local_path}") | |
| return False | |
| filename = local_path.name | |
| # 1. Try Direct Upload (Fast Path) | |
| if not self._is_circuit_open(): | |
| try: | |
| success = self._upload_single(local_path, remote_folder, timeout=self.TIMEOUT) | |
| if success: | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Direct upload failed ({e}). Fallback to buffer.") | |
| else: | |
| logger.warning("Circuit breaker OPEN. Skipping direct upload.") | |
| # 2. Fallback: Copy to Buffer & Queue | |
| try: | |
| timestamp = int(time.time()) | |
| buffered_name = f"{timestamp}_{filename}" | |
| buffered_path = self.buffer_dir / buffered_name | |
| # Store metadata for remote folder | |
| meta_path = buffered_path.with_suffix('.meta') | |
| with open(meta_path, 'w') as f: | |
| json.dump({"remote_folder": remote_folder, "original_name": filename}, f) | |
| shutil.copy2(local_path, buffered_path) | |
| self.upload_queue.put(buffered_path) | |
| logger.info(f"Buffered file: {filename} -> {buffered_name}") | |
| return True # Successfully buffered | |
| except Exception as e: | |
| logger.critical(f"CRITICAL: Failed to buffer file {filename}: {e}") | |
| return False | |
| def _upload_single(self, local_path: Path, remote_folder: str, timeout: float) -> bool: | |
| """Perform a single blocking upload.""" | |
| filename = local_path.name | |
| url = f"{self.BUNKER_URL}/upload/{remote_folder}" if remote_folder else f"{self.BUNKER_URL}/upload" | |
| # Adjusted for likely simple PUT/POST structure of your tunnel server, | |
| # assuming it accepts raw data or files. | |
| # Standard implementation for generic file server: | |
| try: | |
| with open(local_path, 'rb') as f: | |
| files = {'file': (filename, f)} | |
| # If remote_folder is path params or query, adjust here. | |
| # Assuming simple flat POST for MVP or path-based. | |
| # Let's assume the server endpoint handles /upload/filename or similar | |
| # Just posting to root upload for now to be safe with unknown server spec | |
| upload_url = f"{self.BUNKER_URL}/api/upload" | |
| data = {'path': str(Path(remote_folder) / filename)} | |
| response = requests.post( | |
| upload_url, | |
| files=files, | |
| data=data, | |
| auth=self.AUTH, | |
| timeout=timeout | |
| ) | |
| if response.status_code in [200, 201]: | |
| return True | |
| else: | |
| logger.warning(f"Upload failed: {response.status_code} - {response.text}") | |
| return False | |
| except requests.exceptions.Timeout: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Network error: {e}") | |
| raise | |
| def _sync_worker(self): | |
| """Background thread to process the queue.""" | |
| failures = 0 | |
| while not self.shutdown_event.is_set(): | |
| if self._is_circuit_open(): | |
| time.sleep(10) | |
| continue | |
| try: | |
| # Get task from queue | |
| buffered_path = self.upload_queue.get(timeout=1) | |
| buffered_path = Path(buffered_path) | |
| if not buffered_path.exists(): | |
| self.upload_queue.task_done() | |
| continue | |
| # Read metadata | |
| meta_path = buffered_path.with_suffix('.meta') | |
| remote_folder = "" | |
| original_name = buffered_path.name | |
| if meta_path.exists(): | |
| try: | |
| with open(meta_path, 'r') as f: | |
| meta = json.load(f) | |
| remote_folder = meta.get("remote_folder", "") | |
| original_name = meta.get("original_name", buffered_path.name) | |
| except: | |
| pass | |
| # Rename for upload context (temp) | |
| # Just pass path and handle logic | |
| logger.info(f"Background Sync: Uploading {original_name}...") | |
| try: | |
| # Rename temporary to original name for upload if needed, | |
| # but _upload_single reads content. We just need to ensure | |
| # we send the right filename in the request. | |
| # We will mock the path object to have the name we want | |
| # Create a dummy path object with correct name but pointing to buffered content? | |
| # Easier: Modify _upload_single to accept override name | |
| # For now, let's just upload. | |
| # Retrying logic | |
| success = False | |
| for i in range(self.MAX_RETRIES): | |
| try: | |
| # Use internal helper that allows manual filename | |
| with open(buffered_path, 'rb') as f: | |
| files = {'file': (original_name, f)} | |
| upload_url = f"{self.BUNKER_URL}/api/upload" | |
| data = {'path': str(Path(remote_folder) / original_name)} | |
| res = requests.post( | |
| upload_url, | |
| files=files, | |
| data=data, | |
| auth=self.AUTH, | |
| timeout=10 + (i*5) # Increasing timeout for background | |
| ) | |
| if res.status_code in [200, 201]: | |
| success = True | |
| break | |
| except Exception as e: | |
| logger.warning(f"Retry {i+1}/{self.MAX_RETRIES} failed: {e}") | |
| time.sleep(2 ** i) # Exponential backoff | |
| if success: | |
| logger.info(f"Synced: {original_name}") | |
| # Cleanup | |
| os.remove(buffered_path) | |
| if meta_path.exists(): | |
| os.remove(meta_path) | |
| failures = 0 # Reset circuit breaker counter | |
| else: | |
| logger.error(f"Failed to sync {original_name} after retries.") | |
| # Keep in buffer? Or move to 'dead letter'? | |
| # For now, put back in queue with delay? | |
| # To avoid infinite loop, we leave it in buffer but don't re-queue immediately. | |
| failures += 1 | |
| except Exception as e: | |
| logger.error(f"Sync worker exception: {e}") | |
| failures += 1 | |
| self.upload_queue.task_done() | |
| # Check Circuit Breaker | |
| if failures >= 3: | |
| self._trip_circuit() | |
| except Empty: | |
| pass | |
| except Exception as e: | |
| logger.error(f"Worker loop error: {e}") | |
| time.sleep(1) | |
| def _is_circuit_open(self): | |
| if time.time() < self.circuit_open_until: | |
| return True | |
| return False | |
| def _trip_circuit(self): | |
| logger.warning(f"Circuit Breaker TRIPPED. Pausing background sync for {self.CIRCUIT_BREAKER_WINDOW}s") | |
| self.circuit_open_until = time.time() + self.CIRCUIT_BREAKER_WINDOW | |
| def save_thought(self, thought_data: dict, topic: str = "general") -> bool: | |
| """Helper for Brain/CTM to save JSON thoughts directly.""" | |
| try: | |
| timestamp = int(time.time()*1000) | |
| filename = f"thought_{topic}_{timestamp}.json" | |
| local_tmp = Path(self.buffer_dir) / f"tmp_{filename}" | |
| with open(local_tmp, 'w') as f: | |
| json.dump(thought_data, f, indent=2) | |
| # Queue for upload to /thoughts/{topic} | |
| return self.save_file(str(local_tmp), remote_folder=f"thoughts/{topic}") | |
| except Exception as e: | |
| logger.error(f"Failed to save thought: {e}") | |
| return False | |