trading-tools / utils /markdown_validator.py
Deploy Bot
Deploy Trading Analysis Platform to HuggingFace Spaces
a1bf219
"""Markdown validation utility for agent response quality assurance.
This module validates that agent responses follow the structured markdown format
with proper sections, tables, bullet points, and numbered summaries.
Feature 004 - User Story 3: Enhanced Agent Dialog Content Quality
"""
import logging
import re
from dataclasses import dataclass
from typing import List, Optional
logger = logging.getLogger(__name__)
@dataclass
class ValidationResult:
"""Result of markdown validation."""
is_valid: bool
score: float # 0-100 percentage score
issues: list[str]
warnings: list[str]
sections_found: list[str]
has_tables: bool
has_bullets: bool
has_numbered_list: bool
has_conclusion: bool
class MarkdownValidator:
"""
Validates agent response markdown structure and content quality.
Checks for:
- Structured sections with ## headings
- Data tables with markdown table syntax (|)
- Bullet-pointed insights (- or *)
- Numbered summary (1., 2., 3.)
- Conclusion section with recommendation
"""
def __init__(self, strict_mode: bool = False):
"""
Initialize the markdown validator.
Args:
strict_mode: If True, all checks must pass for validation to succeed.
If False, validation passes with warnings for minor issues.
"""
self.strict_mode = strict_mode
def validate(
self, content: str, agent_type: str | None = None
) -> ValidationResult:
"""
Validate markdown content structure and format.
Args:
content: Markdown content to validate
agent_type: Optional agent type for specialized validation
(fundamental, technical, manager, research)
Returns:
ValidationResult with validation status and details
"""
if not content or not content.strip():
return ValidationResult(
is_valid=False,
score=0.0,
issues=["Empty or whitespace-only content"],
warnings=[],
sections_found=[],
has_tables=False,
has_bullets=False,
has_numbered_list=False,
has_conclusion=False,
)
issues = []
warnings = []
score_components = []
# Check structured sections
sections_result = self._check_structured_sections(content)
sections_found = sections_result["sections"]
if sections_result["has_sections"]:
score_components.append(25.0)
else:
issues.append("Missing structured sections with ## headings")
if len(sections_found) < 3:
warnings.append(
f"Only {len(sections_found)} sections found. Expected at least 3-4 major sections."
)
# Check data tables
tables_result = self._check_data_tables(content)
has_tables = tables_result["has_tables"]
if has_tables:
score_components.append(25.0)
if tables_result["table_count"] < 2:
warnings.append(
f"Only {tables_result['table_count']} table(s) found. Multiple tables recommended for comprehensive analysis."
)
else:
if agent_type in ["fundamental", "technical", "manager"]:
issues.append(
"No markdown tables found. Tables required for data presentation."
)
else:
warnings.append(
"No markdown tables found. Consider using tables for structured data."
)
# Check bullet insights
bullets_result = self._check_bullet_insights(content)
has_bullets = bullets_result["has_bullets"]
if has_bullets:
score_components.append(20.0)
if bullets_result["bullet_count"] < 3:
warnings.append(
f"Only {bullets_result['bullet_count']} bullet point(s) found. More insights recommended."
)
else:
warnings.append(
"No bullet-pointed insights found. Bullet points improve readability."
)
# Check numbered summary
numbered_result = self._check_numbered_summary(content)
has_numbered_list = numbered_result["has_numbered_list"]
if has_numbered_list:
score_components.append(15.0)
if numbered_result["item_count"] < 3:
warnings.append(
f"Only {numbered_result['item_count']} numbered item(s) in summary. 3-5 items recommended."
)
else:
warnings.append(
"No numbered summary list found. Numbered summaries aid comprehension."
)
# Check conclusion
conclusion_result = self._check_conclusion(content)
has_conclusion = conclusion_result["has_conclusion"]
if has_conclusion:
score_components.append(15.0)
else:
issues.append("Missing conclusion section with clear recommendation.")
# Calculate overall score
score = sum(score_components)
# Determine if valid
is_valid = True
if self.strict_mode:
is_valid = len(issues) == 0
else:
# Non-strict mode: valid if score >= 60% and no critical issues
is_valid = score >= 60.0 and len(issues) <= 2
logger.info(
f"Markdown validation complete: score={score:.1f}%, "
f"sections={len(sections_found)}, tables={has_tables}, "
f"bullets={has_bullets}, numbered={has_numbered_list}, "
f"conclusion={has_conclusion}, issues={len(issues)}, warnings={len(warnings)}"
)
return ValidationResult(
is_valid=is_valid,
score=score,
issues=issues,
warnings=warnings,
sections_found=sections_found,
has_tables=has_tables,
has_bullets=has_bullets,
has_numbered_list=has_numbered_list,
has_conclusion=has_conclusion,
)
def _check_structured_sections(self, content: str) -> dict:
"""
Check for structured sections with ## markdown headings.
Args:
content: Markdown content
Returns:
Dict with has_sections bool and list of section titles
"""
# Match ## headings (level 2)
heading_pattern = r"^##\s+(.+)$"
matches = re.findall(heading_pattern, content, re.MULTILINE)
sections = [match.strip() for match in matches]
return {
"has_sections": len(sections) >= 2,
"sections": sections,
"section_count": len(sections),
}
def _check_data_tables(self, content: str) -> dict:
"""
Check for markdown tables with pipes (|).
Args:
content: Markdown content
Returns:
Dict with has_tables bool and table count
"""
# Match markdown table rows (must have at least 2 pipes per line)
# Table header: | Col1 | Col2 | Col3 |
# Table divider: |------|------|------|
# Table row: | Val1 | Val2 | Val3 |
# Find table dividers (|---|---|)
divider_pattern = r"^\|[\s\-:]+\|[\s\-:|]+$"
divider_matches = re.findall(divider_pattern, content, re.MULTILINE)
# Find table rows with actual data (not just dashes)
row_pattern = r"^\|[^\-\n][^\n]*\|[^\n]*$"
row_matches = re.findall(row_pattern, content, re.MULTILINE)
# Consider it a valid table if we have both dividers and data rows
has_tables = len(divider_matches) >= 1 and len(row_matches) >= 2
return {
"has_tables": has_tables,
"table_count": len(divider_matches),
"row_count": len(row_matches),
}
def _check_bullet_insights(self, content: str) -> dict:
"""
Check for bullet-pointed insights (- or *).
Args:
content: Markdown content
Returns:
Dict with has_bullets bool and bullet count
"""
# Match bullet points (- or * at start of line, followed by content)
bullet_pattern = r"^[\-\*]\s+(.+)$"
matches = re.findall(bullet_pattern, content, re.MULTILINE)
return {
"has_bullets": len(matches) >= 2,
"bullet_count": len(matches),
}
def _check_numbered_summary(self, content: str) -> dict:
"""
Check for numbered summary list (1., 2., 3.).
Args:
content: Markdown content
Returns:
Dict with has_numbered_list bool and item count
"""
# Match numbered list items (1., 2., 3., etc.)
numbered_pattern = r"^\d+\.\s+(.+)$"
matches = re.findall(numbered_pattern, content, re.MULTILINE)
# Check for sequential numbering (1, 2, 3, ...)
has_sequence = False
if len(matches) >= 3:
# Extract numbers from the full content to verify sequence
number_pattern = r"^(\d+)\.\s+"
numbers = [
int(m.group(1))
for m in re.finditer(number_pattern, content, re.MULTILINE)
]
# Check if we have at least 3 consecutive numbers starting from 1
has_sequence = len(numbers) >= 3 and numbers[0] == 1 and numbers[1] == 2
return {
"has_numbered_list": has_sequence,
"item_count": len(matches),
}
def _check_conclusion(self, content: str) -> dict:
"""
Check for conclusion section with recommendation.
Args:
content: Markdown content
Returns:
Dict with has_conclusion bool and details
"""
# Check for conclusion-related section headings
conclusion_keywords = [
"conclusion",
"recommendation",
"final decision",
"summary",
"investment decision",
"trading implication",
]
content_lower = content.lower()
has_conclusion_heading = any(
keyword in content_lower for keyword in conclusion_keywords
)
# Check for decision-related terms in the content
decision_keywords = [
"buy",
"sell",
"hold",
"bullish",
"bearish",
"neutral",
"recommend",
"advise",
"suggest",
]
has_decision_language = any(
keyword in content_lower for keyword in decision_keywords
)
return {
"has_conclusion": has_conclusion_heading and has_decision_language,
"has_conclusion_heading": has_conclusion_heading,
"has_decision_language": has_decision_language,
}
def validate_agent_response(
content: str, agent_name: str, strict: bool = False
) -> ValidationResult:
"""
Convenience function to validate agent response.
Args:
content: Agent response markdown content
agent_name: Name of the agent (for specialized validation)
strict: Whether to use strict validation mode
Returns:
ValidationResult with validation details
"""
# Determine agent type from name
agent_type = None
if "fundamental" in agent_name.lower():
agent_type = "fundamental"
elif any(
keyword in agent_name.lower()
for keyword in ["indicator", "pattern", "trend", "technical"]
):
agent_type = "technical"
elif any(
keyword in agent_name.lower() for keyword in ["portfolio", "risk", "manager"]
):
agent_type = "manager"
elif "research" in agent_name.lower():
agent_type = "research"
validator = MarkdownValidator(strict_mode=strict)
result = validator.validate(content, agent_type=agent_type)
# Log results
if result.is_valid:
logger.info(
f"✓ {agent_name} response validated successfully (score: {result.score:.1f}%)"
)
else:
logger.warning(
f"✗ {agent_name} response validation failed (score: {result.score:.1f}%)"
)
for issue in result.issues:
logger.warning(f" - Issue: {issue}")
for warning in result.warnings:
logger.debug(f" - Warning: {warning}")
return result