Spaces:
Sleeping
Sleeping
| """ | |
| app/core/orchestrator.py | |
| ------------------------ | |
| Dynamic Agent Orchestrator for NotiFlow Autonomous (Phase 2). | |
| Replaces the static pipeline list with a planner-driven execution loop. | |
| Flow | |
| ---- | |
| create_context() | |
| β | |
| build_plan(ctx) β Planner evaluates rules, writes ctx["plan"] | |
| β | |
| for step in ctx["plan"]: | |
| agent = AGENT_REGISTRY[step["agent"]] | |
| agent.run(ctx) β executes, writes audit entry to ctx["history"] | |
| if failed and step["critical"]: abort | |
| β | |
| _build_result(ctx) β flatten ctx β public API response shape | |
| Key properties | |
| -------------- | |
| - No hardcoded execution order anywhere in this file | |
| - LedgerAgent is NOT special-cased β it is just another plan step | |
| - Adding a new agent = one registry entry + one planner rule, zero changes here | |
| - Non-critical step failures are recorded and skipped, not aborted | |
| Return shape (backward compatible): | |
| { | |
| "message": str, | |
| "intent": str, | |
| "data": dict, | |
| "event": dict, | |
| "sheet_updated": bool, | |
| "context": dict, β full context (debug/audit) | |
| } | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any | |
| from app.core.context import create_context, update_context, log_step, add_error | |
| from app.core.event_bus import emit_event, push_live_log | |
| from app.core.planner import build_plan | |
| from app.core.autonomy_planner import build_autonomy_plan | |
| from app.core.priority import reset_priority_score | |
| from app.core.registry import get_agent | |
| logger = logging.getLogger(__name__) | |
| _MAX_REPLANS = 2 # hard cap on feedback-loop iterations | |
| _STEP_EVENT_MAP: dict[str, tuple[str, str]] = { | |
| "intent": ("intent_detected", "IntentAgent"), | |
| "extraction": ("extraction_done", "ExtractionAgent"), | |
| "validation": ("validation_done", "ValidationAgent"), | |
| "invoice_agent": ("invoice_generated", "InvoiceAgent"), | |
| "payment_agent": ("payment_requested", "PaymentAgent"), | |
| "ledger": ("execution_done", "LedgerAgent"), | |
| "recovery": ("recovery_triggered", "RecoveryAgent"), | |
| } | |
| def process_message(message: str, source: str = "system") -> dict[str, Any]: | |
| """ | |
| Run a raw business message through the full NotiFlow agent pipeline. | |
| Phase 2: main pipeline driven by Planner (dynamic). | |
| Phase 3: autonomy layer driven by AutonomyPlanner (dynamic). | |
| Fix: feedback loop β replan up to _MAX_REPLANS times if needed. | |
| Args: | |
| message: Raw Hinglish or English business message. | |
| source: Notification source (e.g. "whatsapp", "gpay"). | |
| Returns: | |
| Flat result dict + full context. | |
| Raises: | |
| ValueError: Empty message. | |
| """ | |
| if not message or not message.strip(): | |
| raise ValueError("Message cannot be empty.") | |
| ctx = create_context(message.strip(), source=source) | |
| logger.info("Orchestrator β %r (source=%s)", message, source) | |
| _emit_pipeline_event( | |
| ctx, | |
| "message_received", | |
| { | |
| "message": ctx["message"], | |
| "source": source, | |
| "state": ctx.get("state"), | |
| }, | |
| agent="Orchestrator", | |
| step="message", | |
| message=f"Received message from {source}", | |
| log_text=f"[Orchestrator] Message received from {source}: {ctx['message']}", | |
| ) | |
| # ββ Main plan + autonomy + feedback loop ββββββββββββββββββββββββββββββ | |
| while True: | |
| retry_count = ctx["metadata"].get("retry_count", 0) | |
| # ββ 1. Build and run main plan ββββββββββββββββββββββββββββββββββββ | |
| plan = build_plan(ctx) | |
| logger.info( | |
| "[cycle=%d] Main plan: [%s]", | |
| retry_count, | |
| ", ".join(s["agent"] for s in plan), | |
| ) | |
| ctx = _run_plan(ctx, plan) | |
| # ββ 2. Build and run autonomy plan ββββββββββββββββββββββββββββββββ | |
| autonomy_plan = build_autonomy_plan(ctx) | |
| logger.info( | |
| "[cycle=%d] Autonomy plan: [%s]", | |
| retry_count, | |
| ", ".join(s["agent"] for s in autonomy_plan), | |
| ) | |
| ctx = _run_autonomy(ctx, autonomy_plan) | |
| # ββ 3. Check if replan is needed ββββββββββββββββββββββββββββββββββ | |
| if retry_count >= _MAX_REPLANS: | |
| logger.info("Replan cap reached (%d) β stopping loop.", _MAX_REPLANS) | |
| break | |
| if not _should_replan(ctx): | |
| logger.info("[cycle=%d] No replan needed.", retry_count) | |
| break | |
| # ββ 4. Prepare for replan βββββββββββββββββββββββββββββββββββββββββ | |
| logger.warning( | |
| "[cycle=%d] Replan triggered β retry_count β %d", | |
| retry_count, retry_count + 1, | |
| ) | |
| _emit_pipeline_event( | |
| ctx, | |
| "recovery_triggered", | |
| { | |
| "retry_count": retry_count + 1, | |
| "reason": "replan_required", | |
| "errors": ctx.get("errors", []), | |
| }, | |
| agent="RecoveryAgent", | |
| step="recovery", | |
| message="Recovery triggered after pipeline replan request", | |
| log_text=f"[RecoveryAgent] Recovery triggered after cycle {retry_count}", | |
| ) | |
| ctx["metadata"]["retry_count"] = retry_count + 1 | |
| # Reset priority score so contributors don't double-count | |
| reset_priority_score(ctx) | |
| # Clear autonomy outputs so fresh evaluation happens | |
| for key in ("verification", "monitor", "risk", "alerts", "recovery"): | |
| ctx.pop(key, None) | |
| # Clear accumulated errors from previous cycle (keep original errors) | |
| ctx["errors"] = [e for e in ctx["errors"] if not e.startswith("[Monitor]")] | |
| # ββ Mark final state ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if ctx.get("state") not in ("failed",): | |
| update_context(ctx, state="completed") | |
| logger.info( | |
| "Orchestrator done β state=%s intents=%s errors=%d risk=%s priority=%s score=%d", | |
| ctx["state"], | |
| ctx.get("intents", [ctx.get("intent")]), | |
| len(ctx["errors"]), | |
| ctx.get("risk", {}).get("level", "n/a"), | |
| ctx.get("priority", "n/a"), | |
| ctx.get("priority_score", 0), | |
| ) | |
| return _build_result(ctx) | |
| def _run_plan(ctx: dict[str, Any], plan: list[dict]) -> dict[str, Any]: | |
| """Execute the planner-generated main pipeline steps.""" | |
| for step in plan: | |
| agent_key = step["agent"] | |
| is_critical = step.get("critical", True) | |
| _emit_pipeline_step_event(ctx, agent_key, "started") | |
| try: | |
| agent = get_agent(agent_key) | |
| except KeyError as exc: | |
| msg = f"Orchestrator: {exc}" | |
| logger.error(msg) | |
| add_error(ctx, msg) | |
| log_step(ctx, agent_key, "skipped", | |
| detail=f"Agent not found in registry: {agent_key}") | |
| _emit_pipeline_event( | |
| ctx, | |
| "error_occurred", | |
| { | |
| "step": agent_key, | |
| "message": msg, | |
| "critical": is_critical, | |
| }, | |
| agent="Orchestrator", | |
| step=agent_key, | |
| message=msg, | |
| log_text=f"[Orchestrator] {msg}", | |
| status="error", | |
| ) | |
| if is_critical: | |
| update_context(ctx, state="failed") | |
| break | |
| continue | |
| try: | |
| ctx = agent.run(ctx) | |
| _emit_pipeline_step_event(ctx, agent_key, "completed") | |
| _emit_step_success(ctx, agent_key) | |
| except Exception as exc: | |
| logger.error("Orchestrator: %s raised %s", agent_key, exc) | |
| _emit_pipeline_step_event(ctx, agent_key, "failed", str(exc)) | |
| _emit_pipeline_event( | |
| ctx, | |
| "error_occurred", | |
| { | |
| "step": agent_key, | |
| "message": str(exc), | |
| "critical": is_critical, | |
| }, | |
| agent=getattr(agent, "name", agent_key), | |
| step=agent_key, | |
| message=str(exc), | |
| log_text=f"[{getattr(agent, 'name', agent_key)}] {exc}", | |
| status="error", | |
| ) | |
| if is_critical: | |
| logger.error("Critical agent '%s' failed β aborting pipeline.", agent_key) | |
| break | |
| else: | |
| logger.warning("Non-critical agent '%s' failed β continuing.", agent_key) | |
| if ctx.get("state") == "failed": | |
| update_context(ctx, state="partial") | |
| continue | |
| if ctx.get("state") == "failed" and is_critical: | |
| logger.error("Pipeline in failed state after critical agent '%s'.", agent_key) | |
| break | |
| return ctx | |
| def _run_autonomy(ctx: dict[str, Any], plan: list[dict]) -> dict[str, Any]: | |
| """ | |
| Run the dynamically planned autonomy sequence. | |
| All autonomy agents are non-critical β failures are recorded but | |
| never abort the sequence or corrupt the main pipeline result. | |
| """ | |
| for step in plan: | |
| agent_key = step["agent"] | |
| _emit_pipeline_step_event(ctx, agent_key, "started") | |
| try: | |
| agent = get_agent(agent_key) | |
| except KeyError: | |
| logger.warning("[Autonomy] agent '%s' not in registry β skipping", agent_key) | |
| continue | |
| try: | |
| ctx = agent.run(ctx) | |
| _emit_pipeline_step_event(ctx, agent_key, "completed") | |
| if agent_key == "recovery": | |
| recovery = ctx.get("recovery", {}) or {} | |
| recovery_event = "recovery_success" if recovery.get("success") else "recovery_triggered" | |
| _emit_pipeline_event( | |
| ctx, | |
| recovery_event, | |
| recovery or {"action": "none"}, | |
| agent=getattr(agent, "name", agent_key), | |
| step=agent_key, | |
| message=recovery.get("details") or "Recovery step completed", | |
| log_text=f"[{getattr(agent, 'name', agent_key)}] {recovery.get('details', 'Recovery step completed')}", | |
| ) | |
| except Exception as exc: | |
| logger.error("[Autonomy] '%s' raised %s β continuing", agent_key, exc) | |
| add_error(ctx, f"[Autonomy] {agent_key} failed: {exc}") | |
| _emit_pipeline_step_event(ctx, agent_key, "failed", str(exc)) | |
| _emit_pipeline_event( | |
| ctx, | |
| "error_occurred", | |
| { | |
| "step": agent_key, | |
| "message": str(exc), | |
| "critical": False, | |
| }, | |
| agent=getattr(agent, "name", agent_key), | |
| step=agent_key, | |
| message=str(exc), | |
| log_text=f"[{getattr(agent, 'name', agent_key)}] {exc}", | |
| status="error", | |
| ) | |
| if ctx.get("state") == "failed": | |
| update_context(ctx, state="partial") | |
| return ctx | |
| def _should_replan(ctx: dict[str, Any]) -> bool: | |
| """ | |
| Return True if the feedback loop should trigger a replan. | |
| Conditions (any one is sufficient): | |
| 1. verification.status == "fail" | |
| 2. risk.level == "high" | |
| 3. errors list is non-empty (excluding autonomy-internal noise) | |
| """ | |
| v_status = ctx.get("verification", {}).get("status", "ok") | |
| if v_status == "fail": | |
| logger.info("[Feedback] replan trigger: verification=fail") | |
| return True | |
| risk_level = ctx.get("risk", {}).get("level", "low") | |
| if risk_level == "high": | |
| logger.info("[Feedback] replan trigger: risk=high") | |
| return True | |
| # Only count errors that are not pure autonomy-internal noise | |
| meaningful_errors = [ | |
| e for e in ctx.get("errors", []) | |
| if not e.startswith("[Autonomy]") | |
| ] | |
| if meaningful_errors: | |
| logger.info( | |
| "[Feedback] replan trigger: %d meaningful error(s)", len(meaningful_errors) | |
| ) | |
| return True | |
| return False | |
| def _build_result(ctx: dict[str, Any]) -> dict[str, Any]: | |
| """Flatten context into the public API response shape.""" | |
| event = ctx.get("event", {}) or {} | |
| data = ctx.get("data", {}) or {} | |
| event_order = event.get("order", {}) if isinstance(event.get("order"), dict) else {} | |
| risk = ctx.get("risk", {}) or {} | |
| source = ctx.get("source", "system") | |
| payment_state = ctx.get("payment") or {} | |
| amount = payment_state.get("amount", event.get("amount")) | |
| if amount is None: | |
| amount = data.get("amount", 0) | |
| try: | |
| numeric_amount = float(amount or 0) | |
| except (TypeError, ValueError): | |
| numeric_amount = 0 | |
| return { | |
| # ββ Core (backward compatible) ββββββββββββββββββββββββββββββββββββ | |
| "message": ctx["message"], | |
| "intent": ctx.get("intent") or "other", | |
| "intents": ctx.get("intents") or [ctx.get("intent") or "other"], | |
| "data": ctx.get("data", {}), | |
| "multi_data": ctx.get("multi_data", {}), | |
| "event": ctx.get("event", {}), | |
| "invoice": ctx.get("invoice"), | |
| "events": ctx.get("events", []), | |
| "live_logs": ctx.get("live_logs", []), | |
| "history": ctx.get("history", []), | |
| "sheet_updated": ctx.get("metadata", {}).get("sheet_updated", False), | |
| "customer": {"name": event.get("customer") or data.get("customer") or "Walk-in customer"}, | |
| "order": { | |
| "item": event.get("item") or event_order.get("item") or data.get("item"), | |
| "quantity": event.get("quantity") or event_order.get("quantity") or data.get("quantity"), | |
| "status": event.get("status") or "received", | |
| "source": source, | |
| }, | |
| "payment": { | |
| "invoice_id": payment_state.get("invoice_id") or (ctx.get("invoice") or {}).get("invoice_id") or event.get("invoice_id") or data.get("invoice_id"), | |
| "amount": payment_state.get("amount") or (ctx.get("invoice") or {}).get("total") or numeric_amount, | |
| "status": payment_state.get("status") or (ctx.get("invoice") or {}).get("status") or ("paid" if numeric_amount > 0 else "pending"), | |
| }, | |
| "decision": { | |
| "intent": ctx.get("intent") or "other", | |
| "priority": ctx.get("priority", "low"), | |
| "priority_score": ctx.get("priority_score", 0), | |
| "risk": risk.get("level"), | |
| }, | |
| # ββ Autonomy fields βββββββββββββββββββββββββββββββββββββββββββββββ | |
| "verification": ctx.get("verification", {}), | |
| "risk": ctx.get("risk", {}), | |
| "priority": ctx.get("priority", "low"), | |
| "priority_score": ctx.get("priority_score", 0), | |
| "alerts": ctx.get("alerts", []), | |
| "recovery": ctx.get("recovery", {}), | |
| "monitor": ctx.get("monitor", {}), | |
| # ββ Debug βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "context": ctx, | |
| } | |
| def _emit_step_success(ctx: dict[str, Any], agent_key: str) -> None: | |
| if agent_key in {"invoice_agent", "payment_agent"}: | |
| return | |
| event_type, agent_name = _STEP_EVENT_MAP.get(agent_key, ("execution_done", agent_key)) | |
| payload = _build_step_payload(ctx, agent_key) | |
| message, log_text = _build_step_messages(ctx, agent_key, agent_name) | |
| _emit_pipeline_event( | |
| ctx, | |
| event_type, | |
| payload, | |
| agent=agent_name, | |
| step=agent_key, | |
| message=message, | |
| log_text=log_text, | |
| ) | |
| def _build_step_payload(ctx: dict[str, Any], agent_key: str) -> dict[str, Any]: | |
| if agent_key == "intent": | |
| return { | |
| "intent": ctx.get("intent") or "other", | |
| "intents": ctx.get("intents", []), | |
| "model_used": ctx.get("model_used"), | |
| } | |
| if agent_key == "extraction": | |
| return { | |
| "intent": ctx.get("intent") or "other", | |
| "data": ctx.get("data", {}), | |
| "multi_data": ctx.get("multi_data", {}), | |
| } | |
| if agent_key == "validation": | |
| return { | |
| "intent": ctx.get("intent") or "other", | |
| "data": ctx.get("data", {}), | |
| } | |
| if agent_key == "invoice_agent": | |
| return { | |
| "intent": ctx.get("intent") or "other", | |
| "invoice": ctx.get("invoice"), | |
| } | |
| if agent_key == "payment_agent": | |
| return { | |
| "intent": ctx.get("intent") or "other", | |
| "payment": ctx.get("payment"), | |
| "invoice": ctx.get("invoice"), | |
| } | |
| if agent_key == "ledger": | |
| return { | |
| "sheet_updated": ctx.get("metadata", {}).get("sheet_updated", False), | |
| "event": ctx.get("event", {}), | |
| } | |
| if agent_key == "recovery": | |
| return ctx.get("recovery", {}) | |
| return { | |
| "state": ctx.get("state"), | |
| "event": ctx.get("event", {}), | |
| } | |
| def _build_step_messages(ctx: dict[str, Any], agent_key: str, agent_name: str) -> tuple[str, str]: | |
| if agent_key == "intent": | |
| intent = ctx.get("intent") or "other" | |
| return ( | |
| f"Intent detected: {intent}", | |
| f"[{agent_name}] Intent detected: {intent}", | |
| ) | |
| if agent_key == "extraction": | |
| data = ctx.get("data", {}) or {} | |
| detail_parts = [f"{key}={value}" for key, value in data.items() if value not in (None, "", [], {})] | |
| detail = ", ".join(detail_parts) or "no structured fields extracted" | |
| return ( | |
| f"Extraction completed: {detail}", | |
| f"[{agent_name}] Extracted: {detail}", | |
| ) | |
| if agent_key == "validation": | |
| data = ctx.get("data", {}) or {} | |
| detail_parts = [f"{key}={value}" for key, value in data.items() if value not in (None, "", [], {})] | |
| detail = ", ".join(detail_parts) or "validation passed with empty payload" | |
| return ( | |
| f"Validation completed: {detail}", | |
| f"[{agent_name}] Validated: {detail}", | |
| ) | |
| if agent_key == "invoice_agent": | |
| invoice_id = (ctx.get("invoice") or {}).get("invoice_id") or "unknown" | |
| return ( | |
| f"Invoice generated: {invoice_id}", | |
| f"[{agent_name}] Invoice generated: {invoice_id}", | |
| ) | |
| if agent_key == "payment_agent": | |
| invoice_id = (ctx.get("payment") or {}).get("invoice_id") or (ctx.get("invoice") or {}).get("invoice_id") or "unknown" | |
| amount = (ctx.get("payment") or {}).get("amount", 0) | |
| return ( | |
| f"Payment requested: {invoice_id}", | |
| f"[{agent_name}] Payment requested: {invoice_id} amount={amount}", | |
| ) | |
| if agent_key == "ledger": | |
| updated = ctx.get("metadata", {}).get("sheet_updated", False) | |
| return ( | |
| f"Execution completed: sheet_updated={updated}", | |
| f"[{agent_name}] Ledger update status: {updated}", | |
| ) | |
| recovery = (ctx.get("recovery", {}) or {}).get("details") or "Recovery step completed" | |
| return (recovery, f"[{agent_name}] {recovery}") | |
| def _emit_pipeline_event( | |
| ctx: dict[str, Any], | |
| event_type: str, | |
| payload: dict[str, Any], | |
| *, | |
| agent: str, | |
| step: str, | |
| message: str, | |
| log_text: str, | |
| status: str = "success", | |
| ) -> None: | |
| log_entry = push_live_log( | |
| ctx, | |
| { | |
| "agent": agent, | |
| "status": status, | |
| "action": message, | |
| "detail": log_text, | |
| }, | |
| ) | |
| emit_event( | |
| ctx, | |
| "log", | |
| { | |
| "step": step, | |
| "message": log_text, | |
| }, | |
| agent=agent, | |
| step=step, | |
| message=log_text, | |
| log_entry=log_entry, | |
| ) | |
| emit_event( | |
| ctx, | |
| event_type, | |
| payload, | |
| agent=agent, | |
| step=step, | |
| message=message, | |
| log_entry=log_entry, | |
| ) | |
| def _emit_pipeline_step_event( | |
| ctx: dict[str, Any], | |
| step: str, | |
| status: str, | |
| detail: str = "", | |
| ) -> None: | |
| step_message = f"{step} {status}" | |
| emit_event( | |
| ctx, | |
| "pipeline_step", | |
| { | |
| "step": step, | |
| "status": status, | |
| "detail": detail, | |
| }, | |
| agent="Orchestrator", | |
| step=step, | |
| message=step_message, | |
| ) | |