dd-poc / scripts /verify_test_coverage.py
Juan Salas
Refactored code
12f0afd
#!/usr/bin/env python3
"""
Test Coverage Verification Script
This script verifies that critical user flows are adequately tested without requiring
high overall coverage percentages. It focuses on:
1. Document Upload & Processing
2. Report Generation (Overview & Strategic)
3. Checklist Matching
4. Q&A Functionality
5. Export Functionality
Usage:
python verify_test_coverage.py [--verbose] [--fail-on-missing]
Options:
--verbose: Show detailed test results
--fail-on-missing: Exit with error if critical tests are missing
"""
import sys
import os
import subprocess
import json
from pathlib import Path
from typing import Dict, List, Any, Tuple
from dataclasses import dataclass, asdict
import argparse
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from app.core.logging import logger
@dataclass
class TestResult:
"""Result of a test verification"""
flow_name: str
tests_found: int
tests_passed: int
critical_functions_covered: int
total_critical_functions: int
status: str # 'PASS', 'FAIL', 'MISSING_TESTS'
details: List[str]
@dataclass
class CoverageReport:
"""Overall coverage report"""
total_flows: int
passed_flows: int
failed_flows: int
missing_test_flows: int
overall_status: str
flow_results: List[TestResult]
class CriticalFlowVerifier:
"""Verifies test coverage for critical user flows"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.project_root = Path(__file__).parent
# Define critical user flows and their key components
self.critical_flows = {
"document_processing": {
"description": "Document upload, processing, chunking, and indexing",
"key_modules": [
"app/handlers/document_handler.py",
"app/core/document_processor.py",
"app/core/content_ingestion.py"
],
"key_functions": [
"DocumentHandler.process_data_room_fast",
"DocumentProcessor.process_documents",
"DocumentProcessor.create_chunks",
"DocumentProcessor.build_vector_store"
],
"test_files": [
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_complete_document_processing_pipeline",
"tests/integration/test_core_services.py::TestCoreServices::test_document_processor_initialization",
"tests/integration/test_workflows.py::TestUserWorkflows::test_overview_workflow_end_to_end"
]
},
"report_generation": {
"description": "Overview and strategic report generation",
"key_modules": [
"app/handlers/ai_handler.py",
"app/services/ai_service.py",
"app/ui/tabs/overview_tab.py",
"app/ui/tabs/strategic_tab.py"
],
"key_functions": [
"AIHandler.generate_report",
"AIService.analyze_documents",
"OverviewTab.render",
"StrategicTab.render"
],
"test_files": [
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_end_to_end_report_generation",
"tests/integration/test_workflows.py::TestUserWorkflows::test_overview_workflow_end_to_end",
"tests/integration/test_workflows.py::TestUserWorkflows::test_strategic_workflow_end_to_end"
]
},
"checklist_matching": {
"description": "Due diligence checklist parsing and matching",
"key_modules": [
"app/core/services.py",
"app/ui/tabs/checklist_tab.py",
"app/ui/tabs/questions_tab.py"
],
"key_functions": [
"parse_checklist",
"search_and_analyze",
"ChecklistTab.render",
"QuestionsTab.render"
],
"test_files": [
"tests/integration/test_core_services.py::TestCoreServices::test_checklist_parsing",
"tests/integration/test_workflows.py::TestUserWorkflows::test_questions_workflow_end_to_end"
]
},
"qa_functionality": {
"description": "Document search and AI-powered Q&A",
"key_modules": [
"app/core/services.py",
"app/ui/tabs/qa_tab.py",
"app/handlers/ai_handler.py"
],
"key_functions": [
"search_documents",
"AIHandler.answer_question",
"QATab.render"
],
"test_files": [
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_full_qa_workflow",
"tests/integration/test_workflows.py::TestUserWorkflows::test_qa_workflow_end_to_end",
"tests/integration/test_core_services.py::TestCoreServices::test_search_documents_function"
]
},
"export_functionality": {
"description": "Report export capabilities",
"key_modules": [
"app/handlers/export_handler.py"
],
"key_functions": [
"ExportHandler.export_overview_report",
"ExportHandler.export_strategic_report"
],
"test_files": [
"tests/integration/test_critical_workflows.py::TestCriticalWorkflows::test_end_to_end_report_generation",
"tests/integration/test_workflows.py::TestUserWorkflows::test_export_functionality"
]
}
}
def run_tests_for_flow(self, flow_name: str, flow_config: Dict) -> TestResult:
"""Run tests for a specific flow and verify coverage"""
logger.info(f"πŸ” Verifying test coverage for {flow_name}...")
details = []
tests_passed = 0
tests_found = len(flow_config["test_files"])
# Run each test and collect results
for test_file in flow_config["test_files"]:
try:
# Extract test details from the test file path
if "::" in test_file:
file_path, test_class, test_method = test_file.split("::")
else:
file_path = test_file
test_class = ""
test_method = ""
# For integration tests, run the specific test file
test_file_path = self.project_root / file_path
if test_file_path.exists():
# Run the test and capture output
cmd = [
sys.executable, "-m", "pytest",
str(test_file_path),
"-v" if self.verbose else "-q",
"--tb=no",
"-x" # Stop on first failure
]
if "::" in test_file:
cmd.append(f"--collect-only")
# For now, just check if the test exists
result = subprocess.run(cmd, capture_output=True, text=True, cwd=self.project_root)
if result.returncode == 0:
tests_passed += 1
details.append(f"βœ… Test found: {test_method}")
else:
details.append(f"❌ Test not found: {test_method}")
else:
# Run the entire test file
result = subprocess.run(cmd, capture_output=True, text=True, cwd=self.project_root)
if result.returncode == 0:
tests_passed += 1
details.append(f"βœ… Test file passed: {file_path}")
else:
details.append(f"❌ Test file failed: {file_path}")
else:
details.append(f"❌ Test file not found: {file_path}")
except Exception as e:
details.append(f"❌ Error running test {test_file}: {str(e)}")
# Check if key functions are tested (basic check)
critical_functions_covered = 0
total_critical_functions = len(flow_config["key_functions"])
for func in flow_config["key_functions"]:
# Simple check: look for function references in test files
func_name = func.split(".")[-1] # Get the function name
found_in_tests = False
for test_file in flow_config["test_files"]:
try:
test_file_path = self.project_root / test_file.split("::")[0]
if test_file_path.exists():
with open(test_file_path, 'r') as f:
content = f.read()
if func_name in content:
found_in_tests = True
break
except:
pass
if found_in_tests:
critical_functions_covered += 1
details.append(f"βœ… Function covered: {func}")
else:
details.append(f"❌ Function not found in tests: {func}")
# Determine status
if tests_passed == 0:
status = "MISSING_TESTS"
elif tests_passed >= len(flow_config["test_files"]) * 0.7: # 70% of tests pass
status = "PASS"
else:
status = "FAIL"
result = TestResult(
flow_name=flow_name,
tests_found=tests_found,
tests_passed=tests_passed,
critical_functions_covered=critical_functions_covered,
total_critical_functions=total_critical_functions,
status=status,
details=details
)
return result
def run_critical_tests(self) -> CoverageReport:
"""Run all critical flow tests and generate report"""
logger.info("πŸš€ Starting critical user flows test verification...")
flow_results = []
for flow_name, flow_config in self.critical_flows.items():
result = self.run_tests_for_flow(flow_name, flow_config)
flow_results.append(result)
if self.verbose:
print(f"\nπŸ“Š {flow_name.upper()} FLOW RESULTS:")
print(f" Status: {result.status}")
print(f" Tests: {result.tests_passed}/{result.tests_found} passed")
print(f" Functions: {result.critical_functions_covered}/{result.total_critical_functions} covered")
for detail in result.details:
print(f" {detail}")
# Calculate overall results
total_flows = len(flow_results)
passed_flows = sum(1 for r in flow_results if r.status == "PASS")
failed_flows = sum(1 for r in flow_results if r.status == "FAIL")
missing_test_flows = sum(1 for r in flow_results if r.status == "MISSING_TESTS")
# Overall status: PASS if >= 80% of flows pass and no missing tests
if missing_test_flows == 0 and passed_flows >= total_flows * 0.8:
overall_status = "PASS"
elif missing_test_flows > 0:
overall_status = "MISSING_TESTS"
else:
overall_status = "FAIL"
report = CoverageReport(
total_flows=total_flows,
passed_flows=passed_flows,
failed_flows=failed_flows,
missing_test_flows=missing_test_flows,
overall_status=overall_status,
flow_results=flow_results
)
return report
def generate_report_summary(self, report: CoverageReport) -> str:
"""Generate a human-readable report summary"""
summary = []
summary.append("🎯 CRITICAL USER FLOWS TEST COVERAGE REPORT")
summary.append("=" * 50)
summary.append("")
summary.append(f"Overall Status: {'βœ… PASS' if report.overall_status == 'PASS' else '❌ ' + report.overall_status}")
summary.append("")
summary.append(f"Flows Tested: {report.total_flows}")
summary.append(f"Flows Passed: {report.passed_flows}")
summary.append(f"Flows Failed: {report.failed_flows}")
summary.append(f"Flows Missing Tests: {report.missing_test_flows}")
summary.append("")
for result in report.flow_results:
summary.append(f"πŸ” {result.flow_name.upper()}")
summary.append(f" Status: {result.status}")
summary.append(f" Tests: {result.tests_passed}/{result.tests_found}")
summary.append(f" Functions: {result.critical_functions_covered}/{result.total_critical_functions}")
if self.verbose:
summary.append(" Details:")
for detail in result.details:
summary.append(f" {detail}")
summary.append("")
return "\n".join(summary)
def main():
"""Main entry point"""
# Print header
print(f"{BLUE}πŸ” Running Critical User Flows Test Coverage Verification{NC}")
print("")
parser = argparse.ArgumentParser(description="Verify critical user flows test coverage")
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed test results")
parser.add_argument("--fail-on-missing", action="store_true", help="Exit with error if critical tests are missing")
parser.add_argument("--json", action="store_true", help="Output results as JSON")
args = parser.parse_args()
# Initialize verifier
verifier = CriticalFlowVerifier(verbose=args.verbose)
# Run verification
try:
report = verifier.run_critical_tests()
# Output results
if args.json:
# Convert dataclasses to dicts for JSON serialization
json_report = asdict(report)
print(json.dumps(json_report, indent=2))
else:
summary = verifier.generate_report_summary(report)
print(summary)
# Determine exit code
if args.fail_on_missing and report.missing_test_flows > 0:
print(f"{RED}❌ Critical tests are missing!{NC}")
sys.exit(1)
elif report.overall_status != "PASS":
print(f"{RED}❌ Test coverage verification failed!{NC}")
sys.exit(1)
else:
print(f"{GREEN}βœ… Test coverage verification completed successfully!{NC}")
sys.exit(0)
except Exception as e:
print(f"{RED}❌ Error during test verification: {str(e)}{NC}")
sys.exit(1)
if __name__ == "__main__":
main()