agentbond-api / app /workers /celery_worker.py
Karan6124's picture
fix(agent): handle non-dict LLM response gracefully and delay setting task to FAILED during Celery retries
6ae9355
Raw
History Blame Contribute Delete
7.01 kB
import os
import logging
from celery import Celery # type: ignore
from dotenv import load_dotenv
import time
from typing import Dict, Any
from app.services.database import SessionLocal
from app.services.context_manager import ContextManager
from app.agents.case_manager import CaseManagerAgent
from app.agents.research_agent import ResearchAgent
from app.models.schemas import CaseStatus
from app.observability.metrics import AGENT_LATENCY, AGENT_FAILURES
load_dotenv()
# Set multiprocess directory for Prometheus metrics
os.environ["PROMETHEUS_MULTIPROC_DIR"] = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "prometheus_multiproc_dir")
)
# Configure Celery Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Retrieve Redis URL
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Create Celery instance
celery_app = Celery(
"investigator_tasks",
broker=REDIS_URL,
backend=REDIS_URL
)
# Configuration settings
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)
@celery_app.task(name="app.workers.celery_worker.run_case_manager_task", bind=True, max_retries=3)
def run_case_manager_task(self, case_id: str):
"""
Asynchronous Celery task that coordinates the multi-agent investigation:
1. Runs CaseManagerAgent to decompose the problem statement into hypotheses.
2. Runs ResearchAgent on each hypothesis to perform web research and gather evidence.
3. Saves all results and updates the case status to completed.
"""
logger.info(f"Starting async agent investigation for case ID: {case_id}")
db = SessionLocal()
try:
# 1. Fetch case context from database
context = ContextManager.get_context(db=db, case_id=case_id)
if not context:
error_msg = f"Case with ID {case_id} not found in database."
logger.error(error_msg)
return {"status": "failed", "error": error_msg}
# Mark case as actively investigating
ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.INVESTIGATING)
# 2. Run the Case Manager Agent to generate hypotheses
logger.info(f"Invoking CaseManagerAgent for problem: '{context.problem_statement}'")
with AGENT_LATENCY.labels(agent_name="CaseManagerAgent").time():
case_manager = CaseManagerAgent()
new_hypotheses = case_manager.execute(context)
# Add generated hypotheses to database
ContextManager.add_hypotheses(db=db, case_id=case_id, new_hypotheses=new_hypotheses)
# 3. Retrieve refreshed case context containing assigned hypothesis database IDs
refreshed_context = ContextManager.get_context(db=db, case_id=case_id)
# 4. Loop through hypotheses and execute ResearchAgent on each (No delays since we are on paid Gemini API)
for hypothesis in refreshed_context.hypotheses:
logger.info(f"Investigating Hypothesis: '{hypothesis.statement}' (ID: {hypothesis.id})")
# Mark hypothesis status as investigating
ContextManager.update_hypothesis(
db=db,
hypothesis_id=hypothesis.id,
status="investigating",
assigned_investigator="ResearchAgent"
)
# Run the Research Agent (which formulates searches & queries DDG)
with AGENT_LATENCY.labels(agent_name="ResearchAgent").time():
researcher = ResearchAgent()
verdict = researcher.execute_research(hypothesis.statement, refreshed_context)
# Save supporting evidence
if verdict.get("supporting_evidence"):
new_evidence = [
{
"hypothesis_id": hypothesis.id,
"source": f"web_search: {verdict.get('search_query')}",
"content": ev,
"confidence": 0.85
} for ev in verdict["supporting_evidence"]
]
ContextManager.add_evidence(db=db, case_id=case_id, new_evidence=new_evidence)
# Save contrary evidence
if verdict.get("contrary_evidence"):
contrary_ev = [
{
"hypothesis_id": hypothesis.id,
"source": f"web_search: {verdict.get('search_query')}",
"content": f"[CONTRARY] {ev}",
"confidence": 0.85
} for ev in verdict["contrary_evidence"]
]
ContextManager.add_evidence(db=db, case_id=case_id, new_evidence=contrary_ev)
# Add conclusion as a case Fact
conclusion_fact = [
{
"source": "ResearchAgent",
"content": f"Hypothesis Verdict: '{hypothesis.statement}' -> {verdict.get('conclusion')}"
}
]
ContextManager.add_facts(db=db, case_id=case_id, new_facts=conclusion_fact)
# Update final status of the hypothesis
ContextManager.update_hypothesis(
db=db,
hypothesis_id=hypothesis.id,
status=verdict.get("status", "inconclusive")
)
# 5. Mark overall case status as completed
ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.COMPLETED)
logger.info(f"Successfully completed investigation for case ID: {case_id}")
return {
"status": "success",
"case_id": case_id,
"hypotheses_investigated": len(refreshed_context.hypotheses)
}
except Exception as e:
logger.error(f"Error executing run_case_manager_task: {str(e)}")
AGENT_FAILURES.labels(agent_name="InvestigationPipeline", error_type=type(e).__name__).inc()
# Only set status to FAILED if we are out of retries
if self.request.retries >= self.max_retries:
try:
ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.FAILED)
except Exception as rollback_err:
logger.error(f"Failed to set status to FAILED for case ID {case_id}: {str(rollback_err)}")
try:
raise self.retry(exc=e, countdown=10)
except Exception as retry_err:
logger.error(f"Task retry failed: {str(retry_err)}")
# Force FAILED status if we can't retry anymore
try:
ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.FAILED)
except Exception:
pass
return {"status": "failed", "error": str(e)}
finally:
db.close()