"""Markdown validation utility for agent response quality assurance. This module validates that agent responses follow the structured markdown format with proper sections, tables, bullet points, and numbered summaries. Feature 004 - User Story 3: Enhanced Agent Dialog Content Quality """ import logging import re from dataclasses import dataclass from typing import List, Optional logger = logging.getLogger(__name__) @dataclass class ValidationResult: """Result of markdown validation.""" is_valid: bool score: float # 0-100 percentage score issues: list[str] warnings: list[str] sections_found: list[str] has_tables: bool has_bullets: bool has_numbered_list: bool has_conclusion: bool class MarkdownValidator: """ Validates agent response markdown structure and content quality. Checks for: - Structured sections with ## headings - Data tables with markdown table syntax (|) - Bullet-pointed insights (- or *) - Numbered summary (1., 2., 3.) - Conclusion section with recommendation """ def __init__(self, strict_mode: bool = False): """ Initialize the markdown validator. Args: strict_mode: If True, all checks must pass for validation to succeed. If False, validation passes with warnings for minor issues. """ self.strict_mode = strict_mode def validate( self, content: str, agent_type: str | None = None ) -> ValidationResult: """ Validate markdown content structure and format. Args: content: Markdown content to validate agent_type: Optional agent type for specialized validation (fundamental, technical, manager, research) Returns: ValidationResult with validation status and details """ if not content or not content.strip(): return ValidationResult( is_valid=False, score=0.0, issues=["Empty or whitespace-only content"], warnings=[], sections_found=[], has_tables=False, has_bullets=False, has_numbered_list=False, has_conclusion=False, ) issues = [] warnings = [] score_components = [] # Check structured sections sections_result = self._check_structured_sections(content) sections_found = sections_result["sections"] if sections_result["has_sections"]: score_components.append(25.0) else: issues.append("Missing structured sections with ## headings") if len(sections_found) < 3: warnings.append( f"Only {len(sections_found)} sections found. Expected at least 3-4 major sections." ) # Check data tables tables_result = self._check_data_tables(content) has_tables = tables_result["has_tables"] if has_tables: score_components.append(25.0) if tables_result["table_count"] < 2: warnings.append( f"Only {tables_result['table_count']} table(s) found. Multiple tables recommended for comprehensive analysis." ) else: if agent_type in ["fundamental", "technical", "manager"]: issues.append( "No markdown tables found. Tables required for data presentation." ) else: warnings.append( "No markdown tables found. Consider using tables for structured data." ) # Check bullet insights bullets_result = self._check_bullet_insights(content) has_bullets = bullets_result["has_bullets"] if has_bullets: score_components.append(20.0) if bullets_result["bullet_count"] < 3: warnings.append( f"Only {bullets_result['bullet_count']} bullet point(s) found. More insights recommended." ) else: warnings.append( "No bullet-pointed insights found. Bullet points improve readability." ) # Check numbered summary numbered_result = self._check_numbered_summary(content) has_numbered_list = numbered_result["has_numbered_list"] if has_numbered_list: score_components.append(15.0) if numbered_result["item_count"] < 3: warnings.append( f"Only {numbered_result['item_count']} numbered item(s) in summary. 3-5 items recommended." ) else: warnings.append( "No numbered summary list found. Numbered summaries aid comprehension." ) # Check conclusion conclusion_result = self._check_conclusion(content) has_conclusion = conclusion_result["has_conclusion"] if has_conclusion: score_components.append(15.0) else: issues.append("Missing conclusion section with clear recommendation.") # Calculate overall score score = sum(score_components) # Determine if valid is_valid = True if self.strict_mode: is_valid = len(issues) == 0 else: # Non-strict mode: valid if score >= 60% and no critical issues is_valid = score >= 60.0 and len(issues) <= 2 logger.info( f"Markdown validation complete: score={score:.1f}%, " f"sections={len(sections_found)}, tables={has_tables}, " f"bullets={has_bullets}, numbered={has_numbered_list}, " f"conclusion={has_conclusion}, issues={len(issues)}, warnings={len(warnings)}" ) return ValidationResult( is_valid=is_valid, score=score, issues=issues, warnings=warnings, sections_found=sections_found, has_tables=has_tables, has_bullets=has_bullets, has_numbered_list=has_numbered_list, has_conclusion=has_conclusion, ) def _check_structured_sections(self, content: str) -> dict: """ Check for structured sections with ## markdown headings. Args: content: Markdown content Returns: Dict with has_sections bool and list of section titles """ # Match ## headings (level 2) heading_pattern = r"^##\s+(.+)$" matches = re.findall(heading_pattern, content, re.MULTILINE) sections = [match.strip() for match in matches] return { "has_sections": len(sections) >= 2, "sections": sections, "section_count": len(sections), } def _check_data_tables(self, content: str) -> dict: """ Check for markdown tables with pipes (|). Args: content: Markdown content Returns: Dict with has_tables bool and table count """ # Match markdown table rows (must have at least 2 pipes per line) # Table header: | Col1 | Col2 | Col3 | # Table divider: |------|------|------| # Table row: | Val1 | Val2 | Val3 | # Find table dividers (|---|---|) divider_pattern = r"^\|[\s\-:]+\|[\s\-:|]+$" divider_matches = re.findall(divider_pattern, content, re.MULTILINE) # Find table rows with actual data (not just dashes) row_pattern = r"^\|[^\-\n][^\n]*\|[^\n]*$" row_matches = re.findall(row_pattern, content, re.MULTILINE) # Consider it a valid table if we have both dividers and data rows has_tables = len(divider_matches) >= 1 and len(row_matches) >= 2 return { "has_tables": has_tables, "table_count": len(divider_matches), "row_count": len(row_matches), } def _check_bullet_insights(self, content: str) -> dict: """ Check for bullet-pointed insights (- or *). Args: content: Markdown content Returns: Dict with has_bullets bool and bullet count """ # Match bullet points (- or * at start of line, followed by content) bullet_pattern = r"^[\-\*]\s+(.+)$" matches = re.findall(bullet_pattern, content, re.MULTILINE) return { "has_bullets": len(matches) >= 2, "bullet_count": len(matches), } def _check_numbered_summary(self, content: str) -> dict: """ Check for numbered summary list (1., 2., 3.). Args: content: Markdown content Returns: Dict with has_numbered_list bool and item count """ # Match numbered list items (1., 2., 3., etc.) numbered_pattern = r"^\d+\.\s+(.+)$" matches = re.findall(numbered_pattern, content, re.MULTILINE) # Check for sequential numbering (1, 2, 3, ...) has_sequence = False if len(matches) >= 3: # Extract numbers from the full content to verify sequence number_pattern = r"^(\d+)\.\s+" numbers = [ int(m.group(1)) for m in re.finditer(number_pattern, content, re.MULTILINE) ] # Check if we have at least 3 consecutive numbers starting from 1 has_sequence = len(numbers) >= 3 and numbers[0] == 1 and numbers[1] == 2 return { "has_numbered_list": has_sequence, "item_count": len(matches), } def _check_conclusion(self, content: str) -> dict: """ Check for conclusion section with recommendation. Args: content: Markdown content Returns: Dict with has_conclusion bool and details """ # Check for conclusion-related section headings conclusion_keywords = [ "conclusion", "recommendation", "final decision", "summary", "investment decision", "trading implication", ] content_lower = content.lower() has_conclusion_heading = any( keyword in content_lower for keyword in conclusion_keywords ) # Check for decision-related terms in the content decision_keywords = [ "buy", "sell", "hold", "bullish", "bearish", "neutral", "recommend", "advise", "suggest", ] has_decision_language = any( keyword in content_lower for keyword in decision_keywords ) return { "has_conclusion": has_conclusion_heading and has_decision_language, "has_conclusion_heading": has_conclusion_heading, "has_decision_language": has_decision_language, } def validate_agent_response( content: str, agent_name: str, strict: bool = False ) -> ValidationResult: """ Convenience function to validate agent response. Args: content: Agent response markdown content agent_name: Name of the agent (for specialized validation) strict: Whether to use strict validation mode Returns: ValidationResult with validation details """ # Determine agent type from name agent_type = None if "fundamental" in agent_name.lower(): agent_type = "fundamental" elif any( keyword in agent_name.lower() for keyword in ["indicator", "pattern", "trend", "technical"] ): agent_type = "technical" elif any( keyword in agent_name.lower() for keyword in ["portfolio", "risk", "manager"] ): agent_type = "manager" elif "research" in agent_name.lower(): agent_type = "research" validator = MarkdownValidator(strict_mode=strict) result = validator.validate(content, agent_type=agent_type) # Log results if result.is_valid: logger.info( f"✓ {agent_name} response validated successfully (score: {result.score:.1f}%)" ) else: logger.warning( f"✗ {agent_name} response validation failed (score: {result.score:.1f}%)" ) for issue in result.issues: logger.warning(f" - Issue: {issue}") for warning in result.warnings: logger.debug(f" - Warning: {warning}") return result