VyaparFlow / app /core /base_agent.py
Dipan04's picture
Add application file
32d42b3
"""
app/core/base_agent.py
----------------------
BaseAgent β€” the standard interface for all NotiFlow agents.
Every agent in the system inherits from BaseAgent and implements `execute()`.
The public `run()` method provides:
- unified error handling
- structured logging into context["history"]
- state transition on success / failure
Usage
-----
class MyAgent(BaseAgent):
name = "MyAgent"
def execute(self, context: dict) -> dict:
# do work, mutate context, return it
context["data"]["my_field"] = "value"
return context
result_ctx = MyAgent().run(context)
"""
from __future__ import annotations
import logging
from typing import Any
from app.core.context import log_step, add_error, update_context
logger = logging.getLogger(__name__)
class BaseAgent:
"""
Abstract base for all NotiFlow agents.
Subclasses must:
- Set a class-level ``name`` attribute
- Implement ``execute(context) -> dict``
Optional class-level audit declarations (used in history logs):
input_keys : list[str] β€” context keys this agent reads
output_keys : list[str] β€” context keys this agent writes
action : str β€” one-line description of what the agent does
The ``run()`` wrapper handles logging and error isolation automatically.
Agents should raise exceptions from ``execute()`` on unrecoverable errors;
for soft/non-fatal issues they should call ``add_error(ctx, msg)`` and
continue.
"""
#: Human-readable agent identifier used in logs and history.
name: str = "BaseAgent"
#: Audit metadata β€” override in subclasses for richer logs
input_keys: list[str] = []
output_keys: list[str] = []
action: str = ""
def run(self, context: dict[str, Any]) -> dict[str, Any]:
"""
Execute the agent with full error handling and audit logging.
This is the ONLY method callers should invoke externally.
Args:
context: The live request context dict.
Returns:
The (mutated) context dict.
Raises:
Exception: Re-raises any exception from execute() so the
orchestrator can decide whether to abort.
"""
logger.info("[%s] starting", self.name)
try:
context = self.execute(context)
log_step(
context,
self.name,
"success",
action = self.action or f"{self.name} completed",
input_keys = list(self.input_keys),
output_keys = list(self.output_keys),
)
logger.info("[%s] completed successfully", self.name)
except Exception as exc:
error_msg = f"{self.name} failed: {exc}"
logger.error(error_msg, exc_info=True)
add_error(context, error_msg)
log_step(
context,
self.name,
"error",
str(exc),
action = self.action or f"{self.name} failed",
input_keys = list(self.input_keys),
output_keys = [],
)
update_context(context, state="failed")
raise
return context
def execute(self, context: dict[str, Any]) -> dict[str, Any]:
"""
Agent-specific logic. Subclasses MUST override this method.
Args:
context: The live request context dict.
Returns:
The mutated context dict.
Raises:
NotImplementedError: If subclass does not implement this.
"""
raise NotImplementedError(
f"{self.__class__.__name__} must implement execute(context)."
)