Spaces:
Running
Running
| import os | |
| import logging | |
| from celery import Celery # type: ignore | |
| from dotenv import load_dotenv | |
| import time | |
| from typing import Dict, Any | |
| from app.services.database import SessionLocal | |
| from app.services.context_manager import ContextManager | |
| from app.agents.case_manager import CaseManagerAgent | |
| from app.agents.research_agent import ResearchAgent | |
| from app.models.schemas import CaseStatus | |
| from app.observability.metrics import AGENT_LATENCY, AGENT_FAILURES | |
| load_dotenv() | |
| # Set multiprocess directory for Prometheus metrics | |
| os.environ["PROMETHEUS_MULTIPROC_DIR"] = os.path.abspath( | |
| os.path.join(os.path.dirname(__file__), "..", "..", "prometheus_multiproc_dir") | |
| ) | |
| # Configure Celery Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Retrieve Redis URL | |
| REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") | |
| # Create Celery instance | |
| celery_app = Celery( | |
| "investigator_tasks", | |
| broker=REDIS_URL, | |
| backend=REDIS_URL | |
| ) | |
| # Configuration settings | |
| celery_app.conf.update( | |
| task_serializer="json", | |
| accept_content=["json"], | |
| result_serializer="json", | |
| timezone="UTC", | |
| enable_utc=True, | |
| ) | |
| def run_case_manager_task(self, case_id: str): | |
| """ | |
| Asynchronous Celery task that coordinates the multi-agent investigation: | |
| 1. Runs CaseManagerAgent to decompose the problem statement into hypotheses. | |
| 2. Runs ResearchAgent on each hypothesis to perform web research and gather evidence. | |
| 3. Saves all results and updates the case status to completed. | |
| """ | |
| logger.info(f"Starting async agent investigation for case ID: {case_id}") | |
| db = SessionLocal() | |
| try: | |
| # 1. Fetch case context from database | |
| context = ContextManager.get_context(db=db, case_id=case_id) | |
| if not context: | |
| error_msg = f"Case with ID {case_id} not found in database." | |
| logger.error(error_msg) | |
| return {"status": "failed", "error": error_msg} | |
| # Mark case as actively investigating | |
| ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.INVESTIGATING) | |
| # 2. Run the Case Manager Agent to generate hypotheses | |
| logger.info(f"Invoking CaseManagerAgent for problem: '{context.problem_statement}'") | |
| with AGENT_LATENCY.labels(agent_name="CaseManagerAgent").time(): | |
| case_manager = CaseManagerAgent() | |
| new_hypotheses = case_manager.execute(context) | |
| # Add generated hypotheses to database | |
| ContextManager.add_hypotheses(db=db, case_id=case_id, new_hypotheses=new_hypotheses) | |
| # 3. Retrieve refreshed case context containing assigned hypothesis database IDs | |
| refreshed_context = ContextManager.get_context(db=db, case_id=case_id) | |
| # 4. Loop through hypotheses and execute ResearchAgent on each (No delays since we are on paid Gemini API) | |
| for hypothesis in refreshed_context.hypotheses: | |
| logger.info(f"Investigating Hypothesis: '{hypothesis.statement}' (ID: {hypothesis.id})") | |
| # Mark hypothesis status as investigating | |
| ContextManager.update_hypothesis( | |
| db=db, | |
| hypothesis_id=hypothesis.id, | |
| status="investigating", | |
| assigned_investigator="ResearchAgent" | |
| ) | |
| # Run the Research Agent (which formulates searches & queries DDG) | |
| with AGENT_LATENCY.labels(agent_name="ResearchAgent").time(): | |
| researcher = ResearchAgent() | |
| verdict = researcher.execute_research(hypothesis.statement, refreshed_context) | |
| # Save supporting evidence | |
| if verdict.get("supporting_evidence"): | |
| new_evidence = [ | |
| { | |
| "hypothesis_id": hypothesis.id, | |
| "source": f"web_search: {verdict.get('search_query')}", | |
| "content": ev, | |
| "confidence": 0.85 | |
| } for ev in verdict["supporting_evidence"] | |
| ] | |
| ContextManager.add_evidence(db=db, case_id=case_id, new_evidence=new_evidence) | |
| # Save contrary evidence | |
| if verdict.get("contrary_evidence"): | |
| contrary_ev = [ | |
| { | |
| "hypothesis_id": hypothesis.id, | |
| "source": f"web_search: {verdict.get('search_query')}", | |
| "content": f"[CONTRARY] {ev}", | |
| "confidence": 0.85 | |
| } for ev in verdict["contrary_evidence"] | |
| ] | |
| ContextManager.add_evidence(db=db, case_id=case_id, new_evidence=contrary_ev) | |
| # Add conclusion as a case Fact | |
| conclusion_fact = [ | |
| { | |
| "source": "ResearchAgent", | |
| "content": f"Hypothesis Verdict: '{hypothesis.statement}' -> {verdict.get('conclusion')}" | |
| } | |
| ] | |
| ContextManager.add_facts(db=db, case_id=case_id, new_facts=conclusion_fact) | |
| # Update final status of the hypothesis | |
| ContextManager.update_hypothesis( | |
| db=db, | |
| hypothesis_id=hypothesis.id, | |
| status=verdict.get("status", "inconclusive") | |
| ) | |
| # 5. Mark overall case status as completed | |
| ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.COMPLETED) | |
| logger.info(f"Successfully completed investigation for case ID: {case_id}") | |
| return { | |
| "status": "success", | |
| "case_id": case_id, | |
| "hypotheses_investigated": len(refreshed_context.hypotheses) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error executing run_case_manager_task: {str(e)}") | |
| AGENT_FAILURES.labels(agent_name="InvestigationPipeline", error_type=type(e).__name__).inc() | |
| # Only set status to FAILED if we are out of retries | |
| if self.request.retries >= self.max_retries: | |
| try: | |
| ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.FAILED) | |
| except Exception as rollback_err: | |
| logger.error(f"Failed to set status to FAILED for case ID {case_id}: {str(rollback_err)}") | |
| try: | |
| raise self.retry(exc=e, countdown=10) | |
| except Exception as retry_err: | |
| logger.error(f"Task retry failed: {str(retry_err)}") | |
| # Force FAILED status if we can't retry anymore | |
| try: | |
| ContextManager.update_status(db=db, case_id=case_id, status=CaseStatus.FAILED) | |
| except Exception: | |
| pass | |
| return {"status": "failed", "error": str(e)} | |
| finally: | |
| db.close() |