| """ |
| Setup script to fetch data from GitHub repository or use mock data as fallback. |
| This runs before the app starts to ensure data is available. |
| Supports periodic refresh with caching. |
| """ |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import threading |
| import time |
| import logging |
| from pathlib import Path |
| from datetime import datetime, timedelta |
| from config import DATA_DIR, EXTRACTED_DATA_DIR, CONFIG_NAME |
|
|
| logger = logging.getLogger(__name__) |
|
|
| GITHUB_REPO = "https://github.com/OpenHands/openhands-index-results.git" |
| |
| REPO_CLONE_DIR = Path("/tmp/openhands-index-results") |
|
|
| |
| _last_fetch_time = None |
| _fetch_lock = threading.Lock() |
| _refresh_callbacks = [] |
|
|
| |
| CACHE_TTL_SECONDS = int(os.environ.get("CACHE_TTL_SECONDS", 900)) |
|
|
|
|
| def get_repo_clone_dir() -> Path: |
| """Get the path to the cloned openhands-index-results repository.""" |
| return REPO_CLONE_DIR |
|
|
|
|
| def fetch_data_from_github(): |
| """ |
| Fetch data from the openhands-index-results GitHub repository. |
| Keeps the full repo clone for access to pydantic schema models. |
| Returns True if successful, False otherwise. |
| """ |
| clone_dir = REPO_CLONE_DIR |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME |
| |
| try: |
| |
| if clone_dir.exists(): |
| shutil.rmtree(clone_dir) |
| |
| print(f"Attempting to clone data from {GITHUB_REPO}...") |
| |
| |
| env = os.environ.copy() |
| env['GIT_TERMINAL_PROMPT'] = '0' |
| |
| |
| result = subprocess.run( |
| ["git", "clone", "--depth", "1", GITHUB_REPO, str(clone_dir)], |
| capture_output=True, |
| text=True, |
| timeout=30, |
| stdin=subprocess.DEVNULL, |
| env=env |
| ) |
| |
| if result.returncode != 0: |
| print(f"Git clone failed: {result.stderr}") |
| return False |
| |
| |
| results_source = clone_dir / "results" |
|
|
| if not results_source.exists(): |
| print(f"Results directory not found in repository") |
| return False |
|
|
| |
| result_dirs = list(results_source.iterdir()) |
| if not result_dirs: |
| print(f"No agent results found in {results_source}") |
| return False |
|
|
| print(f"Found {len(result_dirs)} agent result directories") |
|
|
| |
| os.makedirs(target_dir.parent, exist_ok=True) |
| if target_dir.exists(): |
| shutil.rmtree(target_dir) |
|
|
| |
| target_results = target_dir / "results" |
| shutil.copytree(results_source, target_results) |
|
|
| |
| |
| |
| alt_source = clone_dir / "alternative_agents" |
| if alt_source.exists(): |
| alt_target = target_dir / "alternative_agents" |
| shutil.copytree(alt_source, alt_target) |
| agent_types = sorted(p.name for p in alt_source.iterdir() if p.is_dir()) |
| print(f"Found alternative agent types: {agent_types}") |
| else: |
| print("No alternative_agents/ directory in repository (skipping)") |
| |
| print(f"Successfully fetched data from GitHub. Files: {list(target_dir.glob('*'))}") |
| |
| |
| sample_agents = list(target_results.glob("*/scores.json")) |
| if sample_agents: |
| import json |
| with open(sample_agents[0]) as f: |
| sample_data = json.load(f) |
| print(f"Sample data from {sample_agents[0].parent.name}: {sample_data[0] if sample_data else 'EMPTY'}") |
| |
| |
| scripts_dir = clone_dir / "scripts" |
| if scripts_dir.exists() and str(scripts_dir) not in sys.path: |
| sys.path.insert(0, str(scripts_dir)) |
| print(f"Added {scripts_dir} to Python path for schema imports") |
| |
| return True |
| |
| except subprocess.TimeoutExpired: |
| print("Git clone timed out") |
| return False |
| except Exception as e: |
| print(f"Error fetching data from GitHub: {e}") |
| return False |
|
|
| def copy_mock_data(): |
| """Copy mock data to the expected extraction directory.""" |
| |
| mock_source = Path("mock_results") / CONFIG_NAME |
| if not mock_source.exists(): |
| |
| mock_source = Path("/app/mock_results") / CONFIG_NAME |
| |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME |
| |
| print(f"Current working directory: {os.getcwd()}") |
| print(f"Looking for mock data at: {mock_source.absolute()}") |
| |
| if not mock_source.exists(): |
| print(f"ERROR: Mock data directory {mock_source} not found!") |
| print(f"Directory contents: {list(Path('.').glob('*'))}") |
| return False |
| |
| |
| os.makedirs(target_dir.parent, exist_ok=True) |
| |
| |
| print(f"Using mock data from {mock_source}") |
| if target_dir.exists(): |
| shutil.rmtree(target_dir) |
| shutil.copytree(mock_source, target_dir) |
| |
| |
| copied_files = list(target_dir.glob('*')) |
| print(f"Mock data copied successfully. Files: {copied_files}") |
| print(f"Target directory: {target_dir.absolute()}") |
| return True |
|
|
| def register_refresh_callback(callback): |
| """ |
| Register a callback to be called after data is refreshed. |
| The callback should clear any caches that depend on the data. |
| """ |
| global _refresh_callbacks |
| if callback not in _refresh_callbacks: |
| _refresh_callbacks.append(callback) |
|
|
|
|
| def _notify_refresh_callbacks(): |
| """Notify all registered callbacks that data has been refreshed.""" |
| global _refresh_callbacks |
| for callback in _refresh_callbacks: |
| try: |
| callback() |
| except Exception as e: |
| logger.warning(f"Error in refresh callback: {e}") |
|
|
|
|
| def get_last_fetch_time(): |
| """Get the timestamp of the last successful data fetch.""" |
| global _last_fetch_time |
| return _last_fetch_time |
|
|
|
|
| def is_cache_stale(): |
| """Check if the cache has expired (older than CACHE_TTL_SECONDS).""" |
| global _last_fetch_time |
| if _last_fetch_time is None: |
| return True |
| return datetime.now() - _last_fetch_time > timedelta(seconds=CACHE_TTL_SECONDS) |
|
|
|
|
| def refresh_data_if_needed(force: bool = False) -> bool: |
| """ |
| Refresh data from GitHub if the cache is stale or if forced. |
| |
| Args: |
| force: If True, refresh regardless of cache age |
| |
| Returns: |
| True if data was refreshed, False otherwise |
| """ |
| global _last_fetch_time, _fetch_lock |
| |
| if not force and not is_cache_stale(): |
| return False |
| |
| |
| if not _fetch_lock.acquire(blocking=False): |
| logger.info("Another refresh is in progress, skipping...") |
| return False |
| |
| try: |
| logger.info("Refreshing data from GitHub...") |
| |
| |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME |
| if target_dir.exists(): |
| shutil.rmtree(target_dir) |
| |
| |
| if fetch_data_from_github(): |
| _last_fetch_time = datetime.now() |
| logger.info(f"✓ Data refreshed successfully at {_last_fetch_time}") |
| _notify_refresh_callbacks() |
| return True |
| else: |
| |
| logger.warning("GitHub fetch failed, trying mock data...") |
| if copy_mock_data(): |
| _last_fetch_time = datetime.now() |
| logger.info(f"✓ Using mock data (refreshed at {_last_fetch_time})") |
| _notify_refresh_callbacks() |
| return True |
| logger.error("Failed to refresh data from any source") |
| return False |
| finally: |
| _fetch_lock.release() |
|
|
|
|
| def setup_mock_data(): |
| """ |
| Setup data for the leaderboard. |
| First tries to fetch from GitHub, falls back to mock data if unavailable. |
| """ |
| global _last_fetch_time |
| |
| print("=" * 60) |
| print("STARTING DATA SETUP") |
| print("=" * 60) |
| |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME |
| |
| |
| if target_dir.exists(): |
| results_dir = target_dir / "results" |
| has_results = results_dir.exists() and any(results_dir.iterdir()) |
| has_jsonl = any(target_dir.glob("*.jsonl")) |
| |
| if has_results or has_jsonl: |
| if not is_cache_stale(): |
| print(f"Data already exists at {target_dir} and cache is fresh") |
| return |
| print(f"Data exists but cache is stale, will refresh...") |
| |
| |
| print("\n--- Attempting to fetch from GitHub ---") |
| try: |
| if fetch_data_from_github(): |
| _last_fetch_time = datetime.now() |
| print("✓ Successfully using data from GitHub repository") |
| return |
| except Exception as e: |
| print(f"GitHub fetch failed with error: {e}") |
| |
| |
| print("\n--- GitHub data not available, falling back to mock data ---") |
| try: |
| if copy_mock_data(): |
| _last_fetch_time = datetime.now() |
| print("✓ Successfully using mock data") |
| return |
| except Exception as e: |
| print(f"Mock data copy failed with error: {e}") |
| |
| print("\n" + "!" * 60) |
| print("ERROR: No data available! Neither GitHub nor mock data could be loaded.") |
| print("!" * 60) |
|
|
|
|
| |
| _scheduler_thread = None |
| _scheduler_stop_event = threading.Event() |
|
|
|
|
| def _background_refresh_loop(): |
| """Background thread that periodically refreshes data.""" |
| global _scheduler_stop_event |
| |
| logger.info(f"Background refresh scheduler started (interval: {CACHE_TTL_SECONDS}s)") |
| |
| while not _scheduler_stop_event.is_set(): |
| |
| if _scheduler_stop_event.wait(timeout=CACHE_TTL_SECONDS): |
| break |
| |
| |
| try: |
| logger.info("Background refresh triggered") |
| refresh_data_if_needed(force=True) |
| except Exception as e: |
| logger.error(f"Error in background refresh: {e}") |
| |
| logger.info("Background refresh scheduler stopped") |
|
|
|
|
| def start_background_refresh(): |
| """Start the background refresh scheduler.""" |
| global _scheduler_thread, _scheduler_stop_event |
| |
| if _scheduler_thread is not None and _scheduler_thread.is_alive(): |
| logger.info("Background refresh scheduler already running") |
| return |
| |
| _scheduler_stop_event.clear() |
| _scheduler_thread = threading.Thread(target=_background_refresh_loop, daemon=True) |
| _scheduler_thread.start() |
| logger.info("Background refresh scheduler started") |
|
|
|
|
| def stop_background_refresh(): |
| """Stop the background refresh scheduler.""" |
| global _scheduler_thread, _scheduler_stop_event |
| |
| if _scheduler_thread is None: |
| return |
| |
| _scheduler_stop_event.set() |
| _scheduler_thread.join(timeout=5) |
| _scheduler_thread = None |
| logger.info("Background refresh scheduler stopped") |
|
|
|
|
| if __name__ == "__main__": |
| setup_mock_data() |
|
|