Spaces:
Running
Running
| """ | |
| Precompute example evaluation results for the default demo. | |
| This script runs the evaluation on the example text and saves the results | |
| so they can be loaded instantly when users visit the page. | |
| """ | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from pathlib import Path | |
| # Add parent directory to path | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| import torch | |
| # Get the directory where this script is located | |
| SCRIPT_DIR = Path(__file__).parent.absolute() | |
| MODELS_DIR = SCRIPT_DIR / "models" | |
| SUPPORT_DIR = SCRIPT_DIR / "support" | |
| PRECOMPUTED_DIR = SCRIPT_DIR / "precomputed" | |
| HF_REPO_ID = "BlinkDL/rwkv7-g1" | |
| SMALL_SIZE_KEY = "0.4b" | |
| LARGE_SIZE_KEY = "1.5b" | |
| PREFERRED = { | |
| "0.4b": "rwkv7-g1d-0.4b-20260210-ctx8192.pth", | |
| "1.5b": "rwkv7-g1d-1.5b-20260212-ctx8192.pth", | |
| } | |
| SMALL_MODEL_NAME = "RWKV7-G1D-0.4B" | |
| LARGE_MODEL_NAME = "RWKV7-G1D-1.5B" | |
| # Detect device | |
| # DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| DEVICE = "cpu" | |
| IS_CPU = DEVICE == "cpu" | |
| def _extract_date_token(filename: str): | |
| m = re.search(r"-(\d{8})-", filename) | |
| return m.group(1) if m else "00000000" | |
| def _pick_best_filename(filenames): | |
| if not filenames: | |
| return None | |
| return sorted(filenames, key=lambda x: (_extract_date_token(x), x))[-1] | |
| def _list_repo_files(): | |
| from huggingface_hub import HfApi | |
| return HfApi().list_repo_files(repo_id=HF_REPO_ID, repo_type="model") | |
| def resolve_rwkv_model_path(size_key: str) -> str: | |
| """Resolve RWKV model path from project models dir; auto-download when missing.""" | |
| MODELS_DIR.mkdir(parents=True, exist_ok=True) | |
| preferred = PREFERRED.get(size_key) | |
| if preferred and (MODELS_DIR / preferred).exists(): | |
| return str((MODELS_DIR / preferred).resolve()) | |
| pattern = f"rwkv7-g1d-{size_key}-*.pth" | |
| local_matches = [p.name for p in MODELS_DIR.glob(pattern)] | |
| local_best = _pick_best_filename(local_matches) | |
| if local_best: | |
| return str((MODELS_DIR / local_best).resolve()) | |
| repo_files = _list_repo_files() | |
| remote_candidates = [f for f in repo_files if re.match(rf"^rwkv7-g1d-{re.escape(size_key)}-.*\.pth$", f, re.IGNORECASE)] | |
| remote_file = preferred if preferred in remote_candidates else _pick_best_filename(remote_candidates) | |
| if not remote_file: | |
| raise FileNotFoundError(f"No remote model found for size {size_key} in {HF_REPO_ID}") | |
| from huggingface_hub import hf_hub_download | |
| print(f"Downloading missing model {remote_file} from {HF_REPO_ID} ...") | |
| local_path = hf_hub_download( | |
| repo_id=HF_REPO_ID, | |
| filename=remote_file, | |
| local_dir=str(MODELS_DIR), | |
| local_dir_use_symlinks=False, | |
| ) | |
| return str(Path(local_path).resolve()) | |
| def load_rwkv7_model(model_path: str): | |
| """Load RWKV7 model.""" | |
| os.environ["RWKV_JIT_ON"] = "1" | |
| os.environ["RWKV_V7_ON"] = "1" | |
| if IS_CPU: | |
| os.environ["RWKV_CUDA_ON"] = "0" | |
| else: | |
| os.environ["RWKV_CUDA_ON"] = "1" | |
| from rwkv.model import RWKV | |
| from rwkv.rwkv_tokenizer import TRIE_TOKENIZER | |
| strategy = "cpu fp32" if IS_CPU else "cuda fp16" | |
| if model_path.endswith(".pth"): | |
| model_path = model_path[:-4] | |
| model = RWKV(model=model_path, strategy=strategy) | |
| vocab_path = str(SUPPORT_DIR / "rwkv_vocab_v20230424.txt") | |
| tokenizer = TRIE_TOKENIZER(vocab_path) | |
| return model, tokenizer | |
| def precompute_example(): | |
| """Precompute the example and save results.""" | |
| from core.evaluator import evaluate_rwkv7_single_sample | |
| from visualization.html_generator import generate_comparison_html | |
| example_file = SCRIPT_DIR / "the_bitter_lesson.txt" | |
| with open(example_file, "r", encoding="utf-8") as f: | |
| example_text = f.read() | |
| print(f"Example text length: {len(example_text)} characters") | |
| print(f"Resolving {SMALL_MODEL_NAME} model path...") | |
| small_model_path = resolve_rwkv_model_path(SMALL_SIZE_KEY) | |
| print(f"Resolving {LARGE_MODEL_NAME} model path...") | |
| large_model_path = resolve_rwkv_model_path(LARGE_SIZE_KEY) | |
| print(f"Loading {SMALL_MODEL_NAME}...") | |
| small_model, small_tokenizer = load_rwkv7_model(small_model_path) | |
| print(f"Loading {LARGE_MODEL_NAME}...") | |
| large_model, large_tokenizer = load_rwkv7_model(large_model_path) | |
| print(f"Evaluating with {SMALL_MODEL_NAME}...") | |
| result_small = evaluate_rwkv7_single_sample(small_model, small_tokenizer, example_text) | |
| print(f"{SMALL_MODEL_NAME} completed in {result_small['inference_time']:.2f}s") | |
| print(f"Evaluating with {LARGE_MODEL_NAME}...") | |
| result_large = evaluate_rwkv7_single_sample(large_model, large_tokenizer, example_text) | |
| print(f"{LARGE_MODEL_NAME} completed in {result_large['inference_time']:.2f}s") | |
| print("Generating visualization...") | |
| html = generate_comparison_html( | |
| text=example_text, | |
| byte_losses_a=result_large["byte_wise_losses"], | |
| byte_losses_b=result_small["byte_wise_losses"], | |
| model_a_name=LARGE_MODEL_NAME, | |
| model_b_name=SMALL_MODEL_NAME, | |
| topk_predictions_a=result_large["top5_predictions"], | |
| topk_predictions_b=result_small["top5_predictions"], | |
| tokenizer_a=result_large["tokenizer"], | |
| tokenizer_b=result_small["tokenizer"], | |
| model_type_a="rwkv7", | |
| model_type_b="rwkv7", | |
| default_delta_mode="absolute", | |
| ) | |
| PRECOMPUTED_DIR.mkdir(parents=True, exist_ok=True) | |
| html_path = PRECOMPUTED_DIR / "example_visualization.html" | |
| with open(html_path, "w", encoding="utf-8") as f: | |
| f.write(html) | |
| print(f"Saved HTML to {html_path}") | |
| metadata = { | |
| "example_text": example_text, | |
| "small_model_inference_time": result_small["inference_time"], | |
| "large_model_inference_time": result_large["inference_time"], | |
| "small_model_compression_rate": result_small["compression_rate"], | |
| "large_model_compression_rate": result_large["compression_rate"], | |
| "small_model_file": Path(small_model_path).name, | |
| "large_model_file": Path(large_model_path).name, | |
| } | |
| metadata_path = PRECOMPUTED_DIR / "example_metadata.json" | |
| with open(metadata_path, "w", encoding="utf-8") as f: | |
| json.dump(metadata, f, ensure_ascii=False, indent=2) | |
| print(f"Saved metadata to {metadata_path}") | |
| print("Done! Precomputed example is ready.") | |
| if __name__ == "__main__": | |
| precompute_example() | |