"""Formatting utilities for MCP tool outputs.""" import json from typing import Any, Dict, List def format_dataset_list(datasets: List[Dict[str, Any]]) -> str: """Format a list of datasets for display.""" if not datasets: return "No datasets found." lines = ["## Datasets Found\n"] for i, ds in enumerate(datasets, 1): lines.append(f"### {i}. {ds['id']}") lines.append(f"- Downloads: {ds.get('downloads', 'N/A'):,}") lines.append(f"- Likes: {ds.get('likes', 'N/A')}") if ds.get('tags'): lines.append(f"- Tags: {', '.join(ds['tags'][:5])}") lines.append("") return "\n".join(lines) def format_dataset_info(info: Dict[str, Any]) -> str: """Format dataset info for display.""" lines = [f"## Dataset: {info['id']}\n"] lines.append(f"- **Author**: {info.get('author', 'N/A')}") lines.append(f"- **Downloads**: {info.get('downloads', 0):,}") lines.append(f"- **Likes**: {info.get('likes', 0)}") lines.append(f"- **License**: {info.get('license', 'N/A')}") if info.get('tags'): lines.append(f"- **Tags**: {', '.join(info['tags'][:10])}") if info.get('card_summary'): lines.append("\n### Dataset Card (Summary)") lines.append(info['card_summary'][:1500] + "..." if len(info.get('card_summary', '')) > 1500 else info['card_summary']) return "\n".join(lines) def format_schema(schema: Dict[str, Any]) -> str: """Format schema information for display.""" if "error" in schema: return f"Error: {schema['error']}" lines = ["## Dataset Schema\n"] lines.append(f"**Number of columns**: {schema.get('num_columns', 'N/A')}\n") lines.append("### Columns\n") lines.append("| Column | Type |") lines.append("|--------|------|") for col, dtype in schema.get('features', {}).items(): lines.append(f"| `{col}` | {dtype} |") return "\n".join(lines) def format_sample(samples: List[Dict[str, Any]], dataset_id: str) -> str: """Format sample rows for display.""" if not samples: return "No samples available." if "error" in samples[0]: return f"Error loading samples: {samples[0]['error']}" lines = [f"## Sample from `{dataset_id}`\n"] lines.append(f"Showing {len(samples)} row(s):\n") for i, row in enumerate(samples, 1): lines.append(f"### Row {i}") lines.append("```json") lines.append(json.dumps(row, indent=2, default=str, ensure_ascii=False)[:1000]) lines.append("```\n") return "\n".join(lines) def format_statistics(stats: Dict[str, Any]) -> str: """Format statistics for display.""" if "error" in stats: return f"Error: {stats['error']}" lines = ["## Dataset Statistics\n"] lines.append(f"**Total rows**: {stats.get('total_rows', 'N/A'):,}\n") if stats.get('column_stats'): lines.append("### Column Statistics\n") for col, col_stats in stats['column_stats'].items(): lines.append(f"#### `{col}`") for key, value in col_stats.items(): if isinstance(value, float): lines.append(f"- {key}: {value:.2f}") else: lines.append(f"- {key}: {value}") lines.append("") return "\n".join(lines) def format_quality_report(report: Dict[str, Any]) -> str: """Format data quality report for display.""" if "error" in report: return f"Error: {report['error']}" lines = ["## Data Quality Report\n"] # Overall score if "overall_score" in report: score = report['overall_score'] emoji = "" if score >= 80 else "" if score >= 60 else "" lines.append(f"**Overall Quality Score**: {emoji} {score}/100\n") # Issues if report.get('issues'): lines.append("### Issues Found\n") for issue in report['issues']: lines.append(f"- {issue}") lines.append("") # Column-level quality if report.get('column_quality'): lines.append("### Column Quality\n") lines.append("| Column | Missing % | Unique % | Issues |") lines.append("|--------|-----------|----------|--------|") for col, quality in report['column_quality'].items(): missing = quality.get('missing_pct', 0) unique = quality.get('unique_pct', 0) issues = quality.get('issues', '-') lines.append(f"| `{col}` | {missing:.1f}% | {unique:.1f}% | {issues} |") return "\n".join(lines) def format_comparison(comparison: Dict[str, Any]) -> str: """Format dataset comparison for display.""" if "error" in comparison: return f"Error: {comparison['error']}" lines = ["## Dataset Comparison\n"] lines.append(f"Comparing **{comparison['dataset_a']}** vs **{comparison['dataset_b']}**\n") lines.append("| Aspect | Dataset A | Dataset B |") lines.append("|--------|-----------|-----------|") for aspect, values in comparison.get('comparison', {}).items(): lines.append(f"| {aspect} | {values.get('a', 'N/A')} | {values.get('b', 'N/A')} |") if comparison.get('common_columns'): lines.append(f"\n**Common columns**: {', '.join(comparison['common_columns'])}") if comparison.get('unique_to_a'): lines.append(f"**Unique to A**: {', '.join(comparison['unique_to_a'])}") if comparison.get('unique_to_b'): lines.append(f"**Unique to B**: {', '.join(comparison['unique_to_b'])}") return "\n".join(lines) def format_similar_datasets(similar: List[Dict[str, Any]]) -> str: """Format similar datasets list for display.""" if not similar: return "No similar datasets found." lines = ["## Similar Datasets\n"] for i, ds in enumerate(similar, 1): score = ds.get('similarity_score', 0) lines.append(f"### {i}. {ds['id']} (similarity: {score:.2f})") lines.append(f"- Downloads: {ds.get('downloads', 'N/A'):,}") if ds.get('reason'): lines.append(f"- Why similar: {ds['reason']}") lines.append("") return "\n".join(lines) def format_task_suggestions(suggestions: Dict[str, Any]) -> str: """Format ML task suggestions for display.""" if "error" in suggestions: return f"Error: {suggestions['error']}" lines = [f"## Suggested ML Tasks for `{suggestions.get('dataset_id', 'dataset')}`\n"] if suggestions.get('tasks'): for i, task in enumerate(suggestions['tasks'], 1): confidence = task.get('confidence', 'medium') emoji = "" if confidence == 'high' else "" if confidence == 'medium' else "" lines.append(f"### {i}. {task['name']} {emoji}") lines.append(f"- **Confidence**: {confidence}") lines.append(f"- **Reason**: {task.get('reason', 'Based on dataset structure')}") if task.get('target_column'): lines.append(f"- **Target column**: `{task['target_column']}`") if task.get('feature_columns'): lines.append(f"- **Feature columns**: {', '.join(f'`{c}`' for c in task['feature_columns'][:5])}") lines.append("") return "\n".join(lines)