runner-ai-intelligence / src /pipeline_steps /visualization_step.py
avfranco's picture
HF Space deploy snapshot (minimal allow-list)
557ee65
from core.pipeline.step import PipelineStep
class VisualizationStep(PipelineStep):
name = "visualization_agent"
def __init__(self, agent, viz_executor):
self.agent = agent
self.viz_executor = viz_executor
async def run(self, context):
detailed_features = [
f.model_dump(mode="json") if hasattr(f, "model_dump") else f for f in context.runs
]
chart_specs = await self.agent.run(detailed_features)
charts = {}
for i, spec in enumerate(chart_specs):
fig = self.viz_executor.render_chart(spec, detailed_features)
if spec.chart_type == "pace":
charts["pace_chart"] = fig
elif spec.chart_type == "heart_rate":
charts["hr_chart"] = fig
else:
key = spec.title or f"chart_{i}"
charts[key] = fig
context.charts = charts