Proofly / core /orchestrator.py
Dipan04's picture
Initial clean commit for Hugging Face Space
2c41dce
"""
Orchestrator Module
Central controller that coordinates agent execution in a fixed sequence.
"""
from typing import Dict, Any, Optional
import logging
from core.agent_base import Agent
from core.errors import ProofSystemError
from agents.input_validator import InputValidatorAgent
from agents.text_extraction_agent import TextExtractionAgent
from agents.hashing_agent import HashingAgent
from agents.metadata_agent import MetadataAgent
from agents.proof_builder import ProofBuilderAgent
from agents.storage_agent import SupabaseStorageAgent
from agents.verification_agent import VerificationAgent
from models.proof import Proof, VerificationResult
from sidecar.gemini_sidecar import GeminiSidecar
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Orchestrator:
"""
Central orchestrator that manages the proof generation pipeline.
Coordinates agent execution and handles failures gracefully.
"""
def __init__(self):
"""Initialize all agents in the pipeline."""
self.input_validator = InputValidatorAgent()
self.text_extraction_agent = TextExtractionAgent()
self.hashing_agent = HashingAgent()
self.metadata_agent = MetadataAgent()
self.proof_builder = ProofBuilderAgent()
self.storage_agent = SupabaseStorageAgent()
self.verification_agent = VerificationAgent(self.storage_agent)
# AI Sidecar (optional, non-authoritative)
self.ai_sidecar = GeminiSidecar()
logger.info("Orchestrator initialized with all agents")
def create_proof(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute the full proof creation pipeline.
Args:
input_data: {
"type": "file" | "text",
"content": bytes | str,
"filename": str (optional)
}
Returns:
{
"success": bool,
"proof_id": str,
"proof": Proof,
"message": str
}
"""
try:
logger.info("Starting proof creation pipeline")
# Step 1: Validate input
logger.info("Step 1/6: Validating input")
validated_data = self.input_validator.execute(input_data)
# Step 2: Extract text (OCR if applicable)
logger.info("Step 2/6: Text extraction (OCR)")
ocr_data = self.text_extraction_agent.execute(validated_data)
# Step 3: Generate hash (ALWAYS on raw content, never OCR output)
logger.info("Step 3/6: Generating hash from raw content")
hashed_data = self.hashing_agent.execute(ocr_data)
# Step 4: Generate metadata (includes OCR results)
logger.info("Step 4/6: Generating metadata")
metadata_data = self.metadata_agent.execute(hashed_data)
# Step 5: Build proof
logger.info("Step 5/6: Building proof object")
proof_data = self.proof_builder.execute(metadata_data)
# Step 6: Save proof
logger.info("Step 6/6: Saving proof to storage")
storage_result = self.storage_agent.save_proof(proof_data["proof"])
logger.info(f"Proof created successfully: {proof_data['proof_id']}")
# OPTIONAL: AI Sidecar explains the proof (non-blocking)
# This does NOT affect the core response
result = {
"success": True,
"proof_id": proof_data["proof_id"],
"proof": proof_data["proof"],
"message": "Proof created and stored successfully"
}
# Add AI explanation if available (optional, non-authoritative)
if self.ai_sidecar.enabled:
try:
assistant_response = self.ai_sidecar.explain_proof(
proof_data["proof"]
)
result["assistant"] = assistant_response.to_dict()
logger.info("AI explanation added to response")
except Exception as e:
logger.warning(f"AI explanation failed (non-critical): {str(e)}")
# AI failure does not affect core response
return result
except ProofSystemError as e:
logger.error(f"Proof creation failed: {str(e)}")
return {
"success": False,
"error": str(e),
"error_type": e.__class__.__name__,
"message": "Proof creation failed"
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {
"success": False,
"error": str(e),
"error_type": "UnexpectedError",
"message": "An unexpected error occurred"
}
def ask_assistant(
self,
question: str,
proof_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Ask the AI assistant a question about a proof.
This is a separate, optional endpoint - not part of core flow.
Args:
question: User's question
proof_id: Optional proof ID for context
Returns:
{
"success": bool,
"assistant": AssistantResponse dict,
"message": str
}
"""
if not self.ai_sidecar.enabled:
return {
"success": False,
"message": "AI assistant is not enabled. Set AI_ENABLED=true and configure GEMINI_API_KEY."
}
try:
logger.info(f"AI assistant query: {question[:50]}...")
# Get proof if provided
proof = None
if proof_id:
proof = self.storage_agent.get_proof(proof_id)
assistant_response = self.ai_sidecar.answer_question(
question,
proof
)
return {
"success": True,
"assistant": assistant_response.to_dict(),
"message": "Question answered"
}
except Exception as e:
logger.error(f"AI assistant query failed: {str(e)}")
return {
"success": False,
"error": str(e),
"message": "AI assistant query failed"
}
def verify_proof(self, proof_id: str, content: bytes) -> Dict[str, Any]:
"""
Verify an existing proof.
Args:
proof_id: Unique proof identifier
content: Original content to verify
Returns:
{
"success": bool,
"verification_result": VerificationResult,
"message": str
}
"""
try:
logger.info(f"Starting proof verification: {proof_id}")
result = self.verification_agent.execute({
"proof_id": proof_id,
"content": content
})
verification_result = result["verification_result"]
logger.info(f"Verification completed: {verification_result.message}")
result = {
"success": True,
"verification_result": verification_result,
"message": verification_result.message
}
# OPTIONAL: AI Sidecar explains verification (non-blocking)
if self.ai_sidecar.enabled:
try:
# Get original proof for context
proof = self.storage_agent.get_proof(proof_id)
assistant_response = self.ai_sidecar.explain_verification(
verification_result,
proof
)
result["assistant"] = assistant_response.to_dict()
logger.info("AI verification explanation added")
except Exception as e:
logger.warning(f"AI explanation failed (non-critical): {str(e)}")
# AI failure does not affect core response
return result
except ProofSystemError as e:
logger.error(f"Verification failed: {str(e)}")
return {
"success": False,
"error": str(e),
"error_type": e.__class__.__name__,
"message": "Verification failed"
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {
"success": False,
"error": str(e),
"error_type": "UnexpectedError",
"message": "An unexpected error occurred"
}
def get_proof(self, proof_id: str) -> Dict[str, Any]:
"""
Retrieve a proof from storage.
Args:
proof_id: Unique proof identifier
Returns:
{
"success": bool,
"proof": Proof | None,
"message": str
}
"""
try:
logger.info(f"Retrieving proof: {proof_id}")
proof = self.storage_agent.get_proof(proof_id)
if not proof:
return {
"success": False,
"proof": None,
"message": f"Proof not found: {proof_id}"
}
return {
"success": True,
"proof": proof,
"message": "Proof retrieved successfully"
}
except ProofSystemError as e:
logger.error(f"Proof retrieval failed: {str(e)}")
return {
"success": False,
"error": str(e),
"error_type": e.__class__.__name__,
"message": "Proof retrieval failed"
}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {
"success": False,
"error": str(e),
"error_type": "UnexpectedError",
"message": "An unexpected error occurred"
}