|
|
|
|
|
""" |
|
|
Test Coverage Verification Script |
|
|
|
|
|
This script verifies that critical user flows are adequately tested without requiring |
|
|
high overall coverage percentages. It focuses on: |
|
|
|
|
|
1. Document Upload & Processing |
|
|
2. Report Generation (Overview & Strategic) |
|
|
3. Checklist Matching |
|
|
4. Q&A Functionality |
|
|
5. Export Functionality |
|
|
|
|
|
Usage: |
|
|
python verify_test_coverage.py [--verbose] [--fail-on-missing] |
|
|
|
|
|
Options: |
|
|
--verbose: Show detailed test results |
|
|
--fail-on-missing: Exit with error if critical tests are missing |
|
|
""" |
|
|
|
|
|
import sys |
|
|
import os |
|
|
import subprocess |
|
|
import json |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Tuple |
|
|
from dataclasses import dataclass, asdict |
|
|
import argparse |
|
|
|
|
|
|
|
|
RED='\033[0;31m' |
|
|
GREEN='\033[0;32m' |
|
|
YELLOW='\033[1;33m' |
|
|
BLUE='\033[0;34m' |
|
|
NC='\033[0m' |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
|
|
|
from app.core.logging import logger |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class TestResult: |
|
|
"""Result of a test verification""" |
|
|
flow_name: str |
|
|
tests_found: int |
|
|
tests_passed: int |
|
|
critical_functions_covered: int |
|
|
total_critical_functions: int |
|
|
status: str |
|
|
details: List[str] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CoverageReport: |
|
|
"""Overall coverage report""" |
|
|
total_flows: int |
|
|
passed_flows: int |
|
|
failed_flows: int |
|
|
missing_test_flows: int |
|
|
overall_status: str |
|
|
flow_results: List[TestResult] |
|
|
|
|
|
|
|
|
class CriticalFlowVerifier: |
|
|
"""Verifies test coverage for critical user flows""" |
|
|
|
|
|
def __init__(self, verbose: bool = False): |
|
|
self.verbose = verbose |
|
|
self.project_root = Path(__file__).parent |
|
|
|
|
|
|
|
|
self.critical_flows = { |
|
|
"document_processing": { |
|
|
"description": "Document upload, processing, chunking, and indexing", |
|
|
"key_modules": [ |
|
|
"app/handlers/document_handler.py", |
|
|
"app/core/document_processor.py", |
|
|
"app/core/content_ingestion.py" |
|
|
], |
|
|
"key_functions": [ |
|
|
"DocumentHandler.process_data_room_fast", |
|
|
"DocumentProcessor.process_documents", |
|
|
"DocumentProcessor.create_chunks", |
|
|
"DocumentProcessor.build_vector_store" |
|
|
], |
|
|
"test_files": [ |
|
|
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_complete_document_processing_pipeline", |
|
|
"tests/integration/test_core_services.py::TestCoreServices::test_document_processor_initialization", |
|
|
"tests/integration/test_workflows.py::TestUserWorkflows::test_overview_workflow_end_to_end" |
|
|
] |
|
|
}, |
|
|
"report_generation": { |
|
|
"description": "Overview and strategic report generation", |
|
|
"key_modules": [ |
|
|
"app/handlers/ai_handler.py", |
|
|
"app/services/ai_service.py", |
|
|
"app/ui/tabs/overview_tab.py", |
|
|
"app/ui/tabs/strategic_tab.py" |
|
|
], |
|
|
"key_functions": [ |
|
|
"AIHandler.generate_report", |
|
|
"AIService.analyze_documents", |
|
|
"OverviewTab.render", |
|
|
"StrategicTab.render" |
|
|
], |
|
|
"test_files": [ |
|
|
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_end_to_end_report_generation", |
|
|
"tests/integration/test_workflows.py::TestUserWorkflows::test_overview_workflow_end_to_end", |
|
|
"tests/integration/test_workflows.py::TestUserWorkflows::test_strategic_workflow_end_to_end" |
|
|
] |
|
|
}, |
|
|
"checklist_matching": { |
|
|
"description": "Due diligence checklist parsing and matching", |
|
|
"key_modules": [ |
|
|
"app/core/services.py", |
|
|
"app/ui/tabs/checklist_tab.py", |
|
|
"app/ui/tabs/questions_tab.py" |
|
|
], |
|
|
"key_functions": [ |
|
|
"parse_checklist", |
|
|
"search_and_analyze", |
|
|
"ChecklistTab.render", |
|
|
"QuestionsTab.render" |
|
|
], |
|
|
"test_files": [ |
|
|
"tests/integration/test_core_services.py::TestCoreServices::test_checklist_parsing", |
|
|
"tests/integration/test_workflows.py::TestUserWorkflows::test_questions_workflow_end_to_end" |
|
|
] |
|
|
}, |
|
|
"qa_functionality": { |
|
|
"description": "Document search and AI-powered Q&A", |
|
|
"key_modules": [ |
|
|
"app/core/services.py", |
|
|
"app/ui/tabs/qa_tab.py", |
|
|
"app/handlers/ai_handler.py" |
|
|
], |
|
|
"key_functions": [ |
|
|
"search_documents", |
|
|
"AIHandler.answer_question", |
|
|
"QATab.render" |
|
|
], |
|
|
"test_files": [ |
|
|
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_full_qa_workflow", |
|
|
"tests/integration/test_workflows.py::TestUserWorkflows::test_qa_workflow_end_to_end", |
|
|
"tests/integration/test_core_services.py::TestCoreServices::test_search_documents_function" |
|
|
] |
|
|
}, |
|
|
"export_functionality": { |
|
|
"description": "Report export capabilities", |
|
|
"key_modules": [ |
|
|
"app/handlers/export_handler.py" |
|
|
], |
|
|
"key_functions": [ |
|
|
"ExportHandler.export_overview_report", |
|
|
"ExportHandler.export_strategic_report" |
|
|
], |
|
|
"test_files": [ |
|
|
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_end_to_end_report_generation", |
|
|
"tests/integration/test_workflows.py::TestUserWorkflows::test_export_functionality" |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
def run_tests_for_flow(self, flow_name: str, flow_config: Dict) -> TestResult: |
|
|
"""Run tests for a specific flow and verify coverage""" |
|
|
logger.info(f"π Verifying test coverage for {flow_name}...") |
|
|
|
|
|
details = [] |
|
|
tests_passed = 0 |
|
|
tests_found = len(flow_config["test_files"]) |
|
|
|
|
|
|
|
|
for test_file in flow_config["test_files"]: |
|
|
try: |
|
|
|
|
|
if "::" in test_file: |
|
|
file_path, test_class, test_method = test_file.split("::") |
|
|
else: |
|
|
file_path = test_file |
|
|
test_class = "" |
|
|
test_method = "" |
|
|
|
|
|
|
|
|
test_file_path = self.project_root / file_path |
|
|
|
|
|
if test_file_path.exists(): |
|
|
|
|
|
cmd = [ |
|
|
sys.executable, "-m", "pytest", |
|
|
str(test_file_path), |
|
|
"-v" if self.verbose else "-q", |
|
|
"--tb=no", |
|
|
"-x" |
|
|
] |
|
|
|
|
|
if "::" in test_file: |
|
|
cmd.append(f"--collect-only") |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, cwd=self.project_root) |
|
|
if result.returncode == 0: |
|
|
tests_passed += 1 |
|
|
details.append(f"β
Test found: {test_method}") |
|
|
else: |
|
|
details.append(f"β Test not found: {test_method}") |
|
|
else: |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, cwd=self.project_root) |
|
|
if result.returncode == 0: |
|
|
tests_passed += 1 |
|
|
details.append(f"β
Test file passed: {file_path}") |
|
|
else: |
|
|
details.append(f"β Test file failed: {file_path}") |
|
|
else: |
|
|
details.append(f"β Test file not found: {file_path}") |
|
|
|
|
|
except Exception as e: |
|
|
details.append(f"β Error running test {test_file}: {str(e)}") |
|
|
|
|
|
|
|
|
critical_functions_covered = 0 |
|
|
total_critical_functions = len(flow_config["key_functions"]) |
|
|
|
|
|
for func in flow_config["key_functions"]: |
|
|
|
|
|
func_name = func.split(".")[-1] |
|
|
found_in_tests = False |
|
|
|
|
|
for test_file in flow_config["test_files"]: |
|
|
try: |
|
|
test_file_path = self.project_root / test_file.split("::")[0] |
|
|
if test_file_path.exists(): |
|
|
with open(test_file_path, 'r') as f: |
|
|
content = f.read() |
|
|
if func_name in content: |
|
|
found_in_tests = True |
|
|
break |
|
|
except: |
|
|
pass |
|
|
|
|
|
if found_in_tests: |
|
|
critical_functions_covered += 1 |
|
|
details.append(f"β
Function covered: {func}") |
|
|
else: |
|
|
details.append(f"β Function not found in tests: {func}") |
|
|
|
|
|
|
|
|
if tests_passed == 0: |
|
|
status = "MISSING_TESTS" |
|
|
elif tests_passed >= len(flow_config["test_files"]) * 0.7: |
|
|
status = "PASS" |
|
|
else: |
|
|
status = "FAIL" |
|
|
|
|
|
result = TestResult( |
|
|
flow_name=flow_name, |
|
|
tests_found=tests_found, |
|
|
tests_passed=tests_passed, |
|
|
critical_functions_covered=critical_functions_covered, |
|
|
total_critical_functions=total_critical_functions, |
|
|
status=status, |
|
|
details=details |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
def run_critical_tests(self) -> CoverageReport: |
|
|
"""Run all critical flow tests and generate report""" |
|
|
logger.info("π Starting critical user flows test verification...") |
|
|
|
|
|
flow_results = [] |
|
|
|
|
|
for flow_name, flow_config in self.critical_flows.items(): |
|
|
result = self.run_tests_for_flow(flow_name, flow_config) |
|
|
flow_results.append(result) |
|
|
|
|
|
if self.verbose: |
|
|
print(f"\nπ {flow_name.upper()} FLOW RESULTS:") |
|
|
print(f" Status: {result.status}") |
|
|
print(f" Tests: {result.tests_passed}/{result.tests_found} passed") |
|
|
print(f" Functions: {result.critical_functions_covered}/{result.total_critical_functions} covered") |
|
|
for detail in result.details: |
|
|
print(f" {detail}") |
|
|
|
|
|
|
|
|
total_flows = len(flow_results) |
|
|
passed_flows = sum(1 for r in flow_results if r.status == "PASS") |
|
|
failed_flows = sum(1 for r in flow_results if r.status == "FAIL") |
|
|
missing_test_flows = sum(1 for r in flow_results if r.status == "MISSING_TESTS") |
|
|
|
|
|
|
|
|
if missing_test_flows == 0 and passed_flows >= total_flows * 0.8: |
|
|
overall_status = "PASS" |
|
|
elif missing_test_flows > 0: |
|
|
overall_status = "MISSING_TESTS" |
|
|
else: |
|
|
overall_status = "FAIL" |
|
|
|
|
|
report = CoverageReport( |
|
|
total_flows=total_flows, |
|
|
passed_flows=passed_flows, |
|
|
failed_flows=failed_flows, |
|
|
missing_test_flows=missing_test_flows, |
|
|
overall_status=overall_status, |
|
|
flow_results=flow_results |
|
|
) |
|
|
|
|
|
return report |
|
|
|
|
|
def generate_report_summary(self, report: CoverageReport) -> str: |
|
|
"""Generate a human-readable report summary""" |
|
|
summary = [] |
|
|
summary.append("π― CRITICAL USER FLOWS TEST COVERAGE REPORT") |
|
|
summary.append("=" * 50) |
|
|
summary.append("") |
|
|
|
|
|
summary.append(f"Overall Status: {'β
PASS' if report.overall_status == 'PASS' else 'β ' + report.overall_status}") |
|
|
summary.append("") |
|
|
summary.append(f"Flows Tested: {report.total_flows}") |
|
|
summary.append(f"Flows Passed: {report.passed_flows}") |
|
|
summary.append(f"Flows Failed: {report.failed_flows}") |
|
|
summary.append(f"Flows Missing Tests: {report.missing_test_flows}") |
|
|
summary.append("") |
|
|
|
|
|
for result in report.flow_results: |
|
|
summary.append(f"π {result.flow_name.upper()}") |
|
|
summary.append(f" Status: {result.status}") |
|
|
summary.append(f" Tests: {result.tests_passed}/{result.tests_found}") |
|
|
summary.append(f" Functions: {result.critical_functions_covered}/{result.total_critical_functions}") |
|
|
|
|
|
if self.verbose: |
|
|
summary.append(" Details:") |
|
|
for detail in result.details: |
|
|
summary.append(f" {detail}") |
|
|
summary.append("") |
|
|
|
|
|
return "\n".join(summary) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main entry point""" |
|
|
|
|
|
print(f"{BLUE}π Running Critical User Flows Test Coverage Verification{NC}") |
|
|
print("") |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Verify critical user flows test coverage") |
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed test results") |
|
|
parser.add_argument("--fail-on-missing", action="store_true", help="Exit with error if critical tests are missing") |
|
|
parser.add_argument("--json", action="store_true", help="Output results as JSON") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
verifier = CriticalFlowVerifier(verbose=args.verbose) |
|
|
|
|
|
|
|
|
try: |
|
|
report = verifier.run_critical_tests() |
|
|
|
|
|
|
|
|
if args.json: |
|
|
|
|
|
json_report = asdict(report) |
|
|
print(json.dumps(json_report, indent=2)) |
|
|
else: |
|
|
summary = verifier.generate_report_summary(report) |
|
|
print(summary) |
|
|
|
|
|
|
|
|
if args.fail_on_missing and report.missing_test_flows > 0: |
|
|
print(f"{RED}β Critical tests are missing!{NC}") |
|
|
sys.exit(1) |
|
|
elif report.overall_status != "PASS": |
|
|
print(f"{RED}β Test coverage verification failed!{NC}") |
|
|
sys.exit(1) |
|
|
else: |
|
|
print(f"{GREEN}β
Test coverage verification completed successfully!{NC}") |
|
|
sys.exit(0) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"{RED}β Error during test verification: {str(e)}{NC}") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|