| """ |
| schema_tester.py - Official Schema Testing System |
| |
| This script iterates over all schemas in schemas/, prompts the trained model, |
| validates output with jsonschema, and prints comprehensive pass/fail results. |
| |
| Matches the exact specification from the user's requirements. |
| """ |
|
|
| import os |
| import json |
| import torch |
| from pathlib import Path |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| from peft import PeftModel |
| import jsonschema |
| from jsonschema import validate, ValidationError |
| import random |
|
|
| class SchemaValidator: |
| """Handles JSON schema validation.""" |
| |
| @staticmethod |
| def validate_function_call(response, schema): |
| """Validate if response matches expected function call structure.""" |
| try: |
| |
| call_data = json.loads(response) |
| |
| |
| if not isinstance(call_data, dict): |
| return False, "Response is not a JSON object" |
| |
| if "name" not in call_data: |
| return False, "Missing 'name' field" |
| |
| if "arguments" not in call_data: |
| return False, "Missing 'arguments' field" |
| |
| |
| if call_data["name"] != schema["name"]: |
| return False, f"Function name mismatch: expected '{schema['name']}', got '{call_data['name']}'" |
| |
| |
| try: |
| validate(instance=call_data["arguments"], schema=schema["parameters"]) |
| return True, "Valid function call" |
| except ValidationError as e: |
| return False, f"Argument validation failed: {e.message}" |
| |
| except json.JSONDecodeError as e: |
| return False, f"Invalid JSON: {e}" |
|
|
| class ModelTester: |
| """Handles model loading and testing.""" |
| |
| def __init__(self, model_path="./smollm3_robust"): |
| self.model_path = model_path |
| self.model = None |
| self.tokenizer = None |
| self.device = None |
| self._load_model() |
| |
| def _load_model(self): |
| """Load the trained model.""" |
| print("π Loading trained SmolLM3-3B model...") |
| |
| base_model_name = "HuggingFaceTB/SmolLM3-3B" |
| |
| |
| self.tokenizer = AutoTokenizer.from_pretrained(base_model_name) |
| if self.tokenizer.pad_token is None: |
| self.tokenizer.pad_token = self.tokenizer.eos_token |
| |
| |
| base_model = AutoModelForCausalLM.from_pretrained( |
| base_model_name, |
| torch_dtype=torch.float32, |
| trust_remote_code=True |
| ) |
| |
| |
| self.model = PeftModel.from_pretrained(base_model, self.model_path) |
| |
| |
| if torch.backends.mps.is_available(): |
| self.model = self.model.to("mps") |
| self.device = "mps" |
| else: |
| self.device = "cpu" |
| |
| print(f"β
Model loaded on {self.device}") |
| |
| def test_schema(self, schema, question): |
| """Test the model on a specific schema and question.""" |
| |
| prompt = f"""<|im_start|>system |
| You are a helpful assistant that calls functions by responding with valid JSON when given a schema. Always respond with JSON function calls only, never prose.<|im_end|> |
| |
| <schema> |
| {json.dumps(schema, indent=2)} |
| </schema> |
| |
| <|im_start|>user |
| {question}<|im_end|> |
| <|im_start|>assistant |
| """ |
| |
| |
| inputs = self.tokenizer(prompt, return_tensors="pt") |
| if self.device == "mps": |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} |
| |
| |
| self.model.eval() |
| with torch.no_grad(): |
| outputs = self.model.generate( |
| **inputs, |
| max_new_tokens=150, |
| temperature=0.1, |
| do_sample=True, |
| pad_token_id=self.tokenizer.eos_token_id, |
| eos_token_id=self.tokenizer.eos_token_id |
| ) |
| |
| |
| input_length = inputs["input_ids"].shape[1] |
| response = self.tokenizer.decode(outputs[0][input_length:], skip_special_tokens=True) |
| |
| |
| response = response.strip() |
| if response.endswith('}"}'): |
| response = response[:-2] |
| if response.endswith('}}'): |
| response = response[:-1] |
| |
| return response |
|
|
| def load_schemas(schemas_dir="schemas"): |
| """Load all schema files from the schemas directory.""" |
| schemas = {} |
| schema_files = Path(schemas_dir).glob("*.json") |
| |
| for schema_file in schema_files: |
| try: |
| with open(schema_file, 'r') as f: |
| schema_data = json.load(f) |
| schemas[schema_file.stem] = schema_data |
| except Exception as e: |
| print(f"β οΈ Error loading {schema_file}: {e}") |
| |
| return schemas |
|
|
| def run_comprehensive_test(): |
| """Run the complete schema testing suite.""" |
| |
| print("π§ͺ Official Schema Testing System") |
| print("=" * 50) |
| |
| |
| print("π Loading evaluation schemas...") |
| schemas = load_schemas() |
| |
| if not schemas: |
| print("β No schemas found in schemas/ directory") |
| return |
| |
| print(f"β
Loaded {len(schemas)} schemas: {', '.join(schemas.keys())}") |
| |
| |
| tester = ModelTester() |
| validator = SchemaValidator() |
| |
| |
| results = {} |
| total_tests = 0 |
| total_passed = 0 |
| |
| print(f"\nπ― Running tests on all schemas...") |
| print("-" * 50) |
| |
| |
| for schema_name, schema_data in schemas.items(): |
| print(f"\nπ Testing Schema: {schema_name}") |
| print(f"π§ Function: {schema_data['name']}") |
| |
| |
| test_questions = schema_data.get('test_questions', []) |
| if not test_questions: |
| print("β οΈ No test questions found, skipping") |
| continue |
| |
| schema_results = [] |
| |
| |
| for i, question in enumerate(test_questions, 1): |
| print(f"\nβ Test {i}: {question}") |
| |
| |
| response = tester.test_schema(schema_data, question) |
| print(f"π€ Response: {response}") |
| |
| |
| is_valid, error_msg = validator.validate_function_call(response, schema_data) |
| |
| if is_valid: |
| print(f"β
PASS - {error_msg}") |
| schema_results.append(True) |
| total_passed += 1 |
| else: |
| print(f"β FAIL - {error_msg}") |
| schema_results.append(False) |
| |
| total_tests += 1 |
| |
| |
| schema_passed = sum(schema_results) |
| schema_total = len(schema_results) |
| schema_rate = schema_passed / schema_total * 100 |
| |
| results[schema_name] = { |
| 'passed': schema_passed, |
| 'total': schema_total, |
| 'rate': schema_rate, |
| 'results': schema_results |
| } |
| |
| print(f"π Schema Summary: {schema_passed}/{schema_total} ({schema_rate:.1f}%)") |
| |
| |
| print(f"\n" + "=" * 50) |
| print(f"π OVERALL RESULTS") |
| print(f"=" * 50) |
| |
| overall_rate = total_passed / total_tests * 100 |
| print(f"β
Total passed: {total_passed}/{total_tests} ({overall_rate:.1f}%)") |
| print(f"π― Target: β₯80% valid calls") |
| |
| |
| print(f"\nπ Detailed Breakdown:") |
| for schema_name, result in results.items(): |
| status = "β
PASS" if result['rate'] >= 80 else "β FAIL" |
| print(f" {schema_name}: {result['passed']}/{result['total']} ({result['rate']:.1f}%) {status}") |
| |
| |
| if overall_rate >= 80: |
| print(f"\nπ SUCCESS! Model meets the β₯80% target") |
| print(f"π Ready for enterprise deployment") |
| else: |
| print(f"\nπ IMPROVEMENT NEEDED") |
| print(f"π Current: {overall_rate:.1f}% | Target: β₯80%") |
| print(f"π‘ Suggestions:") |
| |
| |
| failed_schemas = [name for name, result in results.items() if result['rate'] < 80] |
| |
| if failed_schemas: |
| print(f" 1. Focus training on: {', '.join(failed_schemas)}") |
| print(f" 2. Add more examples for complex parameter schemas") |
| print(f" 3. Increase training epochs or learning rate") |
| |
| print(f" 4. Consider using larger LoRA rank (r=16)") |
| print(f" 5. Generate more diverse training examples") |
| |
| return results, overall_rate |
|
|
| def main(): |
| """Main entry point.""" |
| try: |
| results, rate = run_comprehensive_test() |
| |
| |
| with open("test_results.json", "w") as f: |
| json.dump({ |
| "overall_rate": rate, |
| "results": results, |
| "timestamp": str(torch.cuda.current_device() if torch.cuda.is_available() else "cpu") |
| }, f, indent=2) |
| |
| print(f"\nπΎ Results saved to test_results.json") |
| |
| except Exception as e: |
| print(f"β Testing failed: {e}") |
| raise |
|
|
| if __name__ == "__main__": |
| main() |