Spaces:
Paused
Paused
| """ | |
| Tabular Exporters (CSV, TSV, JSONL) | |
| Exports annotations to flat tabular formats suitable for analysis in | |
| spreadsheets, pandas, or streaming pipelines. | |
| """ | |
| import csv | |
| import json | |
| import os | |
| import logging | |
| from typing import Optional, Tuple, List | |
| from .base import BaseExporter, ExportContext, ExportResult | |
| logger = logging.getLogger(__name__) | |
| def _flatten_annotation(ann: dict) -> dict: | |
| """Flatten a single annotation record into a flat dict for tabular output.""" | |
| row = { | |
| "instance_id": ann.get("instance_id", ""), | |
| "user_id": ann.get("user_id", ""), | |
| } | |
| # Flatten labels: schema_name.label_name = value | |
| for schema_name, labels in ann.get("labels", {}).items(): | |
| if isinstance(labels, dict): | |
| for label_name, value in labels.items(): | |
| col = f"{schema_name}.{label_name}" if label_name else schema_name | |
| row[col] = value if not isinstance(value, (dict, list)) else json.dumps(value) | |
| else: | |
| row[schema_name] = labels if not isinstance(labels, (dict, list)) else json.dumps(labels) | |
| # Flatten spans as JSON strings | |
| for schema_name, spans in ann.get("spans", {}).items(): | |
| row[f"{schema_name}._spans"] = json.dumps(spans) | |
| return row | |
| class CSVExporter(BaseExporter): | |
| """Export annotations to CSV format.""" | |
| format_name = "csv" | |
| description = "Comma-separated values (one row per user-instance annotation)" | |
| file_extensions = [".csv"] | |
| def can_export(self, context: ExportContext) -> Tuple[bool, str]: | |
| if not context.annotations: | |
| return False, "No annotations to export" | |
| return True, "" | |
| def export(self, context: ExportContext, output_path: str, | |
| options: Optional[dict] = None) -> ExportResult: | |
| return _write_delimited(context, output_path, "csv", ",") | |
| class TSVExporter(BaseExporter): | |
| """Export annotations to TSV format.""" | |
| format_name = "tsv" | |
| description = "Tab-separated values (one row per user-instance annotation)" | |
| file_extensions = [".tsv"] | |
| def can_export(self, context: ExportContext) -> Tuple[bool, str]: | |
| if not context.annotations: | |
| return False, "No annotations to export" | |
| return True, "" | |
| def export(self, context: ExportContext, output_path: str, | |
| options: Optional[dict] = None) -> ExportResult: | |
| return _write_delimited(context, output_path, "tsv", "\t") | |
| class JSONLExporter(BaseExporter): | |
| """Export annotations to JSONL format (one JSON object per line).""" | |
| format_name = "jsonl" | |
| description = "JSON Lines (one JSON object per user-instance annotation)" | |
| file_extensions = [".jsonl"] | |
| def can_export(self, context: ExportContext) -> Tuple[bool, str]: | |
| if not context.annotations: | |
| return False, "No annotations to export" | |
| return True, "" | |
| def export(self, context: ExportContext, output_path: str, | |
| options: Optional[dict] = None) -> ExportResult: | |
| os.makedirs(output_path, exist_ok=True) | |
| out_file = os.path.join(output_path, "annotations.jsonl") | |
| with open(out_file, "w", encoding="utf-8") as f: | |
| for ann in context.annotations: | |
| record = { | |
| "instance_id": ann.get("instance_id", ""), | |
| "user_id": ann.get("user_id", ""), | |
| "labels": ann.get("labels", {}), | |
| "spans": ann.get("spans", {}), | |
| "links": ann.get("links", {}), | |
| } | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| files_written = [out_file] | |
| phase_file = _write_phase_jsonl(context, output_path) | |
| if phase_file: | |
| files_written.append(phase_file) | |
| warnings = [] | |
| excl = _phase_exclusion_warning(context) | |
| if excl: | |
| warnings.append(excl) | |
| return ExportResult( | |
| success=True, | |
| format_name=self.format_name, | |
| files_written=files_written, | |
| warnings=warnings, | |
| stats={ | |
| "num_records": len(context.annotations), | |
| "num_phase_responses": len(context.phase_responses) if phase_file else 0, | |
| "num_phase_responses_excluded": ( | |
| len(context.phase_responses) if not phase_file else 0), | |
| }, | |
| ) | |
| def _should_include_phase_data(context: ExportContext) -> bool: | |
| """Check if phase response export is enabled.""" | |
| return ( | |
| bool(context.phase_responses) | |
| and context.config.get("export_include_phase_data", False) | |
| ) | |
| def _phase_exclusion_warning(context: ExportContext) -> Optional[str]: | |
| """Return a warning when phase/survey responses exist but are NOT exported. | |
| Phase-response export is opt-in via ``export_include_phase_data``. Without this | |
| warning a survey/consent/instrument study would export with all phase responses | |
| silently missing and the stats reporting ``num_phase_responses: 0`` (F-047), | |
| making it look like no survey data was ever collected. | |
| """ | |
| if context.phase_responses and not context.config.get("export_include_phase_data", False): | |
| return ( | |
| f"{len(context.phase_responses)} phase/survey responses were found but " | |
| f"NOT exported. Set 'export_include_phase_data: true' in your config to " | |
| f"write them to a phase_responses file." | |
| ) | |
| return None | |
| def _write_phase_delimited(context: ExportContext, output_path: str, | |
| fmt_name: str, delimiter: str) -> Optional[str]: | |
| """Write phase responses as a separate delimited file. Returns file path or None.""" | |
| if not _should_include_phase_data(context): | |
| return None | |
| out_file = os.path.join(output_path, f"phase_responses.{fmt_name}") | |
| columns = ["user_id", "phase", "page", "schema", "label_name", "value"] | |
| with open(out_file, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=columns, delimiter=delimiter, | |
| extrasaction="ignore") | |
| writer.writeheader() | |
| for row in context.phase_responses: | |
| writer.writerow(row) | |
| return out_file | |
| def _write_phase_jsonl(context: ExportContext, output_path: str) -> Optional[str]: | |
| """Write phase responses as a JSONL file. Returns file path or None.""" | |
| if not _should_include_phase_data(context): | |
| return None | |
| out_file = os.path.join(output_path, "phase_responses.jsonl") | |
| with open(out_file, "w", encoding="utf-8") as f: | |
| for row in context.phase_responses: | |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") | |
| return out_file | |
| def _write_delimited(context: ExportContext, output_path: str, | |
| fmt_name: str, delimiter: str) -> ExportResult: | |
| """Write annotations as a delimited file (CSV or TSV).""" | |
| os.makedirs(output_path, exist_ok=True) | |
| out_file = os.path.join(output_path, f"annotations.{fmt_name}") | |
| # Flatten all annotations to collect the full set of columns | |
| rows = [_flatten_annotation(ann) for ann in context.annotations] | |
| if not rows: | |
| return ExportResult( | |
| success=True, | |
| format_name=fmt_name, | |
| files_written=[out_file], | |
| stats={"num_records": 0}, | |
| ) | |
| # Collect all column names preserving order (instance_id, user_id first) | |
| columns = ["instance_id", "user_id"] | |
| seen = set(columns) | |
| for row in rows: | |
| for key in row: | |
| if key not in seen: | |
| columns.append(key) | |
| seen.add(key) | |
| with open(out_file, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=columns, delimiter=delimiter, | |
| extrasaction="ignore") | |
| writer.writeheader() | |
| for row in rows: | |
| writer.writerow(row) | |
| files_written = [out_file] | |
| phase_file = _write_phase_delimited(context, output_path, fmt_name, delimiter) | |
| if phase_file: | |
| files_written.append(phase_file) | |
| warnings = [] | |
| excl = _phase_exclusion_warning(context) | |
| if excl: | |
| warnings.append(excl) | |
| return ExportResult( | |
| success=True, | |
| format_name=fmt_name, | |
| files_written=files_written, | |
| warnings=warnings, | |
| stats={ | |
| "num_records": len(rows), | |
| "num_columns": len(columns), | |
| "num_phase_responses": len(context.phase_responses) if phase_file else 0, | |
| "num_phase_responses_excluded": ( | |
| len(context.phase_responses) if not phase_file else 0), | |
| }, | |
| ) | |