vn6295337 Claude Opus 4.5 commited on
Commit
f286eb6
·
1 Parent(s): 34989f6

Add temporal data streaming to A2A partial_metrics

Browse files

- Add end_date, fiscal_year, form fields to MetricEntry
- Update emit_metric() to accept temporal parameters
- Extract temporal data from financials MCP response
- Stream fiscal period info (FY 2023, Q3 2024) via A2A

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

Files changed (2) hide show
  1. app.py +12 -3
  2. mcp_client.py +30 -8
app.py CHANGED
@@ -68,6 +68,10 @@ class MetricEntry(BaseModel):
68
  metric: str
69
  value: Any
70
  timestamp: str
 
 
 
 
71
 
72
 
73
  class Task(BaseModel):
@@ -235,20 +239,25 @@ async def handle_message_send(params: dict, request_id: Any) -> dict:
235
 
236
  def create_progress_callback(task_id: str):
237
  """Create a progress callback that adds partial metrics to the task."""
238
- def callback(source: str, metric: str, value: Any):
 
239
  task = TASK_STORE.get(task_id)
240
  if task and task.status == TaskStatus.WORKING:
241
  entry = MetricEntry(
242
  source=source,
243
  metric=metric,
244
  value=value,
245
- timestamp=datetime.now().isoformat()
 
 
 
246
  )
247
  if task.partial_metrics is None:
248
  task.partial_metrics = []
249
  task.partial_metrics.append(entry)
250
  task.updated_at = datetime.now().isoformat()
251
- logger.info(f"Task {task_id}: [{source}] {metric} = {value}")
 
252
  return callback
253
 
254
 
 
68
  metric: str
69
  value: Any
70
  timestamp: str
71
+ # Temporal fields for financial data
72
+ end_date: Optional[str] = None # "2023-09-30"
73
+ fiscal_year: Optional[int] = None # 2023
74
+ form: Optional[str] = None # "10-K" or "10-Q"
75
 
76
 
77
  class Task(BaseModel):
 
239
 
240
  def create_progress_callback(task_id: str):
241
  """Create a progress callback that adds partial metrics to the task."""
242
+ def callback(source: str, metric: str, value: Any,
243
+ end_date: str = None, fiscal_year: int = None, form: str = None):
244
  task = TASK_STORE.get(task_id)
245
  if task and task.status == TaskStatus.WORKING:
246
  entry = MetricEntry(
247
  source=source,
248
  metric=metric,
249
  value=value,
250
+ timestamp=datetime.now().isoformat(),
251
+ end_date=end_date,
252
+ fiscal_year=fiscal_year,
253
+ form=form
254
  )
255
  if task.partial_metrics is None:
256
  task.partial_metrics = []
257
  task.partial_metrics.append(entry)
258
  task.updated_at = datetime.now().isoformat()
259
+ temporal_info = f" ({form} {fiscal_year})" if fiscal_year else ""
260
+ logger.info(f"Task {task_id}: [{source}] {metric} = {value}{temporal_info}")
261
  return callback
262
 
263
 
mcp_client.py CHANGED
@@ -26,11 +26,20 @@ MCP_SERVERS_PATH = Path(__file__).parent / "mcp-servers"
26
  METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "300"))
27
 
28
 
29
- async def emit_metric(progress_callback: Optional[Callable], source: str, metric: str, value: Any):
30
- """Emit a metric event with configurable delay."""
 
 
 
 
 
 
 
 
31
  if progress_callback:
32
- logger.debug(f"emit_metric: {source}/{metric}={value}")
33
- progress_callback(source, metric, value)
 
34
  await asyncio.sleep(METRIC_DELAY_MS / 1000)
35
 
36
 
@@ -256,12 +265,25 @@ async def _extract_and_emit_metrics(
256
  if source == "financials":
257
  financials = result.get("financials", {})
258
  debt = result.get("debt", {})
259
- if financials.get("revenue", {}).get("value"):
260
- await emit_metric(progress_callback, source, "revenue", financials["revenue"]["value"])
 
 
 
 
 
 
 
261
  if financials.get("net_margin"):
262
  await emit_metric(progress_callback, source, "net_margin", financials["net_margin"])
263
- if financials.get("eps", {}).get("value"):
264
- await emit_metric(progress_callback, source, "EPS", financials["eps"]["value"])
 
 
 
 
 
 
265
  if debt.get("debt_to_equity"):
266
  await emit_metric(progress_callback, source, "debt_to_equity", debt["debt_to_equity"])
267
 
 
26
  METRIC_DELAY_MS = int(os.getenv("METRIC_DELAY_MS", "300"))
27
 
28
 
29
+ async def emit_metric(
30
+ progress_callback: Optional[Callable],
31
+ source: str,
32
+ metric: str,
33
+ value: Any,
34
+ end_date: str = None,
35
+ fiscal_year: int = None,
36
+ form: str = None
37
+ ):
38
+ """Emit a metric event with optional temporal data and configurable delay."""
39
  if progress_callback:
40
+ temporal_info = f" ({form} {fiscal_year})" if fiscal_year else ""
41
+ logger.debug(f"emit_metric: {source}/{metric}={value}{temporal_info}")
42
+ progress_callback(source, metric, value, end_date, fiscal_year, form)
43
  await asyncio.sleep(METRIC_DELAY_MS / 1000)
44
 
45
 
 
265
  if source == "financials":
266
  financials = result.get("financials", {})
267
  debt = result.get("debt", {})
268
+ # Extract temporal data with metrics
269
+ revenue = financials.get("revenue", {})
270
+ if revenue.get("value"):
271
+ await emit_metric(
272
+ progress_callback, source, "revenue", revenue["value"],
273
+ end_date=revenue.get("end_date"),
274
+ fiscal_year=revenue.get("fiscal_year"),
275
+ form=revenue.get("form")
276
+ )
277
  if financials.get("net_margin"):
278
  await emit_metric(progress_callback, source, "net_margin", financials["net_margin"])
279
+ eps = financials.get("eps", {})
280
+ if eps.get("value"):
281
+ await emit_metric(
282
+ progress_callback, source, "EPS", eps["value"],
283
+ end_date=eps.get("end_date"),
284
+ fiscal_year=eps.get("fiscal_year"),
285
+ form=eps.get("form")
286
+ )
287
  if debt.get("debt_to_equity"):
288
  await emit_metric(progress_callback, source, "debt_to_equity", debt["debt_to_equity"])
289