""" Orchestrator Module Central controller that coordinates agent execution in a fixed sequence. """ from typing import Dict, Any, Optional import logging from core.agent_base import Agent from core.errors import ProofSystemError from agents.input_validator import InputValidatorAgent from agents.text_extraction_agent import TextExtractionAgent from agents.hashing_agent import HashingAgent from agents.metadata_agent import MetadataAgent from agents.proof_builder import ProofBuilderAgent from agents.storage_agent import SupabaseStorageAgent from agents.verification_agent import VerificationAgent from models.proof import Proof, VerificationResult from sidecar.gemini_sidecar import GeminiSidecar logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Orchestrator: """ Central orchestrator that manages the proof generation pipeline. Coordinates agent execution and handles failures gracefully. """ def __init__(self): """Initialize all agents in the pipeline.""" self.input_validator = InputValidatorAgent() self.text_extraction_agent = TextExtractionAgent() self.hashing_agent = HashingAgent() self.metadata_agent = MetadataAgent() self.proof_builder = ProofBuilderAgent() self.storage_agent = SupabaseStorageAgent() self.verification_agent = VerificationAgent(self.storage_agent) # AI Sidecar (optional, non-authoritative) self.ai_sidecar = GeminiSidecar() logger.info("Orchestrator initialized with all agents") def create_proof(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """ Execute the full proof creation pipeline. Args: input_data: { "type": "file" | "text", "content": bytes | str, "filename": str (optional) } Returns: { "success": bool, "proof_id": str, "proof": Proof, "message": str } """ try: logger.info("Starting proof creation pipeline") # Step 1: Validate input logger.info("Step 1/6: Validating input") validated_data = self.input_validator.execute(input_data) # Step 2: Extract text (OCR if applicable) logger.info("Step 2/6: Text extraction (OCR)") ocr_data = self.text_extraction_agent.execute(validated_data) # Step 3: Generate hash (ALWAYS on raw content, never OCR output) logger.info("Step 3/6: Generating hash from raw content") hashed_data = self.hashing_agent.execute(ocr_data) # Step 4: Generate metadata (includes OCR results) logger.info("Step 4/6: Generating metadata") metadata_data = self.metadata_agent.execute(hashed_data) # Step 5: Build proof logger.info("Step 5/6: Building proof object") proof_data = self.proof_builder.execute(metadata_data) # Step 6: Save proof logger.info("Step 6/6: Saving proof to storage") storage_result = self.storage_agent.save_proof(proof_data["proof"]) logger.info(f"Proof created successfully: {proof_data['proof_id']}") # OPTIONAL: AI Sidecar explains the proof (non-blocking) # This does NOT affect the core response result = { "success": True, "proof_id": proof_data["proof_id"], "proof": proof_data["proof"], "message": "Proof created and stored successfully" } # Add AI explanation if available (optional, non-authoritative) if self.ai_sidecar.enabled: try: assistant_response = self.ai_sidecar.explain_proof( proof_data["proof"] ) result["assistant"] = assistant_response.to_dict() logger.info("AI explanation added to response") except Exception as e: logger.warning(f"AI explanation failed (non-critical): {str(e)}") # AI failure does not affect core response return result except ProofSystemError as e: logger.error(f"Proof creation failed: {str(e)}") return { "success": False, "error": str(e), "error_type": e.__class__.__name__, "message": "Proof creation failed" } except Exception as e: logger.error(f"Unexpected error: {str(e)}") return { "success": False, "error": str(e), "error_type": "UnexpectedError", "message": "An unexpected error occurred" } def ask_assistant( self, question: str, proof_id: Optional[str] = None ) -> Dict[str, Any]: """ Ask the AI assistant a question about a proof. This is a separate, optional endpoint - not part of core flow. Args: question: User's question proof_id: Optional proof ID for context Returns: { "success": bool, "assistant": AssistantResponse dict, "message": str } """ if not self.ai_sidecar.enabled: return { "success": False, "message": "AI assistant is not enabled. Set AI_ENABLED=true and configure GEMINI_API_KEY." } try: logger.info(f"AI assistant query: {question[:50]}...") # Get proof if provided proof = None if proof_id: proof = self.storage_agent.get_proof(proof_id) assistant_response = self.ai_sidecar.answer_question( question, proof ) return { "success": True, "assistant": assistant_response.to_dict(), "message": "Question answered" } except Exception as e: logger.error(f"AI assistant query failed: {str(e)}") return { "success": False, "error": str(e), "message": "AI assistant query failed" } def verify_proof(self, proof_id: str, content: bytes) -> Dict[str, Any]: """ Verify an existing proof. Args: proof_id: Unique proof identifier content: Original content to verify Returns: { "success": bool, "verification_result": VerificationResult, "message": str } """ try: logger.info(f"Starting proof verification: {proof_id}") result = self.verification_agent.execute({ "proof_id": proof_id, "content": content }) verification_result = result["verification_result"] logger.info(f"Verification completed: {verification_result.message}") result = { "success": True, "verification_result": verification_result, "message": verification_result.message } # OPTIONAL: AI Sidecar explains verification (non-blocking) if self.ai_sidecar.enabled: try: # Get original proof for context proof = self.storage_agent.get_proof(proof_id) assistant_response = self.ai_sidecar.explain_verification( verification_result, proof ) result["assistant"] = assistant_response.to_dict() logger.info("AI verification explanation added") except Exception as e: logger.warning(f"AI explanation failed (non-critical): {str(e)}") # AI failure does not affect core response return result except ProofSystemError as e: logger.error(f"Verification failed: {str(e)}") return { "success": False, "error": str(e), "error_type": e.__class__.__name__, "message": "Verification failed" } except Exception as e: logger.error(f"Unexpected error: {str(e)}") return { "success": False, "error": str(e), "error_type": "UnexpectedError", "message": "An unexpected error occurred" } def get_proof(self, proof_id: str) -> Dict[str, Any]: """ Retrieve a proof from storage. Args: proof_id: Unique proof identifier Returns: { "success": bool, "proof": Proof | None, "message": str } """ try: logger.info(f"Retrieving proof: {proof_id}") proof = self.storage_agent.get_proof(proof_id) if not proof: return { "success": False, "proof": None, "message": f"Proof not found: {proof_id}" } return { "success": True, "proof": proof, "message": "Proof retrieved successfully" } except ProofSystemError as e: logger.error(f"Proof retrieval failed: {str(e)}") return { "success": False, "error": str(e), "error_type": e.__class__.__name__, "message": "Proof retrieval failed" } except Exception as e: logger.error(f"Unexpected error: {str(e)}") return { "success": False, "error": str(e), "error_type": "UnexpectedError", "message": "An unexpected error occurred" }