Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| SIRUS ML Module Real-Time End-to-End Testing Script | |
| This interactive script allows you to test the complete ML module workflow | |
| in real-time, with detailed logging and validation at each step. | |
| Usage: | |
| python run_e2e_test.py [--mode interactive|automated] [--config test_config.json] | |
| Features: | |
| - Interactive step-by-step testing | |
| - Automated full workflow testing | |
| - Real-time progress monitoring | |
| - Detailed logging and error reporting | |
| - Performance metrics collection | |
| - Test result validation | |
| """ | |
| import asyncio | |
| import argparse | |
| import json | |
| import time | |
| import logging | |
| import sys | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Optional | |
| import requests | |
| import pandas as pd | |
| from datetime import datetime | |
| # Add the project root to Python path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| sys.path.append(str(Path(__file__).parent)) | |
| from tests.test_fixtures import TestDataGenerator, TestFileManager, TestReporter, TestValidators | |
| class MLModuleE2ETester: | |
| """Interactive end-to-end tester for ML module.""" | |
| def __init__(self, base_url: str = "http://localhost:8001", | |
| user_id: str = "realtime_test_user"): | |
| self.base_url = base_url.rstrip('/') | |
| self.user_id = user_id | |
| self.session = requests.Session() | |
| self.logger = self._setup_logging() | |
| self.file_manager = TestFileManager() | |
| self.reporter = TestReporter() | |
| self.validators = TestValidators() | |
| # Test state | |
| self.current_project_id = None | |
| self.test_results = [] | |
| # Test data | |
| self.data_generator = TestDataGenerator() | |
| def _setup_logging(self) -> logging.Logger: | |
| """Set up logging configuration.""" | |
| log_format = '%(asctime)s - %(levelname)s - %(message)s' | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format=log_format, | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler(f'ml_e2e_test_{int(time.time())}.log') | |
| ] | |
| ) | |
| return logging.getLogger(__name__) | |
| def print_banner(self): | |
| """Print welcome banner.""" | |
| banner = """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β SIRUS ML MODULE E2E TESTER β | |
| β β | |
| β Real-time testing of complete ML workflow: β | |
| β Project Creation β Data Upload β Analysis β β | |
| β Preprocessing β Training β Evaluation β Cleanup β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| print(banner) | |
| self.logger.info("Starting SIRUS ML Module E2E Testing") | |
| def check_api_health(self) -> bool: | |
| """Check if the ML module API is running.""" | |
| try: | |
| response = self.session.get(f"{self.base_url}/") | |
| if response.status_code == 200: | |
| self.logger.info("β API is healthy and responding") | |
| return True | |
| else: | |
| self.logger.error(f"β API health check failed: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"β Cannot connect to API: {e}") | |
| return False | |
| def cleanup_existing_projects(self): | |
| """Clean up any existing test projects.""" | |
| try: | |
| response = self.session.get( | |
| f"{self.base_url}/api/v1/projects/", | |
| params={"user_id": self.user_id} | |
| ) | |
| if response.status_code == 200: | |
| projects = response.json() | |
| for project in projects: | |
| self.logger.info(f"π§Ή Cleaning up existing project: {project['project_id']}") | |
| self.session.delete( | |
| f"{self.base_url}/api/v1/projects/{project['project_id']}", | |
| params={"user_id": self.user_id} | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Could not cleanup existing projects: {e}") | |
| def test_project_creation(self) -> bool: | |
| """Test project creation.""" | |
| self.logger.info("π Testing project creation...") | |
| project_name = f"E2E Test Project {datetime.now().strftime('%H%M%S')}" | |
| try: | |
| response = self.session.post( | |
| f"{self.base_url}/api/v1/projects/", | |
| data={ | |
| "user_id": self.user_id, | |
| "project_name": project_name | |
| } | |
| ) | |
| if response.status_code == 201: | |
| project_data = response.json() | |
| self.current_project_id = project_data["project_id"] | |
| # Validate project structure | |
| is_valid = self.validators.validate_project_structure(project_data) | |
| if is_valid: | |
| self.logger.info(f"β Project created successfully: {self.current_project_id}") | |
| self.logger.info(f" - Name: {project_data['project_name']}") | |
| self.logger.info(f" - State: {project_data['current_step']}") | |
| return True | |
| else: | |
| self.logger.error("β Project structure validation failed") | |
| return False | |
| else: | |
| self.logger.error(f"β Project creation failed: {response.status_code} - {response.text}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"β Exception during project creation: {e}") | |
| return False | |
| def test_data_upload(self, dataset_type: str = "messy") -> bool: | |
| """Test data upload functionality.""" | |
| self.logger.info(f"π€ Testing data upload ({dataset_type} dataset)...") | |
| if not self.current_project_id: | |
| self.logger.error("β No active project for data upload") | |
| return False | |
| try: | |
| # Generate test dataset | |
| if dataset_type == "clean": | |
| df = self.data_generator.create_clean_dataset(150) | |
| elif dataset_type == "large": | |
| df = self.data_generator.create_large_dataset(1000) | |
| else: # messy | |
| df = self.data_generator.create_messy_dataset(200) | |
| # Create temporary CSV file | |
| csv_path = self.file_manager.create_csv_file(df, "dataset.csv") | |
| # Upload file | |
| with open(csv_path, 'rb') as f: | |
| files = {"file": ("dataset.csv", f, "text/csv")} | |
| data = {"user_id": self.user_id} | |
| response = self.session.post( | |
| f"{self.base_url}/api/v1/projects/{self.current_project_id}/upload", | |
| data=data, | |
| files=files | |
| ) | |
| if response.status_code == 200: | |
| upload_result = response.json() | |
| self.logger.info(f"β Dataset uploaded successfully") | |
| self.logger.info(f" - Path: {upload_result.get('path', 'N/A')}") | |
| self.logger.info(f" - Rows: {len(df)}, Columns: {len(df.columns)}") | |
| self.logger.info(f" - Missing values: {df.isnull().sum().sum()}") | |
| return True | |
| else: | |
| self.logger.error(f"β Data upload failed: {response.status_code} - {response.text}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"β Exception during data upload: {e}") | |
| return False | |
| def test_chat_workflow(self, messages: List[str]) -> bool: | |
| """Test the complete chat workflow.""" | |
| self.logger.info("π¬ Testing ML chat workflow...") | |
| if not self.current_project_id: | |
| self.logger.error("β No active project for chat testing") | |
| return False | |
| success = True | |
| for i, message in enumerate(messages, 1): | |
| self.logger.info(f"π¨ Step {i}: Sending message: '{message}'") | |
| try: | |
| response = self.session.post( | |
| f"{self.base_url}/api/v1/projects/{self.current_project_id}/chat/", | |
| data={ | |
| "user_id": self.user_id, | |
| "message": message | |
| } | |
| ) | |
| if response.status_code == 200: | |
| chat_result = response.json() | |
| bot_response = chat_result.get("response", "No response") | |
| self.logger.info(f"β Response received:") | |
| # Log first 200 chars of response | |
| response_preview = bot_response[:200] + "..." if len(bot_response) > 200 else bot_response | |
| self.logger.info(f" {response_preview}") | |
| # Add delay between messages to simulate real interaction | |
| time.sleep(2) | |
| else: | |
| self.logger.error(f"β Chat message failed: {response.status_code} - {response.text}") | |
| success = False | |
| break | |
| except Exception as e: | |
| self.logger.error(f"β Exception during chat: {e}") | |
| success = False | |
| break | |
| return success | |
| def test_project_state_tracking(self) -> bool: | |
| """Test project state management.""" | |
| self.logger.info("π Testing project state tracking...") | |
| if not self.current_project_id: | |
| return False | |
| try: | |
| response = self.session.get( | |
| f"{self.base_url}/api/v1/projects/{self.current_project_id}", | |
| params={"user_id": self.user_id} | |
| ) | |
| if response.status_code == 200: | |
| project_data = response.json() | |
| current_state = project_data.get("current_step", "unknown") | |
| self.logger.info(f"β Current project state: {current_state}") | |
| self.logger.info(f" - Project ID: {self.current_project_id}") | |
| self.logger.info(f" - Metadata keys: {list(project_data.get('metadata', {}).keys())}") | |
| return True | |
| else: | |
| self.logger.error(f"β Could not get project state: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"β Exception getting project state: {e}") | |
| return False | |
| def test_file_operations(self) -> bool: | |
| """Test file listing and management.""" | |
| self.logger.info("π Testing file operations...") | |
| if not self.current_project_id: | |
| return False | |
| try: | |
| # List files | |
| response = self.session.get( | |
| f"{self.base_url}/api/v1/projects/{self.current_project_id}/files", | |
| params={"user_id": self.user_id} | |
| ) | |
| if response.status_code == 200: | |
| files = response.json() | |
| self.logger.info(f"β Project files listed: {len(files)} files found") | |
| for file_path in files[:5]: # Show first 5 files | |
| self.logger.info(f" - {file_path}") | |
| if len(files) > 5: | |
| self.logger.info(f" ... and {len(files) - 5} more files") | |
| return True | |
| else: | |
| self.logger.error(f"β File listing failed: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"β Exception during file operations: {e}") | |
| return False | |
| def test_project_cleanup(self) -> bool: | |
| """Test project deletion and cleanup.""" | |
| self.logger.info("π§Ή Testing project cleanup...") | |
| if not self.current_project_id: | |
| self.logger.warning("β οΈ No active project to cleanup") | |
| return True | |
| try: | |
| response = self.session.delete( | |
| f"{self.base_url}/api/v1/projects/{self.current_project_id}", | |
| params={"user_id": self.user_id} | |
| ) | |
| if response.status_code == 204: | |
| self.logger.info(f"β Project deleted successfully: {self.current_project_id}") | |
| # Verify deletion | |
| check_response = self.session.get( | |
| f"{self.base_url}/api/v1/projects/{self.current_project_id}", | |
| params={"user_id": self.user_id} | |
| ) | |
| if check_response.status_code == 404: | |
| self.logger.info("β Deletion verified - project not found") | |
| self.current_project_id = None | |
| return True | |
| else: | |
| self.logger.error("β Project still exists after deletion") | |
| return False | |
| else: | |
| self.logger.error(f"β Project deletion failed: {response.status_code}") | |
| return False | |
| except Exception as e: | |
| self.logger.error(f"β Exception during cleanup: {e}") | |
| return False | |
| def run_interactive_mode(self): | |
| """Run interactive testing mode.""" | |
| self.print_banner() | |
| if not self.check_api_health(): | |
| return | |
| while True: | |
| print("\n" + "="*60) | |
| print("INTERACTIVE TESTING MENU") | |
| print("="*60) | |
| print("1. π Create New Project") | |
| print("2. π€ Upload Test Dataset") | |
| print("3. π¬ Start ML Chat Workflow") | |
| print("4. π Check Project State") | |
| print("5. π List Project Files") | |
| print("6. π§Ή Cleanup Project") | |
| print("7. π€ Run Full Automated Test") | |
| print("8. π View Test Summary") | |
| print("9. β Exit") | |
| choice = input("\nEnter your choice (1-9): ").strip() | |
| if choice == '1': | |
| self.test_project_creation() | |
| elif choice == '2': | |
| dataset_type = input("Dataset type (clean/messy/large) [messy]: ").strip() or "messy" | |
| self.test_data_upload(dataset_type) | |
| elif choice == '3': | |
| self.run_chat_workflow() | |
| elif choice == '4': | |
| self.test_project_state_tracking() | |
| elif choice == '5': | |
| self.test_file_operations() | |
| elif choice == '6': | |
| self.test_project_cleanup() | |
| elif choice == '7': | |
| self.run_automated_test() | |
| elif choice == '8': | |
| self.show_test_summary() | |
| elif choice == '9': | |
| print("π Goodbye!") | |
| break | |
| else: | |
| print("β Invalid choice. Please try again.") | |
| input("\nPress Enter to continue...") | |
| def run_chat_workflow(self): | |
| """Interactive chat workflow.""" | |
| if not self.current_project_id: | |
| print("β No active project. Please create a project first.") | |
| return | |
| print("\n㪠Starting ML Chat Workflow") | |
| print("Available commands:") | |
| print(" - 'analyze the data' - Start data analysis") | |
| print(" - 'yes, proceed' - Approve analysis/plans") | |
| print(" - 'RandomForest' or 'LogisticRegression' - Select model") | |
| print(" - 'evaluate this model' - Start evaluation") | |
| print(" - 'quit' - Exit chat") | |
| while True: | |
| message = input("\nπ€ Your message: ").strip() | |
| if message.lower() == 'quit': | |
| break | |
| if not message: | |
| continue | |
| success = self.test_chat_workflow([message]) | |
| if not success: | |
| print("β Chat failed. Try again or type 'quit' to exit.") | |
| def run_automated_test(self): | |
| """Run complete automated test suite.""" | |
| self.logger.info("π€ Starting automated full test suite...") | |
| self.reporter.start_testing() | |
| test_steps = [ | |
| ("API Health Check", self.check_api_health), | |
| ("Project Creation", self.test_project_creation), | |
| ("Data Upload", lambda: self.test_data_upload("messy")), | |
| ("File Operations", self.test_file_operations), | |
| ("ML Workflow - Analysis", lambda: self.test_chat_workflow(["analyze the data"])), | |
| ("ML Workflow - Approve Analysis", lambda: self.test_chat_workflow(["yes, proceed"])), | |
| ("ML Workflow - Approve Preprocessing", lambda: self.test_chat_workflow(["yes, execute the plan"])), | |
| ("ML Workflow - Start Training", lambda: self.test_chat_workflow(["yes, let's train a model"])), | |
| ("ML Workflow - Select Model", lambda: self.test_chat_workflow(["RandomForest"])), | |
| ("ML Workflow - Evaluation", lambda: self.test_chat_workflow(["evaluate this model"])), | |
| ("Project State Tracking", self.test_project_state_tracking), | |
| ("Project Cleanup", self.test_project_cleanup) | |
| ] | |
| for step_name, test_func in test_steps: | |
| start_time = time.time() | |
| self.logger.info(f"π§ͺ Running: {step_name}") | |
| try: | |
| success = test_func() | |
| duration = time.time() - start_time | |
| if success: | |
| self.logger.info(f"β {step_name} completed in {duration:.2f}s") | |
| self.reporter.add_test_result(step_name, "PASS", duration) | |
| else: | |
| self.logger.error(f"β {step_name} failed after {duration:.2f}s") | |
| self.reporter.add_test_result(step_name, "FAIL", duration) | |
| # Add delay between steps | |
| time.sleep(1) | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| self.logger.error(f"π₯ {step_name} crashed: {e}") | |
| self.reporter.add_test_result(step_name, "FAIL", duration, {"error": str(e)}) | |
| self.reporter.end_testing() | |
| report = self.reporter.generate_report() | |
| print("\n" + "="*60) | |
| print("AUTOMATED TEST RESULTS") | |
| print("="*60) | |
| print(f"Total Tests: {report['summary']['total_tests']}") | |
| print(f"Passed: {report['summary']['passed']} β ") | |
| print(f"Failed: {report['summary']['failed']} β") | |
| print(f"Success Rate: {report['summary']['success_rate']:.1f}%") | |
| print(f"Total Duration: {report['summary']['total_duration']:.2f}s") | |
| if report['recommendations']: | |
| print("\nπ RECOMMENDATIONS:") | |
| for rec in report['recommendations']: | |
| print(f" - {rec}") | |
| def show_test_summary(self): | |
| """Show test summary and statistics.""" | |
| print("\n" + "="*60) | |
| print("TEST SESSION SUMMARY") | |
| print("="*60) | |
| if hasattr(self, 'reporter') and self.reporter.test_results: | |
| report = self.reporter.generate_report() | |
| print(f"π Tests Run: {report['summary']['total_tests']}") | |
| print(f"β Passed: {report['summary']['passed']}") | |
| print(f"β Failed: {report['summary']['failed']}") | |
| print(f"βοΈ Skipped: {report['summary']['skipped']}") | |
| print(f"π― Success Rate: {report['summary']['success_rate']:.1f}%") | |
| if report['detailed_results']: | |
| print("\nπ DETAILED RESULTS:") | |
| for result in report['detailed_results']: | |
| status_emoji = {"PASS": "β ", "FAIL": "β", "SKIP": "βοΈ"} | |
| emoji = status_emoji.get(result['status'], "β") | |
| print(f" {emoji} {result['test_name']} ({result['duration']:.2f}s)") | |
| else: | |
| print("No test results available.") | |
| if self.current_project_id: | |
| print(f"\nπ§ Active Project: {self.current_project_id}") | |
| else: | |
| print("\nπ§ No active project") | |
| def cleanup_and_exit(self): | |
| """Perform cleanup before exiting.""" | |
| self.logger.info("π§Ή Performing cleanup...") | |
| if self.current_project_id: | |
| self.test_project_cleanup() | |
| self.file_manager.cleanup() | |
| self.logger.info("β Cleanup completed") | |
| def main(): | |
| """Main function to run the E2E tester.""" | |
| parser = argparse.ArgumentParser(description="SIRUS ML Module E2E Tester") | |
| parser.add_argument("--mode", choices=["interactive", "automated"], | |
| default="interactive", help="Testing mode") | |
| parser.add_argument("--base-url", default="http://localhost:8001", | |
| help="Base URL of ML module API") | |
| parser.add_argument("--user-id", default="e2e_test_user", | |
| help="User ID for testing") | |
| args = parser.parse_args() | |
| tester = MLModuleE2ETester(args.base_url, args.user_id) | |
| try: | |
| if args.mode == "automated": | |
| tester.run_automated_test() | |
| else: | |
| tester.run_interactive_mode() | |
| except KeyboardInterrupt: | |
| print("\n\nβ οΈ Testing interrupted by user") | |
| finally: | |
| tester.cleanup_and_exit() | |
| if __name__ == "__main__": | |
| main() |